Hello, I have the following code that upserts a collection of pojos( to be turned to json documents) into couchbase using Observables:
import com.couchbase.client.core.BackpressureException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.N1qlQueryRow;
import com.couchbase.client.java.util.retry.RetryBuilder;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import rx.Observable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
public <T> long batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {
AtomicLong counter = new AtomicLong();
AsyncBucket asyncBucket = couchbaseBucket.async();
Observable<JsonDocument> observableFromDocs =
Observable
.from(items)
.map(elem -> docCreator.apply(elem))
.filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
.flatMap(elem -> upsertDocument(elem, asyncBucket))
.retryWhen(
RetryBuilder.anyOf(BackpressureException.class, Exception.class)
.doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
.max(MAX_RETRIES)
.build()
);
observableFromDocs.subscribe(
(elem)-> {},
elem -> log.error("Document insertion failure", elem),
() -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});
return counter.get();
}
private Observable<JsonDocument> upsertDocument(JsonDocument document, AsyncBucket asyncBucket){
Observable<JsonDocument> observableDoc = asyncBucket.upsert(document);
return observableDoc;
}
This code runs fine, the documents get created and the async call is made to upsert but no document is upserted to couchbase, it fails silently, and no exceptions are logged, almost as if the thread dies internally, can someone kindly point what am I doing wrong? I am about to pull my hairs…
I now verified it fails for collections of 1 item, and runs when there are more than one item in the iterable, can someone tell me why that’s happening?
EDIT
added requested code and imports