I am trying to do a bulk insert using the following Rx code in my application logic:
LOGGER.info("Main thread : " + Thread.currentThread() + “----” + Thread.currentThread().getId());
Observable
.from(items)
.subscribeOn(Schedulers.io())
.map(item->{return convertJavaToJsonObject(item,CbKeyHelper.createCbKey(item.getid));})
.flatMap(docToInsert->asyncBucket.insert(docToInsert)) .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.toBlocking()
.forEach(insertedDoc ->{
final Item item = convertToJava(JsonObject.fromJson(insertedDoc.content()));
storedIds.add(item.getId);
LOGGER.info("Current thread : " + Thread.currentThread() + “----” + Thread.currentThread().getId());
});
Thread output :
Main thread : Thread[main,5,main]----1
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48
Current thread : Thread[cb-computations-5,5,main]----48
My understanding is :
The streams of items is sent from observable is one by one converted to RawJsonDocument using map. Flatmap takes all this documents and inserts them in separate threads in parallel. This is taken care by couchbase and its request reponse buffer. If some insert fails retryWhen comes into picture. To blocking blocks the main thread till all the documents are inserted. forEach receives all the documents emitted by the observable until some error occurs.
The subscribeOn method tells which thread pool to use. In this case IO.
Why I doubt that something is wrong with my understanding is because when I tried to print the thread ids. First I get it as a computation thread. I was expecting a io thread. And second, all thread ids are same?
Also what happens if I put a onNext after retryWhen instead of forEach? What will be the result in that case.
Please help.