Hi,
I am trying to read couchbase documents from bucket through spark using spark-couchbase connector, but i am getting error while running.
Check below code and error log file.
build.sbt
name := “KafkaSparkCouchReadWrite”
organization := “my.rohit”
version := “1.0.0-SNAPSHOT”
scalaVersion := “2.11.11”
libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “2.1.0”,
“org.apache.spark” %% “spark-streaming” % “2.1.0”,
“org.apache.spark” %% “spark-sql” % “2.1.0”,
“org.apache.spark” % “spark-streaming-kafka-0-10_2.11” % “2.2.0”,
“com.couchbase.client” %% “spark-connector” % “2.1.0”,
“org.glassfish.hk2” % “hk2-utils” % “2.2.0-b27”,
“org.glassfish.hk2” % “hk2-locator” % “2.2.0-b27”,
“javax.validation” % “validation-api” % “1.1.0.Final”,
“org.apache.kafka” %% “kafka” % “0.11.0.0”,
“com.googlecode.json-simple” % “json-simple” % “1.1”
).map(_.excludeAll(ExclusionRule(“org.glassfish.hk2”),ExclusionRule(“javax.validation”)))
SparkRead.scala
package com.*******.demo.sparkstreaming
import com.couchbase.client.java.document.JsonDocument
import org.apache.spark.sql.SparkSession
import com.couchbase.spark._
/**
- Created by cloudera on 12/8/17.
*/
object SparkRead {
def main(args: Array[String]): Unit = {
// The SparkSession is the main entry point into spark
val spark = SparkSession
.builder()
.appName("KeyValueExample")
.master("local[*]") // use the JVM as the master, great for testing
.config("spark.couchbase.nodes", "<ip-address>") // connect to couchbase on hostname
.config("spark.couchbase.
.config("spark.couchbase.bucket.travel-sample","") // open the travel-sample bucket with empty password
.config("com.couchbase.username", "<user>")
.config("com.couchbase.password", "<pass>")
.getOrCreate()
spark.sparkContext
.couchbaseGet[JsonDocument](Seq("airline_10123")) // Load documents from couchbase
.collect() // collect all data from the spark workers
.foreach(println) // print each document content
}
}
Log File
17/12/11 15:14:41 INFO CouchbaseCore: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile=‘null’, sslKeystorePassword=false, sslKeystore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=4, computationPoolSize=4, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.4.2 (git: 2.4.2, core: 1.4.2), dcpEnabled=false, retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, autoreleaseAfter=2000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=1000, dcpConnectionBufferSize=20971520, dcpConnectionBufferAckThreshold=0.2, dcpConnectionName=dcp/core-io, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$2@73ab3aac, queryTimeout=75000, viewTimeout=75000, kvTimeout=2500, connectTimeout=5000, dnsSrvEnabled=false}
17/12/11 15:14:42 WARN Endpoint: [null][KeyValueEndpoint]: Authentication Failure.
17/12/11 15:14:42 INFO Endpoint: [null][KeyValueEndpoint]: Got notified from Channel as inactive, attempting reconnect.
17/12/11 15:14:43 WARN ResponseStatusConverter: Unknown ResponseStatus with Protocol HTTP: 401
17/12/11 15:14:43 WARN Endpoint: [null][KeyValueEndpoint]: Authentication Failure.
17/12/11 15:14:43 WARN Endpoint: Error during reconnect:
com.couchbase.client.core.endpoint.kv.AuthenticationException: Authentication Failure
at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.checkIsAuthed(KeyValueAuthHandler.java:288)
at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java:173)
at com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java:52)
at com.couchbase.client.deps.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:343)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:336)
at com.couchbase.client.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/12/11 15:14:43 INFO CouchbaseConnection: Performing Couchbase SDK Shutdown
17/12/11 15:14:43 INFO SparkContext: Invoking stop() from shutdown hook
17/12/11 15:14:43 INFO SparkUI: Stopped Spark web UI at http://192.168.33.193:4041
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown IoPool: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown kvIoPool: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown viewIoPool: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown queryIoPool: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown searchIoPool: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown Core Scheduler: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown Runtime Metrics Collector: success
17/12/11 15:14:43 INFO CoreEnvironment: Shutdown Latency Metrics Collector: success
17/12/11 15:14:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/12/11 15:14:43 INFO MemoryStore: MemoryStore cleared
17/12/11 15:14:43 INFO BlockManager: BlockManager stopped
17/12/11 15:14:43 INFO BlockManagerMaster: BlockManagerMaster stopped
17/12/11 15:14:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/12/11 15:14:43 INFO SparkContext: Successfully stopped SparkContext
17/12/11 15:14:43 INFO ShutdownHookManager: Shutdown hook called
17/12/11 15:14:43 INFO ShutdownHookManager: Deleting directory C:\Users\Rohit Vangari\AppData\Local\Temp\spark-ef2fed72-f9dd-40ea-93f8-f595cf3c262f
17/12/11 15:14:45 INFO CoreEnvironment: Shutdown Netty: success
Process finished with exit code 1
Thanks,
Rohit Vangari