Hello experts,
I am looking for expert advice on one of the peculiar issue that we are facing with kafka-connect source connectors in our organisation.
To brief about the use-case, we store millions of created and updated events in CouchBase which are to be sent to another team via Kafka topics. We added a
kafka-source-connector to dump all of the events from CB to the topic.
The configurations are as follows :
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“couchbase.source.handler”: “com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler”,
“value.converter”: “org.apache.kafka.connect.converters.ByteArrayConverter”,
“couchbase.event.filter”: “com.couchbase.connect.kafka.filter.AllPassFilter”,
“couchbase.stream.from”: “SAVED_OFFSET_OR_BEGINNING”,
“couchbase.compression”: “ENABLED”,
“couchbase.flow.control.buffer”: “16m”,
“couchbase.persistence.polling.interval”: “100ms”
During our DR activity, we created another CB cluster and another kafka-connect cluster and replicated everything into it. But when we switched on the flow to the
secondary cluster, we found that all of the previously sent events replayed as well resulting in almost 32mil events sent to the kafka topic.
I am still wondering if the topic : offset.storage.topic was successfully replicated so why it didn’t continue from there and why the connector on restarting
jumped to the beginning.
Adding to this, we are also migrating to the upgraded cluster of CB 7.2.x version. Now we fear that whenever we’ll point our application to this new cluster
the same thing will happen. Only exception in this scenario will be that we won’t be migrating the kafka-connect cluster. Just we’ll have to update the
new CB cluster IPs and restart the connector.
We are scratching our head since weeks to understand the actual behavior of kafka-connect, the “couchbase.stream.from” properties and how the saved offsets
behave in these cases and infact how does the offsets are calculated.
Any expert advise/opinion/explanation is highly appreciated.
Without actually checking… my understanding is that the “saved offset” is per cluster. So there will be no saved offset for the new cluster and it will start from the beginning.
There is an additional subtlety for saved offset - that the offsets are also by partition (vbucket). So if there is no saved offset for a partition, changes for that partition will begin from beginning.
Ok. I get it. But for the new kafka-connect cluster, we did replicate everything from the older cluster : data from offset.storage.topic, config.storage.topic and status.storage.topic. So if the Couchbase replication is set and then the kafka-connect is replicated as well my first guess would be that is should resume from exact position in the new cluster but it is not happening.
Somehow I have a feeling after a log of testing similar scenarios that these offsets are very closely connected to the cluster and as soon as you change the cluster the significance of the offset is lost. As far as I have understood these offsets are created internal tracking the DCP stream’s position. DCP provides a way to stream changes (mutations, deletions, etc.) from Couchbase.
Additionally, I have seen that below mentioned steps result in perfect migration in case of source connectors:
1) Stop the application
2) Let the connector send all the existing events for a while. And as soon as the events reach 0 :
a) Stop the kafka source connector
b) Update the connector config to point to new CouchBase Cluster
c) update the stream from parameter as :
"couchbase.stream.from": "NOW"
3) Stop the replication from old CouchBase cluster to new CouchBase Cluster
4) Start the replication from new CouchBase cluster to old CouchBase cluster(just for the sake of backup, can be removed later)
5) Start the kafka source connector
6) Start the application
7) And once the traffic starts flowing after a while, update the stream from parameter as :
"couchbase.stream.from": "SAVED_OFFSET_OR_NOW"
Somehow I have a feeling after a log of testing similar scenarios that these offsets are very closely connected to the cluster
That’s what I meant by “the “saved offset” is per cluster”.
update the stream from parameter as : “couchbase.stream.from”: “NOW”
Right. Because “NOW” does not rely on saved offsets. And once there has been a DCP in every partition, all the partitions have saved offsets, and then you can go forward using saved offsets. However - if any of the partitions did not get a DCP operation - there will be no saved offset for that partition (the subtlety I mentioned) and that partition will get the “OR_BEGINNING” part of SAVED_OFFSET_OR_BEGINNING
Do you have a support ticket open? Maybe 61145?
What version of connector are you using? What version of connector was used on the original cluster?
Thanks a lot for explaining @mreiche
No. I haven’t created any support ticket for this.
On both of the clusters the version of connector is v4.1.1 .
And once there has been a DCP in every partition, all the partitions have saved offsets, and then you can go forward using saved offsets.
Ok. That explains it.
However, I also noticed one more thing that if you have two CouchBase clusters and have bi-directional replication set. And if we configure source kafka-connector with one cluster(say A) and then update the configuration of connector and switch to second cluster(say B). In this case even though DCP operation is already ongoing, what I’ve seen it the number of events being pushed into the topic.name do not match the events being sent to either CBs. Whenever there is cluster switch in connector and the configuration is not “NOW”, there will always be some extra events being dumped into topic.
The only reason of doing so many testing is that I do not want to miss a single event during migrations hence was trying with “SAVED_OFFSET_OR_NOW” or “SAVED_OFFSET_OR_BEGINNING”. But clearly the saved offsets do not behave in similar manner in case of cluster switch, even if the DCP operation is undergoing.
Yes. Because if it is BEGINNING or SAVED_OFFSET_OR_BEGINNING - it will be from the beginning. Because an offset from a different cluster does not match the new cluster - it has no saved offset and must go back to the beggining. (There may be some funkiness that it keeps trying to use that offset from the old cluster over and over. I’m investigating that). [Edit: it seems that SAVED_OFFSET_OR_NOW should not send extra events]
I think what you might need to do is (a) use a different name for the connector so it doesn’t try to use saved offsets from the old cluster; and (b) create a filter that discards DCP operations which occurred on the old cluster and where handled by the old connector.
[Edit: it seems that SAVED_OFFSET_OR_NOW should not send extra events]
As a matter of fact this does sends the extra events as well while switching the connector to and fro between two CB clusters. But very less as compared to the SAVED_OFFSET_OR_BEGINNING.
I assume mutation count is different in the master cluster and the replicated one which leads to this no matter for how long the replication is on. So the offsets either do not exist which making the switch or are inconsistent which is causing the extra events to send. So all of the events are not sent, but just quite a few are always sent in case of SAVED_OFFSET_OR_NOW or SAVED_OFFSET_OR_BEGINNING. More in the later config.
It appears that the only rescue here is use NOW and once the new cluster is set and events are flowing fine, switch back to SAVED_OFFSET_OR_BEGINNING or SAVED_OFFSET_OR_NOW to be resilient.
If you upgrade node-by-node (i.e. add a 7.2.x node to your cluster, then rebalance, then remove an old node from your cluster, then rebalance, repeat until all your nodes are 7.2.x) then the vb uids do not change and the kafka saved offsets will remain valid. https://docs.couchbase.com/server/current/install/upgrade-cluster-online.html
Can you please check this part once? How does this work internally?
As in testing these scenarios, I’ve seen that if bi-directional replication is set for a long period of time and we try to switch the connector config from one cluster to another and then back to the older one I’ve seen extra events being dumped into the topic with either of the configs : SAVED_OFFSET_OR_NOW and SAVED_OFFSET_OR_BEGINNING.
I’m trying to understand the behaviour not just for the cluster upgrade part but for migrations as well including the DR activities.