I am using Couchbase community edition-5.0.1 cluster with two nodes. I am trying to execute a simple N1Q1Query using java-client-2.7.4 which has 8Lakh jsondocuments to return. But the following BlockingObservable code always throws TimeoutException.
String query = "SELECT * FROM health WHERE type="cardio";
healthBucket.async().query(N1qlQuery.simple(query))
.flatMap(AsyncN1qlQueryResult::rows)
.map(new Func1<AsyncN1qlQueryRow, HObj>() {
@Override
public HObj call(AsyncN1qlQueryRow row) {
HObj hObj = null;
for (String key : row.value().getNames()) {
hObj = new HObj(row.value().getObject(key).toMap(), true);
}
return hObj;
}
}).toList().toBlocking().first();
I also created index for this bucket as
CREATE INDEX cardio_idx ON health(type) WHERE type=“cardio”;
This is my custom environment used to create CouchbaseCluser
java.lang.RuntimeException java.util.concurrent.TimeoutException: {"b":"ymsconfig","r":"10.62.1.183:8093","s":"n1ql","c":"6ECD82F738A3EF6A/FFFFFFFF9185D7CB","t":240000000,"i":"6a26a67c-8cde-4f1a-904d-1a21ffc4aa50","l":"10.212.134.200:58729"}
at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
at rx.observables.BlockingObservable.first(BlockingObservable.java:166)
at com.foo.db.CouchbaseDS.getHealthRecordss(CouchbaseDS.java:1257)
at com.foo.service.webapp.HealthServlet.healthRecords(HealthServlet.java:294)
at com.foo.service.webapp.UploadPriceFloorServlet.service(UploadPriceFloorServlet.java:180)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:742)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.filters.CorsFilter.handleNonCORS(CorsFilter.java:441)
at org.apache.catalina.filters.CorsFilter.doFilter(CorsFilter.java:169)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81)
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:650)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342)
The same query works fine for smaller volume of data but for large volume it always throws an exception. How to change my envionment to execute the query effectively.
How long does the same query take if you run it from the Couchbase Admin UI, in the query tab? If it’s longer than 240s then you may need to adjust your queryTimeout higher.
I’m far from an expert on query and indexes (I work on the SDKs), but do you actually need the ‘WHERE’ clause on the index? Could you try creating another index without it, and see if that improves the speed?
As per my understanding, this solution will avoid timeout issue. So I tried this solution like this, but I couldn’t able to get the document corresponding to the docId.
Observable.from(docIds) // which has 8Lakh docIds retrieved based on your solution
.flatMap(new Func1<String, Observable<HObj>>() {
@Override
public Observable<HObj> call(String docId) {
logger.debug("docId [{}]", docId);
_configBucket.async().get(docId)
.flatMap(jdoc -> {
HObj hObj = new HObj(jdoc.content().toMap(), true);
logger.debug(hObj.toString());
return Observable.just(hObj);
}).subscribe();
return Observable.empty();
}
}).toBlocking().subscribe();
My intention is to take List from the observable for all docIds. Where I went wrong?
@Achaius_kp, so your code isn’t quite right. You have multiple subscribes() going on there, with reactive programming you want to chain everything together (e.g. with flatMaps) and then subscribe once at the end. Something like this:
Observable.from(docIds) // which has 8Lakh docIds retrieved based on your solution
.flatMap(new Func1<String, Observable<HObj>>() {
@Override
public Observable<HObj> call(String docId) {
logger.debug("docId [{}]", docId);
return _configBucket.async().get(docId)
.map(jdoc -> {
HObj hObj = new HObj(jdoc.content().toMap(), true);
logger.debug(hObj.toString());
return hObj;
})
}
}).toBlocking().subscribe();
This code returns the timeout error with following message while running …
12:02:48,481 WARN client.core.tracing.ThresholdLogReporter warn:150 - Operations over threshold: [{"top":[{"operation_name":"n1ql","last_local_id":"030958B03B3EB142/FFFFFFFFAB7828AA","last_local_address":"10.212.134.202:59900","last_remote_address":"10.62.1.11:8093","last_dispatch_us":521611,"last_operation_id":"7e32fa49-b842-4dac-8c1b-b2d528748541","total_us":33833506}],"service":"n1ql","count":1}]
12:02:51,993 ERROR my.service.MyServlet service:194 - Exception:
Backpressure, ringbuffer contains CONFIG=0 VIEW=0 SEARCH=0 QUERY=0 ANALYTICS=0 BINARY=12261 other=0 total=12261
at com.couchbase.client.core.tracing.RingBufferMonitor.createException(RingBufferMonitor.java:85)
at com.couchbase.client.core.CouchbaseCore.send(CouchbaseCore.java:239)
at com.couchbase.client.java.bucket.api.Get$1$1.call(Get.java:70)
at com.couchbase.client.java.bucket.api.Get$1$1.call(Get.java:66)
at com.couchbase.client.java.util.OnSubscribeDeferAndWatch.call(OnSubscribeDeferAndWatch.java:82)
at com.couchbase.client.java.util.OnSubscribeDeferAndWatch.call(OnSubscribeDeferAndWatch.java:57)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:45)
at rx.internal.operators.OnSubscribeFilter.call(OnSubscribeFilter.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:48)
at rx.internal.operators.OnSubscribeMap.call(OnSubscribeMap.java:33)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
@ssakthi.t looks like your threads are overwhelming the underlying SDK system to push out operations (KV in your case to the cluster). Are you performing bulk inserts?