Hi Team,
I am working on using couchbase SDK transforming from Blocking to Non blocking code.
This is the first time I am making change towards reactive code.
Problem:
-
Get the List of document Id’s
-
Iterate the document List , form the query using document Id
-
Execute the query using couchbase SDK to get the Query Result.
-
Transform the result to pojo
private ObjectMapper objectMapper = new ObjectMapper(); private static final Cluster cluster = Cluster.connect(host, userName, password); private final static String empSelectQuery = "select empId,name,age,email,currentlyEmployed,MILLIS_TO_STR(dateOfJoining)" + " as dateOfJoining" + " from " + bucketName + " USE KEYS $documentId " + "WHERE type='EMP_DOC'"; public static void main(String[] args) { EmpDbUtilites empDbUtils = new EmpDbUtilites(); long start = System.currentTimeMillis(); List<Employee> employeeList = workWithBlockingCode(empDbUtils); System.out.println("employeeList : " + employeeList); log.info("Time taken by workWithBlockingCode method : " + (System.currentTimeMillis() - start) + " employeeList :" + employeeList); start = System.currentTimeMillis(); List<Employee> empList = reactiveStyle(empDbUtils); log.info("Time taken by reactiveStyle method: " + (System.currentTimeMillis() - start) + " employeeList :" + empList); } public static List<Employee> workWithBlockingCode(EmpDbUtilites empDbUtils) { EmpCallDBStarter empMain = new EmpCallDBStarter(); List<String> documentIdList = empDbUtils.getDocumentInfo(); List<Employee> employeeList = new ArrayList<>(); QueryResult queryResult = null; for (String documentId : documentIdList) { queryResult = null; queryResult = empMain.n1qlQuery(empSelectQuery, JsonObject.create().put("documentId", documentId)); employeeList.add(queryResult.rowsAs(Employee.class).get(0)); } return employeeList; } public QueryResult n1qlQuery(String query, JsonObject namedParams) { QueryResult queryResult = null; try { queryResult = cluster.query(query, QueryOptions.queryOptions().parameters(namedParams) .scanConsistency(QueryScanConsistency.valueOf(scanConsistency))); return queryResult; } catch (Exception e) { return queryResult; } } public static List<Employee> reactiveStyle(EmpDbUtilites empDbUtils) { EmpCallDBStarter empMain = new EmpCallDBStarter(); List<String> documentList = empDbUtils.getDocumentInfo(); List<Employee> employeeList = new ArrayList<>(); employeeList = (List<Employee>) Flux.fromIterable(documentList).flatMap( documentId -> empMain.n1qlQueryAsync(empSelectQuery, JsonObject.create().put("documentId", documentId))) .flatMap(ReactiveQueryResult::rowsAsObject).map(data -> empMain.transformJsonToPojo(data)) .collect(Collectors.toList()).block(); return employeeList; } public Employee transformJsonToPojo(JsonObject value) { Employee employee = null; try { employee = objectMapper.readValue(value.toString(), Employee.class); } catch (IOException e) { e.printStackTrace(); } return employee; } public Mono<ReactiveQueryResult> n1qlQueryAsync(String query, JsonObject namedParams) { Mono<ReactiveQueryResult> queryResult = null; try { queryResult = cluster.reactive().query(query, QueryOptions.queryOptions().parameters(namedParams) .scanConsistency(QueryScanConsistency.valueOf(scanConsistency))); return queryResult; } catch (Exception e) { return queryResult; } }
Reactive Style is giving me good difference in time taken when compared with blocking code.
Can you tell me if this reactive code can be improved or any other suggestions?
Thanks,
Manzoor