ReactiveCollection.unlock hangs

The program below does the following:

  1. creates a document
  2. gets and locks the document
  3. unlocks the document
    It hangs on line 22, which is the unlock.
    Do I do something wrong, or is it a bug?
    I use java sdk 3.4.6 and couchbase server 7.1.0 on a Mac.
package dr.couchbase;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ReactiveCollection;

public class Test {
	public static void main(final String[] args) throws InterruptedException {
		final CountDownLatch latch = new CountDownLatch(1);
		final Cluster cluster = Cluster.connect(
			      "couchbase://127.0.0.1", "Administrator", "cbadmin"
			    );
		final Bucket bucket = cluster.bucket("Aggregation");
		final ReactiveCollection reactiveColl = bucket.reactive().defaultCollection();
		reactiveColl.upsert("k", "hello").subscribe(r1 -> {
			System.out.println("add");
			reactiveColl.getAndLock("k", Duration.ofSeconds(10)).subscribe(r2 -> {
				System.out.println("lock");
				reactiveColl.unlock("k", r2.cas()).subscribe(v -> {
					System.out.println("unlocked");
					latch.countDown();
				});
			});
			
		});
		latch.await();
	}
}

Hi @anders_eliasson

No there’s no bug here, you just need to chain reactive operations together using concatMap rather than calling them inside the subscribe in this way.

1 Like

Thanks for the reply!
If I change unlock to remove it works fine with all the subscribes. Do you know why?

Don’t use this construct as Graham said. This post explains it a bit further. reactive programming - Function now executing properly after subscribe - Stack Overflow

It doesn’t hang on unlock(), it hangs on latch.await() because latch.countDown() is never called.

It works with remove because remove returns a (next) object, and you’re specifying the consumer for objects

https://projectreactor.io/docs/core/3.3.0.RELEASE/api/reactor/core/publisher/Mono.html#subscribe-java.util.function.Consumer-

unlock returns Mono<Void> - there is no (next) object. So the consumer for objects is never called. Use the subscribe() method that takes a completeConsumer (the last argument).

https://projectreactor.io/docs/core/3.3.0.RELEASE/api/reactor/core/publisher/Mono.html#subscribe-java.util.function.Consumer-java.util.function.Consumer-java.lang.Runnable-

    reactiveColl.unlock("k", r2.cas()).subscribe(t -> {}, e -> {}, () -> {
      System.out.println("unlocked ");
      latch.countDown();
    });

Thanks a lot, now I get it.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.