Hi,
I am trying to use bulk get using reactive API but the performance I am getting is not that good. Can you look at code and suggest what I am doing wrong.
TestReactive.java
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.AsyncN1qlQueryResult;
import com.couchbase.client.java.query.AsyncN1qlQueryRow;
import com.couchbase.client.java.query.N1qlQuery;
import rx.Observable;
import rx.functions.Func1;
public class TestReactive {
public static void main(String[] args) {
/*Logger logger = Logger.getLogger("com.couchbase");
logger.setLevel(Level.FINEST);
for (Handler h : logger.getParent().getHandlers()) {
if (h instanceof ConsoleHandler) {
h.setLevel(Level.FINEST);
}
}*/
TestReactive testReactive = new TestReactive();
testReactive.testCode();
}
public void testCode() {
CouchbaseEnvironment couchbaseEnvironment = DefaultCouchbaseEnvironment.builder()
.queryTimeout(10000000)
.kvTimeout(10000000)
.searchTimeout(1000000)
.socketConnectTimeout(1000000000)
.managementTimeout(1000000000)
.connectTimeout(1000000000)
.viewTimeout(1000000000)
.autoreleaseAfter(1000000000)
.build();
Cluster cluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("pdcbcl00149.ind.test.com"));
Bucket bucket = cluster.openBucket("gpc", "G1@b@l123");
AsyncBucket asyncBucket = bucket.async();
String n1ql = MessageFormat.format("select raw meta().id from `{0}` LIMIT 500", asyncBucket.name());
//Get 500 meta() ids
long startTime = System.currentTimeMillis();
List<String> metaIds = asyncBucket.query(N1qlQuery.simple(n1ql))
.doOnNext(res -> res.info().forEach(t -> {
System.out.println("N1QlMetrics: " + t);;
}))
.flatMap(AsyncN1qlQueryResult::rows)
.flatMap(new Func1 <AsyncN1qlQueryRow, Observable <String>>() {
@Override
public Observable<String> call(AsyncN1qlQueryRow asyncN1qlQueryRow) {
return Observable.just(new String(asyncN1qlQueryRow.byteValue()).replace("\"", "").replace("[", "").replace("]", ""));
}
})
.toList()
.toBlocking()
.single();
System.out.println("Time taken in getting meta().id: " + (System.currentTimeMillis() - startTime));
// JVM warm-up
Observable.from(metaIds.subList(0, 100))
.flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String metaId) {
return asyncBucket.get(metaId).singleOrDefault(JsonDocument.create(metaId));
}
})
.toList()
.toBlocking()
.single();
// Get the actual 500 JsonDocuments
startTime = System.currentTimeMillis();
List<JsonDocument> documents = Observable.from(metaIds).flatMap(new Func1<String, Observable<JsonDocument>>() {
@Override
public Observable<JsonDocument> call(String metaId) {
return asyncBucket.get(metaId).singleOrDefault(JsonDocument.create(metaId));
}
})
.toList()
.toBlocking()
.single();
System.out.println("Time taken in getting 500 documents: " + (System.currentTimeMillis() - startTime));
bucket.close();
System.out.println("Completed.....");
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.couchbase.test</groupId>
<artifactId>standAloneApplication</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.couchbase.client</groupId>
<artifactId>java-client</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
</project>
Logs:
sh-4.2$java -cp .:/opt/app-root/src/target/lib/* TestReactive
Jul 20, 2018 7:39:37 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured query timeout is greater than the maximum request lifetime. This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured key/value timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured view timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured search timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured management timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslTruststoreFile='null', sslKeystorePassword=false, sslTruststorePassword=false, sslKeystore=null, sslTruststore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=12, computationPoolSize=12, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, configPollInterval=2500, configPollFloorInterval=50, networkResolution=NetworkResolution{name='auto'}, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, analyticsIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.6.0 (git: 2.6.0, core: 1.6.0), retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, continuousKeepAliveEnabled=true, keepAliveErrorThreshold=4, keepAliveTimeout=2500, autoreleaseAfter=1000000000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=1000000000, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$4@5dfcfece, certAuthEnabled=false, coreSendHook=null, forceSaslPlain=false, compressionMinRatio=0.83, compressionMinSize=32, operationTracingEnabled=true, operationTracingServerDurationEnabled=true, tracer=ThresholdLogTracer, orphanResponseReportingEnabled=true, orphanResponseReporter=DefaultOrphanResponseReporter, keyValueServiceConfig=KeyValueServiceConfig{minEndpoints=1, maxEndpoints=1, pipelined=true, idleTime=0}, queryServiceConfig=QueryServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, searchServiceConfig=SearchServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, viewServiceConfig=ViewServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, analyticsServiceConfig=AnalyticsServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, queryTimeout=10000000, viewTimeout=1000000000, searchTimeout=1000000, analyticsTimeout=75000, kvTimeout=10000000, connectTimeout=1000000000, dnsSrvEnabled=false}
Jul 20, 2018 7:39:43 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.22.69.133/pdcbcl00149ecp1.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.config.DefaultConfigurationProvider upsertBucketConfig
INFO: Selected network configuration: default
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.config.DefaultConfigurationProvider$8 call
INFO: Opened bucket gpc
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.22.69.133/pdcbcl00149ecp1.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.91/lpdospdb50750.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.92/lpdospdb50751.ind.test.com
Jul 20, 2018 7:39:45 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.93/lpdospdb50752.ind.test.com
N1QlMetrics: N1qlMetrics{resultCount=500, errorCount=0, warningCount=0, mutationCount=0, sortCount=0, resultSize=7346, elapsedTime='6.654785ms', executionTime='6.618172ms'}
Time taken in getting meta().id: 1505
Time taken in getting 500 documents: 3600
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.config.DefaultConfigurationProvider$11 call
INFO: Closed bucket gpc
Completed.....
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.93/lpdospdb50752.ind.test.com
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.91/lpdospdb50750.ind.test.com
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.92/lpdospdb50751.ind.test.com
sh-4.2$
Any guidance would be of great help.