Upserts failure on thousands of rows using couchbase spark connector

I am using couchbase-spark-connector_2.10-1.0.1.jar to upsert records to one bucket.
The spark I am using is 1.5.1-cdh4.
It complained about
com.couchbase.client.java.error.TemporaryFailureException
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:512)
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:493)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.observers.Subscribers$5.onNext(Subscribers.java:234)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:265)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class

I searched around, and it seems that it is related to https://issues.couchbase.com/browse/SPARKC-36
And it is claimed to be resolved in 1.0.1/1.1.0. I tried the connector with version 1.0.1, but still had the same issue. And after building the 1.1.0 from source, it is missing other dependencies such as Json related libraries.

May I ask whether couchbase is releasing a new connector ? or how can I build a jar with all the dependencies.

thanks
edward

I tried spark 1.4 for couchbase-spark-connector_2.10-1.0.1.jar, and it resulted in the same errors.

After building the spark-connector_2.10-1.1.0.jar, it only contains < 150 class files, however the official release of the couchbase-spark-connector_2.10-1.0.1.jar contains more than 4000 classes. Looks like there are significant number of classes missing.

for example, when I ran the latest build of spark-connector_2.10-1.1.0.jar, it complained:
Exception in thread “main” java.lang.NoClassDefFoundError: com/couchbase/client/java/document/json/JsonObject
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2625)
at java.lang.Class.getMethod0(Class.java:2866)
at java.lang.Class.getMethod(Class.java:1676)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:657)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.couchbase.client.java.document.json.JsonObject
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

Any suggestions would be appreciated!

best,

Hi,
I had this issue(not with spark but with oracle) and the same message,
this is maybe I/O speed issue, do you have problem with fewer rows?
i tried to insert 110000 row each time and got the “temporary failure” message, then i use “interrupt” function in my code and it is OK!

Thanks!
It works fine if the json object is small. But when my json object is around 32k, it failed.
I have to make it work by repartition the data and load these partitions sequentially.

What is the interrupt function? Do you have a link?

I meant command like “sleep(in sec)” in python for example

I am using the couchbase-spark-connector which is implemented in scala. My goal is to leverage spark to load data to couchbase in a distributed fashion.

I am wondering if anyone has the same experience as I had.

I was able to build a couchbase-spark-connector-1.1.0.jar to include all the dependencies by sbt assembly.
However after I ran my job, it still complains about :

slight_smile:16/05/16 10:38:27 WARN TaskSetManager: Lost task 16.0 in stage 3.0 (TID 440, 172.19.62.10): com.couchbase.client.java.error.TemporaryFailureException
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:569)
at com.couchbase.client.java.CouchbaseAsyncBucket$16.call(CouchbaseAsyncBucket.java:550)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
at rx.observers.Subscribers$5.onNext(Subscribers.java:234)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:222)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:101)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:265)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:109)
at rx.exceptions.Exceptions.throwOrReport(Exceptions.java:188)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:56)
… 12 more

It seems that 1.1.0 still hasn’t resolved the problem.

I was able to resolve the problem by modifying the DocumentRDDFunctions.scala.
The latest source code only fixes the problem when retrieving the data, but no change was made to saveToCouchbase(). I added retries feature and it worked.

Could you share your patch? Is it possible to push it to official repository?

I believe the latest spark couchbase connector has this fixed already.

Yep should be fixed with the latest releases.