Hey there,
I’m pretty new to using the async bucket in the Couchbase SDK.
I would like to fetch a large volume of documents (in the range of 50000-100000) and process them one by one. I thought that using RxJava would lean itself to this kind of task.
Below you can see my pipeline:
String queryString = String.format("SELECT CDT.*, meta().id "
+ "FROM %s as CDT "
+ "WHERE `_class` = '%s'",
getQuotedBucketName(),
clazz.getCanonicalName());
N1qlParams params = N1qlParams.build().consistency(ScanConsistency.NOT_BOUNDED).pretty(false);
N1qlQuery query = N1qlQuery.simple(queryString, params);
asyncBucket
.query(query)
.flatMap(AsyncN1qlQueryResult::rows)
.map(this::readTask)
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(consumer::accept,
e -> LOGGER.error("Failed to delete chunk.", e));
Using this helper function to map the results:
public Optional<ChunkDeletionTask> readTask(AsyncN1qlQueryRow row) {
try {
return Optional.of(objectMapper.readValue(row.value().toString(), clazz));
} catch (IOException e) {
LOGGER.warn("Could not map ChunkDeletionTask to object.", e);
return Optional.empty();
}
}
The consumer in turn deletes the given entity.
When there is a lot of objects returned from the query, I get the infamous exception IllegalStateException: The content of this Observable is already released. Subscribe earlier or tune the CouchbaseEnvironment#autoreleaseAfter() setting.
.
Can someone give me a hint as to what I am doing wrong? I assumed that using backpreassure, rows are only fetched when the service is ready to handle them.