Unexpected Behavior on Couchbase Transactions replace method

Hello,
I’ve been experimenting with Transactions in our Java project. Suppose I have an order object and I want to update it using the update method. I’ve been trying to understand how CAS will work in this scenario. I added a breakpoint before the replace method and updated the document via the Couchbase UI (I also tried this by sending concurrent requests). I expected to see a CAS_MISMATCH error, but instead, the replace operation ran again and succeeded, causing me to lose the previous operation’s data. Why are we retrying in case of a CAS_MISMATCH error? Can we prevent this retry? In my case, I need to execute the flow again and calculate the up-to-date order from scratch. Thank you for your support.

public void update(Order order) {
cluster.transactions().run((ctx) → {
TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), order.getId());
ctx.replace(currentDoc, order);
order.getEvents().forEach(event → ctx.insert(domainEventCollection, event.id(), event));
});
}

The issue is that your update(order) intentionally overwrites the document that was in couchbase with the order that was passed in as an argument, but it uses the cas from the document in couchbase (via currentDoc).

ctx.replace(currentDoc, order);

What you should do is get the order from couchbase, change whatever you want to change - leaving everything else as-is - and then replace().

TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), order.getId());
Order currentOrder = currentDoc.contentAs(Order.class);
currentOrder.name = order.name; // just change the name
ctx.replace(currentDoc, currentOrder);

Here’s a complete example. You’ll need a bucket named ‘my_bucket’. The second thread t2 can use either updateNameOrDescriptionTx() or updateNameOrDescriptionNoTx()

package com.couchbase.client.java.examples.query;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.transactions.TransactionGetResult;

import java.util.UUID;

public class SimpleQueryExample {

  public static void main(String... args) {
    Cluster cluster = Cluster.connect("127.0.0.1", "Administrator", "password");

	Bucket bucket = cluster.bucket("my_bucket");
	Order order = new Order(UUID.randomUUID().toString(), "original_name", "original_description");
	bucket.defaultCollection().upsert(order.id, order);

	Thread t1 = new Thread() {
		public void run() {
			updateNameOrDescriptionTx(cluster, bucket, order.getId(), null, "new_description", 2);
		}
	};

	Thread t2 = new Thread() {
		public void run() {
			updateNameOrDescriptionNoTx(cluster, bucket, order.getId(), "new_name", null, 0);
		}
	};

	t1.start();
    sleep(1);
	t2.start();
	try {
		t1.join();
		t2.join();
	} catch (InterruptedException e) {
		throw new RuntimeException(e);
	}

	System.err.println("finally: " + bucket.defaultCollection().get(order.id));
  }

	public static void updateNameOrDescriptionTx(Cluster cluster, Bucket bucket, String orderId, String name,
			String description, long delaySeconds) {
		if (name == null && description == null)
			throw new RuntimeException("both name and description cannot be null");
		if (name != null && description != null)
			throw new RuntimeException("both name and description cannot be specified");
		cluster.transactions().run((ctx) -> {
			TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), orderId);
			Order order = currentDoc.contentAs(Order.class);
			System.err.println(Thread.currentThread().getName() + " before: " + order);
			if (name != null)
				order.name = name;
			else
				order.description = description;
			sleep(delaySeconds);
			System.err.println(Thread.currentThread().getName() + " changing: " + order);
			ctx.replace(currentDoc, order);
		});
	}

	public static void updateNameOrDescriptionNoTx(Cluster cluster, Bucket bucket, String orderId, String name,
			String description, long delaySeconds) {
		if (name == null && description == null)
			throw new RuntimeException("both name and description cannot be null");
		if (name != null && description != null)
			throw new RuntimeException("both name and description cannot be specified");

		GetResult currentDoc = bucket.defaultCollection().get(orderId);
		Order order = currentDoc.contentAs(Order.class);
		System.err.println(Thread.currentThread().getName() + " before: " + order);
		if (name != null)
			order.name = name;
		else
			order.description = description;
		sleep(delaySeconds);
		System.err.println(Thread.currentThread().getName() + " changing: " + order);
		bucket.defaultCollection().replace(order.getId(), order, ReplaceOptions.replaceOptions().cas(currentDoc.cas()));

	}

	public static class Order {
		String id;
		String name;
		String description;

		public Order() {};

		public Order(String id, String name, String description) {
			this.id = id;
			this.name = name;
			this.description = description;
		}

		public String getId() {
			return id;
		}

		public String getName() {
			return name;
		}

		public String getDescription() {
			return description;
		}

    public String toString(){
      return "id="+id+", name="+name+", description="+description;
    }

	}

	static void sleep(long seconds) {
		try {
			System.err.println(Thread.currentThread() + " sleeping for " + seconds);
			Thread.sleep(seconds * 1000);
			System.err.println(Thread.currentThread() + " resuming");
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}
}

Thank you for the response.
In our system, there are many complex update operations, and transforming them to this approach is challenging. I was wondering if there’s a way to disable retrying for the CAS_MISMATCH case. During the transaction, I received this TransactionOperationFailedException with a null cause.

