I’m trying to use the couchbase spark connector inside a glue job, but can’t succed. I get the error:
Exception in User Class: java.lang.NoClassDefFoundError : com/couchbase/client/core/error/InvalidArgumentException
My code is:
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import com.couchbase.spark._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
object GlueApp {
def main(args: Array[String]): Unit = {
val jobParams = Seq("JOB_NAME", "CONNECTION_STRING", "USERNAME", "PASSWORD")
val options = GlueArgParser.getResolvedOptions(args, jobParams.toArray)
val connectionString = options("CONNECTION_STRING")
val username = options("USERNAME")
val password = options("PASSWORD")
val conf = new SparkConf()
.setAll(
Seq(
("spark.couchbase.connectionString", connectionString),
("spark.couchbase.username", username),
("spark.couchbase.password", password)
))
val sparkContext = new SparkContext(conf)
val glueContext = new GlueContext(sparkContext)
val sparkSession = glueContext.getSparkSession
Job.init(options("JOB_NAME"), glueContext, options.asJava)
val ids = Seq(Get("645102691_1::coupon_codes"))
val fromKeyspace = Keyspace(bucket = Some("bucket"))
sparkContext
.couchbaseGet(ids, fromKeyspace)
.collect()
.foreach(println)
Job.commit()
}
}
As far as I understand it, it’s because of wrong arguments. I’m passing only host address as connectionString, username and password.
Dependencies:
spark-connector_2.12-3.2.0.jar
scala-client_2.12-1.2.4.jar
2022-04-13 13:16:23,352 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
java.lang.NoClassDefFoundError: com/couchbase/client/core/error/InvalidArgumentException
at com.couchbase.spark.config.CouchbaseConnection$.connection$lzycompute(CouchbaseConnection.scala:164)
at com.couchbase.spark.config.CouchbaseConnection$.connection(CouchbaseConnection.scala:164)
at com.couchbase.spark.config.CouchbaseConnection$.apply(CouchbaseConnection.scala:166)
at com.couchbase.spark.kv.GetRDD.getPartitions(GetRDD.scala:66)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at GlueApp$.main(AWSGlueCouchbaseETLJob.scala:41)
at GlueApp.main(AWSGlueCouchbaseETLJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Caused by: java.lang.ClassNotFoundException: com.couchbase.client.core.error.InvalidArgumentException
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 25 more
2022-04-13 13:16:23,357 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Exception in User Class: java.lang.NoClassDefFoundError : com/couchbase/client/core/error/InvalidArgumentException
com.couchbase.spark.config.CouchbaseConnection$.connection$lzycompute(CouchbaseConnection.scala:164)
com.couchbase.spark.config.CouchbaseConnection$.connection(CouchbaseConnection.scala:164)
com.couchbase.spark.config.CouchbaseConnection$.apply(CouchbaseConnection.scala:166)
com.couchbase.spark.kv.GetRDD.getPartitions(GetRDD.scala:66)
org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
scala.Option.getOrElse(Option.scala:189)
org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
GlueApp$.main(AWSGlueCouchbaseETLJob.scala:41)
GlueApp.main(AWSGlueCouchbaseETLJob.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Couchbase Spark Connector is not tested with Glue Job so cannot say for sure what is going on. One thing obvious that you can try is providing the implicit bucket name in the configuration.
“spark.couchbase.implicitBucket”, "<> "
CouchbaseConnection.scala file has the following code
def bucketName(cfg: CouchbaseConfig, name: Option[String]): String = {
name.orElse(cfg.bucketName) match {
case Some(name) => name
case None => throw InvalidArgumentException
.fromMessage("No bucketName provided (neither configured globally, "
+ "nor in the per-command options)")
}
}
See if it works !
I added the code and I also changed the couchbase spark connector to 3.1 to match the spark 3.1 support on aws glue.
Now, I get this error… Connection refused, must be policy issue, i think.
2022-04-13 16:09:02,255 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:455)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:444)
at org.apache.spark.executor.CoarseGrainedExecutorBackendPlugin.launch(CoarseGrainedExecutorBackendWrapper.scala:10)
at org.apache.spark.executor.CoarseGrainedExecutorBackendPlugin.launch$(CoarseGrainedExecutorBackendWrapper.scala:10)
at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper$$anon$1.launch(CoarseGrainedExecutorBackendWrapper.scala:15)
at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper.launch(CoarseGrainedExecutorBackendWrapper.scala:19)
at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper$.main(CoarseGrainedExecutorBackendWrapper.scala:5)
at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper.main(CoarseGrainedExecutorBackendWrapper.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:475)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:473)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 19 more
Caused by: java.io.IOException: Failed to connect to /172.31.20.189:46005
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:287)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.31.20.189:46005
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:750)
Just noticed this in your exception Failed to connect to /172.31.20.189:46005
curious if this is the connection string you had provided or totally something else ? the connection string for CB need not include port.
This is the glue jobs ip address, i think (Or something like it). I passed only the ip address of the ec2 instance on which the database is located. I’m refreshing all access policies.
1 Like
I have the same error.
Have you fixed?