Hello,
I am using couchbase 4.5 with the latest java driver. I have a simple program that fetches some data using N1QL and then increments an atomic counter. The problem is that the stream completes but only processed 450000 records. Can someone explain this to me?
val sourceCounter = new AtomicLong(0) //this is only equal to 450000 after completion
val query = "select * from foo limit 1000000"
val docs = for {
b <- bucket
docs <- b.query(N1qlQuery.simple(query)).toScala.flatMap(r => r.rows().toScala.map(_.toString))
} yield docs
docs
.foreach(
onNext = {n:String => sourceCounter.incrementAndGet()},
onError={e:Throwable => logger.error("error in subscriber", e)},
() => {
logger.info("completed")
})
My task is to process 6 million records but I need to get past this. No error is being thrown…I just see the completed
log message above.
Thanks,
-K
UPDATE
I also tried using flatMap
but still encountered the same result:
docs
.flatMap(10, {Observable.just(_)})
.toBlocking
.subscribe( onNext = {n:String => sourceCounter.incrementAndGet()},
onError={e:Throwable => logger.error("error in subscriber", e)},
() => {
logger.info("completed")
})
SOLVED
I just needed to increase the serverSideTimeout
:
b.query(N1qlQuery.simple(query(count = true), N1qlParams.build().serverSideTimeout(10, TimeUnit.MINUTES))