Hi folks,
I’m using couchbase as a Source Connector to send messages to a kafka topic and I’m facing an issue when I work along side a jdbcsink connector and schema registry configurations on converters.
This is my configuration:
{
"name": "couchbase-source-connector",
"config": {
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"couchbase.persistence.polling.interval": "100ms",
"tasks.max": "2",
"couchbase.seed.nodes": "couchbasedb",
"couchbase.collections": "art-school-scope.student-record-collection",
"couchbase.bucket": "student-bucket",
"couchbase.username": "username",
"couchbase.password": "password",
"couchbase.stream.from": "NOW",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"couchbase.topic": "student_art_topic6"
}
}
This how message looks on Kafka console producer. So far so good at this point.
{"name": "Estudent Test","date_of_birth": "2001-08-10"}
The problem comes on schema-registry. I registered the follow schema
kafka-json-schema-console-producer --bootstrap-server IP_HERE:9092 \
> --property schema.registry.url=http://IP_HERE:8081 --topic student_art_topic6 \
> --property value.schema='{"type":"object", "properties":{"name":{"type":"string"},"date_of_birth":{"type":"string"} }}'
Then I put on the same structure as I add documents on Couchbase then the message looks like this:
{"name":"Test Number 3","date_of_birth":"2001-08-10"}
The unique diference that I noticed is the blanks between the keys and values on message. But in the end it’s a JSON as the previous one.
Then I register my JdbcSink, there are no problem only with second message that I added when I registered the schema on Schema Registry. The problem appear when the message is added when a new document it’s created from Couchbase Server.
I also share my config on jdbc sink
{
"name": "mysql-sink",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "student_art_topic6",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.username": "*",
"connection.password": "*",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
I’m quite new at this topic and I don’t catch it up why doesn’t work even it both messages are on json format. The only error that I see is the follow that checked it out on other forums but the solutions don’t work for my configuration.
2024-08-10 10:12:18 org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic student_art_topic6:
2024-08-10 10:12:18 at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
2024-08-10 10:12:18 at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
2024-08-10 10:12:18 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
2024-08-10 10:12:18 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
2024-08-10 10:12:18 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
2024-08-10 10:12:18 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
2024-08-10 10:12:18 at java.base/java.lang.Thread.run(Thread.java:1583)
2024-08-10 10:12:18 Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
2024-08-10 10:12:18 at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)
2024-08-10 10:12:18 at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)
2024-08-10 10:12:18 at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)
2024-08-10 10:12:18 at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)
2024-08-10 10:12:18 ... 17 more
2024-08-10 10:12:18 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2024-08-10 10:12:18 at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:635)
2024-08-10 10:12:18 at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)
2024-08-10 10:12:18 ... 20 more
Any help or advice would be helpful to workaround trough this situation.
Best regards,
Rigoberto Calderon