Questions tagged [akka-stream]

1

votes
2

answer
56

Views

Akka: is there a Sink that never pulls?

Need a Sink that never pulls, to use in unit tests. Is there one already available or do I need to code it myself? Please note that Sink.ignore() won't help, because it ALWAYS pulls. I need a Sink that NEVER pulls.
Michael M.
1

votes
0

answer
293

Views

Bridging Kafka and Akka Streams

Kafka Streams are only supported on the JVM platform. So I am looking for a way to leverage Akka Streams in C# to build a Kafka consumer with the benefit from Akka Streams. The Confluent C# Kafka consumer is based on polling and event handling, e.g. using (var consumer = new Consumer(consumerConfi...
carstenj
1

votes
1

answer
285

Views

How to create an Akka flow with backpressure and Control

I need to create a function with the following Interface: import akka.kafka.scaladsl.Consumer.Control object ItemConversionFlow { def build(config: StreamConfig): Flow[Item, OtherItem, Control] = { // Implementation goes here } My problem is that I don't know how to define the flow in a way that it...
Stoecki
1

votes
2

answer
178

Views

Akka Streams GraphStage

Using GraphStage is recommended in Akka Streams, but I could not find any documentation on using the getStageActor() method in Java (all of the documentation that I have found used Scala). How can I convert the following code to Java? lazy val self: StageActor = getStageActor(onMessage) and private...
Ali
1

votes
1

answer
107

Views

Combining arbitrary number of sources with materialized values

Given xs: Seq[Source[A, Mat]] and a function to combine individual materializers into a single one, is it possible to merge xs into a single aggregate source which materializes into an aggregate Mat? Consider this practical example: Having N Kafka topics represented by N Sources of type Source[A, Co...
tkroman
1

votes
0

answer
276

Views

akka stream kafka web socket clients stop receiving messages after 30 seconds of inactive period

I want to read messages from a Kafka topic and send to web socket clients. I created a sample web socket server app using akka-stream-kafka. I am using kafka-console-producer script to send messages to kafka topic. Works fine, browser clients receive data from kafka topic over web socket. If I do no...
Vms
1

votes
0

answer
256

Views

Akka-Stream: How to handle stream exception gracefully

I am working with streaming application where I received following exceptions: ParsingException FramingException StreamTcpException I have written Supervision.Decider but it doesn't catch all types of exception. My questions are: Why Supervision.Decider doesn't catch exception, it catches high level...
Nikhil
1

votes
0

answer
131

Views

Akka stream throws ParsingException when start decompressing .gz file from middle offset

I am reading .gz file from AWS S3 bucket using Akka streaming, The application reads chunk of data and process it. The following code is working fine when I read file from beginning but if I start file from particular offset then Decompression fails and throws ParsingException. Code: object StreamA...
Nikhil
1

votes
0

answer
153

Views

Akka-SSE (deprecated Ok.feed)

I have been working with Akka SSE and found this wonderful easy understanding of SSE using javascript in the client side. http://runkalrun.blogspot.in/2016/01/server-sent-events-sse-using-scala-akka.html The only problem is, the Ok.feed() method has been deprecated in play 2.6, though we can use Ok....
Sushant Adhikari
1

votes
0

answer
328

Views

Akka streams Source.actorRef vs Source.queue vs buffer, which one to use?

I am using akka-streams-kafka to created a stream consumer from a kafka topic. Using broadcast to serve events from kafka topic to web socket clients. I have found following three approaches to create a stream Source. Question: My goal is to serve hundreds/thousands of websocket clients (some of w...
Vms
1

votes
1

answer
141

Views

Akka Streams: Calling push(_,_) not within onPull(_,_) is blocking the stream - Why?

I'm having trouble understanding the behaviour of a small sample customer Akka Streams Source. The idea behind the sample is that the Source should ask an Actor for the next element. See the code below class ActorSource[T](context: ActorRefFactory, actor: ActorRef) extends GraphStage[SourceShape[T]]...
cokeSchlumpf
1

votes
0

answer
92

Views

How to use Source.queue with Alpakka

I'm trying to create a producer to a JMS queue that can be used more than once; i.e., I don't want to create a connection to the queue every time I send a message. I want an actor with a connection open, and each time a message comes in, it uses that same flow. Actor init Sink jmsSink = JmsProducer...
Kingpin2k
1

votes
0

answer
44

Views

akka slick alternative to anorm SQL(…).on(params)

I am doing as follows with anorm: val name = 'john' val age = 30 val params: Seq[NamedParameter] = List(NamedParameter('name', name), NamedParameter('age', age)) SQL'insert into person (name, age) values ({name}, {age})'.on(params:_*)).executeUpdate() I am aware it could be simpler as follows: SQL'i...
David Portabella
1

votes
0

answer
208

Views

Akka Streams: Cannot push port twice, or before it being pulled

I am trying to test my sliding window stage using the Akka Streams TestKit and I see this exception. Exception in thread 'main' java.lang.AssertionError: assertion failed: expected OnNext(Stream(2, ?)), found OnError(java.lang.IllegalArgumentException: Cannot push port (Sliding.out(2043106095)) twi...
manu
1

votes
0

answer
70

Views

akka tcp server-client heartbeat message block by scheduler processing

I am using Akka cluster (Server) and exchanging HeartBeat message with client using Akka TCP in every 5 seconds. HeartBeat is working fine till I am not using scheduler. but when I am starting 4-5 schedulers, Server is not receiving heartbeat buffer message from client (tcp connection). After sched...
Abhishek
1

votes
1

answer
114

Views

Propagate context through parser and action body in play framework

I have distributed tracing information that is carried with requests between services in HTTP headers (specifically for opentracing). I would like to be able to this information available everywhere, so I can include the trace-id in logs, propagate to downstream requests, etc. So, I want to parse...
Thayne
1

votes
0

answer
73

Views

akka stream does not consume and just logs WakeupTimeout

I've this app that we have been testing for months now. The usual setup has been either using embedded kafka, or kafka in docker-compose. I've to admit only with 1 partition topics. As soon as we deployed it (dockerized, connecting to a real broker, only one), it doesn't even start consuming but jus...
gotch4
1

votes
1

answer
56

Views

Waiting for a client websocket flow to connect before connecting source and sink

I'm using akka-streams to set up a client web socket. I'm trying to encapsulate the setup in a method with the following signature: def createConnectedWebSocket(url: String): Flow[Message, Message, _] It is clear how to create the web socket flow but it is not connected yet: val webSocketFlow: Flow[...
lex82
1

votes
2

answer
76

Views

Multiple Akka Sources from one blocking iterator?

I have an iterator which reads a binary records from an InputStream (blocking for new input), and produces elements which have one of three possible types, let's say types are T1, T2, T3. What's the easiest way to produce 3 independent Akka Sources S1, S2 and S3 from that iterator based on the type...
JPK
1

votes
1

answer
41

Views

How to implement pagination with akka-streams

I need to process large file by lines and do some heavy work (on 4 core cpu) on every item, I think code correct: implicit val system = ActorSystem('TestSystem') implicit val materializer = ActorMaterializer() import system.dispatcher val sink = Sink.foreach[String](elem => println('element proceed'...
zella
1

votes
0

answer
97

Views

How to get WebSocket close code from Akka HTTP?

We are using Akka HTTP to handle our web socket connections using the akka streams API. We are using a Flow that pipes the incoming messages to a 'connection actor'. A snippet of the code is below: val connection = system.actorOf(ConnectionActor.props()) val in = Flow[Message] .to(Sink.actorRef[Mess...
1

votes
1

answer
249

Views

How to combine akka stream source queue with graph?

I have a queue need to be broadcasted and merged using akka stream graphs. enter image description here I found the graph demo and queue demo. and don't know how to combine them. Can anyone help me out? Thanks Here is the graph demo val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit build...
liaolunhui
1

votes
0

answer
68

Views

How to save a websocket client's connection and send it later with akka-streams and akka-http

I'm trying to follow this part of the akka-http documentation where it talks about handling web socket messages asynchronously What I am trying to do is this: Receive a websocket request for a client Serve a payment invoice back to the client Run a background process that has the client's websocke...
Chris Stewart
1

votes
1

answer
186

Views

Akka Streams recreate stream in case of stage failure

I have very simple Akka Streams flow which reads msg from Kafka using alpakka, performs some manipulation on msg and indexes it to Elasticsearch. I'm using CommitableSource, therefore i'm in At-Least-Once strategy. I commit my offset only when index to ES succeed, if it fails I will read again the...
danny.lesnik
1

votes
1

answer
94

Views

Any way to reuse a Source[ByteString, Any] (without keeping it all in memory)

Is there any way to make a Source reusable? I have an akka-http server that receives a large file upload and then streams the (chunked) data to subscriber websockets and other HTTP servers via HTTP POST. In both cases, there is an API that accepts a Source[ByteString, Any]: HttpEntity(..., source) i...
Allan
1

votes
0

answer
58

Views

Websocket client does not receive data from Akka streams Source.queue

I am using Akka stream Source.queue as Source for websocket clients. Reading from kafka topic with 10k records using kaka consumer API and offering it to Source.queue with buffer 100k. I am using BroardcastHub for fan-out. The websocket client does not get any data but see records from kafka enqueue...
Vms
1

votes
1

answer
35

Views

Alpakka XML content between tags

Alpakka XML processing flow allows to read xml file element by element. But how to extract data between particular StartElement and EndElement including StartElement data? subslice is not an option because there is no constant prefix for needed elements.
St.
1

votes
1

answer
177

Views

'term akka.stream' is missing from the classpath when using with Akka Http Spray Json for SprayJsonSupport

I'm using akka-http and reactivemongo in my project and marshalling/unmarshalling case classes with akka-http-spray-json. Here are their dependencies - 'com.typesafe.akka' %% 'akka-http' % '10.1.3' 'com.typesafe.akka' %% 'akka-stream' % '2.5.14' 'com.typesafe.akka' %% 'akka-http-spray-json' % '10.1...
Ishan
1

votes
1

answer
75

Views

Websocket with Graph DSL

I am trying to implement a Websocket Login flow with Akka Flow. I get a myriad of nasty runtime exceptions around Inlets, Outlets and Connection issues. My latest is: java.lang.IllegalStateException: Illegal GraphDSL usage. Inlets [Map.in] were not returned in the resulting shape and not connected....
abergmeier
1

votes
0

answer
144

Views

Alpakka/Kafka - Partitions consumed faster than others

I’ve been using alpakka kafka to streaming data from kafka topics. I’m using: Consumer .committableSource(consumerSettings, Subscriptions.topics(topic)) Recently I’ve tried to spam more consumers like 3 on a topic which has 15 partitions. When I plug more consumers with the same group id, it k...
Thiago Pereira
1

votes
1

answer
64

Views

Alpakka UDP: How can I respond to received datagrams via the already bound socket?

I'm using Alpakkas UDP.bindFlow to forward incoming UDP datagrams to a Kafka broker. The legacy application that is sending these datagrams requires a UDP response from the same port the message was sent to. I am struggling to model this behaviour as it would require me to connect the output of the...
Uwe Sommerlatt
1

votes
0

answer
27

Views

How is fetch size controlled in akka stream alpakka slick

The documentation of slick caution that for certain databases, the fetch size must be set. However in general, I wonder how to control the fetchSize with the akka-stream-alpakka-slick integration ? We don't have access to the DB Action directly hence how to do that ? Thanks
MaatDeamon
1

votes
1

answer
207

Views

Play framework Scala: Create infinite source using scala akka streams and keep Server sent events connection open on server

We have requirement to implements server sent events for following uses cases: Send notification to UI after some processing on server. This processing is based on some logic Send notification to UI after reading messages from RabbitMQ followed by performing some operation on it. Technology set we a...
priyanka goel
1

votes
1

answer
87

Views

Error when loading static content using AKKA-HTTP Java

I'm trying out AKKA-Http in Java for the first time(akka-http_2.12 v 10.1.5, akka-stream_2.12 v 2.5.17), but can't seem to load static content for swagger UI. I've added the routes below and able to reach the json and yaml swagger endpoints via http://localhost:8080/api-docs/swagger.json and http://...
Chinedu Ekwunife
1

votes
0

answer
75

Views

akka stream CPU usage monitoring

We have several akka streams running in our app, and I'd like to find CPU usage by each of them. They share the same actor system. My idea is find out how much time each Runnable responsible for processing island inbox spends running and treat it as CPU usage time, but it seems to be impossible to i...
eprst
1

votes
1

answer
132

Views

The idiomatic way to manage shared state with Akka Streams

I need to filter my flow based on some blacklist which can be changed outside of the flow execution. So, I see two option to do that: Incapsulate blacklist in a separate service class Blacklist(init: Set[String]) { @volatile private var set: Set[String] = init def get: Set[String] = set def update(...
artemka
1

votes
0

answer
77

Views

Akka-Http, Server Sent Event and Akka Stream Source : Connection get “Reset” / Stream Source get closed after last page get stopped or refreshed

I’m trying to do some web application using the “Server Sent Event” feature of Akka HTTP. TL;DR : I can run and consume SSE from the browser, but if I have only one page and I refresh it or quit it and reload the page, the SSE entrypoint looks not working anymore as the whole Akka Stream on th...
Totetmatt
1

votes
1

answer
54

Views

Throttle API calls to external service using Scala

I have a service exposing a REST endpoint that, after a couple of transformations, calls a third-party service also via its REST endpoint. I would like to implement some sort of throttling on my service to avoid being throttled by this third-party service. Note that my service's endpoint accepts onl...
fwlega
1

votes
0

answer
74

Views

Dynamically changing the throttling level on Akka Streams

Is there any way to change the Source's throttling parameters dynamically so as to implement a ramp up mechanism e.g every passing second the throttling is to be reduced/increased ?
Erhan Bagdemir
1

votes
0

answer
40

Views

Throttle stream based on external input

Looking at the signature of throttle in Akka Streams I see that it can take a cost function of type (Out) ⇒ Int to compute how the element passing through the stage affects the cost/per speed limit of elements sent downstream. Is it possible to implement throttling based on external input for the...
Samuel Heaney

View additional questions