Kafka connector for publishing messages to multiple topics

Hello Team,
I’m using Kafka couchbase source connector 4.1.11 and couchbase version 7.
I’m trying to publish messages from Couchbase bucket to individual kafka topic. its successful.
When I’m trying to publish to multiple topics it’s failing. I’ve pasted the config, can you please check and let me know accordingly

Config file as below

Arbitrary unique name for the connector. Attempting to register

two connectors with the same name will fail.

name=test-couchbase-source

The Java class for the connector.

connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector

The maximum number of tasks that should be created for this connector.

tasks.max=2

Publish to this Kafka topic.

couchbase.topics=KafkaExample5,KafkaExample6

#couchbase.topic=KafkaExample5

#topics=KafkaExample5,KafkaExample6

Connect to this Couchbase cluster (comma-separated list of bootstrap nodes).

couchbase.seed.nodes=127.0.0.1

couchbase.bootstrap.timeout=10s

If connecting to Capella (or any Couchbase cluster that requires TLS),

enable TLS by uncommenting the following line:

couchbase.enable.tls=true

Unless you’re connecting to Capella, enabling TLS also requires specifying

the Certificate Authority certificate(s) to trust.

The simplest way is to supply a filesystem path to a PEM file:

couchbase.trust.certificate.path=/path/to/my-root-certificate.pem

Alternatively, you can specify the filesystem path to a Java keystore file

containing the certificate(s) to trust, and the password for the keystore.

If the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable is set,

it will override the password specified here.

couchbase.trust.store.path=/path/to/keystore

couchbase.trust.store.password=secret

Read from this Couchbase bucket using these credentials.

If the KAFKA_COUCHBASE_PASSWORD environment variable is set,

it will override the password specified here.

#couchbase.bucket=default

couchbase.bucket=tstb

#couchbase.scope=default

#couchbase.collections=default.profile

#couchbase.collection.to.topic:default.profile=KafkaExample5

couchbase.username=

couchbase.password=

Keys of published messages are just Strings with no schema.

key.converter=org.apache.kafka.connect.storage.StringConverter

A “source handler” converts the Couchbase document into a Kafka record.

This quickstart config uses “RawJsonSourceHandler” which creates a Kafka

record whose content is exactly the same as the Couchbase JSON document.

When using RawJsonSourceHandler (or its cousin RawJsonWithMetadataSourceHandler)

the value converter must be ByteArrayConverter… unless you’re using

Single Message Transforms. See the quickstart documentation for more details.

couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler

#couchbase.source.handler=/Users/NSI29/kafka-connect-couchbase/examples/custom-extensions/src/main/java/com/couchbase/connect/kafka/Sample_connector/CustomSourceHandler1.java

#couchbase.source.handler=com.couchbase.connect.kafka.handler.source.CustomSourceHandler1

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Control which Couchbase document change notifications get published to Kafka

using this Java class, which must implement com.couchbase.connect.kafka.filter.Filter.

couchbase.event.filter=com.couchbase.connect.kafka.filter.AllPassFilter

#couchbase.event.filter=/Users/NSI29/kafka-connect-couchbase/examples/custom-extensions/src/main/java/com/couchbase/connect/kafka/Sample_connector/CustomSourceHandler1.java

#couchbase.event.filter=com.couchbase.connect.kafka.filter.CustomFilter1

Specifies when in Couchbase history the connector should start streaming from.

Modes starting with “SAVED_OFFSET” tell the connector to resume from when each

vBucket’s state was most recently saved by the Kafka Connect framework, falling back

to the secondary mode if no saved state exists for a vBucket.

couchbase.stream.from=SAVED_OFFSET_OR_BEGINNING

#couchbase.stream.from=SAVED_OFFSET_OR_NOW

couchbase.stream.from=BEGINNING

#couchbase.stream.from=NOW

To reduce bandwidth usage, Couchbase Server 5.5 and later can send documents to the connector in compressed form.

(Messages are always published to the Kafka topic in uncompressed form, regardless of this setting.)

If the requested mode is not supported by your version of Couchbase Server, compression will be disabled.

ENABLED - (default) Couchbase Server decides whether to use compression

on a per-document basis, depending on whether the compressed form of the

document is readily available. Select this mode to prioritize Couchbase Server

performance and reduced bandwidth usage (recommended).

Requires Couchbase Server 5.5 or later.

DISABLED - No compression. Select this mode to prioritize reduced CPU load for the Kafka connector.

