When I insert (asynchronously) to Couchbase server (3 nodes cluster) with ReplicateTo.ONE and with retry, the Java Client always throws java.lang.ClassCastException exception.
If no retry, the client will rather throws “DurabilityException: Durability requirement failed: null” exception.
I am using couchbase-java-client 2.1.4. Any input is very much appreciated!
Code snippet (which throws ClassCastException):
final CountDownLatch latch = new CountDownLatch(1);
Observable.defer(new Func0<Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call() {
return bucket
.async()
.insert(JsonDocument.create(documentId, jsonContent), replicateTo)
.timeout(10, TimeUnit.SECONDS);
}
})
.retryWhen(RetryBuilder
.any()
.max(10)
.delay(Delay.exponential(TimeUnit.SECONDS, 1))
.build())
.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(JsonDocument t) {
}
});
try {
latch.await();
} catch (InterruptedException ex) {
}
The cluster environment:
3 nodes cluster, all with version 3.1.0-1776 Enterprise Edition (build-1776-rel).
Bucket:
Access Control: Authentication
Replicas: 1 replica copy
Compaction: Not active
Cache Metadata: Value Eviction
Disk I/O priority: Low
Index replicas: not enabled
Flush: enabled
Exception (ClassCastException):
$
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword='null', queryEnabled=false, queryPort=8093, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=32, computationPoolSize=32, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=1, queryServiceEndpoints=1, ioPool=NioEventLoopGroup, coreScheduler=CoreScheduler, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.1.4 (git: 2.1.4), dcpEnabled=false, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=10, upper=100000}, keepAliveInterval=30000, autoreleaseAfter=2000, bufferPoolingEnabled=true, queryTimeout=75000, viewTimeout=75000, kvTimeout=2500, connectTimeout=5000, disconnectTimeout=25000, dnsSrvEnabled=false}
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.3.1
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.config.DefaultConfigurationProvider$6 call
INFO: Opened bucket Aggr
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.6.1
Aug 25, 2015 6:02:29 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.8.1
Inserted: 0 objects
Aug 25, 2015 6:02:30 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Aug 25, 2015 6:02:31 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Aug 25, 2015 6:02:31 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
java.lang.ClassCastException: com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument
at cbInsert$2.onNext(cbInsert.java:114)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:231)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:159)
at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:81)
at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onNext(OperatorTimeoutBase.java:126)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.internal.util.RxRingBuffer.accept(RxRingBuffer.java:432)
at rx.internal.operators.OperatorMerge$InnerSubscriber.drainAll(OperatorMerge.java:725)
at rx.internal.operators.OperatorMerge$InnerSubscriber.drainQueue(OperatorMerge.java:741)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:612)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onNext(OperatorOnErrorResumeNextViaFunction.java:89)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:67)
at rx.internal.operators.OperatorSkipWhile$1.onNext(OperatorSkipWhile.java:45)
at rx.internal.operators.OnSubscribeRedo$2$1.onNext(OnSubscribeRedo.java:231)
at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Error on insertWithRetry, exception: 'com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument'. Document id: 1440496949571_1880
Exception in thread "main" java.lang.Exception: com.couchbase.client.core.message.kv.ObserveResponse cannot be cast to com.couchbase.client.java.document.JsonDocument
at cbInsert.insertWithRetry(cbInsert.java:141)
at cbInsert.execute(cbInsert.java:48)
at cbInsert.main(cbInsert.java:146)
$
Exception (DurabilityException):
$
Aug 25, 2015 6:01:01 PM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslKeystorePassword='null', queryEnabled=false, queryPort=8093, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=32, computationPoolSize=32, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=1, queryServiceEndpoints=1, ioPool=NioEventLoopGroup, coreScheduler=CoreScheduler, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.1.4 (git: 2.1.4), dcpEnabled=false, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS; lower=10, upper=100000}, keepAliveInterval=30000, autoreleaseAfter=2000, bufferPoolingEnabled=true, queryTimeout=75000, viewTimeout=75000, kvTimeout=2500, connectTimeout=5000, disconnectTimeout=25000, dnsSrvEnabled=false}
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.3.1
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.config.DefaultConfigurationProvider$6 call
INFO: Opened bucket Aggr
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.6.1
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.node.CouchbaseNode$1 call
INFO: Connected to Node 172.28.8.1
Inserted: 0 objects
Aug 25, 2015 6:01:02 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Aug 25, 2015 6:01:04 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Aug 25, 2015 6:01:05 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$1 call
WARNING: Caught exception while onNext on observable
java.lang.IllegalStateException: This instance has been unsubscribed and the queue is no longer usable.
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:335)
at rx.internal.operators.OperatorMerge$InnerSubscriber.enqueue(OperatorMerge.java:680)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:657)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
com.couchbase.client.java.error.DurabilityException: Durability requirement failed: null
at com.couchbase.client.java.CouchbaseAsyncBucket$9$1.call(CouchbaseAsyncBucket.java:410)
at com.couchbase.client.java.CouchbaseAsyncBucket$9$1.call(CouchbaseAsyncBucket.java:406)
at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:77)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
at rx.internal.operators.OperatorTake$1.onError(OperatorTake.java:57)
at rx.internal.operators.OperatorSkipWhile$1.onError(OperatorSkipWhile.java:54)
at rx.internal.operators.OnSubscribeRedo$4$1.onError(OnSubscribeRedo.java:300)
at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:404)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onError(OperatorZip.java:316)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:324)
at rx.internal.operators.OnSubscribeRange$RangeProducer.request(OnSubscribeRange.java:93)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:39)
at rx.internal.operators.OnSubscribeRange.call(OnSubscribeRange.java:27)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154)
at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OnSubscribeRedo$4.call(OnSubscribeRedo.java:292)
at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:81)
at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:59)
at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:289)
at rx.internal.operators.OnSubscribeRedo.call(OnSubscribeRedo.java:54)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.util.RxRingBuffer.onNext(RxRingBuffer.java:338)
at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:322)
... 49 more
Error on insert, exception: 'Durability requirement failed: null'. Document id: 1440496862517_4044
Exception in thread "main" java.lang.Exception: Durability requirement failed: null
at cbInsert.insert(cbInsert.java:89)
at cbInsert.execute(cbInsert.java:50)
at cbInsert.main(cbInsert.java:146)
$
Complete source code:
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.util.retry.RetryBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
public class cbInsert {
private final Cluster cluster;
private final Bucket bucket;
private final ReplicateTo replicateTo;
private final String documentPrefix;
public enum Retry {
YES, NO
}
public cbInsert() {
cluster = CouchbaseCluster.create("172.28.3.1");
bucket = cluster.openBucket("bucket", "pass123");
replicateTo = ReplicateTo.ONE;
documentPrefix = System.currentTimeMillis() + "_";
}
public void execute(Retry retry) throws Exception {
JsonObject json = JsonObject.create();
json.put("name", "unique name");
long counter = 0;
boolean run = true;
while (run) {
if (counter % 5_000 == 0) {
System.out.println("Inserted: " + counter + " objects");
}
String id = documentPrefix + (++counter);
json.put("counter", counter).put("id", id);
if (retry == Retry.YES) {
insertWithRetry(id, json);
} else {
insert(id, json);
}
}
}
private void insert(String documentId, JsonObject jsonContent) throws Exception {
final StringBuilder exception = new StringBuilder();
final CountDownLatch latch = new CountDownLatch(1);
bucket.async()
.insert(JsonDocument.create(documentId, jsonContent), replicateTo)
.timeout(50, TimeUnit.SECONDS)
.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
exception.append(e.getMessage());
latch.countDown();
}
@Override
public void onNext(JsonDocument t) {
}
});
try {
latch.await();
} catch (InterruptedException ex) {
}
if (exception.length() > 0) {
System.out.println("\nError on insert, exception: '" + exception.toString() + "'. Document id: " + documentId);
throw new Exception(exception.toString());
}
}
private void insertWithRetry(final String documentId, final JsonObject jsonContent) throws Exception {
final StringBuilder exception = new StringBuilder();
final CountDownLatch latch = new CountDownLatch(1);
Observable.defer(new Func0<Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call() {
return bucket
.async()
.insert(JsonDocument.create(documentId, jsonContent), replicateTo)
.timeout(10, TimeUnit.SECONDS);
}
})
.retryWhen(RetryBuilder
.any()
.max(10)
.delay(Delay.exponential(TimeUnit.SECONDS, 1))
.build())
.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
exception.append(e.getMessage());
latch.countDown();
}
@Override
public void onNext(JsonDocument t) {
}
});
try {
latch.await();
} catch (InterruptedException ex) {
}
if (exception.length() > 0) {
System.out.println("\nError on insertWithRetry, exception: '" + exception.toString() + "'. Document id: " + documentId);
throw new Exception(exception.toString());
}
}
public static void main(String[] args) throws Exception {
new cbInsert().execute(Retry.YES);
}
}