Fail to produce message to topic from couchbase kafka connector

Hi,

I am trying to setup a full couchbase kafka connector on docker, I manage to receive data from couchbase, but not producing message to the couchbase.topic

here is my docker file :

FROM confluentinc/cp-kafka-connect:5.2.7

RUN confluent-hub install --no-prompt couchbase/kafka-connect-couchbase:4.0.6

COPY connect-distributed.properties /etc/kafka/

CMD ["sh", "-c", "connect-distributed /etc/kafka/connect-distributed.properties"]

Here is my couchbase-source.json

{
  "name": "vistest-couchbase-source",
  "config": {
    "name": "vistest-couchbase-source",
    "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
    "tasks.max": "2",
    "couchbase.topic": "vistest-cb-visite",
    "couchbase.seed.nodes": "*****",
    "couchbase.bootstrap.timeout": "10s",
    "couchbase.bucket": "vistest-visite",
    "couchbase.username": "Administrator",
    "couchbase.password": "*****",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
    "couchbase.stream.from": "BEGINNING", //(NOW doesn't work either)
    "couchbase.compression": "ENABLED",
    "couchbase.flow.control.buffer": "128m",
    "couchbase.persistence.polling.interval": "100ms",
    "couchbase.log.document.lifecycle": "true",
    "couchbase.log.redaction": "FULL",
    "couchbase.black.hole.topic": "vistest-cb-visite-blackhole"
  }
}

And here is my connect-distributed.properties

bootstrap.servers=*****
security.protocol=SSL

group.id=vistest-connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=vistest-connect-offsets
offset.storage.replication.factor=1

config.storage.topic=vistest-connect-configs
config.storage.replication.factor=1

status.storage.topic=vistest-connect-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components

I have a bunch of messages coming from couchbase like

2024-12-12T13:13:03.928466629Z [2024-12-12 13:13:03,928] INFO {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":52697,"documentId":"_default._default._sync:unusedSeq:6434914","connectTaskId":"?","revision":3431544,"type":"deletion","partition":485,"sequenceNumber":18338,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":305937970568} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)

But none of them are produced to my vistest-cb-visite topic

  • kafka version on MSK : 2.2.1
  • So confluent version : 5.2.7
  • and couchbase/kafka-connect-couchbase : v4.0.6 (from the compatibility matrix)

I only have those errors in logs, but I can’t find if it’s related or not :

[2024-12-12 13:15:32,150] ERROR WorkerSourceTask{id=vistest-couchbase-source-1} Failed to flush, timed out while waiting for producer to flush outstanding 1 messages (org.apache.kafka.connect.runtime.WorkerSourceTask:438)
2024-12-12T13:15:32.151196761Z [2024-12-12 13:15:32,151] ERROR WorkerSourceTask{id=vistest-couchbase-source-1} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:111)
2024-12-12T13:15:32.151242840Z [2024-12-12 13:15:32,151] INFO WorkerSourceTask{id=vistest-couchbase-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
2024-12-12T13:15:32.151284192Z [2024-12-12 13:15:32,151] INFO WorkerSourceTask{id=vistest-couchbase-source-0} flushing 1 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)

EDIT:

when triggering GET http://localhost:8083/connectors/vistest-couchbase-source/status

I have this error for task 1:

org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback
	at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:258)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:312)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic vistest-cb-visite not present in metadata after 60000 ms.

and this one for task 2:

org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is already flushing
	at org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:111)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:428)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:250)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

If you have any idea, I would be glad to hear it.

Thanks

I will answer my own question :

I had to add the property : producer.security.protocol=SSL

It appears that the producer was not able to connect to the broker because of the security protocol. It was able to read, with the security.protocol=SSL but not to produce. (a bit weird, but it’s working now)

I hope it will help someone else.

1 Like