Properly Configuring Kafka Connect S3 Sink TimeBasedPartitioner

Refresh

November 2018

Views

1.6k time

4

I am trying to use the TimeBasedPartitioner of the Confluent S3 sink. Here is my config:

{  
"name":"s3-sink",
"config":{  
    "connector.class":"io.confluent.connect.s3.S3SinkConnector",
    "tasks.max":"1",
    "file":"test.sink.txt",
    "topics":"xxxxx",
    "s3.region":"yyyyyy",
    "s3.bucket.name":"zzzzzzz",
    "s3.part.size":"5242880",
    "flush.size":"1000",
    "storage.class":"io.confluent.connect.s3.storage.S3Storage",
    "format.class":"io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class":"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "timestamp.extractor":"Record",
    "timestamp.field":"local_timestamp",
    "path.format":"YYYY-MM-dd-HH",
    "partition.duration.ms":"3600000",
    "schema.compatibility":"NONE"
}

}

The data is binary and I use an avro scheme for it. I would want to use the actual record field "local_timestamp" which is a UNIX timestamp to partition the data, say into hourly files.

I start the connector with the usual REST API call

curl -X POST -H "Content-Type: application/json" --data @s3-config.json http://localhost:8083/connectors

Unfortunately the data is not partitioned as I wish. I also tried to remove the flush size because this might interfere. But then I got the error

{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nMissing required configuration \"flush.size\" which has no default value.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}%

Any idea how to properly set the TimeBasedPartioner? I could not find a working example.

Also how can one debug such a problem or gain further insight what the connector is actually doing?

Greatly appreciate any help or further suggestions.

2 answers

2

Действительно исправленную конфигурация кажется правильным.

В частности, установка timestamp.extractorв RecordFieldпозволяет разбивать файлы , основываясь на поле метки времени , что ваши записи должны и которые вы определить, установив свойство timestamp.field.

Когда вместо того, чтобы один устанавливает timestamp.extractor=Record, то время на основе разметка будет использовать метки Кафка для каждой записи.

Что касается flush.size, установка этого свойства на высокое значение (например Integer.MAX_VALUE) будет практически синонимами игнорировать его.

Не , наконец, schema.generator.classбольше не требуется в самых последних версиях разъема.

4

После изучения кода на TimeBasedPartitioner.java и журналы с

confluent log connect tail -f

Я понял , что и часовой пояс и локаль являются обязательными, хотя это не указано в качестве таковых в Confluent S3 Connector документации. Следующие поля конфигурации решить эту проблему и дайте мне загрузить записи должным образом разбиты на разделы для S3 ведра:

"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"timestamp.extractor": "RecordField",
"timestamp.field": "local_timestamp",

Обратите внимание, еще две вещи: во-первых значение для flush.size также необходимо, файлы разбиты в конечном счете на более мелкие куски, не больше, чем указано flush.size. Во-вторых, path.format лучше выбран, как показано выше, таким образом формируется правильная структура дерева.

Я до сих пор не 100% уверен , что если на самом деле запись поле local_timestamp используется для разделения записей.

Любые комментарии или улучшения очень приветствуются.