We are using Couchbase Kafka source connector (v4.1.1) with custom filter. After the connector restarts, rejected events (I mean returned false from the custom filter), comes again.
I think there is an inconsistency between the seq number in Kafka committed offset and couchbase. According to this blog post: Couchbase DCP Rollback and How QA Tests Them | The Couchbase Blog. Because rejected events don’t send to Kafka, so there will be no committed offset in Kafka for rejected ones. We do not want to process previously rejected events after the restart.
Is this default behavior or are we missing something?
Example log:
Starting to Stream for 1 partitions
Stopping stream for 1 partitions
Received rollback for vbucket 581 to seqno 0
{"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":6,"documentId":"_default._default.12345678","connectTaskId":"0","revision":6,"type":"mutation","partition":469,"sequenceNumber":6,"sizeInBytes":4297,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":256087724954}
{"milestone":"SKIPPED_BECAUSE_FILTER_SAYS_IGNORE","tracingToken":6,"documentId":"_default._default.12345678"}
Poll returns 0 result(s) (filtered out 1)
-- restart
same logs comes again
{
"name": "kafka-connector",
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"tasks.max": "1",
"couchbase.topic": "topic_name",
"couchbase.seed.nodes": "...",
"couchbase.bootstrap.timeout": "5s",
"couchbase.persistence.polling.interval": "100ms",
"couchbase.compression": "ENABLED",
"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"couchbase.event.filter": "our custom filter class path",
"couchbase.log.redaction": "PARTIAL",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "dropIfNullValue,deserializeJson",
"transforms.dropIfNullValue.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
"transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
"couchbase.connector.name.in.offsets" : "true"
}