Advise to reprocess a specific list of documents

Hi team,

I have the following pipeline:

Couchbase → Kafka Source Connector → Kafka Topic → Elasticsearch Sink Connector

In my sink connector, I am using a dead letter queue as there are various scenarios in which my downstream system or pre-processing can fail(ie: mapping) but I do not wish to fail the entire task of my Sink Connector when that happens

The challenge, however, is that this makes it challenging to safely reprocess these documents for various reasons:

  1. Reading and sending the document stored in the DLQ as is does not seem safe because concurrent updates are possible
  2. Querying Couchbase(in a separate process) while the source connector is running may lead to overwrites/concurrent modifications as the document can be updated and the source may push an update

With that context, what do you suggest to reprocess a specific list of documents by the source connector?

One design that I am considering consists on externalizing the list of documents to process(ie: in a file or another system like Redis), create a Couchbase Source Connector that streams using SAVED_OFFSET_OR_BEGINNING,
and create a custom Filter that drops all records that do not meet that criteria. This design may work but fails in a few scenarios:

  • Reprocessing the same document is not possible without re-resetting the offset of the source connector and/or recreating it with a different name
  • Seems like an overkill to stream the entire history to only then select a few records
  • I am unsure if there can be conflicts between two Couchbase source connectors publishing to the same topic

Any suggestions would be appreciated

How about inserting the documents (or just the document ids) in a separate collection and (re)processing that collection with a Couchbase Source Connector?

1 Like

Hi @mreiche, thank you for the response

I am not sure I understand what you mean.

Do you mean to:

  1. Query the original Couchbase collection with the given list of documents and insert them to a separate bucket in Couchbase
  2. Create a Source Connector that reads from that new collection?

If so, I am not sure how that would solve the problem of concurrent updates to the documents

When this happens…

In my sink connector, I am using a dead letter queue as there are various scenarios in which my downstream system or pre-processing can fail(ie: mapping)

insert the document into a separate (reprocess) collection. Then, on that collection…

create a Couchbase Source Connector that streams using SAVED_OFFSET_OR_BEGINNING

1 Like

Thank you for the suggestion

I think that I can implement something similar without Couchbase as I can read from the DLQ directly and publish to my sink and/or to the topic the original source is publishing to.

The problem of this solution is that it assumes that the document in the dead letter queue will be the latest version of the document, but that is not always true. In fact, the only scenario where this is true is when the record fails 100% of the time (ie: some sort of mapping problem)

Other examples that can cause failures include:

  • Temporary downstream instability while there are concurrent updates in Couchbase
  • Temporary mapping problems that impact specific sections of the document that can be removed in subsequent updates

I suppose that I could implement logic to check if the document is the latest version or not (using a timestamp or similar) but that would need making a call to Couchbase to verify that

@david.nault Could you please share your input here?

HI @yeikel,

Is your goal to re-trigger processing the latest version of the document in Couchbase? If so, perhaps you could use a Couchbase SDK (for the language of your choice!) to “tickle” the document you want to reprocess.

Here’s what that might look like with the Couchbase Java SDK:

/**
 * Modifies an extended attribute (XATTR) on the document.
 * This is sufficient to trigger a DCP mutation event.
 * 
 * @throws DocumentNotFoundException if document does not exist
 */
public static void tickle(
  Collection collection,
  String documentId
) {
  collection.mutateIn(
    documentId,
    List.of(MutateInSpec.increment("tickle", 1).xattr()),
    MutateInOptions.mutateInOptions()
      .preserveExpiry(true)
  );
}

CAVEAT: If the document was removed, there is no way to reprocess it using this strategy.

Out of curiosity, what requirements guided your decision to use the Couchbase Kafka connector instead of the Couchbase Elasticsearch connector?

Thanks,
David

1 Like

Is your goal to re-trigger processing the latest version of the document in Couchbase? If so, perhaps you could use a Couchbase SDK (for the language of your choice!) to “tickle” the document you want to reprocess

Yes, this is exactly what we are looking for. We just need the latest version of the document and deletions are excluded for now. Thank you!

Out of curiosity, what requirements guided your decision to use the Couchbase Kafka connector instead of the Couchbase Elasticsearch connector?

Here are the reasons:

  1. We have an existing Kafka Connect pipeline reading from the bucket and publishing it to Kafka for another consumer(the consumer then writes it to a file). We thought it was convenient to tap into that data and use the sink connector supported by Confluent to then publish it to Elasticsearch.

  2. It seems that running the “Couchbase Elasticsearch connector” needs a separate infra/maintenance as well as expertise to run it. As a side note, we also do not have access to “Consul” although it is possible that “distributed mode” is enough for us

  3. Please correct me if I am wrong, but it seems that the Couchbase Elasticsearch Connector streams the data as is and does not allow any mutation/transformation to the data before it is sent to Elasticsearch. As part of our pipeline, we map and transform the Couchbase data(with a SMT) and the document that reaches Elasticsearch is different* to what we have stored in Couchbase.

Please let me know what you think and thank you again!

1 Like

One note on #3 is that Elasticsearch supports ingest pipelines for modifying documents before they are indexed.

Based on #1 and #2, I think you made a reasonable choice :slight_smile:

Thanks,
David

1 Like

One note on #3 is that Elasticsearch supports ingest pipelines for modifying documents before they are indexed

Thank you for the suggestion.

Based on the documentation, it seems that we would need to roll our own ingestion plugin to support the type of transformations that we do (the built-in transformations are not enough) and it also seems that maintenance of custom plugins in Elasticsearch involves cluster restarts which is not ideal given the frequency at which we make changes to our data model/transformations. Redeploying the Kafka Connect Cluster with the SMT changes is more manageable as it is fully independent