In the getting started of Spark Connector steps for setting Couchbase RDD as follows:
// Configure Spark
val cfg = new SparkConf()
.setAppName(“couchbaseQuickstart”) // give your app a name
.setMaster(“local[*]”) // set the master to local for easy experimenting
.set(“com.couchbase.bucket.travel-sample”, “”) // open the travel-sample bucket instead of “default”
// .set(“com.couchbase.nodes”, “127.0.0.1”) // ; separated list of bootstrap nodes, 127.0.0.1 is the default
// Generate The Context
val sc = new SparkContext(cfg)
Let’s assume I’d like to read documents from ‘default’ and ‘travel-sample’.
What can I write my code for the above purpose ?
You should be able to add another bucket and then refer to it by name after that. @daschl is going to update the Spark Samples to include an example of this. In the meantime, here’s a starter hint based on the KV sample
object KeyValueExample {
def main(args: Array[String]): Unit = {
// Configure Spark
val cfg = new SparkConf()
.setAppName("keyValueExample")
.setMaster("local[*]")
.set("com.couchbase.bucket.travel-sample", "")
.set("com.couchbase.bucket.beer-sample", "foo") //Adds a second bucket name with bucket password foo
// Generate The Context
val sc = new SparkContext(cfg)
sc
.parallelize(Seq("airline_10123", "airline_10748")) // Define Document IDs
.couchbaseGet[JsonDocument](bucketName = "travel-sample") // Set the bucket name explicitly (you can leave blank if only one bucket)
.map(_.content()) // extract the content
.collect() // collect all data
.foreach(println) // print it out
}
}
One quick note: if you open more than one bucket and forget to set the bucket name explicitly you’ll get an exception pretty quickly since it will tell you the bucket name is ambiguous and its not able to choose one
The following is the code for opening both the buckets. SparkSession sparkSession = SparkSession.builder().config(conf) .config("spark.couchbase.bucket.incrementaldata", "") .config("spark.couchbase.bucket.basedata","").getOrCreate();
Below is the code for accessing the needed bucket EqualTo bucket = new EqualTo("bucketName","basedata"); EqualTo type = new EqualTo("type","custom");
And filters = new And(bucket, type); return couchbaseReader(sparkSession.sqlContext().read()).couchbase(filters);
However, when the code is executed only the incremental data bucket opens even after specifying the bucket as basedata.
@neeleshkumar_mannur I think you’re messing up some concepts here. The type is correctly in the filter but the bucketName is not part of the filter (unless its part of the JSON document structure too which I don’t think is the case).
You need to provide the bucket name as an “option” with the key “bucket”.
@daschl My bad. I forgot to open both buckets. I am able to query the respective buckets now. Sorry for the inconvenience. Thanks for helping out. And if possible, please update Java code examples for such cases also.
Finally, just wanted to know how can I increase the timeout interval when opening the bucket in Spark.
@neeleshkumar_mannur changing the bucket timeout with spark only really works by setting a system property “com.couchbase.connectTimeout” in milliseconds. There is “spark.driver.extraJavaOptions” as well as “–driver-java-options” available in spark to do so I think, according to Configuration - Spark 3.5.1 Documentation
While running for reading the data it worked seamlessly with the code below. with one bucket in the config.
.config(“com.couchbase.bucket.B1”, “”)
val ref_qry1=""“select * from employee limit 10"”"
val df= spark.read.json(sc.couchbaseQuery(N1qlQuery.simple(ref_qry1)).map(_.value.toString()))
df,show(false)
I am facing issue while persisting the dataframe to couchbase. For that I have to keep all the buckets I am persisting to in the config. otherwise below method is not able to recognize bucket but If I add more than 1 bucket in configuration read data will fail if not write will fail.