The issue is that your update(order) intentionally overwrites the document that was in couchbase with the order that was passed in as an argument, but it uses the cas from the document in couchbase (via currentDoc).
ctx.replace(currentDoc, order);
What you should do is get the order from couchbase, change whatever you want to change - leaving everything else as-is - and then replace().
TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), order.getId());
Order currentOrder = currentDoc.contentAs(Order.class);
currentOrder.name = order.name; // just change the name
ctx.replace(currentDoc, currentOrder);
Here’s a complete example. You’ll need a bucket named ‘my_bucket’. The second thread t2 can use either updateNameOrDescriptionTx() or updateNameOrDescriptionNoTx()
package com.couchbase.client.java.examples.query;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.kv.ReplaceOptions;
import com.couchbase.client.java.transactions.TransactionGetResult;
import java.util.UUID;
public class SimpleQueryExample {
public static void main(String... args) {
Cluster cluster = Cluster.connect("127.0.0.1", "Administrator", "password");
Bucket bucket = cluster.bucket("my_bucket");
Order order = new Order(UUID.randomUUID().toString(), "original_name", "original_description");
bucket.defaultCollection().upsert(order.id, order);
Thread t1 = new Thread() {
public void run() {
updateNameOrDescriptionTx(cluster, bucket, order.getId(), null, "new_description", 2);
}
};
Thread t2 = new Thread() {
public void run() {
updateNameOrDescriptionNoTx(cluster, bucket, order.getId(), "new_name", null, 0);
}
};
t1.start();
sleep(1);
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.err.println("finally: " + bucket.defaultCollection().get(order.id));
}
public static void updateNameOrDescriptionTx(Cluster cluster, Bucket bucket, String orderId, String name,
String description, long delaySeconds) {
if (name == null && description == null)
throw new RuntimeException("both name and description cannot be null");
if (name != null && description != null)
throw new RuntimeException("both name and description cannot be specified");
cluster.transactions().run((ctx) -> {
TransactionGetResult currentDoc = ctx.get(bucket.defaultCollection(), orderId);
Order order = currentDoc.contentAs(Order.class);
System.err.println(Thread.currentThread().getName() + " before: " + order);
if (name != null)
order.name = name;
else
order.description = description;
sleep(delaySeconds);
System.err.println(Thread.currentThread().getName() + " changing: " + order);
ctx.replace(currentDoc, order);
});
}
public static void updateNameOrDescriptionNoTx(Cluster cluster, Bucket bucket, String orderId, String name,
String description, long delaySeconds) {
if (name == null && description == null)
throw new RuntimeException("both name and description cannot be null");
if (name != null && description != null)
throw new RuntimeException("both name and description cannot be specified");
GetResult currentDoc = bucket.defaultCollection().get(orderId);
Order order = currentDoc.contentAs(Order.class);
System.err.println(Thread.currentThread().getName() + " before: " + order);
if (name != null)
order.name = name;
else
order.description = description;
sleep(delaySeconds);
System.err.println(Thread.currentThread().getName() + " changing: " + order);
bucket.defaultCollection().replace(order.getId(), order, ReplaceOptions.replaceOptions().cas(currentDoc.cas()));
}
public static class Order {
String id;
String name;
String description;
public Order() {};
public Order(String id, String name, String description) {
this.id = id;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public String getDescription() {
return description;
}
public String toString(){
return "id="+id+", name="+name+", description="+description;
}
}
static void sleep(long seconds) {
try {
System.err.println(Thread.currentThread() + " sleeping for " + seconds);
Thread.sleep(seconds * 1000);
System.err.println(Thread.currentThread() + " resuming");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}