disable retrying for the CAS_MISMATCH case.

So what would the application do in the case that CAS_MISMATCH is thrown? It would retry whatever code retrieved and modified the order and then tried to update it in the transaction, wouldn’t it? Why not just put all of that in the transaction where it would be retried automatically?

If you don’t want it retried, then don’t execute in a transaction.

GetResult result =  col.get(orderId)
order = result.getContentAs(Order.class)
...
col.replace(orderId,order, ReplaceOptions.replaceOptions.cas(result.cas()))

I need to use a transaction because I’m updating the order and inserting the order’s event into another collection. After retrieving the order from Couchbase, there are several validations and update operations based on the flow. It would be challenging to add everything to the transaction block. It would be helpful if we had a cause in the TransactionOperationFailedException. This way, I might be able to cancel the replace operation.

But that is the only correct use of couchbase transactions. Otherwise you could end up overwriting data from another transaction. As you found out.

Can’t we include the cause when throwing TransactionOperationFailedException? Normally, the cause is provided for TransactionFailedException.

Hi @Nur , as Michael says, this is the expected model for transactions. It’s because on detecting a CAS mismatch (whether inside a transaction or not) there are only really two options: retry the get, or fail the whole operation. The latter does not seem useful so is not exposed as an option - hence TransactionOperationFailedException being used for this failure, which is intentionally an opaque error signal.

So it’s necessary to inside the transaction lambda get the document, apply whatever changes you need to it, and then write it. This allows your changes to be safely applied to the latest version of the doc if the transaction does need to be retried.

After retrieving the order from Couchbase, there are several validations and update operations based on the flow. It would be challenging to add everything to the transaction block.

Instead of passing in a completed Order object, perhaps you could pass in to update a list of the validations and changes that you want to make?

Alternatively, perhaps pass in a callback? E.g. you get the document inside the transaction, then pass the resulting TransactionGetResult to your callback which performs all validation + updates and returns the updated content?

1 Like

Here is an example of what Graham suggests.

thread t1 shows use of the defined updateOrder(), thread t2 shows use of a lambda.

package com.couchbase.client.java.examples.query;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.transactions.TransactionGetResult;

import java.util.UUID;
import java.util.function.BiConsumer;

public class SimpleQueryExample {

	public static void main(String... args) {
		Cluster cluster = Cluster.connect("127.0.0.1", "Administrator", "password");

		Bucket bucket = cluster.bucket("my_bucket");
		Order order = new Order(UUID.randomUUID().toString(), "original_name", "original_description");
		bucket.defaultCollection().upsert(order.id, order);

		Thread t1 = new Thread() {
			public void run() {
				Order orderUpdates = new Order();
				orderUpdates.description = "new desription";
				updateNameOrDescriptionTx(cluster, bucket, order.id, orderUpdates, 2, SimpleQueryExample::updateOrder);
			}
		};

		Thread t2 = new Thread() {
			public void run() {
				Order orderUpdates = new Order();
				orderUpdates.name = "new name";
				updateNameOrDescriptionTx(cluster, bucket, order.id, orderUpdates, 2, (o, ou) -> {
					// does the same as updateOrder()
					if (ou.description != null)
						o.description = ou.description;
					if (ou.name != null)
						o.name = ou.name;
				});
			}
		};

		t1.start();
		sleep(1);
		t2.start();
		try {
			t1.join();
			t2.join();
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}

		System.err.println("finally: " + bucket.defaultCollection().get(order.id));
	}

	public static void updateNameOrDescriptionTx(Cluster cluster, Bucket bucket, String orderId, Order orderUpdates,
			long delaySeconds, BiConsumer<Order, Order> func) {
		if (orderUpdates.name == null && orderUpdates.description == null)
			throw new RuntimeException("both name and description cannot be null");
		if (orderUpdates.name != null && orderUpdates.description != null)
			throw new RuntimeException("both name and description cannot be specified");

		cluster.transactions().run((ctx) -> {
			TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), orderId);
			Order order = currentDoc.contentAs(Order.class);
			System.err.println(Thread.currentThread().getName() + " before: " + order);
			func.accept(order, orderUpdates);
			sleep(delaySeconds);
			System.err.println(Thread.currentThread().getName() + " changing: " + order);
			ctx.replace(currentDoc, order);
		});
	}

	public static void updateOrder(Order o, Order ou) {
		if (ou.description != null)
			o.description = ou.description;
		if (ou.name != null)
			o.name = ou.name;
	}

	public static class Order {
		public String id;
		public String name;
		public String description;

		public Order() {};

		public Order(String id, String name, String description) {
			this.id = id;
			this.name = name;
			this.description = description;
		}

		public String toString() {
			return "id=" + id + ", name=" + name + ", description=" + description;
		}

	}

	static void sleep(long seconds) {
		try {
			System.err.println(Thread.currentThread() + " sleeping for " + seconds);
			Thread.sleep(seconds * 1000);
			System.err.println(Thread.currentThread() + " resuming");
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}
}
1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.