Using this example i solve the problem.
This is the query code working
ccon.getBucket()
.async()
.query(N1qlQuery.simple(jsonQueryVO.getQuery(), ryow))
.doOnNext(res -> res.info().forEach(t -> {
try {
returnValue.setInfo(new ObjectMapper().readTree(t.asJsonObject().toString()));
} catch (Exception e1) {
LOGGER.error(e1.getMessage());
}
}))
.flatMap(result ->
result.errors()
.flatMap(e -> {
try {
JsonNode newElement = mapper.createObjectNode();
JsonNode error = new ObjectMapper().readTree(e.toString());
((ObjectNode)newElement).set("error", error);
returnValue.addError(newElement);
} catch (Exception e1) {
LOGGER.error(e1.getMessage());
}
return Observable.<AsyncN1qlQueryRow>empty();
})
.switchIfEmpty(result.rows())
)
.timeout(TIMEOUT_MINUTES, TimeUnit.MINUTES)
.map(AsyncN1qlQueryRow::value)
.toBlocking()
.subscribe(
rowContent -> {
JsonNode newElement = mapper.createObjectNode();
JsonNode element = null;
try {
element = new ObjectMapper().readTree(rowContent.toString());
} catch (IOException e1) {
LOGGER.error(e1.getMessage());
}
if(element != null) {
if(rowContent.getString("id") != null) {
((ObjectNode)newElement).set("document", element);
} else {
newElement = element;
}
}
returnValue.addDocument(newElement);
},
runtimeError -> LOGGER.error(runtimeError.getMessage())
);
I still have the doubt if the code in the first post is correct and why it isn’t working.
Regards!