Hi,
I’m using couchbase 4.1, spark 1.6.2 with spark-connector 1.2.1.
The following code ended with OutOfMemory for large file (more then 50GB):
sc.textFile(file_name)//50G
.map(line =>toJsonObject(line))
.zipWithIndex()//or zipWithUniqueId()
.map(pair=>JsonDocument.create(pair._2.toString, pair._1))
.saveToCouchbase()// OutOfMemory
This problem occurs because of the saveToCouchbase implementation.
Simplified:
rdd.foreachPartition(iter => {
val bucket = CouchbaseConnection().bucket(cbConfig).async()
Observable
.from(OnceIterable(iter))
.flatMap(doc => {bucket.upsert(doc) …/error handling/…})
.toBlocking
.last
})
Sending in Couchbase is slower than reading from a file, leaving the entire partition finally loaded into RAM. (Sorry for my English, if I made a mistake)
A possible solution is rdd.repartition(???).foreachPartition(…)
, but what I have to replace the ‘???’ if the amount of data is unknown?
The following solution works more well:
val asyncPartitionSize = 1000 // user defined
rdd.foreachPartition(iter => {
val bucket = CouchbaseConnection().bucket(cbConfig).async()
iter.grouped(asyncPartitionSize).map(_.iterator).foreach(partIter => {
Observable
.from(OnceIterable(partIter ))
.flatMap(doc => {bucket.upsert(doc) …/error handling/…})
.toBlocking
.toIterable
.iterator
})
.flatten// lazy flatten Iterator[Iterator[T]] to Iterator[T]
.last
})
Thus I was able to save file size of 300 GB on a cluster with a total RAM of 20 GB.
However, the same problem is apparent when reading the data from the Couchbase using ‘CouchbaseView’, and I could not solve it.
I do not have experience with Apache Spark, maybe I missed something?