As mentioned in the title, I have a Couchbase connector job, that takes a bunch of ids in a batch, does a couchbaseGet() and writes to Hive.
Currently I am facing lots of timeouts and request cancelled exception like below:
java.util.concurrent.TimeoutException
31-08-2020 18:24:42 JST my-app INFO - at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.onTimeout(OnSubscribeTimeoutTimedWithFallback.java:166)
31-08-2020 18:24:42 JST my-app INFO - at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber$TimeoutTask.call(OnSubscribeTimeoutTimedWithFallback.java:191)
31-08-2020 18:24:42 JST my-app INFO - at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
31-08-2020 18:24:42 JST my-app INFO - at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.FutureTask.run(FutureTask.java:266)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
31-08-2020 18:24:42 JST my-app INFO - at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
31-08-2020 18:24:42 JST my-app INFO - at java.lang.Thread.run(Thread.java:748)
Followed by
Caused by: com.couchbase.client.core.RequestCancelledException: Request cancelled in-flight.
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.core.endpoint.AbstractGenericHandler.handleOutstandingOperations(AbstractGenericHandler.java:686)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.core.endpoint.AbstractGenericHandler.handlerRemoved(AbstractGenericHandler.java:667)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.callHandlerRemoved0(DefaultChannelPipeline.java:626)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.destroyDown(DefaultChannelPipeline.java:878)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.destroyUp(DefaultChannelPipeline.java:844)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.destroy(DefaultChannelPipeline.java:836)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.access$700(DefaultChannelPipeline.java:44)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelUnregistered(DefaultChannelPipeline.java:1286)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:176)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:162)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:821)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:776)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
31-08-2020 18:25:28 JST my-app INFO - at com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
31-08-2020 18:25:28 JST my-app INFO - at java.lang.Thread.run(Thread.java:748)
This batch is really big, we need to be able to process up to 100M items at a time, and we have no problem doing that until we upgraded our Spark Connector to 2.3 and our Java SDK to 2.7.2. Before that, we were using Spark Connector 2.1 and Java 2.4.x.
Our code to start the spark session is as below:
Builder sparkSessionBuilder = SparkSession.builder()
.appName(“my-app”)
.config(“spark.couchbase.nodes”, connectorConfig.getCouchbaseNodes())
.config(“com.couchbase.bucket.” + connectorConfig.getBucket(), “”)
.config(“spark.couchbase.username”, connectorConfig.getCouchbaseUserName())
.config(“spark.couchbase.password”, connectorConfig.getCouchbasePassword())
.config(“com.couchbase.kvTimeout”, 75000)
.config(“com.couchbase.connectTimeout”, 75000)
.enableHiveSupport();
JavaRDD mutatedItemJson = couchbaseRDD(mutatedIds).couchbaseGet(); //mutatedIds might be 100M items
I am thinking we can ratelimit the couchbaseGet as it seems to be causing high QPS(up to 700k) and possibly our cluster cannot handle it (CE 5.1.1). If anyone has any advice to tune this batch job or investigate this issue, we would greatly appreciate it