I’m trying to ingest a large csv file (0.3M rows, 93 columns) into couchbase-server-enterprise_4.0.0, using the spark-connector-1.0.0-beta. Here is how I’m uploading data to Couchbase:
val rowRDD = sc.textFile(filename).map{row =>
val obj = JsonObject.create()
(keys zip row.split("\t")).foreach{case(k, v) => obj.put(k, v)}
JsonDocument.create(obj.get("id").asInstanceOf[String], obj)
}
.saveToCouchbase()
Here is the exception I’m seeing:
15/10/05 09:19:40 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/10/05 09:19:40 INFO TaskSchedulerImpl: Cancelling stage 0
15/10/05 09:19:40 INFO DAGScheduler: Stage 0 (foreachPartition at DocumentRDDFunctions.scala:39) failed in 2.535 s
15/10/05 09:19:40 INFO DAGScheduler: Job 0 failed: foreachPartition at DocumentRDDFunctions.scala:39, took 2.574053 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, localhost): com.couchbase.client.java.error.TemporaryFailureException
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:512)
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:493)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.observers.Subscribers$5.onNext(Subscribers.java:234)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:256)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:104)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
... 12 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
The exception seems to be caused by a Server too busy, any workaround to insert large RDD/documents into Couchbase?