Hi there.
I was assigned the task of testing CouchBase and the Java Client to see if I fits the needs of our company.
During the evaluation with the java client, I saw that Rx Observables are recommended to make async calls and interact with couchbase server.
The case is that were are inserting 1M registers from a file using a Blocking Queue and several Threads. We tried doing this using the sync client and the async client (using both, Blocking and non blocking operations).
At the same time we are reading the values from a copy of the file, in sequential order in a single thread.
I was surprised to see that using the Async non-blocking methods take a lot more time than with the Async blocking method and the Sync Methods.
The main question is:
Why is the Async non-blocking approach taking much more time than the other two?
Which approach is supposed to be faster?
Are there any suggestions to minimize the time in the Asyn Non-blocking approach?
The Sync method returns me some not found keys, which I think is the expected result since we are writing and reading at almost the same time.
Our test environment is composed by 2 Centos Servers running couchbase 3.0.1, cluster replicated in both servers, default couchbase bucket. Couchbase Java Client 2.0.1
Any input will be really appreciated.
Cheers,
Ivan
Normally, fully asynchronous flows are much quicker than blocking operations, but of course they need to be handled differently. If you have some code to look at, I’m happy to suggest improvements.
The reason why async flows are much faster is that they allow you for better resource utilization (cpu, IO,…). For one example, see the docs on async batching: http://docs.couchbase.com/developer/java-2.0/documents-bulk.html the same workload one sync and one async much better utilizes the underling resources. We’ve put lots of effort to make this happen, and to some degree you also benefit in a synchronous context when you have a thread pool firing at it.
Looking at the overall architecture of the core module, check out this slide: Building a Reactive Database Driver on the JVM - Speaker Deck There is a RingBuffer sitting right at the beginning of the request flow and allows to decouple the publishers from the consumers, giving us nice batching effects and reduce contention. If you want to learn more about this, read here: Mechanical Sympathy: Smart Batching
Can you share some code that I can run and suggest improvements?
Actually, I looks like you are mixing sync and async calls here (and leaking scope). I think you want it more something like this for storing:
try {
bucket
.async()
.counter("counter", 1)
.flatMap(new Func1<JsonLongDocument, Observable<?>>() {
@Override
public Observable<?> call(JsonLongDocument counterRes) {
String id = "doc" + counterRes.content();
JsonObject jo = JsonObject.empty(); // your data here
return bucket.async().upsert(JsonDocument.create(id, jo));
}
})
.timeout(1, TimeUnit.SECONDS) // don't forget timeout
.toBlocking()
.single();
} catch(Exception ex) {
ex.printStackTrace();
}
Also note on the querying part you are also mixing sync and async which will inevitably get you into race condition and not wanted behavior. I recommend an approach like this (or you can also use latches - see the docs for this). Also note that if you already use GSON you can avoid parsing and using the RawJsonDocument:
Cluster cluster = CouchbaseCluster.create();
final Bucket bucket = cluster.openBucket();
String id = "foo";
final Gson gson = new Gson();
DummyObject result = bucket
.async()
.get(id, RawJsonDocument.class)
.map(new Func1<RawJsonDocument, DummyObject>() {
@Override
public DummyObject call(RawJsonDocument doc) {
return gson.fromJson(doc.content(), DummyObject.class);
}
})
.timeout(1, TimeUnit.SECONDS) // don't forget the timeout
.toBlocking()
.single();
Thanks @daschl.
I was aware of the leaking scope, which is a bad practice of course, but that I “planned” to solve later.
This example is really helpful, I am new to Rx so this is really useful.
Still one question remains:
Is using toBlocking preventing me from getting the full benefits from the async API?
Thanks for the RawJsonDocument tip, I will use Json for that.
Such great support.
I will let you know if the performance improves. =)
hey @isanchez,
I’ll try to build on michael’s answer and address your last question.
As was said, the point of going async is to have as few wasted time and resources as possible. When you do synchronous query and response, you are effectively doing nothing between firing the request and receiving a response.
In async mode, this chunk of time can be used to do further processing, for example reacting to the response of a previous asynchronous query.
Using the basic sync operations directly on the Bucket is equivalent to the first situation. Fully using the AsyncBucket obtained via Bucket.async() is equivalent to the second situation.
In fact, the simple sync bucket uses the AsyncBucket under the hood, blocking on each operation.
Just doing a toBlocking().last() (for instance) at the end of an async stream with flatmaps and multiple key processing, etc… should only be slightly underperforming, because the multiple requests are still processed asynchronously on the io thread. There’s still a price to pay, but chances are your application is not fully made to be asynchronous so it can be a good compromise.
If you are still designing the app, going fully reactive would rip the most benefits. You can have a look at the Reactive Manifesto : www.reactivemanifesto.org. It describes an approach for highly efficient and resilient applications, and RxJava fits well into it.
Main takeaway point though: never mix blocking behavior inside an asynchronous processing stream (be it either calls to the sdk blocking API or any other long running call). This will lead to race conditions and unexpected errors.
I hope you enjoy coding with RxJava and the new SDK!
Simon
Thanks @simonbasle & @daschl ,
I tried several configurations, with your advice I manage to overcome some issues and get a better performance.
I wanted to ask, what is an acceptable rate of operation per second in Couchbase, specifically number of gets per second and puts per second while using CouchBase 3.0 with the Java client 2.0.1.
I think I might have a bottleneck issue in my client, since I am using async requests and get measurements around 50 put/s and 20 get/s.
Some info about my CouchBase cluster:
CouchBase Servers:
Amazon type instances: m3.2xlarge (4 CPUs, 2048 MB Ram, 15 GB disk)
OS: Centos 2.6.32-431.el6.x86_64
Environment:
3 CouchBase nodes (3 dual core processors).
Disk Structure: Undetermined.
8 GB Total Ram.
Bucket Name: Serialized
Bucket Type: Couchbase
DocumentType: SerializableDocument
1024 MB per bucket per node
2 replicas in Serialized bucket
@isanchez yes, your numbers seem “odd”. Thousands of ops/s are aways possible. Can you please share your code so we can take a look and provide input?
Sorry @daschl,
I was on Holiday, thanks for all the help, unfortunately because of company policies I cannot upload any code made within the company, even test code (not very good policy I think).
Anyways, I finally found the bottleneck, it was IO on the machine running the client.
My Log configuration in Log4J was writing all events, so that was slowing down the whole process.
Now results are in the order of thousands.
Read Threads:
1 Reading thread non blocking Async
Write Threads:
100 Writing threads non blocking Async (using a blocking queue)
Failover time: 120 seconds (automatic)
Rebalance Time: Off
Average Statistics:
Ops per second: around 6K
Writes per second: 4520
Read per second: 1147
@bkachhal I see nothing obviously wrong with what you’re doing…
Are you loading a very big number of documents in your bulk method? how large are the document?
OutOfMemory and the like would need further analysis on your side, with tools like JVisualVM or Java Mission Control (packed with the JVM, second one only for Oracle jdks) to see what kind of objects fill up the memory…
(also unless you’re seeing the exact same problem, as diagnosed throughout the thread, as original poster, please consider creating a new topic)
@bkachhal hmmm that is actually not the best thing, I mean if it works its okay but its not how it should be. Can you raise an issue here: Couchbase: Best NoSQL Cloud Database Service, maybe its a bug in the LegacyDocument. Would it be possible for you to simulate with a JsonDocument and see if you have the same issues?
Hello, could you please explain why do you use transformation to BlockingObservable in each example, if according to java-doc it is not recommended at all ?
@dmartyanov examples and snippets should usually be easily runnable. When you do such a snippet eg. in a main method, if you don’t block somehow then the asynchronous call will happen in a background thread and method execution will continue immediately… in effect terminating the program before anything could happen.
So it is not very interesting! From here you have two choices: either you sleep() or you block on your async call (toBlocking()). Sleeping is even less elegant, as it will potentially force the program to run for more time that it really needs (eg. the async code executes in 1ms but you sleep for 1000ms).
Hence the use of toBlocking() in various examples: if you copy paste that in a main() method and run it, something happens
But yeah, to rip the most benefits from the asynchronous API, for a “normal” long-running application, you should avoid use of toBlocking() and try to make as much of the application code reactive as possible.