FORCED - Compression is used for every document, unless compressed size is greater than uncompressed size.

Select this mode to prioritize bandwidth usage reduction above all else.

Requires Couchbase Server 5.5 or later.

couchbase.compression=ENABLED

#couchbase.compression=DISABLED

#couchbase.compression=FORCED

The flow control buffer limits how much data Couchbase will send before waiting for the connector to acknowledge

the data has been processed. See the connector documentation for details on how this affects connector memory usage.

couchbase.flow.control.buffer=16m

In some failover scenarios, Couchbase Server may roll back (undo) database

changes that have not yet been persisted across all replicas. By default,

the Kafka connector will poll Couchbase Server and defer event publication

until the change has been persisted to all replicas in the cluster,

at which time the change is unlikely to be rolled back. This feature

introduces some latency, and increases connector memory usage and network

traffic, but prevents rolled-back changes from appearing in the Kafka topic.

The longer the polling interval, the larger the flow control buffer required

in order to maintain steady throughput.

If instead you wish to publish events immediately, set the polling interval to 0.

If you do, be aware that when rollbacks occur you may end up with events

in the Kafka topic from an “alternate timeline” in Couchbase Server’s history.

If the source is an ephemeral bucket (which never persists documents)

this value must be set to 0 to disable the persistence check.

couchbase.persistence.polling.interval=100ms

Set this to true to log document lifecycle milestones at INFO level instead of DEBUG.

Lets you watch how documents flow through the connector.

couchbase.log.document.lifecycle=true

If your custom Filter or SourceHandler ignores certain Couchbase events,

and you have a low-traffic deployment where it’s possible a non-ignored message

might not arrive for some time, you can prevent spurious rollbacks to zero

by specifying a “black hole” topic.

For each ignored event, a tiny placeholder record is published to this topic

so Kafka Connect is informed of the source offset of the ignored event.

Because the contents of the record are not important, you can minimize storage

requirements by setting this topic’s cleanup.policy to compact.

#couchbase.black.hole.topic=

Thanks,
Naveen

Hi @naveensrinevas17,

It was successful while publishing to single topic from couchbase document but it’s not working while trying to publish multiple topics

If I understand correctly, you want the source connector to publish the same message to more than one topic? I’m afraid that’s not supported.

TIP: the next time you need to share some code or bits of a config file on the forum, try putting 3 backticks (```) on the lines before and after the code/config. This formats the code/config so it’s more readable.

Thanks,
David

hi @david.nault
Thanks for response. I’m mentioning couchbase kafka source connector
Yes, can’t we do some custom config to enable to publish message to multiple topics based on a key, we have use case that has to be done.
I’ve tried based on a document that has specified as below

if we need to publish messages to multiple topics from a single Couchbase Kafka Source Connector instance, you would typically set the configuration property couchbase.topic to a comma-separated list of topics.

Hi @naveensrinevas17 ,

The Couchbase Kafka source connector definitely does not support publishing the same message to more than one topic. As far as I know, it’s not possible for us to do this and retain the “at least once” delivery guarantee.

Are you familiar with another source connector that supports publishing to multiple topics? If so, maybe we could take a look and learn from it.

In the meantime, it might be possible to do what you want by publishing the message to a single topic, and then processing that topic using KSQL or Kafka Streams.

Thanks,
David

Thanks @david.nault , can try on that option.
can you please let me know if there are ways that can be used to filter documents based on event type.

There’s a Filter interface you can implement by writing custom code.

If you want to do something like publish to a different topic based on the content of the document, you’ll need to write a custom SinkHandler that inspects the document and sets the destination topic.

Another solution, if it makes sense for your data model, would be to put the different types of documents in different collections, and configure the connector to stream from only the collections you’re interested in.

Thanks,
David

Thanks @david.nault
I was looking on collection options as well as it can fix the problem checking docs based on event type but it will add the complexity on producers to configurations for writing to collection.
I will explore filter option as you had mentioned

As for requirements,
We are trying to store events on couchbase and use kafka connector to publish messages to Kafka.
Kafka connector works or reads message from dcp queue.
What happens if dcp queue goes down with replication enabled as well.
Can connector read the events and publish to kafka with out any loss of events once queue is back.
Assume like node failure on couchbase.

Can connector read the events and publish to kafka with out any loss of events once queue is back.

Yes. For more details, see Delivery Guarantees | Couchbase Docs

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.