Couchbase Kafka Connector for Collections

Can we use Couchbase Kafka Connector for collecting data from Couchbase Collections to Kafka.

Hi Naveen - here is a good place to start Introduction | Couchbase Docs

Thanks, I’ll check and have found a way using couchbase.collection.to.topic.
As we had planned to persists events in Couchbase and pass them to Kafka
I have few more questions

  • Is there an index to get the events in data order?
  • How can you have jobs shared between multiple workers, some sort of hashing or something?
  • How do we ensure that the events are processed in order?
  • How do we ensure events aren’t processed multiple times?
  • How can you have jobs shared between multiple workers, some sort of hashing or something?

Each connector task is assigned a different set of Couchbase partitions to handle.

For the answers to your other questions, please see:
https://docs.couchbase.com/kafka-connector/current/delivery-guarantees.html

Thanks @david.nault for your valuable input

Hi @david.nault
I’m trying to perform pop for my client,
I’ve got couchbase kafka source connector 4.1.11, couchbase 7.0, confluent kafka versionv3.16.0.
It was successful while publishing to single topic from couchbase document but it’s not working while trying to publish multiple topics, attached config file as below.

Arbitrary unique name for the connector. Attempting to register

two connectors with the same name will fail.

name=test-couchbase-source

The Java class for the connector.

connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector

The maximum number of tasks that should be created for this connector.

tasks.max=2

Publish to this Kafka topic.

couchbase.topics=KafkaExample5,KafkaExample6

#couchbase.topic=KafkaExample5

#topics=KafkaExample5,KafkaExample6

Connect to this Couchbase cluster (comma-separated list of bootstrap nodes).

couchbase.seed.nodes=127.0.0.1

couchbase.bootstrap.timeout=10s

If connecting to Capella (or any Couchbase cluster that requires TLS),

enable TLS by uncommenting the following line:

couchbase.enable.tls=true

Unless you’re connecting to Capella, enabling TLS also requires specifying

the Certificate Authority certificate(s) to trust.

The simplest way is to supply a filesystem path to a PEM file:

couchbase.trust.certificate.path=/path/to/my-root-certificate.pem

Alternatively, you can specify the filesystem path to a Java keystore file

containing the certificate(s) to trust, and the password for the keystore.

If the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable is set,

it will override the password specified here.

couchbase.trust.store.path=/path/to/keystore

couchbase.trust.store.password=secret

Read from this Couchbase bucket using these credentials.

If the KAFKA_COUCHBASE_PASSWORD environment variable is set,

it will override the password specified here.

#couchbase.bucket=default

couchbase.bucket=tstb

#couchbase.scope=default

#couchbase.collections=default.profile

#couchbase.collection.to.topic:default.profile=KafkaExample5

couchbase.username=

couchbase.password=

Keys of published messages are just Strings with no schema.

key.converter=org.apache.kafka.connect.storage.StringConverter

A “source handler” converts the Couchbase document into a Kafka record.

This quickstart config uses “RawJsonSourceHandler” which creates a Kafka

record whose content is exactly the same as the Couchbase JSON document.

When using RawJsonSourceHandler (or its cousin RawJsonWithMetadataSourceHandler)

the value converter must be ByteArrayConverter… unless you’re using

Single Message Transforms. See the quickstart documentation for more details.

couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler

#couchbase.source.handler=/Sample_connector/CustomSourceHandler1.java

#couchbase.source.handler=com.couchbase.connect.kafka.handler.source.CustomSourceHandler1

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Control which Couchbase document change notifications get published to Kafka

using this Java class, which must implement com.couchbase.connect.kafka.filter.Filter.

couchbase.event.filter=com.couchbase.connect.kafka.filter.AllPassFilter

#couchbase.event.filter=/path/Sample_connector/CustomSourceHandler1.java

#couchbase.event.filter=com.couchbase.connect.kafka.filter.CustomFilter1

Specifies when in Couchbase history the connector should start streaming from.

Modes starting with “SAVED_OFFSET” tell the connector to resume from when each

vBucket’s state was most recently saved by the Kafka Connect framework, falling back

to the secondary mode if no saved state exists for a vBucket.

couchbase.stream.from=SAVED_OFFSET_OR_BEGINNING

#couchbase.stream.from=SAVED_OFFSET_OR_NOW

couchbase.stream.from=BEGINNING

#couchbase.stream.from=NOW

To reduce bandwidth usage, Couchbase Server 5.5 and later can send documents to the connector in compressed form.

(Messages are always published to the Kafka topic in uncompressed form, regardless of this setting.)

If the requested mode is not supported by your version of Couchbase Server, compression will be disabled.

ENABLED - (default) Couchbase Server decides whether to use compression

on a per-document basis, depending on whether the compressed form of the

document is readily available. Select this mode to prioritize Couchbase Server

performance and reduced bandwidth usage (recommended).

Requires Couchbase Server 5.5 or later.

DISABLED - No compression. Select this mode to prioritize reduced CPU load for the Kafka connector.

FORCED - Compression is used for every document, unless compressed size is greater than uncompressed size.

Select this mode to prioritize bandwidth usage reduction above all else.

Requires Couchbase Server 5.5 or later.

couchbase.compression=ENABLED

#couchbase.compression=DISABLED

#couchbase.compression=FORCED

The flow control buffer limits how much data Couchbase will send before waiting for the connector to acknowledge

the data has been processed. See the connector documentation for details on how this affects connector memory usage.

couchbase.flow.control.buffer=16m

In some failover scenarios, Couchbase Server may roll back (undo) database

changes that have not yet been persisted across all replicas. By default,

the Kafka connector will poll Couchbase Server and defer event publication

until the change has been persisted to all replicas in the cluster,

at which time the change is unlikely to be rolled back. This feature

introduces some latency, and increases connector memory usage and network

traffic, but prevents rolled-back changes from appearing in the Kafka topic.

The longer the polling interval, the larger the flow control buffer required

in order to maintain steady throughput.

If instead you wish to publish events immediately, set the polling interval to 0.

If you do, be aware that when rollbacks occur you may end up with events

in the Kafka topic from an “alternate timeline” in Couchbase Server’s history.

If the source is an ephemeral bucket (which never persists documents)

this value must be set to 0 to disable the persistence check.

couchbase.persistence.polling.interval=100ms

Set this to true to log document lifecycle milestones at INFO level instead of DEBUG.

Lets you watch how documents flow through the connector.

couchbase.log.document.lifecycle=true

If your custom Filter or SourceHandler ignores certain Couchbase events,

and you have a low-traffic deployment where it’s possible a non-ignored message

might not arrive for some time, you can prevent spurious rollbacks to zero

by specifying a “black hole” topic.

For each ignored event, a tiny placeholder record is published to this topic

so Kafka Connect is informed of the source offset of the ignored event.

Because the contents of the record are not important, you can minimize storage

requirements by setting this topic’s cleanup.policy to compact.

#couchbase.black.hole.topic=

Response over here in the new topic: Kafka connector for publishing messages to multiple topics

Thanks for starting the new topic, by the way :slight_smile:

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.