Hi,
I am trying to do bulk upserts using observables and was expecting a big performance improvement. But for 10000 records, it takes about 55 seconds with async and observable and 60 seconds with synchronous using a loop to insert. Am I missing something? Below is the code for reference
Observable
.from(someBatch)
.flatMap(t -> {
final String key = generateId(t);
Optional<SomeObject> optionalSomeObject = repo.findById(key);
if (optionalSomeObject.isPresent()) {
return asyncBucket.mutateIn(key)
.upsert("somepath",t)
.execute()
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
.max(retryCount)
.build())
.doOnError(e -> {
log.error("Error", e);
})
.onErrorResumeNext(Observable.empty());
} else {
return asyncBucket.insert(converter.toDocument(t, key))
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.SECONDS, sleepInSeconds))
.max(retryCount)
.build())
.doOnError(e -> {
log.error("Error", e);
})
.onErrorResumeNext(Observable.empty());
}
})
.toList()
.toBlocking()
.single();
}