Questions tagged [spring-kafka]

0

votes
1

answer
11

Views

How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?

Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples: @Transactional @StreamListe...
codependent
1

votes
1

answer
926

Views

Spring kafka and Kafka Cluster

I've configured 3 kafka's in cluster and I'm trying to use with spring-kafka. But After I kill the kafka leader I'm not able to send other messages to queue. I'm setting the spring.kafka.bootstrap-servers property as: 'kafka-1:9092;kafka-2:9093,kafka-3:9094' and all of names in my hosts file. Kafka...
0

votes
0

answer
5

Views

Spring Kafka Error Handler how to avoid endless loop

I hope someone can give me a hint want I'm doing wrong here. I wrote a custom error handler for a batch listener that should seek behind the received records, send them to a dlq. I tired a lot but dont get it working. My current implementation will hang in a endless loop receiving the records over a...
xstring
1

votes
2

answer
102

Views

Spring Kafka Listener on Http Request

I am using Spring and Kafka, I make an HTTP POST request as shown below and send some info to another service via a Kafka topic. @RequestMapping(method = RequestMethod.POST, value = '/portfolio') public void getPortfolio( Authentication auth, @RequestBody User user ) { //Data Transfer Object UserDTO...
Rory Harrison
1

votes
1

answer
901

Views

How to listen for the right ACK message from Kafka

I am doing a POC with Spring Boot & Kafka for a transactional project and I have the following doubt: Scenario: One microservices MSPUB1 receives Requests from the customer. That requests publish a message on topic TRANSACTION_TOPIC1 on Kafka but that Microservice could receive multiple requests in...
jabrena
1

votes
1

answer
956

Views

KafkaConsumer to read topic from beginning

I want to start consuming from beginning of the topic. I have set the property 'AUTO_OFFSET_RESET_CONFIG' to earliest but it somehow still not reading from beginning. Any thoughts if I missing anything? I am creating a new consumer group every time. @Bean public ConcurrentKafkaListenerContainerFac...
itontips
1

votes
1

answer
200

Views

Kafka message publishing failure for random valid messages , how to fix this issue?

We are using kafka in our app , we are sending too many many messages , each message is small in size , what i mean message size is not issue , is their any issue if you try to publish toooo many message to kafka topic that some messages to failed to publish ? 'Publishing error messages to kafka to...
Bravo
1

votes
1

answer
191

Views

Issues encountered switching from RabbitMQ to Kafka

The code below can also be found in the answer to How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels? When attempting to switch from RabbitMQ to Kafka, I'm encountering the following exception: org.springframework.messaging.MessageHandlingException: Missing header 'foo'...
Keith Bennett
1

votes
0

answer
244

Views

Kafka consumer message consumption delayed for one VM in cluster

I have a cluster running kafka 10.2.1 in which a kafka message is sent using kafka-console-producer.sh and 2/3 subscribers receive the message immediately, yet the other receives the message significantly later (4 minutes). I'm assuming this is an issue with the subscriber, so below is my subscriber...
cjavier70
1

votes
0

answer
1k

Views

Kafka - how to use @KafkaListener(topicPattern=“${kafka.topics}”) where property kafka.topics is 'sss.*'?

I'm trying to implement Kafka consumer with topic names as a pattern. E.g. @KafkaListener(topicPattern='${kafka.topics}') where property kafka.topics is 'sss.*'. Now when I send message to topic 'sss.test' or any other topic name like 'sss.xyz', 'sss.pqr', it's throwing error as below: WARN o.apach...
Shailendra
1

votes
1

answer
1.1k

Views

Spring Boot 2.0.0.RC2 KafkaHealthIndicator, Actuator {“status”:“DOWN”}

I ported my application from Spring Boot 2.0.0.M6 to Spring Boot 2.0.0.RC2 and ran into the issue with KafkaHealthIndicator that thinks right now, that my Kafka status is DOWN. kafka':{ 'status':'DOWN', 'details':{ 'clusterId':'wpAKGc_DQBWy9YfPTLNctQ', 'brokerId':'0', 'nodes':1 } } org.springframewo...
alexanoid
0

votes
0

answer
7

Views

Send supertype as type information in Kafka JSON Serialization

I'm using Spring Boot to send data from one application to the other using Kafka. My design uses an interface to declare the data being sent: package domain; interface Data { public String getData(); public void setData(String data); } Producer In the source app, I implement this interface as a db E...
daniu
1

votes
1

answer
327

Views

Spring Kafka - How to set Commit Async property

I am trying to set the property for allowing commitAsync() to be called from the KafkaMessageListenerContainer: if (this.containerProperties.isSyncCommits()) { this.consumer.commitSync(commits); } else { this.consumer.commitAsync(commits, is.commitCallback); } Is there a way to set this in my applic...
shaktech786
1

votes
0

answer
1.1k

Views

kafka server TimeoutException: Expiring 1 record(s) for

I have Kafka instances running in 2 different VMs. I'm able to send messages to Kafka running in vm-1 using spring kafka-template, but while sending a message to kafka running in vm-2, I'm getting the exception below: 2018-04-19 15:12:57 [kafka-producer-network-thread | producer-1] ERROR o.s.k.s.Log...
Ajey kumar HB
1

votes
1

answer
572

Views

spring kafka template producer performance

I am using Spring Kafka template for producing messages. And the rate at which it is producing the messages is too slow. Takes around 8 mins for producing 15000 messages. Following is How I created the Kafka template: @Bean public ProducerFactory highSpeedAvroProducerFactory( @Qualifier('highSpeedPr...
Prabhakar D
1

votes
2

answer
270

Views

Set zookeeper host and port for spring-cloud-bus

I have a project with spring-cloud-starter-bus-kafka and I set the kafka URL inside application.yml like so: spring.kafka.bootstrap-servers=localhost:9092 This works find when kafka and zookeeper are deployed locally, but if I move kafka and zookeeper to their own servers I get an error when spring-...
madmaux
1

votes
1

answer
206

Views

Can we run embedded Kafka broker like ActiveMQ?

When I say embedded this what I mean please refer to following thread - https://stackoverflow.com/a/28858630/5375223 Let me give some context, in our project we are currently using ActiveMQ as messaging broker and now we are planning to migrate to Kafka. Please see the attached picture. Like in abo...
krishna_5c3
1

votes
1

answer
224

Views

spring kafka 1.2.2 gracefull shutdown

I'm using spring kafka 1.2.2.RELEASE. Currently I have configured retry template for container that has no BackOffPolicy and AlwaysRetryPolicy. Ack mode is MANUAL_IMMEDIATE. When a SIGTERM, I will let the current message to be processed and when @KafkaListener is called again with a new message I t...
1

votes
2

answer
218

Views

Not able to shutdown the jms listener which posts message to kafka spring boot application with Runtime.exit, context.close, System.exit()

I am developing a spring boot application which will listen to ibm mq with @JmsListener(id='abc', destination='${queueName}', containerFactory='defaultJmsListenerContainerFactory') I have a JmsListenerEndpointRegistry which starts the listenerContainer. On message will try to push the same messag...
1

votes
0

answer
278

Views

Kafka async send does not retry on failure

I have set up kafka using this way: @Bean public ProducerFactory producerFactory() { return new DefaultKafkaProducerFactory(producerConfigs()); } @Bean public Map producerConfigs() { Map props = new HashMap(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 'localhost:9092'); props.put(ProducerCo...
HeyItsMe
1

votes
1

answer
581

Views

Cannot query local state store in Kafka Streams Application

I'm building a kafka streams application with spring-kafka to group records by key and apply some business logic. I'm following the configuration stated on spring-kafka-streams doc, but the problem is that when I want to retrieve a value from the local store I get the following error: org.apache.kaf...
Daniel Camarasa
1

votes
0

answer
472

Views

Getting java.io.EOFException on Spring Kafka configuration with SSL

I am working on Kafka Streams code using Spring cloud Stream. Maven version - Spring Boot 2.0.2 Release Apache kafka-streams - 1.1.0 Apache kafka-client - 1.1.0. configuration - spring: profiles: dev, test cloud: stream: kafka: binder: autoCreateTopics: false auto-add-partitions: false brokers: ${KA...
R K
1

votes
1

answer
1k

Views

Spring Kafka Consumer Sometime stop receive messages

Who can help me with Kafka Consumer what in sometime stop receive messages from topic. I use Spring Boot 2.0.2 Finchley.RC2 and Spring Kafka 2.1.7.RELEASE. Kafka Version 1.0.1 Kafka version : 1.0.1 Kafka commitId : c0518aa65f25317e My Consumer Config: @EnableKafka @Configuration public class Kafka...
Fonexn
1

votes
1

answer
96

Views

Apache KafkaMetric value method deprecated, how to use metricValue instead?

so I have created a function to get metrics from Kafka topic public Double getMetricValue(String key, T kafkaTemplate) { for (final Object set : kafkaTemplate.metrics().entrySet()) { Map.Entry map = (Map.Entry) set; MetricName metricName = (MetricName) map.getKey(); KafkaMetric kafkaMetric = (Kafka...
Carmageddon
1

votes
1

answer
543

Views

How does spring kafka handle maintaining a heartbeat

In the kafka consumer documentation https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html it states that care needs to taken to make sure poll is called every so often or the broker will assume the consumer is dead. The most reliable procedure was pretty...
JacobSTL
1

votes
3

answer
996

Views

Spring Boot/Spring Kafka SSL Configuration by environment variables impossible

I have a spring boot application which communicates with Kafka. I configure this application in production by injecting environment variables. For kafka, I can configure most things with environment variables - bootstrap servers, ssl truststore location, ssl truststore password, group id, topic, eg:...
Cameron Sours
1

votes
0

answer
274

Views

Kafka-Manager showing Inactive Consumer Groups and incorrect Lag

I have a kafka cluster of 3 separate physical nodes, with each hosting 1 zk and 1 broker each. We are using Kafka 0.10.1.1. The Topics are created on ZK ensemble and the brokers connect to the ZK ensemble. My Producer and Consumer is a spring-boot app on version 1.5.13 RELEASE. We have a few topics...
Divs
1

votes
0

answer
470

Views

Embedded Kafka tests (ran by SBT) intermittently fails with ZooKeeperServer errors

I'm writing a group of tests using the spring KafkaEmbedded test util. The tests each individually stand up an embedded kafka instance, produce events and assert the resultant downstream events. The tests consistently pass when run in an IDE (e.g. IntelliJ), however the tests fail intermittently (ap...
Freestyle076
1

votes
0

answer
192

Views

Embedded Kafka: KTable+KTable leftJoin produces duplicate records

I come seeking knowledge of the arcane. First, I have two pairs of topics, with one topic in each pair feeding into the other topic. Two KTables are being formed by the latter topics, which are used in a KTable+KTable leftJoin. Problem is, the leftJoin producing THREE records when I produce a single...
Freestyle076
1

votes
1

answer
275

Views

How to pause/start/stop Kafka Producer / Kafka Template

I'm using a spring boot app with kafka integration and I want to implement an endpoint to stop and start kafka from publishing messages. The message are triggered in a async way by another endpoints. The beans KafkaTemplate or ProducerFactory producerFactory() does not have any stop and pause action...
Leonel
1

votes
1

answer
471

Views

Spring cloud stream kafka avro deserialization

I'm writing a spring cloud stream sink application which consumes Avro messages. I'm trying to get the application consume the messages however I'm getting the following error. org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.avro.generic.Ge...
k123
1

votes
0

answer
470

Views

Kafka with SASL_PLAINTEXT authentication

I'm using the following docker-compose configuration: app-zookeeper: image: wurstmeister/zookeeper container_name: app-zookeeper ports: - 2181:2181 app-kafka: build: ../images/kafka container_name: app-kafka ports: - 9092:9092 environment: KAFKA_ADVERTISED_HOST_NAME: ${DOCKER_LOCAL_HOST} KAFKA_ADVER...
Leonel
1

votes
1

answer
346

Views

Missing bean 'zipkin2.reporter.Sender' when using `@AutoConfigureAfter(TraceAutoConfiguration.class)`

TLDR : Repro project here : https://github.com/snussbaumer/zipkin-app-wont-start-repo I want to use a Zipkin Kafka Sender I also need a piece of AutoConfiguration to run after TraceAutoConfiguration from Sleuth If I use @AutoConfigureAfter the application does not start and fails with message No qu...
Sébastien Nussbaumer
1

votes
1

answer
85

Views

Spring KafkaEmbedded - problem consuming messages

I have problem using KafkaEmbedded from https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test/2.1.10.RELEASE I'm using KafkaEmbedded to create Kafka broker for testing producer/consumer pipelines. These producers/consumers are standard clients from kafka-clients. I'm not usi...
Tomasz
1

votes
0

answer
326

Views

Error management in Spring Cloud Stream with Kafka Stream

For a project, I use Spring Cloud Stream with Kafka as binder with several Spring Boot Apps using a Kafka stream (KStream). The goal is to have a pipeline of Spring boot apps in Spring Cloud Data Flow. I would like to know how best to manage exceptions raised during the stream because an uncaught ex...
GuanacoBE
1

votes
1

answer
67

Views

Spring Kafka Batch Option: ClassCastException using SeekToCurrentBatchErrorHandler

Trying to configure the Factory as below: private ConcurrentKafkaListenerContainerFactory getKafkaContainerFactory() throws IOException { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(this.getConsumerFactory());//DefaultKa...
PVS
1

votes
2

answer
237

Views

Default values for ProducerConfigs in Spring-Boot Apache Kafka

I have an Spring Boot project up and running with Kafka being implemented . I configured a Producer and sent an message with topic 'Demo' . But i didn't set any of the properties like bootstrap.servers = [localhost:9092] value.serializer = class org.apache.kafka.common.serialization.StringSeriali...
Ajinkya Karode
1

votes
0

answer
48

Views

Testing with EmbeddedKafka - Test report

I'm using EmbeddedKafka to test the integration with Kafka in my microservice. And also, I'm using jacoco for publishing the test result. Inorder to make the test run without any issue, I have to make the forkCount parameter as 0. org.apache.maven.plugins maven-failsafe-plugin 2.22.1 0 **/*ITest.jav...
Thiru
1

votes
1

answer
31

Views

controlling kafka listeners consumption

in addition to previously asked questions about transactions, I'd like ask about controlling consumption: I have listener, which processes production data. Now something bad happens, and for any reason we want to have our app UP, but stop processing records. So I'd like to have option to manually (t...
Martin Mucha
1

votes
1

answer
45

Views

spring boot properties yaml

I run spring boot and kafka with auto configuration(via annotattions only) and having props defined in .yaml file, ie: spring: kafka: bootstrap-servers: someserver:9999 consumer: group-id: mygroup .... @KafkaListener() public void receive(ConsumerRecord consumerRecord) { .... } And it works fine, sp...
markus

View additional questions