Hi,
I am currently trying to use bulk upsert using sub-document api and observables. But for the upsert to work, the path and key has to exist otherwise it throws exception. Not sure what’s the best way to handle the exception case. Here’s the piece of code that I am working on. Any help will be appreciated.
private void createOrUpdateObjects(List someBatch) {
AsyncBucket asyncBucket = couchbaseBucket.async();
Observable
.from(someBatch)
.flatMap(t -> {
final String key = generateId(t);
Optional<SomeObject> optionalSomeObject = repo.findById(key);
if (optionalSomeObject.isPresent()) {
return asyncBucket.mutateIn(key)
.upsert("somepath",t)
.execute()
.doOnError(e -> {
log.error("Error", e);
})
.onErrorResumeNext(Observable.empty())
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
.max(retryCount)
.build());
} else {
return asyncBucket.insert(converter.toDocument(t, key))
.doOnError(e -> {
log.error("Error", e);
})
.onErrorResumeNext(Observable.empty())
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
.max(retryCount)
.build());
}
})
.toList()
.toBlocking()
.single();
}