I noticed this strange ConcurrentModificationException
when connecting to a multi node cluster with many buckets. It seems to be a race condition because if I lower the number of buckets or if I do a single node cluster it does not fail. The code used to reproduce this fairly consistently is:
import com.couchbase.client.java._
object InitTest extends App{
import collection.JavaConversions._
val bucketNames = List("cache","session","clicks","campaignevent","datacenter","shorturl","metering")
val serverUris = List("build04", "build02", "build01")
val password = "foo"
val cluster = CouchbaseAsyncCluster.create(serverUris)
bucketNames foreach(cluster.openBucket(_, password))
Thread.sleep(30000)
}
The error produces is:
rx.exceptions.OnErrorNotImplementedException
at rx.Observable$30.onError(Observable.java:7092)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:154)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:111)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
at rx.internal.operators.OperatorSingle$1.onError(OperatorSingle.java:90)
at rx.internal.operators.OperatorTakeLast$1.onError(OperatorTakeLast.java:65)
at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onError(OperatorMerge.java:404)
at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:49)
at rx.internal.operators.OperatorMerge$MergeSubscriber.innerError(OperatorMerge.java:429)
at rx.internal.operators.OperatorMerge$MergeSubscriber.access$800(OperatorMerge.java:93)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:552)
at rx.Observable.unsafeSubscribe(Observable.java:7311)
at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158)
at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41)
at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.subscribe(Observable.java:7393)
at rx.Observable.subscribe(Observable.java:7083)
at com.couchbase.client.core.RequestHandler$1.call(RequestHandler.java:157)
at com.couchbase.client.core.RequestHandler$1.call(RequestHandler.java:151)
at rx.Observable$31.onNext(Observable.java:7139)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:121)
at com.couchbase.client.core.config.DefaultConfigurationProvider.upsertBucketConfig(DefaultConfigurationProvider.java:386)
at com.couchbase.client.core.config.DefaultConfigurationProvider.access$000(DefaultConfigurationProvider.java:91)
at com.couchbase.client.core.config.DefaultConfigurationProvider$7.call(DefaultConfigurationProvider.java:257)
at com.couchbase.client.core.config.DefaultConfigurationProvider$7.call(DefaultConfigurationProvider.java:254)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:84)
at rx.internal.operators.OperatorOnErrorResumeNextViaObservable$1.onNext(OperatorOnErrorResumeNextViaObservable.java:64)
at rx.internal.operators.OperatorTake$1.onNext(OperatorTake.java:67)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at com.couchbase.client.core.utils.Buffers$2$1.onNext(Buffers.java:90)
at rx.observers.SafeSubscriber.onNext(SafeSubscriber.java:130)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:47)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(HashMap.java:894)
at java.util.HashMap$ValueIterator.next(HashMap.java:922)
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:96)
at rx.Subscriber.setProducer(Subscriber.java:139)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47)
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:7304)
… 58 more