I am facing a problem and i cannot understand why is this happening.
I am using CB Server 4.6.2 ( local ) , Java sdk 2.4.7
I have a rx bulkGet method which end up doing a .toList().toBlocking().single();
This method is being called inside a for loop like this one:
int nThreads = 5;
ExecutorServices executorService = Executors.newFixedThreadPool(nThreads)
final CountDownLatch countDownLatch = new CountDownLatch(X);
for(...){
executorService.submit(new Runnable() {
@Override
public void run() {
call rx bulkGet method
countDownLatch.countDown()
}
});
}
countDownLatch.await();
The problem i am facing is that if nThreads >= 5 i am getting lot of timeouts.
If nThreads <= 3 no timeout.
The weird thing is that i did a tests using an rx flow and calling bulkGet as part of that flow using a subscribeOn with the same executor and no timeout error is happening, not even when nThreads = 10.
Seems that .toBlocking() isn’t working very good under heavy load ( 40K ops/second )
@german.barros, I don’t think the issue is with toBlocking. In your example, >= 5 threads causes too much parallelization of requests. Couchbase can’t keep up. The lower thread number limits requests sufficiently to avoid timeouts.
I’d have to see the RX Flow example to understand why subscribeOn with the same thread executor has no issues regardless of the thread count. It could be how you’re subscribing.
Observable.from(keys)
.flatMap(/* Here i am calling an asyncGet with fallback logic, here there's a timeout but is not being thrown */)
.flatMap(/* Convert the json retrieved to object and generates new observable with it */)
.timeout(/* This timeout is being thrown */)
.toList()
.toBlocking()
.singleORDefault(new ArrayList()).
I can’t say much more on the timeouts you’re seeing with your executor example. My only suggestion is to use flatMap with a maxConcurrent argument instead of using the ExecutorService. That would allow you to create a single timeout for all the work you’re performing.
See this forum post for more on the maxConcurrent approach:
You can also limit in-flight delete requests by limiting the number of subscribers to the flatMap operator performing the deletions. Choose an Observable.flatMap() method that takes a maxConcurrent parameter.