Couchbase Kafka Source Connector with Custom Schema

Hi everyone,

I am using com.couchbase.client:kafka-connect-couchbase:3.4.8 with Couchbase version 6.*. I want to populate the topic with the Avro schema registered for that topic in the schema registry.

I have tried multiple ways, but the config is not working. It is always using the default schema, which is not valid for my use.

If you have any samples that show how to integrate with the schema registry with custom schemas, please share them.

Thanks in advance.

The Config used :

{
“name”: “events-source-connector”,
“config”: {
“name”: “events-source-connector”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“tasks.max”: “2”,
“topic.name”: “bttv-intent-to-play-events”,
“connection.cluster_address”: “events-db”,
“connection.timeout.ms”: “10000”,
“connection.bucket”: “myBucket”,
“connection.username”: “user”,
“connection.password”: “drowssap”,
“couchbase.event_types”: “CREATE,UPDATE”,
“use_snapshots”: “false”,
“schema.url”: “http://schema-registry:8081”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“value.converter.format”: “AVRO”,
“value.converter.schemas.enable”: “true”,
“value.converter.auto.register.schemas”: “false”,
“value.converter.connect.meta.data”: “false”,
“value.converter.enhanced.avro.schema.support”: “true”,
“internal.key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“internal.value.converter”: “io.confluent.connect.avro.AvroConverter”,
“internal.value.converter.schema.registry.url”: “http://schema-registry:8081”,
“internal.key.converter.schemas.enable”: “false”,
“internal.value.converter.schemas.enable”: “false”,
“dcp.message.converter.class”: “com.couchbase.connect.kafka.converter.SchemaConverter”,
“offset.storage.file.filename”: “/tmp/connect.offsets”,
“offset.flush.interval.ms”: “10000”,
“couchbase.stream.from”: “NOW”,
“couchbase.filter.type”: “DOCUMENTS_ONLY”,
“couchbase.compression”: “ENABLED”,
“couchbase.flow_control_buffer”: “128m”,
“couchbase.persistence_polling_interval”: “100ms”,
“errors.tolerance”: “all”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,
“couchbase.log.redaction”: “FULL”
}
}

Hi @dasarivinod ,

The Couchbase connector’s built-in source handlers do not support custom Avro schemas. You would need to write a custom SourceHandler that inspects the document and applies the appropriate Avro schema.

My personal recommendation would be to use the default built-in RawJsonSourceHandler to get the schema-less JSON into a Kafka topic, and then use a separate processing stage (maybe Kafka Streams?) to transform the JSON into Avro+schema.

David

1 Like

Hi @david.nault ,
Thanks for the quick response.
I tried with writing my own custom handler, But the connector is giving error.
connect | [2023-05-02 11:22:51,300] ERROR Error encountered in task events-source-connector-0. Executing stage ‘VALUE_CONVERTER’ with class ‘io.confluent.connect.avro.AvroConverter’, where source record is = SourceRecord{sourcePartition={bucket=myBucket, partition=484}, sourceOffset={vbuuid=278397975133430, collectionsManifestUid=0, snapshotEndSeqno=13, bySeqno=13, snapshotStartSeqno=12}} ConnectRecord{topic=‘bttv-intent-to-play-events’, kafkaPartition=null, key=validDocument-3, keySchema=Schema{STRING}, value=Struct{correlationId=mutation}, valueSchema=Schema{com.couchbase.connect.kafka.handler.source:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter)
connect | org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic bttv-intent-to-play-events :
connect | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)

Can you share an example of the customer schema source handlers?

Hi @dasarivinod,

I’m afraid we don’t have any examples of custom source handlers that use a custom schema.

"internal.key.converter": "org.apache.kafka.connect.storage.StringConverter",
"internal.value.converter": "io.confluent.connect.avro.AvroConverter",
"internal.value.converter.schema.registry.url": "http://schema-registry:8081",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false",

Those lines looks suspicious to me. I would recommend removing these properties for the internal key and value converters. These converters are used for things like serializing the connector offsets; I suspect this might be the cause of the error you’re seeing.

See: KIP-174 - Deprecate and remove internal converter configs in WorkerConfig - Apache Kafka - Apache Software Foundation

Thanks,
David

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.