Hi,
I am seeing issues while doing a bulk get using reactive code. This is the code
List<JsonDocument> documents =Observable.from(subList)
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String metaId) {
if(metaId != null) {
System.out.println("Sending: " + metaId);
return asyncBucket.get(metaId);
}
return Observable.empty();
}
},100)
.doOnNext(t -> {
System.out.println("Received: " + t.id());
})
.toList()
.toBlocking()
.single();
The sublist contains 100 elements. While the sending is showing that all the request for 100 items have been sent which are there in database, I only get response for 90 elements (always) and then I get this warning:
Jul 12, 2018 1:35:26 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$KeepAliveResponseAction onError
WARNING: [/127.0.0.1:8093][QueryEndpoint]: Got error while consuming KeepAliveResponse.
java.util.concurrent.TimeoutException
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.onTimeout(OnSubscribeTimeoutTimedWithFallback.java:166)
at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber$TimeoutTask.call(OnSubscribeTimeoutTimedWithFallback.java:191)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Am I doing something incorrectly ?
I am using java-client 2.5.9 and rx-java 1.3.8 and here is my couchbase configuration details:
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslTruststoreFile='null', sslKeystorePassword=false, sslTruststorePassword=false, sslKeystore=null, sslTruststore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=3, computationPoolSize=3, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, configPollInterval=2500, configPollFloorInterval=50, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.5.9 (git: 2.5.9, core: 1.5.9), retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, continuousKeepAliveEnabled=true, keepAliveErrorThreshold=4, keepAliveTimeout=2500, autoreleaseAfter=75000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=30000, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$2@43b6a5a6, certAuthEnabled=false, coreSendHook=null, forceSaslPlain=false, queryTimeout=75000, viewTimeout=75000, searchTimeout=75000, analyticsTimeout=75000, kvTimeout=2500, connectTimeout=30000, dnsSrvEnabled=false}