If a Kafka message doesn’t have a key, the Couchbase sink connector uses a synthetic key consisting of the Kafka topic, partition, and offset; every Kafka message will generate a unique document in Couchbase. That’s probably why updates aren’t working – you need to assign a key to the Kafka records so Couchbase knows which document to update/delete.
The Couchbase Sink will delete a document when the Kafka record has a null value. To get that to work, you can use an SMT to transform the output of the JDBC connector so the Kafka record’s key is the ID of the document to delete, and the Kafka record’s value is null.
I tried adding the below content to the file “/u01/kafka/confluent-5.5.1/etc/schema-registry/connect-avro-standalonekey.properties” and tried starting the source connector. I also tried adding this content to my JDBC source connector properties file but no luck.
Note : The column with the name as “ID” is the primary key column in my oracle database (Source).
#Added the below for key testing
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
Queries : 1. Can you please tell me what am I missing here ? Anything else to be done apart from this.
2. I still dont see update and delete working on the target (couchbase). Instead of the document getting updated in couchbase, a document is getting created and also old document is existing there as well with no changes. It would help me atleast if update is working fine on the target.
3. Do I need to add these values in the sink connector as well?
Requesting your response. Thanks again for your help and time.
The column with the name as “ID” is the primary key column in my oracle database (Source).**
Maybe it’s a case sensitivity issue. Should it be ID instead of id in your transform definition?
If you have trouble assigning the message key using the transforms, you can have the sink assign the document ID instead using the couchbase.document.id sink config property. For more details, take another look at the “use a field of message as the document id” link in my previous post.
Do I need to add these values in the sink connector as well?
No, defining the transform in the source config should be sufficient. Only reason to modify the sink config would be if you want to define the couchbase.document.id sink config property.
I tried to change the case as suggested from “id” to “ID”, but still no luck with it. The behaviour is the same.
There is no difference in the output, it still give the same response with the below command :
kafka-console-consumer --bootstrap-server localhost:9092 --topic ANUTCUSERS1 --from-beginning
The given parameter helps in achieving the update functionality in couchbase instead of inserting a new document as before.
I am getting the document name as shown below (myDocumentIdPrefix::WQ==) and the ID as “WQ==”, but actually the ID is auto generated in the source (oracle database) and its a number.
Anything to be tweaked here ?
WQ== looks like a Base64 encoded value, which makes me suspect the ID field is being transported as a byte array (or at least, the Avro schema thinks it’s a byte array).
I guess I’d recommend investigating how your Avro schema is defined, or looking into whether the source might be generating a byte array (not a number value) for the ID field.