Transaction - Upsert scenario

Hi Guys,
I am playing with Transaction API with reactive nature. During this, i have a scenario for upserting a document. for example: I am trying to insert an document, if that document is already available i want to replace it or vice versa i have document and trying to replace but that document is not even available in bucket. How do we handle this scenario.
Below is the code i am trying. So basically i want to handle this document not exists or document already exists at code level and make sure transaction is moving forward with out any issue.

Transactions transactions = null;
	try {
		transactions = Transactions.create(cluster,
				TransactionConfigBuilder.create().durabilityLevel(TransactionDurabilityLevel.PERSIST_TO_MAJORITY)
						.expirationTime(Duration.ofMinutes(1)).build());
		ReactiveCollection reactiveCollection = cluster.bucket("XXXX").reactive().defaultCollection();
     Map<String,JsonObject> errorDocs=new HashMap<String, JsonObject>();
		TransactionResult result = transactions.reactive().run((ctx) -> {
			return Flux.fromIterable(jsonDocs)
					.flatMap(jsonDoc -> insertToTargetBucket(jsonDoc, ctx, reactiveCollection))
					.doOnError(err -> {
						if (err instanceof TransactionCommitAmbiguous) {
							System.err.println("Transaction possibly committed: ");
						} else {
							System.err.println("Transaction failed: " + err.getLocalizedMessage());
						}
					})
					.then(Mono.defer(() -> {
						System.out.println("commit the data");
						return ctx.commit();
					}));
		}).block();
		System.out.println(result.toString());
	} catch (Exception ex) {
		System.out.println("Critical error is transaction:" + ex.getLocalizedMessage());
	} finally {
		if (transactions != null)
			transactions.close();
	}

private Mono insertToTargetBucket(GetResult jsonDoc, AttemptContextReactive ctx,
ReactiveCollection reactiveCollection) {

	TransactionGetResult existingDoc = ctx.get(reactiveCollection, jsonDoc.contentAsObject().getString("Id"))
			.flatMap(doc -> {
				return ctx.replace(doc, jsonDoc.contentAsObject());
			}).block();

	if (existingDoc.id()==null) {
		System.out.println("Am i able to catch this error");
		return ctx.insert(reactiveCollection, jsonDoc.contentAsObject().getString("Id"), jsonDoc.contentAsObject());
	} else
		return Mono.just(existingDoc);

}

Hi @nagarajan_subbaraj

You can use ctx.getOptional for this. It returns an Optional, based on whether the document exists or not. Then use ctx.insert or ctx.replace as appropriate.

(In future we would like to add a ctx.upsert operation, but it’s not on the immediate roadmap.)

Btw you’re checking the TransactionCommitAmbiguous at the wrong point - those will only be thrown from the .run(), so you want to catch it in (or instead of) your ‘catch (Exception ex)’ block.

Thanks Graham. I have modified a bit, also i have added onErrorResume( with upsert scenario- just for a fall back. Does this makes sense? or just optional will take care of this UPSERT scenario.

Transactions transactions = null;
try {
transactions = Transactions.create(cluster,
TransactionConfigBuilder.create().durabilityLevel(TransactionDurabilityLevel.PERSIST_TO_MAJORITY)
.expirationTime(Duration.ofMinutes(1)).build());
ReactiveCollection reactiveCollection = cluster.bucket(“XXXX”).reactive().defaultCollection();
TransactionResult result = transactions.reactive().run((ctx) -> {
return Flux.fromIterable(jsonDocs)
.flatMap(jsonDoc -> insertToTargetBucket(jsonDoc, ctx, reactiveCollection).onErrorResume(e -> {
return upsertToTargetBucket(jsonDoc, ctx, reactiveCollection);
})).doOnError(err -> {
System.err.println("Transaction failed: " + err.getLocalizedMessage());
}).then(Mono.defer(() -> {
System.out.println(“commit the data”);
return ctx.commit();
}));
}).block();
System.out.println(result.toString());

	} catch (Exception ex) {
		System.out.println("Critical error is transaction:" + ex.getLocalizedMessage());
	} finally {
		if (transactions != null)
			transactions.close();
	}
}

private Mono<TransactionGetResult> insertToTargetBucket(GetResult jsonDoc, AttemptContextReactive ctx,
		ReactiveCollection reactiveCollection) {
	System.out.println("Calling insert scenario....");
	Optional<TransactionGetResult> cbdoc = ctx
			.getOptional(reactiveCollection, jsonDoc.contentAsObject().getString("Id")).block();
	if (cbdoc.isPresent()) {
		System.out.println("Calling UPSERT inside INSERT scenario....");
		return ctx.get(reactiveCollection, jsonDoc.contentAsObject().getString("Id")).flatMap(doc -> {
			return ctx.replace(doc, jsonDoc.contentAsObject());
		});
	} else {
		return ctx.insert(reactiveCollection, jsonDoc.contentAsObject().getString("Id"), jsonDoc.contentAsObject());
	}

}

/*
 * This is just a fall back method
 */
private Mono<TransactionGetResult> upsertToTargetBucket(GetResult jsonDoc, AttemptContextReactive ctx,
		ReactiveCollection reactiveCollection) {
	System.out.println("Calling UPSERT scenario....");
	return ctx.get(reactiveCollection, jsonDoc.contentAsObject().getString("Id")).flatMap(doc -> {
		return ctx.replace(doc, jsonDoc.contentAsObject());
	});
}

