So,
You want to do a get, detect some exceptions and retry with different delays depending on the exception (RequestCancelled = 31s, TemporaryFailure or Backpressure = 100ms, others = no retry)?
I see you have at some point tried to use the RxJava retry primitives. They can be a bit hard to put in place, but the good news is that since SDK 2.1.2 there’s a RetryBuilder
utility that can help you do that.
This, along with a call to toBlocking
, could vastly simplify your code: no more having to store return codes, exceptions and values in AtomicReference
s, no more CountDownLatch
es, etc…
So instead I would do something more along the line of:
public <T extends Serializable> boolean add(Document<T> doc) throws Exception {
boolean flag = false;
try {
System.out.println("test===>start");
Document<T> inserted =
Observable.from(doc)
.flatMap(doc -> client.async().insert(doc, PersistTo.ONE))
.retryWhen(RetryBuilder
.anyOf(RequestCancelledException.class)
.delay(Delay.fixed(31, TimeUnit.SECONDS)
.max(100)
.build())
.retryWhen(RetryBuilder
.anyOf(TemporaryFailureException.class, BackpressureException.class)
.delay(Delay.fixed(100, TimeUnit.MILLISECONDS)
.max(Integer.MAX_VALUE) //let it retry as long as it can
.build())
//some kind of global timeout here is probably a good idea
//.timeout(10, TimeUnit.MINUTES)
.toBlocking() //this replaces the CountDownLatch
.single(); //this after a toBlocking expects one and only one value emitted and returns said value
System.out.println("Successfully inserted " + inserted);
flag = true;
System.out.println("test===>end");
//this catch block happens when there was retries but the errors kept coming
} catch (CannotRetryException e) {
System.err.println(e.getMessage + ", due to " + e.getCause());
e.printStackTrace();
flag = false;
} catch (Exception e) {
//here I'm not sure, you may want to throw for unexpected exceptions and return false for the expected ones?
e.printStackTrace();
flag = false;
}
return flag;
}