Moving from Blocking to Non blocking code

Hi Team,

I am working on using couchbase SDK transforming from Blocking to Non blocking code.
This is the first time I am making change towards reactive code.
Problem:

  1. Get the List of document Id’s

  2. Iterate the document List , form the query using document Id

  3. Execute the query using couchbase SDK to get the Query Result.

  4. Transform the result to pojo

     private ObjectMapper objectMapper = new ObjectMapper();
     private static final Cluster cluster = Cluster.connect(host, userName, password);
     private final static String empSelectQuery = "select empId,name,age,email,currentlyEmployed,MILLIS_TO_STR(dateOfJoining)"
     		+ " as dateOfJoining" + " from " + bucketName + " USE KEYS $documentId " + "WHERE type='EMP_DOC'";
    
     public static void main(String[] args) {
     	EmpDbUtilites empDbUtils = new EmpDbUtilites();
     	long start = System.currentTimeMillis();
     	List<Employee> employeeList = workWithBlockingCode(empDbUtils);
     	System.out.println("employeeList : " + employeeList);
     	log.info("Time taken by workWithBlockingCode method : " + (System.currentTimeMillis() - start)
     			+ " employeeList :" + employeeList);
    
     	start = System.currentTimeMillis();
     	List<Employee> empList = reactiveStyle(empDbUtils);
     	log.info("Time taken by  reactiveStyle method: " + (System.currentTimeMillis() - start)
     			+ " employeeList :" + empList);
     }
    
    
     public static List<Employee> workWithBlockingCode(EmpDbUtilites empDbUtils) {
     	EmpCallDBStarter empMain = new EmpCallDBStarter();
     	List<String> documentIdList = empDbUtils.getDocumentInfo();
     	List<Employee> employeeList = new ArrayList<>();
     	QueryResult queryResult = null;
     	for (String documentId : documentIdList) {
     		queryResult = null;
     		queryResult = empMain.n1qlQuery(empSelectQuery, JsonObject.create().put("documentId", documentId));
     		employeeList.add(queryResult.rowsAs(Employee.class).get(0));
     	}
     	return employeeList;
     }
    
     public QueryResult n1qlQuery(String query, JsonObject namedParams) {
     	QueryResult queryResult = null;
     	try {
     		queryResult = cluster.query(query, QueryOptions.queryOptions().parameters(namedParams)
     				.scanConsistency(QueryScanConsistency.valueOf(scanConsistency)));
     		return queryResult;
     	} catch (Exception e) {
     		return queryResult;
     	}
     }
    
     public static List<Employee> reactiveStyle(EmpDbUtilites empDbUtils) {
     	EmpCallDBStarter empMain = new EmpCallDBStarter();
     	List<String> documentList = empDbUtils.getDocumentInfo();
     	List<Employee> employeeList = new ArrayList<>();
     	
     	employeeList = (List<Employee>) Flux.fromIterable(documentList).flatMap(
     			documentId -> empMain.n1qlQueryAsync(empSelectQuery, JsonObject.create().put("documentId", documentId)))
     			.flatMap(ReactiveQueryResult::rowsAsObject).map(data -> empMain.transformJsonToPojo(data))
     			.collect(Collectors.toList()).block();
     	
     	return employeeList;
     }
    
     public Employee transformJsonToPojo(JsonObject value) {
     	Employee employee = null;
     	try {
     		employee = objectMapper.readValue(value.toString(), Employee.class);
     	} catch (IOException e) {
     		e.printStackTrace();
     	}
     	return employee;
     }
    
     public Mono<ReactiveQueryResult> n1qlQueryAsync(String query, JsonObject namedParams) {
     	Mono<ReactiveQueryResult> queryResult = null;
     	try {
     		queryResult = cluster.reactive().query(query, QueryOptions.queryOptions().parameters(namedParams)
     				.scanConsistency(QueryScanConsistency.valueOf(scanConsistency)));
     		return queryResult;
     	} catch (Exception e) {
     		return queryResult;
     	}
     }
    

Reactive Style is giving me good difference in time taken when compared with blocking code.
Can you tell me if this reactive code can be improved or any other suggestions?

Thanks,
Manzoor

Hi @Manzoor128
A few points & questions in no real order:

  1. What kind of time difference are you seeing numerically?
  2. Reactive does add a very small overhead, but it shouldn’t be too meaningful.
  3. I know this is only test code, but in real code if you have the document keys then definitely use the Key-Value SDK rather than query unless you have very good reason. Query is excellent for multi-document lookups where you don’t have the keys, but the Key-Value SDK is the fastest possible way to fetch documents if you do.
  4. I’m not seeing any obvious issues with your reactive code, you may want to break out the profiler to explore the timing difference more.
  5. This code doesn’t really leverage reactive’s true strengths re performance: easy parallelism, and easy streaming via deep integrations with other frameworks and platforms. E.g. web frameworks will often allow reactive results to be streamed back easily without you having to buffer it anywhere. So you can trivially parallelise the query fetches (take a look at .parallel() and .runOn()) for one.
  6. You can still use contentAs on the reactive results, no need to use Jackson directly. This is also the only real difference I can spot between the reactive and blocking variants in your code, which makes me think the performance decrease is somehow related to this.

Thank you Graham

Reg point 1

Mainly the db querying part need to be converted to reactive code.

queryResult = empMain.n1qlQuery(empSelectQuery, JsonObject.create().put("documentId", documentId));

This line needs to be done in reactive manner(non blocking).

Reg point 3.

I may need to apply some transformations on the actual data such as MILLIS_TO_STR(dateOfJoining).|

I know projection can be applied in get API, but can this type of transformation also be applied. If so ,can you provide a sample.|

Hi @Manzoor128
On point 1 I’m not sure I understand why, aren’t you doing empMain.n1qlQuery (blocking) deliberately there as that’s testing the blocking API?
With get-with-projections you don’t have access to N1QL transformation methods like MILLIS_TO_STR, but you do have access to the full date/time-converting power of Java of course :slightly_smiling_face:

Reg point 1 . Yes I highlighted that code mainly to inform that this code which actually fetches the data is doing it in blocking manner. This is what I am trying to convert to Reactive manner.

Baseline is : The DB fetching needs to happen in non blocking Manner. Please Ignore the code I shared in previous comment.

Reg Next point , I have done the transformations like this in java using Java date time API.

I am just checking two approaches to see which is faster and better suited for scenario.

Approach 1
1.1 Use Get API to fetch the document
1.2 Apply transformation on the record and get the DTO Object

Approach 2
2.1 Use select query with USE KEYS and apply transformation in Query Itself so it can be mapped directly to DTO.

Feel free to profile them but if you have the keys then using the get API and doing the conversion in Java should be faster. The KV get request can go directly to the correct KV node using an optimised binary protocol, and there’s no overhead of parsing a query statement. I wouldn’t expect the conversion to be a significant determiner either way.