Kafka Connect JDBC sink to Oracle task FAILED

Refresh

November 2018

Views

369 time

2

I am trying to sink data from Kafka avro topic to an existing Oracle database table I created beforehand. I run Kafka Connect in distributed mode (3 workers). When I submit a new connector through REST, it creates a connector, a new task, but the task immediately fails. Can't understand why? Below are the task error and my configs.

Task Error

{"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:457)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)","id":0,"worker_id":"kafka_host02:8083"}

Kafka Connect config

bootstrap.servers=kafka_host01:9092,kafka_host02:9092,kafka_host03:9092
group.id=connect-cluster-00
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://kafka_host01:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://kafka_host01:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
rest.port=8083

Connector submit command

curl -X POST -H "Content-Type: application/json" --data '{"name": "heat-predict-ora-sink", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"MY-TEST-02_topic", "connection.user":"scott", "connection.password":"tiger", "connection.url":"jdbc:oracle:thin:@orahost.localdomain:1521:orcl", "key.converter":"io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url":"http://localhost:8081", "value.converter":"io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url":"http://localhost:8081", "insert.mode":"insert", "batch.size":"0", "table.name.format":"TEST.MY_TABLE_IN", "pk.mode":"none", "pk.fields":"none" }}' http://localhost:8083/connectors

Please tell me any ideas why it behaves so?

2 answers

0

Наконец удалось записать в РСУБД, когда я позволил таблицу быть автоматически созданный пользователем Кафка Connect в своей собственной схеме. Странно, что имена полей в новой таблице, в кавычках, но это не пробка. Спасибо всем, кто пытался помочь с ответом!

1

На основе https://github.com/confluentinc/kafka-connect-jdbc/issues/221 я хотел бы предложить вам подключиться к Oracle в качестве пользователя схемы , к которой вы хотите записать данные, а не указыватьschema.table