All, I am trying to upgrade my code to use java 3 sdk. We are doing batch upsert of 50K records and were using Observable and bucket.async() but my understanding is that is replaced by ReactiveCollection. Does anyone have sample code that I can follow?
Old code
Hi @aqua123
Thatâs correct, with SDK3 we took the opportunity to migrate from RxJava2 to Project Reactor, which is now arguably the defacto standard for reactive streams on the JVM. They both inplement the reactive streams spec so are fully interopable. If you need to keep portions of your application in RxJava2, then please see these docs for how to convert between the Mono and Flux returned from SDK3, to the Single and Observable used by RxJava2.
On the other hand, if youâre in the position of fully converting your application to Project Reactor, then generally itâs easy to find the equivalent operation. In your example code for instance, Observable.from() is replaced by Flux.fromIterable(). And you will want to use collection.reactive() rather than bucket.async() in SDK3.
Incidentally, no need to do an explicit exists() check on the document before performing a mutation - you can just perform the mutation and then catch the DocumentNotFoundException. Itâll be slightly more performant as it saves a round-trip.
Thank you @graham.pople. Is getting into exception and then dealing with it better than exists operation performance wise? Also we are trying to mutate the path so it might be âPathNotExistsExceptionâ that we will have to check for.
Hi @aqua123
Yes it would be more performant, by a few orders of magnitude: itâs the cost of a network round-trip (microseconds to milliseconds) vs the cost of creating an exception (nanoseconds).
Instead of try-catch, in reactive world youâd hang an onErrorResume off the insertOrUpsert. You can either return Mono.empty() from that if you want the other operations in someList to continue, or propagate the error otherwise.
Personally I use .parallel() and .runOn() instead of Flux::flatMap, but thatâs more of a style thing. It just makes it clearer that thereâs parallelism, and what the parallelism is. (By default Flux::flatMap will be doing 128 ops in parallel.) Something like this:
int concurrency = 100; // This many operations will be in-flight at once
Flux.fromIterable(someList)
.parallel(concurrency)
.runOn(Schedulers.boundedElastic())
.concatMap(t -> insertOrUpsert(t, submissionId)
.onErrorResume(err -> {
// Depends what we want to happen - here silently swallowing any errors
return Mono.empty();
}))
.sequential()
.subscribe();
@graham.pople how can I get reference to âtâ in onErrorResume. I see it only has refernce to error object and doesnât understand the outer object âtâ. insertOrUpsert is currently returning a Publisher
It is showimg me error - Cannot resolve method âonErrorResumeâ in âPublisherâ. insertOrUpsert is returning a publisher. Can I wrap it in Flux.just() and then onErrorResume is recognized.
Itâs probably easiest to make insertOrUpdate return a Mono (a Mono is when an operation returns one-or-nothing, like the RxJava Single, a Flux is the equivalent of Observable). Especially since the underlying SDK3 collection.reactive().insert() and collection.reactive().replace() calls will be returning a Mono.
I tried returning Mono.empty() and commented doErrorProcessing but i still donât see it coming in the inner onErrorResume. Also, inner onErrorResume is not recognizing error object as throwable. It is saying itâs just an object.
The error that is being returned from insertOrUpsert is UnambiguousTimeoutException as I made the timeouts for insert/upsert 1 nano second to replicate error scenario.
insertOrUpsert returns an exception if something goes wrong and that exception also has the outer object in it
My issue now is that subscribe doesnât wait for the code which checks errorDocumentProcess is null or not. I tried using blockLast() but that is painfully slow to upload 1K records. Is there any other alternative?
Hi @aqua123
Doing processing inside doOnError looks a bit suspicious - with reactive you generally want to make sure everythingâs part of the reactive chain.
For the slowness, Iâd suggest adding some debugging statements to check if operations are actually being done in parallel. 1,000 operations should take no time at all. Or use .parallel(1000).runOn(Schedulers.boundedElastic()).concatMap(t -> ... to make absolutely certain itâs running in parallel (though .flatMap should do 128 operations in parallel, by default).
Failing that, you could take a look at the personal generic checklist I use when looking at performance issues: Couchbase performance issue - slowness - #4 by graham.pople
Hi @graham.pople , for some reason we were not able to get reactive working and went with spring listener/publisher events but looks like couchbase client is not handling thousands of threads so we are back to reactive.
Well itâs hard to know with just pseudo-code for insertOrUpsertInTwoDcuments, but my guess would be that you probably need to chain the two reactive operations together e.g. with flatMap/concatMap. That will cause the .subscribe() to be passed up through both operations, causing them to happen.
I have chained them now. But now I am seeing when error happens it always goes to the first onErrorResume and never to the second onErrorResume. Is there a way I can chain them?
So without access to all the code or knowing what documents exist, itâs really hard for me to know. Based on the info there Iâd guess itâs only insertT2() that is raising an error? It does maybe look like youâre inserting the same doc twice, in which case Iâd expect the 2nd to fail with DocumentExistsException, which sounds like what youâre reporting.
Thank you for your help @graham.pople. Looks like âexistsâ was blocking the calls, so i removed it and onErrorResume checking for DocumentExistsException and adding record to T1 in one flux and T2 in other. Now I am getting into com.couchbase.client.core.error.AmbiguousTimeoutException