Questions tagged [apache-kafka]

0

votes
1

answer
11

Views

How does Spring Kafka/Spring Cloud Stream guarantee the transactionality / atomicity involving a Database and Kafka?

Spring Kafka, and thus Spring Cloud Stream, allow us to create transactional Producers and Processors. We can see that functionality in action in one of the sample projects: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transaction-kafka-samples: @Transactional @StreamListe...
codependent
1

votes
1

answer
1.7k

Views

Spark Dataframe to Kafka

I am trying to stream the Spark Dataframe to Kafka consumer. I am unable to do , Can you please advice me. I am able to pick the data from Kafka producer to Spark , and I have performed some manipulation, After manipulating the data , I am interested to stream it back to Kafka (Consumer).
1

votes
1

answer
926

Views

Spring kafka and Kafka Cluster

I've configured 3 kafka's in cluster and I'm trying to use with spring-kafka. But After I kill the kafka leader I'm not able to send other messages to queue. I'm setting the spring.kafka.bootstrap-servers property as: 'kafka-1:9092;kafka-2:9093,kafka-3:9094' and all of names in my hosts file. Kafka...
1

votes
1

answer
918

Views

Kafka producer difference between flush and poll

We have a Kafka consumer which will read messages and do so stuff and again publish to Kafka topic using below script producer config : { 'bootstrap.servers': 'localhost:9092' } I haven't configured any other configuration like queue.buffering.max.messages queue.buffering.max.ms batch.num.messages I...
shakeel
1

votes
1

answer
198

Views

What Happens when there is only one partition in Kafka topic and multiple consumers?

I have a Kafka Topic with only one partition and I am not getting what will happen in following cases? How messages will be delivered to consumers? If all consumers are in same group If all consumers are in different group I am not sure if consumers will receive unique messages or duplicate ones.
hard coder
1

votes
1

answer
218

Views

Kafka Connect JDBC Sink Connector

I am trying to write data from a topic (json data) into a MySql Database. I believe I want a JDBC Sink Connector. How do I configure the connector to map the json data in the topic to how to insert data into the database. The only documentation I can find is this. 'The sink connector requires knowl...
Chris
1

votes
4

answer
53

Views

Can't set Kafka retention policy to both compact and delete

According to the below links I'm supposed to be able to set the retention.policy value to 'compact,delete'. https://issues.apache.org/jira/browse/KAFKA-4015 https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist However when I try to alter the rete...
emirhosseini
1

votes
2

answer
41

Views

Exchange files (up to many GB)

For my project, I have to create a file manager which aims at storing many files (from many locations) and exposing URL to download them. In a micro-service ecosystem (I am used to use spring boot), I wonder what is the best way to exchange such files, I mean sending files to file manager? On a o...
OlivierTerrien
1

votes
2

answer
43

Views

Kafka consumer is reading last committed offset on re-start (Java)

I have a kakfa consumer for which enable.auto.commit is set to false. Whenever I re-start my consumer application, it always reads the last committed offset again and then the next offsets. For ex. Last committed offset is 50. When I restart consumer, it again reads offset 50 first and then the next...
svbp
1

votes
1

answer
148

Views

Spring boot Kafka doesn't work - consumer not receiving messages

I'm trying to run a simple Spring Boot Kafka application but I can't make it work. I've followed various tutorials, now I'm implementing this one, but when I start the application this is what happens: I can write in the console, but the consumer doesn't receive any message. This is my SpringApplica...
Dseaster
1

votes
2

answer
2.2k

Views

Error while creating topic in kafka

