Error Using Java SDK in Spark Job

Hi,

I’m using the Java SDK 2.2.6, in Spark 1.5.1 to perform lookup from Couchbase cluster. I’m aware of the spark connector and plan to use it in our project. However, for this specific case we are performing look up on a stream of events, so I would prefer not to load the whole bucket in as an RDD, hence the choice of Java SDK.

The code I used are very simple:

val ENV: CouchbaseEnvironment = DefaultCouchbaseEnvironment.builder.connectTimeout(60 * 1000).keepAliveInterval(3600 * 1000).build val cluster: Cluster = CouchbaseCluster.create(ENV, SEED_IPS) val bucket: Bucket = cluster.openBucket("test-bucket") var jDoc = bucket.get(myId)

This works well when tested with a dataset of about 30 million records. However when performing a endurance test with much larger dataset I started to get error message like below. Has anyone seen similar error? The couchbase client jar is bundled in my application jar. Why the classNotFoundException? And why was the ChannelException thrown?

Any help is greatly appreciated.

WARN spark.ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: com.couchbase.client.deps.io.netty.channel.ChannelException
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at java.lang.Throwable.readObject(Throwable.java:914)
at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Hi,

since I maintain the spark connector, let me follow up here.

That is not needed at all, in fact the spark connector supports exactly what you are doing here. Your problem is another one that the connector solves: you need to create couchbase connections on the worker, not on the manager. Otherwise it needs to serialize it over the network which obviously can’t work.

In your case an error got raised (ChannelException from netty) which can’t be serialized over the network. The class not found is probably because netty is not available on all executors.

So in your case I wonder if you need to builder either a fat jar with all the deps or add the jars to the classpath on each worker node. In addition, you need to very careful where you open and use the Bucket instance.

If you pull in the spark connector version 1.1.0, check out Couchbase SDKs as an example on how to use it and let me know if you run into issues.

Note that for the ChannelException you’d need to share more logs so we can spot whats going on, its not possible to tell the root cause from just this exception type.

1 Like