Hello,
When i read from replica sometimes i have a strange exception like this :
java.lang.NullPointerException
at com.couchbase.client.java.bucket.ReplicaReader$3.call(ReplicaReader.java:116) ~[etl-cron-server.jar:?]
at com.couchbase.client.java.bucket.ReplicaReader$3.call(ReplicaReader.java:112) ~[etl-cron-server.jar:?]
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54) [etl-cron-server.jar:?]
at rx.observers.Subscribers$5.onNext(Subscribers.java:230) [etl-cron-server.jar:?]
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:202) [etl-cron-server.jar:?]
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:162) [etl-cron-server.jar:?]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) [etl-cron-server.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_91]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_91]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_91]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_91]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.cluster.GetClusterConfigResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109) ~[etl-cron-server.jar:?]
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:187) [etl-cron-server.jar:?]
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56) ~[etl-cron-server.jar:?]
I call the getFromReplica like this :
couchbaseService.getBucketProducts()
.async()
.getFromReplica(productId, ReplicaMode.ALL)
// retry 6 times with a delay of 10s
.retryWhen(new ReopenBucket().products())
.onBackpressureBuffer(batchInsertSize * 3)
.doOnError( error → LOG.error(“Error in doOnError (updateCouchbase): {}”, error.getMessage(), error) )
.firstOrDefault(null)
.filter(jsonDocument → jsonDocument != null)
.map(jsonDocument → {
// do something here
return jsonDocument;
})
.filter(jsonDocument → jsonDocument != null)
.subscribe(
jsonDocument → this.updateHotObservable.onNext(jsonDocument),
e → LOG.error(“Error in subscribe (updateCouchbase): {}”, e.getMessage(), e));
And my retry method is this :
private class ReopenBucket {
public Func1<Observable<? extends Throwable>, Observable<?>> products() {
return errors → errors
.zipWith(Observable.range(1, 6), (error, i) → error)
.flatMap(error → {
if (error instanceof BucketClosedException) {
// try to reopen the products bucket
couchbaseService.openBucketProducts();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
return Observable.just(null);
}
// fail for other exception
return Observable.error(error);
});
}
}
Do you have any clue about the problem ? …
Thanks