I am wanting to have the jdbc sink connector automatically delete documents as the documents are deleted from couchbase.
how will the couchbase source connector represent these data changes?
I am wanting to have the jdbc sink connector automatically delete documents as the documents are deleted from couchbase.
how will the couchbase source connector represent these data changes?
Hi Naftali,
When a document is deleted, the Couchbase source connector publishes a tombstone record – a record with no value.
The JDBC sink connector can delete a row when it see one of these tombstones. See Delete Mode for how to enable this feature.
Thanks,
David
Heh @david.nault
As it turns out I’m not able to leverage the auto delete functionality provided by the two side of the pipeline (couchbase source, jdbc sink).
I need to configure another couchbase source connector and filter for document deletes only.
In a perfect world, there is some combination of
“couchbase.custom.filter.field”:“/type”,
“couchbase.custom.filter.values”:“delete-test”
I can use for that but I doubt it. Am I wrong?
Otherwise, I have written a custom SourceHandler and also a Custom Filter before and I could do it again. Would you have a tip as to I go about it?
The goal is simple, I need only document deletes to go into the default kafka topic for the connector. So the filter logic is really simple.
UPDATE!
I think I got it.
There is a DropIfNullValue
provided transform to filter out nulls values.
All I need is to do is flip the logic from return record.value() == null ? null : record;
to return record.value() != null ? null : record;
Rename, Compile and install and I should be good to go.
Makes sense @david.nault ?
Heh, @david.nault I need a little help if you can. Unfortunately my solution above – to only pass through record.value() == null
records in order to route deletes to their own topic – isn’t quite working.
The problem is that down stream kafka tooling (ksqldb) treats these null value record as tombstones (which makes sense) where I as I want to route them and store their occurrence in the Data Warehouse. (Think a soft delete use case)
What I want to do is detect that that record.value()
is null and then put some sort of dummy value their so that it get’s treated as a record in the down stream processes.
Something like this:
public R apply(R record) {
if(record.value() == null) {
return record.newRecord(record.topic(), record.kafkaPartition(),
record.keySchema(), record.key(), record.keySchema(), record.key(), record.timestamp());
} else {
return null;
}
//return record.value() != null ? null : record;
}
The problem is the connector fails downstream because Invalid schema type for ByteArrayConverter
which makes sense since I just put the key schema and key value in where the record.value items should be.
What’s the minimal thing I can put there?
It’s hard to say without understanding the whole system end-to end, but some candidates might be a zero-length string, a zero-length byte array, or if it needs to be JSON then a JSON null
, ""
, {}
, or []
.