How are deleted documents represented by the kafka source connector

I am wanting to have the jdbc sink connector automatically delete documents as the documents are deleted from couchbase.

how will the couchbase source connector represent these data changes?

Hi Naftali,

When a document is deleted, the Couchbase source connector publishes a tombstone record – a record with no value.

The JDBC sink connector can delete a row when it see one of these tombstones. See Delete Mode for how to enable this feature.

Thanks,
David

1 Like

Heh @david.nault

As it turns out I’m not able to leverage the auto delete functionality provided by the two side of the pipeline (couchbase source, jdbc sink).

I need to configure another couchbase source connector and filter for document deletes only.

In a perfect world, there is some combination of
“couchbase.custom.filter.field”:“/type”,
“couchbase.custom.filter.values”:“delete-test”

I can use for that but I doubt it. Am I wrong?

Otherwise, I have written a custom SourceHandler and also a Custom Filter before and I could do it again. Would you have a tip as to I go about it?

The goal is simple, I need only document deletes to go into the default kafka topic for the connector. So the filter logic is really simple.

UPDATE!

I think I got it.

There is a DropIfNullValue provided transform to filter out nulls values.

All I need is to do is flip the logic from return record.value() == null ? null : record;
to return record.value() != null ? null : record;

Rename, Compile and install and I should be good to go.

Makes sense @david.nault ?

Hi @naftali,

That sounds like an elegant solution. Nice! :slight_smile:

Thanks,
David

Heh, @david.nault I need a little help if you can. Unfortunately my solution above – to only pass through record.value() == null records in order to route deletes to their own topic – isn’t quite working.

The problem is that down stream kafka tooling (ksqldb) treats these null value record as tombstones (which makes sense) where I as I want to route them and store their occurrence in the Data Warehouse. (Think a soft delete use case)

What I want to do is detect that that record.value() is null and then put some sort of dummy value their so that it get’s treated as a record in the down stream processes.

Something like this:

  public R apply(R record) {
    if(record.value() == null) {
      return record.newRecord(record.topic(), record.kafkaPartition(), 
      record.keySchema(), record.key(), record.keySchema(), record.key(), record.timestamp());
    } else {
      return null;
    }
    //return record.value() != null ? null : record;
  }

The problem is the connector fails downstream because Invalid schema type for ByteArrayConverter which makes sense since I just put the key schema and key value in where the record.value items should be.

What’s the minimal thing I can put there?

It’s hard to say without understanding the whole system end-to end, but some candidates might be a zero-length string, a zero-length byte array, or if it needs to be JSON then a JSON null, "", {}, or [].