Running into issues when trying to read data from a Couchbase Capella collection using Spark connector.
To illustrate the problem, here is what we are trying to do:
Write data to Capella using Spark Connector 3.1.x
- Create a new bucket in Capella and allocated 3 GB RAM to that bucket.
- Create a new scope say scope_dw and a new collection, say collection_dw . We also created a primary index on the new collection.
- Use Couchbase Spark 3.1.x connector to write contents of a parquet file (around 60 MB in size, 10 column wide dataset with approximately 2.304 million rows).
- Data successfully written to Couchbase Capella into collection_dw . Couchbase bucket level metrics show 2.11 GB of MEMORY USED and 2.8 GB of DISK USED.
** Read data that was written to Capella using Spark Connector 3.1.x in previous step **
- Read contents of data written into ***collection_dw *** using 5 Spark executors (each having 3GB RAM) and Spark Driver with 3GB memory allocation. Please note that we want to read all data in the collection for our use case. Sample java code added below
SparkSession sparkSession = get underlying Spark session
Dataset<Row> filterDataFrame =*
.format("couchbase.query").option(QueryOptions.Bucket(), "bucket_dw")*
.option(QueryOptions.Scope(), "scope_dw")*
.option(QueryOptions.Collection(), "collection_dw").load();*
- The code above tries to read contents of collection and throws an out of memory error after approximately 6 minutes.
- Details about the error being thrown at Spark Connector level are available at the bottom of this post (for some strange reason, I can not upload documents).
Few interesting observations made while encountering this problem:
- Although the Capella instance is appropriately sized
at Bucket level with enough disk and RAM allocated, CPU usage for Capella went up to 100% when reading 2.3 million rows of data.
Would really appreciate it if you folks could look at the issue and suggest ways to fix it. If you need any additional information, please let me know and I am happy to share.
Logs containing error
[2022-04-02 00:58:21,847] {} INFO - ########### INITIAL SCHEMA JSON FOR DATASET ########### account_tel_test
[2022-04-02 00:58:21,850] {} INFO - root
[2022-04-02 00:58:21,850] {} INFO - |-- __META_ID: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- co: double (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- cpu_usage: double (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- device_id: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- light: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- lpg: double (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- motion: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- sfdc_account_id: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- smoke: double (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- temp: double (nullable = true)
[2022-04-02 00:58:21,850] {} INFO - |-- timestamp: string (nullable = true)
[2022-04-02 00:58:21,850] {} INFO -
[2022-04-02 00:58:22,075] {} INFO - 22/04/02 00:58:22 INFO V2ScanRelationPushDown:
[2022-04-02 00:58:22,075] {} INFO - Pushing operators to bucket_dworkz:dw_scope:account_tel_test
[2022-04-02 00:58:22,075] {} INFO - Pushed Filters:
[2022-04-02 00:58:22,075] {} INFO - Post-Scan Filters:
[2022-04-02 00:58:22,075] {} INFO - Output: __META_ID#27, co#28, cpu_usage#29, device_id#30, light#31, lpg#32, motion#33, sfdc_account_id#34, smoke#35, temp#36, timestamp#37
[2022-04-02 00:58:22,075] {} INFO -
[2022-04-02 00:58:22,212] {} INFO - 22/04/02 00:58:22 INFO CodeGenerator: Code generated in 22.190073 ms
[2022-04-02 00:58:22,237] {} INFO - 22/04/02 00:58:22 INFO SparkContext: Starting job: count at
[2022-04-02 00:58:22,238] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Got job 1 (count at with 1 output partitions
[2022-04-02 00:58:22,239] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Final stage: ResultStage 1 (count at
[2022-04-02 00:58:22,239] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Parents of final stage: List()
[2022-04-02 00:58:22,239] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Missing parents: List()
[2022-04-02 00:58:22,240] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[13] at count at, which has no missing parents
[2022-04-02 00:58:22,246] {} INFO - 22/04/02 00:58:22 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 10.4 KiB, free 1007.8 MiB)
[2022-04-02 00:58:22,248] {} INFO - 22/04/02 00:58:22 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.9 KiB, free 1007.8 MiB)
[2022-04-02 00:58:22,248] {} INFO - 22/04/02 00:58:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on dworkz-airflow-worker-0.dworkz-airflow-worker.dataworkz.svc.cluster.local:33843 (size: 4.9 KiB, free: 1007.8 MiB)
[2022-04-02 00:58:22,249] {} INFO - 22/04/02 00:58:22 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1383
[2022-04-02 00:58:22,249] {} INFO - 22/04/02 00:58:22 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[13] at count at (first 15 tasks are for partitions Vector(0))
[2022-04-02 00:58:22,250] {} INFO - 22/04/02 00:58:22 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
[2022-04-02 00:58:22,255] {} INFO - 22/04/02 00:58:22 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 14) (, executor 5, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 00:58:22,274] {} INFO - 22/04/02 00:58:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:01:09,968] {} INFO - 22/04/02 01:01:09 ERROR TaskSchedulerImpl: Lost executor 5 on Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:01:09,970] {} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/5 is now EXITED (Command exited with code 52)
[2022-04-02 01:01:09,971] {} INFO - 22/04/02 01:01:09 INFO StandaloneSchedulerBackend: Executor app-20220402005813-0010/5 removed: Command exited with code 52
[2022-04-02 01:01:09,971] {} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220402005813-0010/7 on worker-20220402001755- ( with 2 core(s)
[2022-04-02 01:01:09,971] {} INFO - 22/04/02 01:01:09 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 14) ( executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:01:09,971] {} INFO - 22/04/02 01:01:09 INFO StandaloneSchedulerBackend: Granted executor ID app-20220402005813-0010/7 on hostPort with 2 core(s), 2.0 GiB RAM
[2022-04-02 01:01:09,974] {} INFO - 22/04/02 01:01:09 INFO DAGScheduler: Executor lost: 5 (epoch 0)
[2022-04-02 01:01:09,974] {} INFO - 22/04/02 01:01:09 INFO BlockManagerMaster: Removal of executor 5 requested
[2022-04-02 01:01:09,974] {} INFO - 22/04/02 01:01:09 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 5
[2022-04-02 01:01:09,975] {} INFO - 22/04/02 01:01:09 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 15) (, executor 2, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:01:09,978] {} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
[2022-04-02 01:01:09,979] {} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(5,, 45479, None)
[2022-04-02 01:01:09,979] {} INFO - 22/04/02 01:01:09 INFO BlockManagerMasterEndpoint: Trying to remove executor 5 from BlockManagerMaster.
[2022-04-02 01:01:09,979] {} INFO - 22/04/02 01:01:09 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor
[2022-04-02 01:01:09,980] {} INFO - 22/04/02 01:01:09 INFO DAGScheduler: Shuffle files lost for executor: 5 (epoch 0)
[2022-04-02 01:01:09,995] {} INFO - 22/04/02 01:01:09 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/7 is now RUNNING
[2022-04-02 01:01:09,997] {} INFO - 22/04/02 01:01:09 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:01:11,848] {} INFO - 22/04/02 01:01:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) ( with ID 7, ResourceProfileId 0
[2022-04-02 01:01:11,936] {} INFO - 22/04/02 01:01:11 INFO BlockManagerMasterEndpoint: Registering block manager with 912.3 MiB RAM, BlockManagerId(7,, 42095, None)
[2022-04-02 01:02:42,066] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on dworkz-airflow-worker-0.dworkz-airflow-worker.dataworkz.svc.cluster.local:33843 in memory (size: 6.4 KiB, free: 1007.8 MiB)
[2022-04-02 01:02:42,072] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,073] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,074] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:02:42,074] {} INFO - 22/04/02 01:02:42 INFO BlockManagerInfo: Removed broadcast_0_piece0 on in memory (size: 6.4 KiB, free: 912.3 MiB)
[2022-04-02 01:04:02,133] {} INFO - 22/04/02 01:04:02 WARN TaskSetManager: Lost task 0.1 in stage 1.0 (TID 15) ( executor 2): java.lang.OutOfMemoryError: GC overhead limit exceeded
[2022-04-02 01:04:02,133] {} INFO -
[2022-04-02 01:04:02,134] {} INFO - 22/04/02 01:04:02 INFO TaskSetManager: Starting task 0.2 in stage 1.0 (TID 16) (, executor 6, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:04:02,137] {} INFO - 22/04/02 01:04:02 ERROR TaskSchedulerImpl: Lost executor 2 on Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[2022-04-02 01:04:02,137] {} INFO - 22/04/02 01:04:02 INFO DAGScheduler: Executor lost: 2 (epoch 1)
[2022-04-02 01:04:02,137] {} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
[2022-04-02 01:04:02,137] {} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2,, 40099, None)
[2022-04-02 01:04:02,138] {} INFO - 22/04/02 01:04:02 INFO BlockManagerMaster: Removed 2 successfully in removeExecutor
[2022-04-02 01:04:02,138] {} INFO - 22/04/02 01:04:02 INFO DAGScheduler: Shuffle files lost for executor: 2 (epoch 1)
[2022-04-02 01:04:02,153] {} INFO - 22/04/02 01:04:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on (size: 4.9 KiB, free: 912.3 MiB)
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/2 is now EXITED (Command exited with code 52)
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO StandaloneSchedulerBackend: Executor app-20220402005813-0010/2 removed: Command exited with code 52
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220402005813-0010/8 on worker-20220402001756- ( with 2 core(s)
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO BlockManagerMaster: Removal of executor 2 requested
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO StandaloneSchedulerBackend: Granted executor ID app-20220402005813-0010/8 on hostPort with 2 core(s), 2.0 GiB RAM
[2022-04-02 01:04:02,319] {} INFO - 22/04/02 01:04:02 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asked to remove non-existent executor 2
[2022-04-02 01:04:02,343] {} INFO - 22/04/02 01:04:02 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220402005813-0010/8 is now RUNNING
[2022-04-02 01:04:04,271] {} INFO - 22/04/02 01:04:04 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) ( with ID 8, ResourceProfileId 0
[2022-04-02 01:04:04,369] {} INFO - 22/04/02 01:04:04 INFO BlockManagerMasterEndpoint: Registering block manager with 912.3 MiB RAM, BlockManagerId(8,, 41267, None)
[2022-04-02 01:06:28,997] {} INFO - 22/04/02 01:06:28 WARN TaskSetManager: Lost task 0.2 in stage 1.0 (TID 16) ( executor 6): java.lang.OutOfMemoryError: GC overhead limit exceeded
[2022-04-02 01:06:28,997] {} INFO - at sun.nio.cs.UTF_8.newDecoder(
[2022-04-02 01:06:28,997] {} INFO - at java.lang.StringCoding.decode(
[2022-04-02 01:06:28,997] {} INFO - at java.lang.String.(
[2022-04-02 01:06:28,997] {} INFO - at java.lang.String.(
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.client.scala.codec.JsonDeserializer$Passthrough$StringConvert$.deserialize(JsonDeserializer.scala:73)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.client.scala.query.QueryResult.$anonfun$rowsAs$1(QueryResult.scala:54)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.client.scala.query.QueryResult$$Lambda$1390/1525300356.apply(Unknown Source)
[2022-04-02 01:06:28,997] {} INFO - at scala.collection.Iterator$$anon$
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.client.scala.util.RowTraversalUtil$.traverse(RowTraversalUtil.scala:12)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.client.scala.query.QueryResult.rowsAs(QueryResult.scala:53)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader.$anonfun$rows$1(QueryPartitionReader.scala:50)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader$$Lambda$1389/1436792842.apply(Unknown Source)
[2022-04-02 01:06:28,997] {} INFO - at scala.util.Success.flatMap(Try.scala:251)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader.rows$lzycompute(QueryPartitionReader.scala:50)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader.rows(QueryPartitionReader.scala:49)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader.rowIterator$lzycompute(QueryPartitionReader.scala:61)
[2022-04-02 01:06:28,997] {} INFO - at com.couchbase.spark.query.QueryPartitionReader.rowIterator(QueryPartitionReader.scala:61)
[2022-04-02 01:06:28,997] {} INFO - at
[2022-04-02 01:06:28,997] {} INFO - at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
[2022-04-02 01:06:28,998] {} INFO - at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.sql.execution.SparkPlan$$Lambda$787/1698108475.apply(Unknown Source)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
[2022-04-02 01:06:28,998] {} INFO - at org.apache.spark.rdd.RDD$$Lambda$788/805824280.apply(Unknown Source)
[2022-04-02 01:06:28,998] {} INFO -
[2022-04-02 01:06:28,998] {} INFO - 22/04/02 01:06:28 INFO TaskSetManager: Starting task 0.3 in stage 1.0 (TID 17) (, executor 1, partition 0, ANY, 5881 bytes) taskResourceAssignments Map()
[2022-04-02 01:06:29,011] {} INFO - 22/04/02 01:06:29 ERROR TaskSchedulerImpl: Lost executor 6 on Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.