I am working with version 2.7.13 of the Java SDK against a local instance of Couchbase Community Edition Server version 6.0. I have written a simple program to insert a number of test documents into a bucket. The program is similar to the one given at the end of the Batching Operations page of the SDK documentation except that, instead of creating the adding the documents to a List and then creating the Observable from that List, I use the range() method to generate a series of integers that are used to generate the document identifiers:
The number of exceptions varies from run to run but always seems to be in the range 3-6. However, I would expect to get 10 exceptions as all the documents are already present in the bucket. I am hoping that someone might be able to explain this behavior or suggest how to investigate further.
Curious. I don’t spot anything, but @daschl or @graham.pople would probably know. By chance are you System.exit()ing before all the responses come back? I believe the thread handling the error will be a daemon thread.
@jsb here is what I think is happening: the Observable contract says that a single error terminates the stream, and an error in this case is also
DocumentAlreadyExistsException
So if you want to “run through” all the 10 ops, you need to move some error handling inside your flatMap (onError*) and log in there and return an empty Observable instead so that the stream continues.
Hi @daschl, many thanks for responding so quickly. Your suggestion works! However, there is still one thing that I do not understand clearly: if a single error terminates the stream, why was I seeing multiple errors? Is it because several inserts are run in parallel so that, by the time the first error is noticed downstream, several failures have accumulated?
To answer my own question, after some more thought, the single exception that stops the flow is the CompositeException instance which bundles up several DocumentAlreadyExistsException instances. Where is the CompositeException instance created? The throwable passed to the onErrorResumeNext method is a DocumentAlreadyExistsException.
Yeah well technically rxjava still raises only one error down the pipeline, it seems to wrap outstanding errors into a composite exception before terminating the stream. It’s not happening in the couchbase code, this is done purely in rxjava land.
However, there is still one thing that I do not understand clearly: if a single error terminates the stream, why was I seeing multiple errors? Is it because several inserts are run in parallel so that, by the time the first error is noticed downstream, several failures have accumulated?
If you switch the .flatMap for a .concatMap, it should make it execute one insert at a time in serial.