Hello Team,
I’m using Kafka couchbase source connector 4.1.11 and couchbase version 7.
I’m trying to publish messages from Couchbase bucket to individual kafka topic. its successful.
When I’m trying to publish to multiple topics it’s failing. I’ve pasted the config, can you please check and let me know accordingly
Config file as below
Arbitrary unique name for the connector. Attempting to register
two connectors with the same name will fail.
name=test-couchbase-source
The Java class for the connector.
connector.class=com.couchbase.connect.kafka.CouchbaseSourceConnector
The maximum number of tasks that should be created for this connector.
tasks.max=2
Publish to this Kafka topic.
couchbase.topics=KafkaExample5,KafkaExample6
#couchbase.topic=KafkaExample5
#topics=KafkaExample5,KafkaExample6
Connect to this Couchbase cluster (comma-separated list of bootstrap nodes).
couchbase.seed.nodes=127.0.0.1
couchbase.bootstrap.timeout=10s
If connecting to Capella (or any Couchbase cluster that requires TLS),
enable TLS by uncommenting the following line:
couchbase.enable.tls=true
Unless you’re connecting to Capella, enabling TLS also requires specifying
the Certificate Authority certificate(s) to trust.
The simplest way is to supply a filesystem path to a PEM file:
couchbase.trust.certificate.path=/path/to/my-root-certificate.pem
Alternatively, you can specify the filesystem path to a Java keystore file
containing the certificate(s) to trust, and the password for the keystore.
If the KAFKA_COUCHBASE_TRUST_STORE_PASSWORD environment variable is set,
it will override the password specified here.
couchbase.trust.store.path=/path/to/keystore
couchbase.trust.store.password=secret
Read from this Couchbase bucket using these credentials.
If the KAFKA_COUCHBASE_PASSWORD environment variable is set,
it will override the password specified here.
#couchbase.bucket=default
couchbase.bucket=tstb
#couchbase.scope=default
#couchbase.collections=default.profile
#couchbase.collection.to.topic:default.profile=KafkaExample5
couchbase.username=
couchbase.password=
Keys of published messages are just Strings with no schema.
key.converter=org.apache.kafka.connect.storage.StringConverter
A “source handler” converts the Couchbase document into a Kafka record.
This quickstart config uses “RawJsonSourceHandler” which creates a Kafka
record whose content is exactly the same as the Couchbase JSON document.
When using RawJsonSourceHandler (or its cousin RawJsonWithMetadataSourceHandler)
the value converter must be ByteArrayConverter… unless you’re using
Single Message Transforms. See the quickstart documentation for more details.
couchbase.source.handler=com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler
#couchbase.source.handler=/Users/NSI29/kafka-connect-couchbase/examples/custom-extensions/src/main/java/com/couchbase/connect/kafka/Sample_connector/CustomSourceHandler1.java
#couchbase.source.handler=com.couchbase.connect.kafka.handler.source.CustomSourceHandler1
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Control which Couchbase document change notifications get published to Kafka
using this Java class, which must implement com.couchbase.connect.kafka.filter.Filter.
couchbase.event.filter=com.couchbase.connect.kafka.filter.AllPassFilter
#couchbase.event.filter=/Users/NSI29/kafka-connect-couchbase/examples/custom-extensions/src/main/java/com/couchbase/connect/kafka/Sample_connector/CustomSourceHandler1.java
#couchbase.event.filter=com.couchbase.connect.kafka.filter.CustomFilter1
Specifies when in Couchbase history the connector should start streaming from.
Modes starting with “SAVED_OFFSET” tell the connector to resume from when each
vBucket’s state was most recently saved by the Kafka Connect framework, falling back
to the secondary mode if no saved state exists for a vBucket.
couchbase.stream.from=SAVED_OFFSET_OR_BEGINNING
#couchbase.stream.from=SAVED_OFFSET_OR_NOW
couchbase.stream.from=BEGINNING
#couchbase.stream.from=NOW
To reduce bandwidth usage, Couchbase Server 5.5 and later can send documents to the connector in compressed form.
(Messages are always published to the Kafka topic in uncompressed form, regardless of this setting.)
If the requested mode is not supported by your version of Couchbase Server, compression will be disabled.
ENABLED - (default) Couchbase Server decides whether to use compression
on a per-document basis, depending on whether the compressed form of the
document is readily available. Select this mode to prioritize Couchbase Server
performance and reduced bandwidth usage (recommended).
Requires Couchbase Server 5.5 or later.
DISABLED - No compression. Select this mode to prioritize reduced CPU load for the Kafka connector.
FORCED - Compression is used for every document, unless compressed size is greater than uncompressed size.
Select this mode to prioritize bandwidth usage reduction above all else.
Requires Couchbase Server 5.5 or later.
couchbase.compression=ENABLED
#couchbase.compression=DISABLED
#couchbase.compression=FORCED
The flow control buffer limits how much data Couchbase will send before waiting for the connector to acknowledge
the data has been processed. See the connector documentation for details on how this affects connector memory usage.
couchbase.flow.control.buffer=16m
In some failover scenarios, Couchbase Server may roll back (undo) database
changes that have not yet been persisted across all replicas. By default,
the Kafka connector will poll Couchbase Server and defer event publication
until the change has been persisted to all replicas in the cluster,
at which time the change is unlikely to be rolled back. This feature
introduces some latency, and increases connector memory usage and network
traffic, but prevents rolled-back changes from appearing in the Kafka topic.
The longer the polling interval, the larger the flow control buffer required
in order to maintain steady throughput.
If instead you wish to publish events immediately, set the polling interval to 0
.
If you do, be aware that when rollbacks occur you may end up with events
in the Kafka topic from an “alternate timeline” in Couchbase Server’s history.
If the source is an ephemeral bucket (which never persists documents)
this value must be set to 0
to disable the persistence check.
couchbase.persistence.polling.interval=100ms
Set this to true to log document lifecycle milestones at INFO level instead of DEBUG.
Lets you watch how documents flow through the connector.
couchbase.log.document.lifecycle=true
If your custom Filter or SourceHandler ignores certain Couchbase events,
and you have a low-traffic deployment where it’s possible a non-ignored message
might not arrive for some time, you can prevent spurious rollbacks to zero
by specifying a “black hole” topic.
For each ignored event, a tiny placeholder record is published to this topic
so Kafka Connect is informed of the source offset of the ignored event.
Because the contents of the record are not important, you can minimize storage
requirements by setting this topic’s cleanup.policy
to compact
.
#couchbase.black.hole.topic=
Thanks,
Naveen