Hi Mico,
By default, the downstream operators are executed in the Netty Event Loop thread for the socket connected to the Couchbase Server node. If you had more than one Couchbase Server node, and the documents you were fetching lived on different nodes, you might see different threads being used.
If your processing is expensive and you have more CPUs than Couchbase nodes, it might make sense to distribute the work among all CPUs. You can do this by applying the publishOn
operator to the get
result. This specifies the scheduler to use for executing all downstream operators.
cluster.reactive().query("SELECT META().id FROM `travel-sample`")
.flatMapMany(ReactiveQueryResult::rowsAsObject)
.flatMap(row -> bucket.reactive().defaultCollection().get(row.getString("id"))
.publishOn(Schedulers.boundedElastic()) // <-- ** ADD THIS **
.map(d -> {
System.out.println("map: " + Thread.currentThread().getName());
return d;
})).collectList().block();
That example uses the “bounded elastic” scheduler. There’s also a “parallel” scheduler that might be appropriate if your work is CPU-bound. Alternatively, you could create a scheduler backed by an existing Executor.
Here’s an article about the Reactor threading model which describes how the publishOn
and subscribeOn
operators work:
Thanks,
David