Unable to create new native thread

I am using scala SDK in spark environment. I am facing this error. I have 4 nodes dataproc cluster. I am using scala SDK to insert documents into couchbase one by one. For each 1000 documents, it gets cluster and insert those and call cluster.disconnect. But after a while, I am getting these error during importing 5 million documents. For 1 million, this error does not pop up. I think here there may be issue in cleaning threads embedded with scala SDK,

def getCluster(cbHost:String,cbUser:String,cbPass:String) = {
val timeout = TimeoutConfig().kvTimeout(600.seconds).queryTimeout(600.seconds).searchTimeout(600.seconds).connectTimeout(600.seconds).managementTimeout(600.seconds).viewTimeout(600.seconds)
val ce = ClusterEnvironment.builder.timeoutConfig(timeout).build
val co: ClusterOptions = ClusterOptions.create(cbUser, cbPass).environment(ce.get)
val cluster = Cluster.connect(cbHost, co).get
cluster
}

20/03/12 14:08:17 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 8247,5,main]
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1603)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:549)
at reactor.core.scheduler.Schedulers.directSchedule(Schedulers.java:1114)
at reactor.core.scheduler.ParallelScheduler.schedule(ParallelScheduler.java:159)
at reactor.core.publisher.MonoDelay.subscribe(MonoDelay.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.MonoTimeout.subscribeOrReturn(MonoTimeout.java:82)
at reactor.core.publisher.Mono.subscribe(Mono.java:4090)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
at reactor.core.publisher.Mono.subscribe(Mono.java:4105)
at reactor.core.publisher.Mono.block(Mono.java:1662)
at reactor.core.scala.publisher.SMono.block(SMono.scala:106)
at reactor.core.scala.publisher.SMono.block$(SMono.scala:105)
at reactor.core.scala.publisher.ReactiveSMono.block(SMono.scala:1587)
at com.couchbase.client.scala.Cluster.disconnect(Cluster.scala:245)

Hi @eakarsu

Apologies for the extremely late reply, I only saw this just now on scanning the Scala SDK category.

Because you’re creating a ClusterEnvironment explicitly (rather than allowing cluster.connect to make a default one for you under the hood), you also need to explicitly destroy it. Please see these docs: Managing Connections | Couchbase Docs