JavaSDK 2.3.5
Trying to debug a problem with arrayAppend, i wrote a simple test. I could not find what i need, but i’ve found a different “? bug ?”. The scenario is simple: just perform arrayAppend with multiple Observables and after that fetch the doc with bucket.get() and check the number of elements in array.
The problem (seems like) comes into play when you use more (much more ?) Observables then number of threads in ioPool (UPDATE: in fact, i must operate computationPool threads-number, but changing to that changes nothing, seems like they both have default init of the same size, which is 4 in my testing machine)
Here is the code (i removed “package arrayappend”):
import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.json.JsonArray; import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.env.CouchbaseEnvironment; import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicInteger;
public class ArrayAppend { // number of simulteneous "reactive" calls public static final int MAX_SIMULTENEOUS_OBSERVABLES = 16; // total number of insertions public static final int MAX_INSERTIONS_NUM = 1024; private static final AtomicInteger totalArrayAppendCalls; private static final Bucket bucket; private static final Phaser doneSync; private static final CouchbaseEnvironment ce;
// init couchbase static { ce = DefaultCouchbaseEnvironment .builder() .kvTimeout(10000) .build();
final List<String> nodes = new ArrayList(); nodes.add("192.168.254.80"); nodes.add("192.168.254.81"); nodes.add("192.168.254.82"); final Cluster cluster = CouchbaseCluster.create(ce, nodes); bucket = cluster.openBucket( "storage", "password" ); totalArrayAppendCalls = new AtomicInteger(MAX_INSERTIONS_NUM); doneSync = new Phaser(MAX_SIMULTENEOUS_OBSERVABLES > ce.ioPoolSize() ? ce.ioPoolSize() + 1 : MAX_SIMULTENEOUS_OBSERVABLES + 1) ; } public static final String DOCID = "doc"; public static final String PATH = "array";
public static final void next() { try { final int calls = totalArrayAppendCalls.decrementAndGet(); if(calls >= 0) { bucket.async() .mutateIn(DOCID) .arrayAppend(PATH, System.currentTimeMillis(), false) .execute() .subscribe( (d) -> { }, (e) -> { System.out.println(String.format("error for %d", calls)); next(); }, () -> { next(); } ); } else doneSync.arriveAndAwaitAdvance(); } catch(final Exception e) { System.out.println("... unexpected oops"); } } public static void main(String[] args) { bucket.upsert(JsonDocument.create(DOCID, JsonObject.empty().put(PATH, JsonArray.empty())));
for(int i = 0; i < MAX_SIMULTENEOUS_OBSERVABLES; i++) next(); doneSync.arriveAndAwaitAdvance();
System.out.println("ok, append completed"); int max = 10, imax = 0, size = 0; while(max > 0) { imax = 3; while(imax > 0) { try { size = bucket.get(DOCID).content().getArray(PATH).size(); break; } catch(RuntimeException e) { System.out.println("... hm, timed out, let's retry / " + imax + " / " + e); } imax--; } if(imax <= 0) { System.out.println("unable to fetch document / " + imax); break; } if(size != MAX_INSERTIONS_NUM) { System.out.println("not formed yet => " + size + " / " + MAX_INSERTIONS_NUM); try { Thread.currentThread().sleep(1000); } catch(final Exception e) { System.out.println("...sleep oops"); } } else { System.out.println("doc formation completed => " + size + " / " + MAX_INSERTIONS_NUM); break; } max--; } System.out.println("done"); }
Just set MAX_SIMULTENEOUS_OBSERVABLES >= 2* ioPoolSize and you’ll see: it’s unable to fetch with doc with bucket.get() after multi-observable-calls:
ok, append completed
… hm, timed out, let’s retry / 3 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
… hm, timed out, let’s retry / 2 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
… hm, timed out, let’s retry / 1 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
unable to fetch document / 0
done
it’s pretty good with 1 Observable (we can see async “doc-growing”):
ok, append completed
not formed yet => 1 / 1024
not formed yet => 159 / 1024
not formed yet => 340 / 1024
not formed yet => 514 / 1024
not formed yet => 710 / 1024
not formed yet => 915 / 1024
doc formation completed => 1024 / 1024
done
it’s also good with 4, 6 and with even 8 Observables. But it blocks AFTER using 16+ Observables on simple bucket.get().
What is that ?