I am using kafka on window by Cygwin and trying to create a topic and getting the below error log4j:ERROR Could not read configuration file from URL [file:/cygdrive/d/kafka/bin/../config/tools-log4j.properties]. java.io.FileNotFoundException: \cygdrive\d\kafka\bin\..\config\tools-log4j.properties (...
1209
0

votes
0

answer
5

Views

Spring Kafka Error Handler how to avoid endless loop

I hope someone can give me a hint want I'm doing wrong here. I wrote a custom error handler for a batch listener that should seek behind the received records, send them to a dlq. I tired a lot but dont get it working. My current implementation will hang in a endless loop receiving the records over a...
xstring
1

votes
1

answer
2.7k

Views

Kafka - Idempotent producer in “exactly once delivery” semantic

From last version of Kafka (0.11.0.0) released the 28th of June 2017, the kafka team provided new features in order to support exactly once delivery. After I downloaded the latest version I tried configuring the Producer (executed through kafka-console-producer.sh script) as described in Producer co...
rh0x
1

votes
2

answer
860

Views

what is best practice to consume messages from multiple kafka topics?

I need to consumer messages from different kafka topics, Should i create different consumer instance per topic and then start a new processing thread as per the number of partition. or I should subscribe all topics from a single consumer instance and the should start different processing threads...
Megha
1

votes
2

answer
430

Views

Differences between zookeeper-server-start.sh and kafka-server-start.sh

Is one of them more recommended/preferred to use than the other?
Danny
1

votes
1

answer
412

Views

What Kafka broker metrics should be monitored if producer side ack lag is very high

Are there some broker metrics we can use to monitor Kafka broker if acknowledgment lag is very high in the producer side. We are using datadog to monitor producer and Kafka broker side. It can be seen that the producer ack lag is more than 10 secs. However, on the broker side, I feel like only usin...
Xiaohe Dong
1

votes
3

answer
821

Views

How can do Functional tests for Kafka Streams with Avro (schemaRegistry)?

A brief explanation of what I want to achieve: I want to do functional tests for a kafka stream topology (using TopologyTestDriver) for avro records. Issues: Can't 'mock' schemaRegistry to automate the schema publishing/reading What I tried so far is use MockSchemaRegistryClient to try to mock the...
Ramon jansen gomez
1

votes
2

answer
372

Views

Check for the existence of a Kafka topic programatically in Java

How can I know if a topic has been created in a Kafka cluster, programatically, without using CLI tools, and before trying to produce into the topic? I'm running into a problem where the topic doesn't exist, and our application is trying to produce to a non-existent topic, but it's only notified a...
mjuarez
1

votes
3

answer
900

Views

Kafka SSL handshake failed issue

I am trying to enable SSL Authentication on my Kafka server. I am following 7.2 section in below documentation(link). http://kafka.apache.org/documentation.html#security_ssl Followed all steps, but while calling the producer.bat file to send data in to the topic i get below error. ERROR [Producer cl...
Osman Jabri
1

votes
2

answer
180

Views

Kafka, will different partitions have the same offset number

I have one Kafka topic and five partitions for that one topic. There will be 5 consumer groups. Each consumer group has one service instances consuming from that topic. Will the offset be the same in each consumer for the same record in Kafka?
mrmannione
1

votes
2

answer
102

Views

Spring Kafka Listener on Http Request

I am using Spring and Kafka, I make an HTTP POST request as shown below and send some info to another service via a Kafka topic. @RequestMapping(method = RequestMethod.POST, value = '/portfolio') public void getPortfolio( Authentication auth, @RequestBody User user ) { //Data Transfer Object UserDTO...
Rory Harrison
1

votes
1

answer
66

Views

Can I set Kafka Stream consumer group.id?

I'm using Kafka Stream library for streaming application. I wanted to set kafka consumer group id. Then, I put Kafka stream configuration like below. streamsCopnfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 'JoinTestApp'); streamsCopnfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, 'JonTestClien...
Jeahyun Kim
1

votes
1

answer
42

Views

Kafka Streams group by and concatenation

I have a Kafka stream that receives records and I want to concatenate messages based on particular field. A message in a stream looks like following: Key: 2099 Payload{ email: [email protected] eventCode: 2099 } Expected Output: key: 2099 Payload{ emails: [email protected], [email protected], [email protected] }...
Nimo1981
1

votes
1

answer
39

Views

Listen to a same port via multiple Docker containers

I have brought up a Kafka messaging service where Kafka broker is setup in a docker and is on port 9092. Host port 9092 is mapped to the container port 9092. Now I am trying to bring up a consumer which listens to host port 9092 to consume messages. I'm trying to setup consumer container after all b...
Darshu Bc
0

votes
0

answer
2

Views

Should a Kafka Consumer be a part of Django Web Layer or a separate service?

I have a Django application which has a Kafka integration to process some orders. The topics on the Kafka queue are created dynamically so the consumers has also to be subscribed dynamically. Now when I initialise a consumer it goes to block the main thread so I have to start the consumer in a backg...
Paras
1

votes
0

answer
8

Views

CreateDirectStream with messages avro

In a first moment, I had to process the information from a text file: C1_4,C2_4,C1______10,01/12/2015,30/12/2015,123456789,S,12345 Now, I need to process the same information but in format avro. How can I do it ? Before I used this code: createDirectStream[String, String, StringDecoder, StringDecode...
user2140391
1

votes
4

answer
817

Views

How to check if Kafka Consumer is ready

I have Kafka commit policy set to latest and missing first few messages. If I give a sleep of 20 seconds before starting to send the messages to the input topic, everything is working as desired. I am not sure if the problem is with consumer taking long time for partition rebalancing. Is there a way...
Nagireddy Hanisha
1

votes
1

answer
206

Views

overloaded method value aggregate with alternatives

I have following function, that does not compile: private def save(pea: KStream[String, String]): Unit = { pea .groupByKey() .aggregate(() => '''{folder: ''}''', (_: String, _: String, value: String) => value, EventStoreTopology.Store) } the error message is: [error] [VR](x$1: org.apache.kafka.str...
zero_coding
1

votes
0

answer
102

Views

Kafka Streams DSL API has published message at consumer without processing

I tried to run the below program using kafka version 0.11.0.2. I complied it and created a runnable jar, which is run from the server where kafka is set up. The input published to kafka through producer/input topic is obtained as such at the consumer/output topic, without applying the processing log...
Keerthi
1

votes
0

answer
420

Views

How to use CustomJsonParser to parse json string in Spark Structured Streaming?

Instead of parsing whole JSON string, user will provide a CustomJsonParser to parse partial JSON string into CustomObject. How to use this CustomJsonParser to convert JSON string in Spark Structured Streaming instead of using from_json and get_json_object methods? Sample Code like this: val jsonDF =...
Casel Chen
1

votes
0

answer
308

Views

Robust web Socket Streaming to Kafka in Java

I need to record from an unreliable web socket connection and stream into Kafka. Our Kafka cluster is pretty reliable and we can make it highly available. What is the best approach to make the web socket connection as reliable as possible? I would like to minimize data loss. One solution would be...
Daniel
1

votes
1

answer
661

Views

Spring Boot: Rest endpoint integration with Kafka

Working on a rest endpoint which has to send a message to another service to process. It is a microservice architecture and all the services are connected via Kafka message broker. Spring supports @Async for asynchronous methods but it doesn't work as expected. Code is something like @RequestMapping...
Himanshu Yadav
1

votes
1

answer
378

Views

How to throttle message delivery to consumers?

I am looking for a message queue with an in-built throttling feature. Use case is that the recipient worker pool may accept a lot of messages but a service that workers depend on may not be able to handle the load. It's not possible to reduce the worker pool since the worker instances handle differe...
Juzer Ali
1

votes
2

answer
337

Views

Error rolling upgrade Kafka

I have installed Cloudera 3.0.0 so I have Apache Kafka version 0.11.0. I want to do a rolling upgrade from 0.11.0 to 1.0.0. I have read the documentation and I follow the next instructions: Repeat for each broker: 1.1.shut down the broker 1.2.update the code 1.2.1.Add to server.properties: inter.bro...
adamista
1

votes
1

answer
357

Views

Could not read data from kafka using pyspark

I've a streaming data in my kafka topic. I need to read this data from topic using pyspark inthe form of pyspark dataframe. But I'm continuously receiving error when I'm calling readStream function. The error is 'py4j.protocol.Py4JJavaError: An error occurred while calling o35.load'. My code is as f...
Nayana Madhu
1

votes
0

answer
179

Views

Spark streaming from Kafka returns result on local but not on YARN

I am using Cloudera's VM CDH 5.12, spark v1.6, kafka(installed by yum) v0.10 and python 2.66. I am following this link spark settings: Below is a simple spark application that I am running. It takes events from kafka and prints it after map reduce. from __future__ import print_function import sys fr...
Samhash
1

votes
1

answer
2.1k

Views

kafka-run-class error could not find or load main class

https://kafka.apache.org/10/documentation/streams/quickstart I'm trying to run my own application (on Linux) using Kafka Streams. I was able to successfully follow the instructions on their page and run the WordCountDemo application. Now I'm trying to use my own app (right now it is the same code, b...
practicemakesperfect
1

votes
0

answer
293

Views

Bridging Kafka and Akka Streams

Kafka Streams are only supported on the JVM platform. So I am looking for a way to leverage Akka Streams in C# to build a Kafka consumer with the benefit from Akka Streams. The Confluent C# Kafka consumer is based on polling and event handling, e.g. using (var consumer = new Consumer(consumerConfi...
carstenj
1

votes
0

answer
368

Views

Cannot deserialize data using apache avro

I have a spring boot application that sends and receives data from a kafka broker, i'm using apache avro as a SerDer. What I've done so far is generate the class using maven plugin, the schema is fairly simple: {'namespace': 'com.domain', 'type': 'record', 'name': 'User', 'fields': [ {'name': 'name...
Ouerghi Yassine
1

votes
1

answer
544

Views

How do I install mssql jdbc driver to confluent / kafka

I'm trying to create a connector through the rest and it won't work stating that 'Failed to find any class that implements Connector and which name matches com.microsoft.sqlserver.jdbc.SQLServerDriver I'm on ubuntu 16.04 and I've tried the following and each time i try something new i stop then sta...
matthewdaniel

View additional questions