Hi,
I am trying to setup a full couchbase kafka connector on docker, I manage to receive data from couchbase, but not producing message to the couchbase.topic
here is my docker file :
FROM confluentinc/cp-kafka-connect:5.2.7
RUN confluent-hub install --no-prompt couchbase/kafka-connect-couchbase:4.0.6
COPY connect-distributed.properties /etc/kafka/
CMD ["sh", "-c", "connect-distributed /etc/kafka/connect-distributed.properties"]
Here is my couchbase-source.json
{
"name": "vistest-couchbase-source",
"config": {
"name": "vistest-couchbase-source",
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"tasks.max": "2",
"couchbase.topic": "vistest-cb-visite",
"couchbase.seed.nodes": "*****",
"couchbase.bootstrap.timeout": "10s",
"couchbase.bucket": "vistest-visite",
"couchbase.username": "Administrator",
"couchbase.password": "*****",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
"couchbase.stream.from": "BEGINNING", //(NOW doesn't work either)
"couchbase.compression": "ENABLED",
"couchbase.flow.control.buffer": "128m",
"couchbase.persistence.polling.interval": "100ms",
"couchbase.log.document.lifecycle": "true",
"couchbase.log.redaction": "FULL",
"couchbase.black.hole.topic": "vistest-cb-visite-blackhole"
}
}
And here is my connect-distributed.properties
bootstrap.servers=*****
security.protocol=SSL
group.id=vistest-connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=vistest-connect-offsets
offset.storage.replication.factor=1
config.storage.topic=vistest-connect-configs
config.storage.replication.factor=1
status.storage.topic=vistest-connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components
I have a bunch of messages coming from couchbase like
2024-12-12T13:13:03.928466629Z [2024-12-12 13:13:03,928] INFO {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":52697,"documentId":"_default._default._sync:unusedSeq:6434914","connectTaskId":"?","revision":3431544,"type":"deletion","partition":485,"sequenceNumber":18338,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":305937970568} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
But none of them are produced to my vistest-cb-visite
topic
- kafka version on MSK : 2.2.1
- So confluent version : 5.2.7
- and couchbase/kafka-connect-couchbase : v4.0.6 (from the compatibility matrix)
I only have those errors in logs, but I can’t find if it’s related or not :
[2024-12-12 13:15:32,150] ERROR WorkerSourceTask{id=vistest-couchbase-source-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:438)
2024-12-12T13:15:32.151196761Z [2024-12-12 13:15:32,151] ERROR WorkerSourceTask{id=vistest-couchbase-source-1} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)
2024-12-12T13:15:32.151242840Z [2024-12-12 13:15:32,151] INFO WorkerSourceTask{id=vistest-couchbase-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
2024-12-12T13:15:32.151284192Z [2024-12-12 13:15:32,151] INFO WorkerSourceTask{id=vistest-couchbase-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
EDIT:
when triggering GET http://localhost:8083/connectors/vistest-couchbase-source/status
I have this error for task 1:
org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:258)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:312)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic vistest-cb-visite not present in metadata after 60000 ms.
and this one for task 2:
org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:428)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:250)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
If you have any idea, I would be glad to hear it.
Thanks