Source connector in failing state

We are running a distributed connector cluster which was 3.3.2 and decided to upgrade it to 3.4.2. First, we shut down the entire cluster and upgraded one of the SDKs. Then, we started one of the nodes and checked the rest api, the existing connector job is now in a failed state:
{
name: “couchbase-event-connector”,
connector: {
state: “FAILED”,
trace: "org.apache.kafka.connect.errors.ConnectException: Cannot fetch configuration for bucket test at com.couchbase.connect.kafka.CouchbaseSourceConnector.start(CouchbaseSourceConnector.java:60) at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111) at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136) at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:195) at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:241) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:916) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:111) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:673) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$11.call(DistributedHerder.java:659) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:271) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:220) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ",
worker_id: “redacted:8083”
},
tasks: [
{
state: “FAILED”,
trace: "java.lang.NoSuchMethodError: com.couchbase.client.dcp.DefaultConnectionNameGenerator.forProduct(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/couchbase/client/dcp/DefaultConnectionNameGenerator; at com.couchbase.connect.kafka.CouchbaseReader.(CouchbaseReader.java:72) at com.couchbase.connect.kafka.CouchbaseSourceTask.start(CouchbaseSourceTask.java:124) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ",
id: 0,
worker_id: “redacted:8083”
},
{
state: “FAILED”,
trace: "java.lang.NoSuchMethodError: com.couchbase.client.dcp.DefaultConnectionNameGenerator.forProduct(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/couchbase/client/dcp/DefaultConnectionNameGenerator; at com.couchbase.connect.kafka.CouchbaseReader.(CouchbaseReader.java:72) at com.couchbase.connect.kafka.CouchbaseSourceTask.start(CouchbaseSourceTask.java:124) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ",
id: 1,
worker_id: “redacted:8083”
},
{
state: “FAILED”,
trace: "java.lang.NoSuchMethodError: com.couchbase.client.dcp.DefaultConnectionNameGenerator.forProduct(Ljava/lang/String;Ljava/lang/String;[Ljava/lang/String;)Lcom/couchbase/client/dcp/DefaultConnectionNameGenerator; at com.couchbase.connect.kafka.CouchbaseReader.(CouchbaseReader.java:72) at com.couchbase.connect.kafka.CouchbaseSourceTask.start(CouchbaseSourceTask.java:124) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) ",
id: 2,
worker_id: “redacted:8083”
}
],
type: “source”
}

Restarting the connector does not make the error go away nor does creating a new connector job. The connector was running fine and publishing messages prior to the upgrade so I doubt it could be authentication issue and I have double checked the settings. Can anybody tell me what is wrong here?

Hi Wen,

It looks like the previous version of the connector was not completely uninstalled, and is conflicting with the more recent version.

Thanks,
David

Hi @david.nault, thank you for your reply. I have a few questions:

  1. Is there a way to upgrade a distributed connector cluster without downtime? (i.e. using rolling upgrade)
  2. How do you properly “install” the SDK? what I did was move the new SDKs into kafka/libs directory and move the old SDK out. Is that the proper way to install it?
  3. Kafka-couchbase-connect-3.4.2 uses Java SDK 2.7.2 and DCP 0.21.0. How do I include these SDKs? What I did was download them from https://docs.couchbase.com/java-sdk/2.7/sdk-release-notes.html#version-2-7-2-5-december-2018 and Releases · couchbase/java-dcp-client · GitHub. Also replaced them in the kafka/libs directory.

Best regrards,
Wen Tat

Hi Wen,

  1. Is there a way to upgrade a distributed connector cluster without downtime? (i.e. using rolling upgrade)

Not that I’m aware of. If you have a relationship with your Kafka vendor, they may have a better answer.

  1. How do you properly “install” the SDK? what I did was move the new SDKs into kafka/libs directory and move the old SDK out. Is that the proper way to install it?

The preferred way to install the connector is via the plugin path. Detailed instructions here: https://docs.confluent.io/current/connect/userguide.html#connect-installing-plugins

  1. Kafka-couchbase-connect-3.4.2 uses Java SDK 2.7.2 and DCP 0.21.0. How do I include these SDKs? What I did was download them from https://docs.couchbase.com/java-sdk/2.7/sdk-release-notes.html#version-2-7-2-5-december-2018 and Releases · couchbase/java-dcp-client · GitHub. Also replaced them in the kafka/libs directory.

The Couchbase SDK and DCP client are bundled into the kafka-connect-couchbase JAR. You do not need to install them separately. Try removing them from the kafka/libs directory.

Thanks,
David

Hi @david.nault,

Thank you for your replies. After excluding the dependencies, it is working fine now.

As for rolling upgrade, I have a cluster both running old version 3.4.2 and 3.3.4, it doesn’t seem to throw any error. I guess it should be fine

Hi Wen,

I’m glad you got the connector working!

Depending on your workload, running two versions of the connector alongside each other might be fine. However, you should exercise caution if you sometimes modify the same Couchbase document multiple times within a short time window.

For example, let’s say you modify document A twice, so you have versions A1 and A2. If the new connector writes both A1 and A2, and then the old connector writes A1 but you shut it down before it has a chance to write A2, then you’ll end up with the wrong version of the document (you’ll have A1 when you should have A2). You’ll recover from this situation the next time document A is modified, since the new connector will write A3. Depending on your own specific use case, that may or may not be tolerable.

Thanks,
David

Hi David,

Thanks for sharing that concern. In your scenario, isn’t A2 still written by the new connector to the topic? My scenario is having 2 different connector “nodes” with different versions running in the same distributed cluster writing to the same topic rather than 2 different distributed clusters.

In any case, we have logic at the kafka consumer end to take the latest update, but will we miss any update if we perform rolling upgrade? (i.e. shut down 1 node, upgrade and rejoin cluster, shut down 2nd node and so on)

Oh, sorry! I misunderstood how you are planning to do the rolling update.

Your rolling upgrade strategy sounds reasonable, and I’m not aware of a reason it wouldn’t be safe. Might be a good idea to try it in a dev environment first, of course.

Doing a rolling upgrade from 3.x to 4.x might present more of a challenge, since some of the config key names have changed in 4.0. The upgrade from 3.3.2 and 3.4.2 is easier since both versions use the same config keys, and the new version should be able to read the existing config.

Thanks,
David

Hi David,

In our case, we are going for 3.4.2 to stay compatible with our current Couchbase 5.1.1 and our future upgrade to enterprise 6.1.1. We will be testing the upgrade on lower environments with a load test to see if any updates are missed. Thank you, you have been helpful!