Hi,
Is CouchbaseQueue API in http://docs.couchbase.com/sdk-api/couchbase-java-client-2.5.5/com/couchbase/client/java/datastructures/collections/CouchbaseQueue.html thread safe? Or can i achieve the similar behavior by Bucket.getAsync().queuePop()?
I want to access this async behavior from multiple JVM and JVM’s don’t know about each other. I do not want to go with Kafka path at this moment. Can anyone give some light as what will be the best way to achieve async queue behavior or refer me to appropriate section?
Thanks
Hi Yr,
Yes, CouchbaseQueue is safe for concurrent use by multiple threads and multiple JVMs.
The Java class itself has no mutable state. The backing array document is modified using subdocument operations (which are atomic) and optimistic locking. It should perform reasonably well as long as there’s not a huge amount of contention. Here’s the source code if you’d like to see for yourself.
A couple of things to keep in mind regarding the implementation as of SDK 2.6.1:
- The API for this class is not committed, meaning it could change at any time.
- CouchbaseQueue.poll() throws
ConcurrentModificationException
if there’s too much contention and the “compare and swap” retry limit is exceeded. The retry limit is controlled by a Java System Propertycom.couchbase.datastructureCASRetryLimit
, and defaults to10
.
EDIT: I see you were asking specifically about SDK 2.5.5 – CouchbaseQueue is the same in 2.5.5 and 2.6.1.
Thanks,
David
If it’s labeled that way, that’s probably just a documentation bug as we graduated that feature a while ago. @subhashni or @daschl?
The behavior is the same, but the implementation is a bit different. CouchbaseQueue.poll()
uses a subdocument operation to get just the head of the queue, while AsyncBucket.queuePop()
loads the entire document. I’m not sure if this is intentional, or an opportunity for applying the same optimization in queuePop()
. What do you think, @subhashni?
Both collections and bucket API use full document get and then a subdocument remove as subdocument remove operation does not return the value removed, the document cas is used to make sure there wasn’t any concurrent modification between the two operations.
Yes, it is documentation bug for the collections one, the bucket API is committed.
Thanks for the reply @david.nault, @subhashni and @ingenthr. I will try a POC with high load against the “optimistic locking”. Want to build a Producers/Consumers paradigm with flexibility of manipulating Queue. Exploring CouchbaseQueue/CouchbaseMap if it could be good enough to serve the purpose. Will update my findings as it could help others too.
Subhashni and I did a bit more investigation, and we confirmed that as of SDK 2.6.1, CouchbaseQueue.poll() is more highly optimized than [Async]Bucket.queuePop(). We’re tracking the issue as JCBC-1239.
Thanks,
David
Hi David,
Its a very old thread, however trying to get little attention here as its suiting well in the context. I am in process of migrating to CB 3.1.6 client SDK and currently on CB6.0 community version. The CouchbaseQueue.poll() seems to be not scaling up in multithreaded(read/write) and multi jvm scenario. I am frequently seeing
com.couchbase.client.core.error.UnambiguousTimeoutException: SubdocGetRequest, Reason: TIMEOUT
at com.couchbase.client.java.AsyncUtils.block(AsyncUtils.java:51) ~[java-client-3.1.6.jar:?]
at com.couchbase.client.java.Collection.lookupIn(Collection.java:550) ~[java-client-3.1.6.jar:?]
at com.couchbase.client.java.datastructures.CouchbaseQueue.poll(CouchbaseQueue.java:135) ~[java-client-3.1.6.jar:?]
OR
com.couchbase.client.core.error.CouchbaseException: CouchbaseQueue poll failed
at com.couchbase.client.java.datastructures.CouchbaseQueue.poll(CouchbaseQueue.java:152) ~[java-client-3.1.6.jar:?]
com.couchbase.client.core.retry.reactor.RetryExhaustedException: Couldn't perform poll in less than 10 iterations. It is likely concurrent modifications of this document are the reason
I haven’t seen this error before. Not sure if changing the client SDK or increased load testing caused it? On CB error.log I can see multiple such errors:
[ns_server:error,2021-11-30T00:14:35.981-08:00,ns_1@172.18.61.84:ns_doctor<0.285.0>:ns_doctor:update_status:316]The following buckets became not ready on node 'ns_1@ 172.18.61.192': ["BucketName1",
"BucketName2"], those of them are active ["BucketName1",
"BucketName2"]
[ns_server:error,2021-11-30T00:14:41.098-08:00,ns_1@172.18.61.84:<0.31238.705>:janitor_agent:query_states_details:200]Failed to query vbucket states from some nodes:
[{'ns_1@172.18.61.192',warming_up}]
Currently, I am running with 3 node cluster. Could you please point in right direction to how to avoid such situation?
Thanks
Yr
Hi Yr,
CouchbaseQueue is intended for lightweight use cases with a low-to-moderate amount of concurrency. You can tune the Queue options to see if that helps:
CouchbaseQueue queue = collection.queue(
"myQueue",
String.class,
QueueOptions.queueOptions()
.timeout(Duration.ofSeconds(10))
.casMismatchRetries(Integer.MAX_VALUE)
);
The SubdocGetRequest timeout might indicate the server is overloaded or unresponsive for some other reason. I’m not a Couchbase Server expert, so I don’t know how to interpret those log messages.
Thanks,
David
Thank you so much David. I want to explore the possibility to use CouchbaseQueue like any other MQ/Kafka and see if its scalable. I will try the suggested solution.