Kafka connector out of memory error

We are using the latest couchbase kafka connector and trying to connect a bucket with millions of docs. When we use the flags BEGINNING or SAVED_OFFSET_OR_BEGINNING we are seeing the below error. When running with the NOW flag it runs fine. Any ideas what is causing this?

[2018-12-12 20:41:12,421] WARN An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. (com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline)
com.couchbase.client.deps.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 6425673735, max: 6442450944)
at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:535)
at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:489)
at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:766)
at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:742)
at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
at com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:333)
at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:183)
at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:174)
at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:135)
at com.couchbase.client.deps.io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

1 Like

Hi Stephen,

What is the value of the connector’s use_snapshots setting?

There is a known issue with the connector running out of memory when the use_snapshots configuration value is true and the first snapshot is very large. When streaming from the beginning of history, the first snapshot consists of just about everything. The recommendation in that case would be to set use_snapshots to false until KAFKAC-132 is resolved.

Thanks,
David

Hi David,
Thanks for the reply. It looks like use_snapshots is is set to false. Are there any other settings that we might need to adjust?

Stephen

1 Like

Hi Stephen,

Can you try setting the couchbase.compression config property to DISABLED ? Let me know if that makes the problem go away.

Another thing to try would be to enable buffer leak detection. Set the EXTRA_ARGS environment variable before starting the Kafka Connect worker process:

export EXTRA_ARGS=-Dcom.couchbase.client.deps.io.netty.leakDetection.level=paranoid

If a leak is detected, there will be an “ERROR LEAK” message in the log with information that can help us pinpoint the location of the leak.

Thanks,
David

Hi David,
It looks like we already had couchbase.compression disabled. We initially set the leakDetection level to advanced and below is the output. Let us know if it would still be helpful to set to paranoid.

[2018-12-14 22:49:42,594] ERROR LEAK: ByteBuf.release() was not called before it’s garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.getLong(AdvancedLeakAwareByteBuf.java:176)
com.couchbase.client.dcp.message.DcpMutationMessage.bySeqno(DcpMutationMessage.java:64)
com.couchbase.client.dcp.Client$5.onEvent(Client.java:299)
com.couchbase.client.dcp.buffer.StreamEventBuffer.onSeqnoPersisted(StreamEventBuffer.java:228)
com.couchbase.client.dcp.buffer.PersistencePollingHandler$5.onSuccess(PersistencePollingHandler.java:176)
com.couchbase.client.dcp.buffer.PersistencePollingHandler$5.onSuccess(PersistencePollingHandler.java:165)
rx.internal.operators.SingleFromEmitter$SingleEmitterImpl.onSuccess(SingleFromEmitter.java:85)
com.couchbase.client.dcp.buffer.DcpOpsImpl$5$1.operationComplete(DcpOpsImpl.java:101)
com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:512)
com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486)
com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:425)
com.couchbase.client.deps.io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:95)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:317)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#2:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:68)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:462)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:417)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:491)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:430)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#3:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.getByte(AdvancedLeakAwareByteBuf.java:128)
com.couchbase.client.dcp.message.DcpMutationMessage.is(DcpMutationMessage.java:29)
com.couchbase.client.dcp.buffer.StreamEventBuffer.onEvent(StreamEventBuffer.java:115)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.handleRequest(DcpMessageHandler.java:342)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:291)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:299)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:415)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#4:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.getByte(AdvancedLeakAwareByteBuf.java:128)
com.couchbase.client.dcp.message.DcpMutationMessage.is(DcpMutationMessage.java:29)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.isDataMessage(DcpMessageHandler.java:380)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.handleRequest(DcpMessageHandler.java:341)
com.couchbase.client.dcp.transport.netty.DcpMessageHandler.channelRead(DcpMessageHandler.java:291)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:299)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:415)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#5:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.slice(AdvancedLeakAwareByteBuf.java:80)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.extractFrame(LengthFieldBasedFrameDecoder.java:517)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:448)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:491)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:430)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#6:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedInt(AdvancedLeakAwareByteBuf.java:170)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:475)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:417)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:491)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:430)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#7:
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.extractFrame(LengthFieldBasedFrameDecoder.java:517)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:448)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:491)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:430)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#8:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.skipBytes(AdvancedLeakAwareByteBuf.java:488)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:443)
com.couchbase.client.deps.io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:491)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:430)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
#9:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:548)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:94)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
Created at:
com.couchbase.client.deps.io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:548)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:519)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:90)
com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1304)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)
com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)
com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:921)
com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:135)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
: 1079 leak records were discarded because they were duplicates
: 1079 leak records were discarded because the leak record count is targeted to 4. Use system property com.couchbase.client.deps.io.netty.leakDetection.targetRecords to increase the limit. (com.couchbase.client.deps.io.netty.util.ResourceLeakDetector)

Hi Stephen,

That log output looks familiar – I’m assisting with your team’s support escalation. Thanks for generating and sharing the leak detection log. Using “advanced” is fine; no need to use “paranoid” at this point.

With the help of your log file, we’ve identified a few code paths where buffers could potentially leak. It’s not certain they are the same leaks you’re seeing, so we’ll also add a configuration property to disable buffer pooling as last resort.

Thanks,
David

Great, thanks for the update. Keep us posted and appreciate the help!

Version 3.4.1 of the connector is now available. It has fixes for some potential memory leaks. Can you give it a try and see if it fixes the leaks you observed?

Great, we will try it out and let you know.

Hi David,

I have the same issue using the latest version of the connector 3.4.4. The connector run with this configugation:
{
“connector.class”:“com.couchbase.connect.kafka.CouchbaseSourceConnector”,
“connection.password”:“XXXX”,
“tasks.max”:“3”,
“couchbase.compression”:“DISABLED”,
“connection.timeout.ms”:“2000”,
“connection.username”:“Administrator”,
“couchbase.stream_from”:“SAVED_OFFSET_OR_BEGINNING”,
“transforms”:“deserializeJson”,
“couchbase.flow_control_buffer”:“128m”,
“dcp.message.converter.class”:“com.couchbase.connect.kafka.handler.source.RawJsonWithMetadataSourceHandler”,
“connection.bucket”:“my-bucket”,
“event.filter.class”:“com.couchbase.connect.kafka.filter.AllPassFilter”,
“value.converter.schemas.enable”:“true”,
“name”:“couchbase-connector-source”,
“topic.name”:“bucket-couchbase-v1”,
“couchbase.persistence_polling_interval”:“100ms”,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“transforms.deserializeJson.type”:“com.couchbase.connect.kafka.transform.DeserializeJson”,
“connection.cluster_address”:“10.XXXXX,10.XXXXX,10XXXX”,
“use_snapshots”:“false”
}

The stack trace:

[2019-07-11 07:08:08,209] WARN An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception. (com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline:151)
11/07/2019 09:08:08com.couchbase.client.deps.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 2130706439, max: 2147483648)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:535)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:489)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:766)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:742)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocate(PoolArena.java:226)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PoolArena.allocate(PoolArena.java:146)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:333)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:183)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:174)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:135)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
11/07/2019 09:08:08 at com.couchbase.client.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
11/07/2019 09:08:08 at java.lang.Thread.run(Thread.java:748)

Thanks,
Samir