How to achieve async (reactive) capability when using N1QL to query couchbase db. We are using Java.
All APIs in the 2.x
generation of the Java SDK are first exposed as sync APIs, but they all have an matching async implementation (using RxJava
). To access this async API, use the async()
method on the Bucket
instance.
So here goes for a reactive snippet of querying Couchbase using N1QL:
//make sure to static import Expression.* and Select.select
//having connected to the Cluster and obtained a Bucket reference named bucket:
//prepare a statement
Statement statement = select("callsign").from(i("travel-sample") //notice we escape travel-sample using i()
.where(x("type").eq("airline").and(x("country").eq(x("$1")); //$1 is a positional parameter
//construct the query and give value for the placeholder
N1qlQuery query = N1qlQuery.parameterized(statement, JsonArray.from("United States"));
//now it's on! let's query reactively
bucket.async() //switch to asyn API
.query(query)
.timeout(20, TimeUnit.SECONDS) //global timeout for the query
//I'll use Java 8 lambdas but you can convert to anonymous classes, see Action1, Func1, etc...
.flatMap(result -> result.rows()) //browse the resulting rows
.map(row -> row.value()) //extract the value of rows that were obtained
.subscribe(
value -> System.out.println(value), //or do something else with value
runtimeError -> runtimeError.printStackTrace() //or do something else with Exception
);
This snippet has a problem though: N1QL can also return errors as JSON in the response, which are fed into the AsyncN1qlQueryResult.errors()
Observable.
What if you dynamically want to return as many values as possible, but detect errors and transform them into an Exception to deal with in the subscribe? Here is a possible solution that changes the flatMap above:
.flatMap(
result -> {
//errors will be disguised as a row stream that just propagate an Exception
Observable<AsyncN1qlQueryRow> errorsAsRows = result.errors()
.toList() //collect all errors if many
.flatMap(allErrors -> {
//we'll use RuntimeException but you should probably use a custom Exception
Exception asException = new RuntimeException("N1QL errors: " + allErrors.toString());
//trick is to combine Obs.error() with flatMap to propagate an error while seeing the stream as another type
return Observable.<AsyncN1qlQueryRow>error(asException);
});
//merge the two: in best case forward the rows, but if there were errors, let them propagate
return result.rows()).merge(errorsAsRows);
})
Thanks for the information Simon