Couchbase Kafka - JdbcSink

Hi folks,

I’m using couchbase as a Source Connector to send messages to a kafka topic and I’m facing an issue when I work along side a jdbcsink connector and schema registry configurations on converters.

This is my configuration:

{
    "name": "couchbase-source-connector",
    "config": {
        "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
        "couchbase.persistence.polling.interval": "100ms",
        "tasks.max": "2",
        "couchbase.seed.nodes": "couchbasedb",
        "couchbase.collections": "art-school-scope.student-record-collection",
        "couchbase.bucket": "student-bucket",
        "couchbase.username": "username",
        "couchbase.password": "password",
        "couchbase.stream.from": "NOW",
        "couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
        "couchbase.topic": "student_art_topic6"
    }
}

This how message looks on Kafka console producer. So far so good at this point.

{"name": "Estudent Test","date_of_birth": "2001-08-10"}

The problem comes on schema-registry. I registered the follow schema

kafka-json-schema-console-producer --bootstrap-server IP_HERE:9092 \
> --property schema.registry.url=http://IP_HERE:8081 --topic student_art_topic6 \
> --property value.schema='{"type":"object", "properties":{"name":{"type":"string"},"date_of_birth":{"type":"string"} }}'

Then I put on the same structure as I add documents on Couchbase then the message looks like this:

{"name":"Test Number 3","date_of_birth":"2001-08-10"}

The unique diference that I noticed is the blanks between the keys and values on message. But in the end it’s a JSON as the previous one.

Then I register my JdbcSink, there are no problem only with second message that I added when I registered the schema on Schema Registry. The problem appear when the message is added when a new document it’s created from Couchbase Server.

I also share my config on jdbc sink

{
    "name": "mysql-sink",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "student_art_topic6",
        "connection.url": "jdbc:mysql://mysql:3306/inventory",
        "connection.username": "*",
        "connection.password": "*",
        "auto.create": "true",
        "insert.mode": "insert",
        "pk.mode": "none",
        "value.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "true",
        "errors.tolerance": "all",
        "errors.log.enable": true,
        "errors.log.include.messages": true
    }
}

I’m quite new at this topic and I don’t catch it up why doesn’t work even it both messages are on json format. The only error that I see is the follow that checked it out on other forums but the solutions don’t work for my configuration.

2024-08-10 10:12:18 org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic student_art_topic6: 
2024-08-10 10:12:18     at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
2024-08-10 10:12:18     at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
2024-08-10 10:12:18     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
2024-08-10 10:12:18     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
2024-08-10 10:12:18     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
2024-08-10 10:12:18     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
2024-08-10 10:12:18     at java.base/java.lang.Thread.run(Thread.java:1583)
2024-08-10 10:12:18 Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
2024-08-10 10:12:18     at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)
2024-08-10 10:12:18     at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)
2024-08-10 10:12:18     at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)
2024-08-10 10:12:18     at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)
2024-08-10 10:12:18     ... 17 more
2024-08-10 10:12:18 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2024-08-10 10:12:18     at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:635)
2024-08-10 10:12:18     at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)
2024-08-10 10:12:18     ... 20 more

Any help or advice would be helpful to workaround trough this situation.

Best regards,
Rigoberto Calderon

Hi @Rigoberto12 ,

I suspect the problem may be that the Couchbase connector is not emitting documents in a schema-aware format.

From your config, it looks like you’re using com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler, which does not emit schemas.

Out of the box, the connector supports a single schema, the one used by DefaultSchemaSourceHandler. That schema is documented here: Quickstart | Couchbase Docs

Frankly, that schema is not very useful, because it says nothing about the structure of your documents; instead, it applies a schema to the document metadata, and treats the document content as a blob.

In order to use a real custom schema, you would need to write your own custom SourceHandler that builds the source record in a schema-aware manner. Unfortunately, we don’t have a great example of how to do this. My impression is that your custom SourceHandler would first need to parse the document JSON to ensure it conforms to your schema, then add each document field to the Kafka Connect SourceRecord one by one, specifying the appropriate schema information for each field. It’s a real pain in the neck, and in many cases not worth the trouble, in my opinion.

If you want to try it anyway, there’s a sample project you can use to get started here: kafka-connect-couchbase/examples/custom-extensions at master · couchbase/kafka-connect-couchbase · GitHub

Another alternative might be to let the Couchbase connector publish the raw document JSON, and then use a different component to transform the record by applying the schema. Maybe a Single Message Transform (SMT) could do this, or a solution based of Kafka Streams. (I’m just guessing about this, though.)

Thanks,
David

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.