Can you provide the code for A,B and C?
I attempted to reproduce with the Kafka 4.1.15-SNAPSHOT against an 8.0.0 dev server. I added logging of expiration - it remains 0.
TransactionResult result = cluster.transactions().run((ctx) -> {
ctx.insert(collection, "k", JsonObject.create().put(id, "k"));
TransactionGetResult docA = ctx.get(collection, id);
ctx.remove(docA);
});
collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().preserveExpiry(true));
[2023-11-30 11:38:09,486] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":21,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":40,"type":"mutation","partition":34,"sequenceNumber":3339,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":49139,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,487] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":22,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":41,"type":"deletion","partition":34,"sequenceNumber":3341,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":14619,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,588] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":23,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":42,"type":"deletion","partition":34,"sequenceNumber":3343,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":101725,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,588] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":24,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":41,"type":"mutation","partition":34,"sequenceNumber":3345,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":92878,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,589] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":25,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":42,"type":"mutation","partition":34,"sequenceNumber":3347,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":90208,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,589] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":26,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":43,"type":"mutation","partition":34,"sequenceNumber":3348,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":76642,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
Or maybe the insert/delete were in separate transactions…
TransactionResult result1 = cluster.transactions().run((ctx) -> {
ctx.insert(collection, "k", JsonObject.create().put(id, "k"));
});
Thread.sleep(2000); // to delineate the two transactions
TransactionResult result2 = cluster.transactions().run((ctx) -> {
TransactionGetResult docA = ctx.get(collection, id);
ctx.remove(docA);
});
collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().preserveExpiry(true));
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":67,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:client-record","connectTaskId":"1","revision":4,"type":"mutation","partition":62,"sequenceNumber":3310,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":32404,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":68,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":61,"type":"mutation","partition":34,"sequenceNumber":3417,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":92960,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":69,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":63,"type":"deletion","partition":34,"sequenceNumber":3419,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":55408,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":70,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":62,"type":"mutation","partition":34,"sequenceNumber":3421,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":43939,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":71,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":64,"type":"mutation","partition":34,"sequenceNumber":3423,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":32815,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":72,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":63,"type":"mutation","partition":34,"sequenceNumber":3425,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":26458,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,443] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":73,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":64,"type":"mutation","partition":34,"sequenceNumber":3427,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":17425,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,444] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":74,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":65,"type":"mutation","partition":34,"sequenceNumber":3429,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":14951,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":75,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":65,"type":"mutation","partition":34,"sequenceNumber":3431,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":13199,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":76,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":66,"type":"deletion","partition":34,"sequenceNumber":3433,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":7104,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":77,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":66,"type":"mutation","partition":34,"sequenceNumber":3435,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":5531,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,546] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":78,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":67,"type":"mutation","partition":34,"sequenceNumber":3436,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":94603,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
Just to make sure the logging for expiry is working…
collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().expiry(Duration.ofSeconds(1)));
[2023-11-30 12:12:27,969] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":92,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":74,"type":"mutation","partition":34,"sequenceNumber":3459,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":89698,"expiration":1701375148,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)