We are seeing issues while doing the basic reactive calls. This below logs shows it for get and touch but we are seeing issue with reactive API’s(get, upsert, delete …). We have put timeouts as well as retries, even after that we are getting thread blocked issues.
Java SDK version: 2.7.4
Cocuhbase version: 5.1
Source Code:
JsonDocument jsonDocumentPagination = couchbaseRead.getAndTouch(messageIdKey, msgIdDocumentExpiryTime).switchIfEmpty(Flowable.just(JsonDocument.create("GARBAGE"))).blockingSingle(); // Line # 1195
if (!jsonDocumentPagination.id().equals("GARBAGE")) { // Pagination key exists
…
}
/**
* @param key The key to fetch from couchbase and touch the document
* @return The JsonDocument for the provided key
*/
public Flowable<JsonDocument> getAndTouch(String key, int documentExpiryTime) {
long startTime = System.currentTimeMillis();
// Get the bucket based on region (IPC1 / IPC2)
return getAndTouchPrimary(key, documentExpiryTime)
.lift(circuitBreakerOperatorForKey)
.onErrorResumeNext(throwable -> {
return getAndTouchSecondary(key, documentExpiryTime);
})
.doOnEach(action -> {
if (action.isOnComplete()) {
logger.info("{}", new GPCLogEvent("getAndTouch", "fetch key", new GPCEntry<>("key", key), new GPCEntry<>("Time Taken", System.currentTimeMillis() - startTime)));
}
});
}
// coucbbaseRead.retryCountForKey=5
// coucbbaseRead.timeoutForKey=1000
// couchbaseRead.delayBetweenRetriesForKey=10
/**
* @param key The key to fetch from near couchbase instance with document expiry time
* @return The JsonDocument
*/
private Flowable<JsonDocument> getAndTouchPrimary(String key, int documentExpiryTime) {
//logger.info("{}", new GPCLogEvent("getAndTouchPrimary", "IN", new GPCEntry<>("STATE", circuitBreakerForCouchbaseReadKey.getState().name()), new GPCEntry<>("Call Permitted", circuitBreakerForCouchbaseReadKey.isCallPermitted())));
if (circuitBreakerForCouchbaseReadKey.isCallPermitted()) {
if (currentBucket.getBucketType() != null && currentBucket.getBucketType().equals(BucketType.MAIN)) {
return RxJavaInterop.toV2Flowable(readPrimaryMainAsyncBucket
.getAndTouch(key, documentExpiryTime, timeoutForKey, TimeUnit.MILLISECONDS)
.retryWhen(RetryBuilder.anyOf(TimeoutException.class).delay(Delay.fixed(delayBetweenRetriesForKey, TimeUnit.MICROSECONDS)).max(retryCountForKey).build()));
} else if (currentBucket.getBucketType() != null && currentBucket.getBucketType().equals(BucketType.BACKUP)) {
return RxJavaInterop.toV2Flowable(readPrimaryBackupAsyncBucket
.getAndTouch(key, documentExpiryTime, timeoutForKey, TimeUnit.MILLISECONDS)
.retryWhen(RetryBuilder.anyOf(TimeoutException.class).delay(Delay.fixed(delayBetweenRetriesForKey, TimeUnit.MICROSECONDS)).max(retryCountForKey).build()));
} else {
logger.error("{}", new GPCLogEvent("getAndTouchPrimary", "ERROR"));
return Flowable.error(new DetermineBucketException("getAndTouchPrimary: " + RUN_TIME_BUCKET_CANNOT_BE_DETERMINED));
}
} else {
return Flowable.error(new GenericException("getAndTouchPrimary: Circuit is OPEN"));
}
}
Logs
05:57:35.295 [vertx-blocked-thread-checker] WARN i.v.core.impl.BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 1729768 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:85)
at io.reactivex.Single.blockingGet(Single.java:2835)
at io.reactivex.Flowable.blockingSingle(Flowable.java:5910)
at com.test.gpc.spd.verticle.SPDServerVerticle.getAllProduct(SPDServerVerticle.java:1195)
at com.test.gpc.spd.verticle.SPDServerVerticle$$Lambda$304/807571894.handle(Unknown Source)
at io.vertx.ext.web.impl.RouteImpl.handleContext(RouteImpl.java:227)
at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:121)
at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:134)
at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:296)
at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:276)
at io.vertx.ext.web.handler.impl.BodyHandlerImpl.lambda$handle$0(BodyHandlerImpl.java:87)
at io.vertx.ext.web.handler.impl.BodyHandlerImpl$$Lambda$412/503098263.handle(Unknown Source)
at io.vertx.core.http.impl.HttpServerRequestImpl.doEnd(HttpServerRequestImpl.java:537)
at io.vertx.core.http.impl.HttpServerRequestImpl.handleEnd(HttpServerRequestImpl.java:526)
at io.vertx.core.http.impl.Http1xServerConnection.handleEnd(Http1xServerConnection.java:167)
at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:136)
at io.vertx.core.net.impl.ConnectionBase.handleRead(ConnectionBase.java:390)
at io.vertx.core.net.impl.VertxHandler$$Lambda$380/266170253.handle(Unknown Source)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:188)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:174)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1476)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1225)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1272)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:421)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:321)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)