Close, but for this:

you don’t want to ever do anything slow or blocking inside of a reactive operation. Instead flatmap the ctx.getOptional call.

And for this:

	return ctx.get(reactiveCollection, jsonDoc.contentAsObject().getString("Id")).flatMap(doc -> {
		return ctx.replace(doc, jsonDoc.contentAsObject());
	});

You don’t need to do the get again, you already the TransactionGetResult inside cbdoc.

Also, it looks from this:

return Flux.fromIterable(jsonDocs)
    .flatMap(jsonDoc -> insertToTargetBucket(jsonDoc, ctx, reactiveCollection).onErrorResume(e -> {

that you are trying to do a bunch of these upserts in parallel. Please make sure to follow the rules here: https://docs.couchbase.com/java-sdk/3.0/howtos/distributed-acid-transactions-from-the-sdk.html#concurrent-operations-with-the-async-api (do the first operation in serial, and track any errors from any of the individual operations, and rollback transaction if they do error).

If you don’t need to be doing the operations in parallel it would be simpler to use the synchronous blocking API? Since you’re already .block()ing on the transaction anyway.

Oh and this:

   .flatMap(jsonDoc -> insertToTargetBucket(jsonDoc, ctx, reactiveCollection).onErrorResume(e -> {
        return upsertToTargetBucket(jsonDoc, ctx, reactiveCollection);

doesn’t really make sense. You don’t need to insert then if that fails do the upsert… The logic in upsertToTargetBucket already checks if the document exists, and inserts it if not.

Hi Graham,
I am just learning reactive way. will you able share the best way to do this? i am not able to achieve without blocking it.

Scenario we are trying to achieve is to insert bulk records using transactions, is synchronous api can handle 100k records mutation?

If you are doing such a large bulk insert then I would definitely go with the async API, I was just checking you actually needed its power (and complexity - as you find, reactive programming is not the easiest!)
100k docs is a lot though. You will certainly need to increase the transaction’s expiration time.

I tried rewriting the bulk get-and-replace example to do what you needed, but have hit something that I want to investigate further while doing so. So, this code is a starting point and hopefully shows you how to do the getOptional-then-insert/replace logic, but I may need to come back and revise it once I have it actually working:

                List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");

            ReactiveCollection coll = collection.reactive();

            TransactionResult result = transactions.reactive((ctx) -> {

                // Tracks whether all operations were successful
                AtomicBoolean allOpsSucceeded = new AtomicBoolean(true);

                // The first mutation must be done in serial, as it also creates a metadata entry
                return ctx.getOptional(coll, docIds.get(0))
                        .flatMap(docOpt -> {
                            if (docOpt.isPresent()) {
                                TransactionGetResult doc = docOpt.get();
                                JsonObject content = doc.contentAsObject();
                                content.put("value", "updated");
                                return ctx.replace(doc, content);
                            }
                            else {
                                return ctx.insert(coll, docIds.get(0),
                                        JsonObject.create().put("value", "initial"));
                            }
                        })

                        // Do all other docs in parallel
                        .thenMany(Flux.fromIterable(docIds.subList(1, docIds.size()))
                                        .flatMap(docId -> ctx.getOptional(coll, docId)
                                                        .flatMap(docOpt -> {
                                                            if (docOpt.isPresent()) {
                                                                TransactionGetResult doc = docOpt.get();
                                                                JsonObject content = doc.contentAsObject();
                                                                content.put("value", "updated");
                                                                return ctx.replace(doc, content);
                                                            }
                                                            else {
                                                                return ctx.insert(coll, docIds.get(0),
                                                                        JsonObject.create().put("value", "initial"));
                                                            }
                                                        })
                                                        .onErrorResume(err -> {
                                                            System.out.println("An operation failed, will retry transaction after finishing this batch");

                                                            allOpsSucceeded.set(false);
                                                            // App should replace this with logging
                                                            err.printStackTrace();

                                                            // Allow other ops to finish
                                                            return Mono.empty();
                                                        }),

                                                // Run these in parallel
                                                docIds.size())

                                // The commit or rollback must also be done in serial
                        ).then(Mono.defer(() -> {
                            // Commit iff all ops succeeded
                            if (allOpsSucceeded.get()) {
                                System.out.println("Committing the transaction");
                                return ctx.commit();
                            } else {
                                System.out.println("Retrying the transaction as one or more ops failed");
                                throw new RetryTransaction();
                            }
                        }));
            })
                    .onErrorResume(err -> {
                        if (err instanceof TransactionCommitAmbiguous) {
                            System.err.println("Transaction possibly committed: ");
                        }
                        else {
                            System.err.println("Transaction failed: ");
                        }

                        for (LogDefer e : ((TransactionFailed) err).result().log().logs()) {
                            // System.err is used for example, log failures to your own logging system
                            System.err.println(err.toString());
                        }

                        // TODO This is not the best way of handling this, as the application doesn't know the
                        // transaction failed.
                        return Mono.just(((TransactionFailed) err).result());
                    })
                    .block();

Thanks Graham, i will try with this.