Questions tagged [spring-cloud-stream]
306 questions
0
votes
1
answer
11
Views
How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?
Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples:
@Transactional
@StreamListe...
1
votes
1
answer
196
Views
Spring Cloud Stream @SendTo Annotation not working
I'm using Spring Cloud Stream with Spring Boot. My application is very simple:
ExampleService.class:
@EnableBinding(Processor1.class)
@Service
public class ExampleService {
@StreamListener(Processor1.INPUT)
@SendTo(Processor1.OUTPUT)
public String dequeue(String message){
System.out.println('New mes...
1
votes
1
answer
158
Views
Spring Boot Cloud + RabbitMQ
I'm new to Spring, Spring Boot and RabbitMQ. However I'm an okay programmer/problem solver.
Recently I started go through this book Learning Spring Boot 2.0 - Second Edition, code for this can be found here https://github.com/learning-spring-boot/learning-spring-boot-2nd-edition-code.
I think that t...
1
votes
2
answer
256
Views
What version of spring-cloud-stream-binder-kstream is compatible with Kafka 1.0.0
When trying to run a slightly adapted version of word-count example, I am having an error that says 'No qualifying bean of type'org.apache.kafka.streams.kstream.KStreamBuilder''. In my POM, I am using spring-cloud-stream-dependencies:Elmhurst.M3 to import dependencies, which imported spring-cloud-s...
1
votes
1
answer
191
Views
Issues encountered switching from RabbitMQ to Kafka
The code below can also be found in the answer to How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels? When attempting to switch from RabbitMQ to Kafka, I'm encountering the following exception:
org.springframework.messaging.MessageHandlingException: Missing header 'foo'...
1
votes
0
answer
313
Views
throwing exception in spring cloud stream get a ClassCastException
I am using Spring Cloud Stream 2.0.0 RC3. If I throw an exception in consumer, I will get ClassCastException.
I am confused why throwing an exception incurs ClassCastException. Is it an expected behavior in Spring Cloud Stream? If a consumer fails to consume message, instead of throwing exception, w...
1
votes
1
answer
137
Views
How to refactor this piece of code into a more reactive one
We are building microservices using Spring Cloud Stream.
We have this piece of code running, which basically subscribes to Messages from the Tasks channel with a specific type header, execute a job and then publish a new Message to the Events channel:
@EnableBinding({InboundChannels.class, OutboundC...
1
votes
1
answer
420
Views
Error testing with Spring Cloud Stream Test
We are using spring-cloud-stream to manage messages between our applications.
We have custom bindings:
public interface InboundChannels {
String TASKS = 'domainTasksInboundChannel';
String EVENTS = 'eventsInboundChannel';
@Input(TASKS)
SubscribableChannel tasks();
@Input(EVENTS)
SubscribableChannel...
1
votes
0
answer
190
Views
Spring boot graphql-java, subscriptions and streamlisteners
I'm trying to create an observable from a streamlistener response but are unable to do so. I'm quite new to the idea of EventObservable and Reactive Java. It would be helpful if someone could look into my code to see if it has been constructed the right way and if my understanding of the concept is...
1
votes
0
answer
306
Views
Spring cloud stream confluent schema registry not working
Below code is not working with confluent schema registry. its not giving connection timeout errors with schema registry url.
consumer configuration
@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class ConsumerApplication {
private final Log logger = LogFactory....
1
votes
1
answer
185
Views
Logging Spring Sleuth Span Id of source application error channel
I have a system setup with a RabbitMQ event bus and applications using spring cloud stream. To help keep track of the messages through the system I'm also using spring-cloud-sleuth.
The question I have involves the errorChannel flow/binding. I'm not using the the error binding in my configuration (r...
1
votes
0
answer
472
Views
Getting java.io.EOFException on Spring Kafka configuration with SSL
I am working on Kafka Streams code using Spring cloud Stream.
Maven version - Spring Boot 2.0.2 Release
Apache kafka-streams - 1.1.0
Apache kafka-client - 1.1.0.
configuration -
spring:
profiles: dev, test
cloud:
stream:
kafka:
binder:
autoCreateTopics: false
auto-add-partitions: false
brokers: ${KA...
1
votes
0
answer
50
Views
Extract trace id from external service
I was recently upgrading to Spring Cloud Sleuth 2.0.0 and facing now the issue to correctly extract the trace id from an external service.
There is ExternalService (not under my control) which is publishing messages into a ActiveMQ queue with a header value 'X-B3-TraceId', a 16 chars long String lik...
1
votes
1
answer
149
Views
How to send a message with priority with Spring cloud stream binder
I'm using rabbitmq. I've defined a queue with priority, but when I have to send a message with priority, I don't know how to specify the priority on it.
This is my code
StreamProcessor.java
public interface StreamProcessor {
public static final String TEST_JOB_OUTPUT = 'test-job-output';
@Output(Str...
1
votes
2
answer
275
Views
spring cloud stream kafka 2.0 - StreamListener with condition
I'm trying to create a consumer using StreamListener annotation and condition attirbute.However , i'm getting the following exception :
org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [java.lang.Integer] for value 'test'; nested exce...
1
votes
0
answer
72
Views
Delayed Exchange not directly publish message
I have tried to create a queue and I want to create delayed exchange and send message to a corresponding queue. But I find that just after creating the exchange, the message is not sent to the queue (it will not also be consumed as well).
But this strange thing happens, after a while, lets say 30 mi...
1
votes
1
answer
332
Views
Kinesis as producer in Spring Boot Reactive Stream API
I'm trying to build a small Spring Boot Reactive API. The API should let the users subscribe to some data, returned as SSE.
The data is located on a Kinesis Topic.
Creating the Reactive API, and the StreamListener to Kinesis is fairly easy - but can I combine these, so the Kinesis Topic are used as...
1
votes
0
answer
234
Views
Spring Cloud Stream Message Handling
While playing around with Spring Cloud Stream, Kafka, Debezium and Postgresql I am facing the following issue :
To highlight :
A table person(id,name) is there in postgresql , debezium fetched the change events and throws to Kafka topic(person) , verified messages are there in the topic , the form o...
1
votes
2
answer
515
Views
Stop Spring Cloud Stream @StreamListener from listening until receipt of some Spring Event
I am working on a Camunda BPM Spring Boot Application. The application reads messages from rabbitmq queue using Spring Cloud Stream. Once the message is received, the application invokes a process instance in Camunda.
If there are messages already in the rabbitmq queue during application startup, th...
1
votes
1
answer
63
Views
The constructor TaskLaunchRequest(String, List<String>, null, null) is undefined - Spring Cloud Task
I am developing a code for the Spring Cloud with Spring Boot. In this example, I am working in Spring Cloud Tasks. I was using Spring Boot 1.4.1.RELEASE then below code was working fine. When I updated 2.0.4.RELEASE then I am getting below error.
The constructor TaskLaunchRequest(String, List, null,...
1
votes
1
answer
471
Views
Spring cloud stream kafka avro deserialization
I'm writing a spring cloud stream sink application which consumes Avro messages. I'm trying to get the application consume the messages however I'm getting the following error.
org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.avro.generic.Ge...
1
votes
1
answer
346
Views
Missing bean 'zipkin2.reporter.Sender' when using `@AutoConfigureAfter(TraceAutoConfiguration.class)`
TLDR :
Repro project here : https://github.com/snussbaumer/zipkin-app-wont-start-repo
I want to use a Zipkin Kafka Sender
I also need a piece of AutoConfiguration to run after TraceAutoConfiguration from Sleuth
If I use @AutoConfigureAfter the application does not start and fails with message No qu...
1
votes
1
answer
108
Views
When deploying a stream with MongoDB Sink I got a MappingException “Couldn't find PersistentEntity for type class [B!”
I'm facing this problem when I'm trying to use the MongoDB Sink app starter with an information read by a JDBC source:
MappingException: Couldn't find PersistentEntity for type class [B!
Class [B! means that is a byte[]. I printed in a log and this is the JSON of the object. Is this a bug? The other...
1
votes
0
answer
326
Views
Error management in Spring Cloud Stream with Kafka Stream
For a project, I use Spring Cloud Stream with Kafka as binder with several Spring Boot Apps using a Kafka stream (KStream). The goal is to have a pipeline of Spring boot apps in Spring Cloud Data Flow.
I would like to know how best to manage exceptions raised during the stream because an uncaught ex...
1
votes
0
answer
415
Views
JsonParseException: Unexpected character ('i' (code 105)): was expecting double-quote
I am trying to integrate Spring Cloud streams and publishing a custom Java Object across services with RabbitMQ as broker. The object I am publishing looks like:
public class AppMessageEnvelope implements Serializable {
...
private Object messageBody;
private Date sentAt = new Date();
...
// setters...
1
votes
0
answer
79
Views
Spring Cloud Stream Reactive - which kafka-binder is appropriate in case of reactive stream pipeline?
We are using reactive spring cloud stream with the Kafka. Any suggestions on using the right binder? which one is most suitable for the reactive pipeline?
spring-cloud-stream-binder-kafka
spring-cloud-stream-binder-kafka-streams
Obviously, we would love and wish if we get reactive-kafka-binder. Sinc...
1
votes
0
answer
52
Views
Spring Cloud Streams RabbitMQ multi-binder vs the Shovel Plugin/Application
Let's consider two systems.
Each of the systems uses a dedicated messaging broker for communication between its micro-services.
We want to communicate these two systems using Spring Cloud Stream.
Case A
Both the systems use RabbitMQ as a message broker.
Which approach is better:
Multi-binder (spring...
1
votes
1
answer
88
Views
Kafka is marking the coordinator dead when autoscaling is on
We run a Kubernetes cluster with Kafka 0.10.2. In the cluster we have a replica set of 10 replicas running one of our services, which consume from one topic as one consumer-group.
Lately we turned on the autoscaling feature for this replica-set, so it can increase or decrease the number of pods, bas...
1
votes
1
answer
94
Views
Kafka consumer process order with concurrency
I have a producer writing to a Kafka topic with 100 partitions, and it choose the partition by the user ID, therefore user's messages are necessarily being processed by the order they were submitted to the queue.
The service which is responsible for consuming has 2-10 instances, each one has in its...
1
votes
0
answer
202
Views
Failed to create producer binding
Kafka producer binding issue 2019-01-04 03:58:18.497 ERROR
[api-gateway-server,,,] 1 --- [ask-scheduler-1]
o.s.cloud.stream.binding.BindingService : Failed to create producer
binding; retrying in 30 seconds
[email protected] |
headless_api-gateway-service.1...
1
votes
1
answer
129
Views
Version compatibility of the kafka, Spring Cloud stream and Spring cloud stream binder kafka
I am using Kafka-client version of 1.0.0 and Spring-cloud-stream version of 2.0.2. now I want to know the version of Spring-cloud-stream-binder-kafka version to stream the events to kafka.
Failed to create producer binding; retrying in 30 seconds
[email protected]
1
votes
0
answer
87
Views
Spring Cloud Stream App VS Spring Cloud Task App
I'm exploring the the SCDF for orchestrating the existing batch task for my system. I noticed that the SC Stream App Starters already provides some OOTB ready to use streaming applications, which suits my use case of:
Extracting xml file from FTP -> the ftp source
Customize transformation
Upload t...
1
votes
0
answer
150
Views
KafkaStreams interactive queries with Spring-Kafka
I am writing a Spring Boot app with Spring Kafka.
As my app is focused on Kafka Streams and I need to use interactive queries and query my state stores I am wondering: is there any particular way to access kafka streams state stores with Spring Kafka?
From what I've seen there is some support in S...
1
votes
1
answer
57
Views
Trace of exception in message header on dead letter queue
I am using spring-cloud-stream to consume message from Rabbit MQ.
I have a queue and dead-letter-queue binding to it. Whenever there is any exception, messages are getting routed to dead-letter-queue.
My question is - Can we put a header in message with Exception class before receiving it on dead le...
1
votes
0
answer
68
Views
Spring Cloud Bus Kafka Bean Configuration
I'm not very fond of using application.yml or bootstrap.yml in my configurations.
Do you know any class that can be used as a bean in order to configure the properties below?
spring:
cloud:
bus:
enabled: true
stream:
kafka:
binder:
zkNodes: localhost:9092
brokers: localhost:2181
My goal was to...
1
votes
1
answer
50
Views
How spring-cloud-stream-rabbit-binder works when RabbitMQ disk or memory alarm is activated?
Versions:
Spring-cloud-stream-starter-rabbit --> 2.1.0.RELEASE
RabbitMQ --> 3.7.7
Erlang --> 21.1
(1) I have created a sample mq-publisher-demo & mq-subscriber-demo repositories on github for reference.
When Memory Alarm was activated
Publisher: was able to publish messages.
Subscriber: seems like,...
1
votes
1
answer
49
Views
Spring Cloud Stream Kinesis Group in autoscaling group over JdbcLockRegistry
I am trying to create a consumer application that is using spring cloud stream to consume events from a Kinesis stream.
My consumer application is running inside an AWS autoscaling group and I would like it to scale up and down at any point without affecting the number of instances that can process...
1
votes
1
answer
74
Views
Consume Kafka message with a custom header
I'm trying to create a kafka Consumer with Spring Cloud Stream in order to listen a Kafka message, built outside any Spring context, with a custom header (operationType).
I'm using Spring Boot 1.5.x / Spring Cloud Egdware.SR5 and the 1.1.1 version of kafka-client and kafka_2.11.
My Listener class...
1
votes
1
answer
33
Views
Spring Cloud Stream Kafka Binder autoCommitOnError=false get unexpected behavior
I am using Spring Boot 2.1.1.RELEASE and Spring Cloud Greenwich.RC2, and the managed version for spring-cloud-stream-binder-kafka is 2.1.0RC4. The Kafka version is 1.1.0. I have set the following properties as the messages should not be consumed if there is an error.
spring.cloud.stream.bindings.inp...
1
votes
0
answer
25
Views
Springcloud bus custom messages cannot be sent through rabbitmq
When using the springcloud bus, a custom message is created and sent through rabbitmq, but after the message is sent, it does not go to rabbitmq. When you try to call /actuator/bus-refresh, you can see the bus messages emitted from the console page of rabbitmq.
I tried to start a micro service to re...