I am looking into the latest Kafka Connect Couchbase, version 3.1.1…
In the documentation, everything is described based on using Confluent distribution, but it is mentionned that it is possible to use stock kafka distribution, which is what I have for now…
And I am wondering what I need to use as key and value converter? I would like to use Avro as in the example, but not rely on the schema registry, is there a default avro converter provided?
We are planning to update the documentation to cover non-confluent distributions. I’m not sure about working around Schema Registry, I’ll get back to you.
Couchbase Kafka connector does not ship any connector for Kafka, it uses org.apache.kafka.connect.source.SourceRecord, which encoded by the Kafka Connect means, so if you want to write object into topic in avro format, you have to use your own converter or take one from Confluent.
For the moment I used:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
But I ran into some issues with OoM Error as per below:
[2017-02-24 18:26:09,921] INFO Finished WorkerSourceTask{id=kafka-connect-couchbase-0} commitOffsets successfully in 714 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
[2017-02-24 18:26:09,923] ERROR Task kafka-connect-couchbase-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
java.lang.OutOfMemoryError: Java heap space
at com.fasterxml.jackson.core.util.ByteArrayBuilder.toByteArray(ByteArrayBuilder.java:118)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:2957)
at org.apache.kafka.connect.json.JsonSerializer.serialize(JsonSerializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:183)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:160)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-02-24 18:26:09,928] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143)
[2017-02-24 18:26:09,930] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:685)
[2017-02-24 18:26:13,584] INFO Finished WorkerSourceTask{id=kafka-connect-couchbase-1} commitOffsets successfully in 735 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
I have some documents bigger than the 1M max bytes so I updated this value to 10M by modifying:
in the couchbase-source.properties:
producer.max.request.size=10485760
in the kafka brokers:
message.max.bytes=10485760
replica.fetch.max.bytes=10485760
and I am also running with 2 max tasks in the couchbase-source.properties…
We have also set the environment variable KAFKA_HEAP_OPTS to -Xms512m -Xmx512m
Is there any other property I can modify to limit the memory usage to avoid OoM? Number of messages prefetch from DCP or similar?