In this post we’ll go reactive all the way!
Some Couchbase customers use Vert.x
, a framework for writing fully asynchronous applications, knowing that the Couchbase Java SDK
fits well into this picture, being asynchronous from the ground up and exposing a RxJava
-based asynchronous API.
So we’ll see how to get going fast with a Couchbase Verticle
that spins up a connection to a Couchbase Cluster
and Bucket
then serves JSON documents from the database, using Java 8.
This blog post assumes that you are familiar with the basics of Vert.x. Here is a short table of content:
- Starting a new Vert.x project
- Asynchronously Obtaining a Bucket
- Gracefully tearing down the SDK
- Seeing it in action
- Going further
- Conclusion
Starting a new Vert.x project
Let’s start by creating a new Maven based project: create a root folder for your project and initialize a maven directory structure (or use your favorite Maven archetype). For instance you can use the following command: “mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/
“.
Now let’s initiate the pom.xml
at the root of the project:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml version="1.0" encoding="UTF-8"?--> 4.0.0 com.couchbase.demo cbvertx 1.0-SNAPSHOT io.vertx vertx-core 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.client java-client 2.2.2 <!-- this is actually already a transitive dependency of the Java SDK--> io.reactivex rxjava 1.0.15 log4j log4j 1.2.17 maven-compiler-plugin 3.3 1.8 1.8 |
As you can see we’ll be using Vert.x
version 3.1.0
and its extension for bindings in RxJava, Couchbase Java SDK
version 2.2.2
and RxJava
version 1.0.15
…
Skeleton of the Verticle
We’ll base our CouchbaseVerticle
on the AbstractVerticle
in io.vertx.rxjava.core
(from the vertx-rx-java extension). Let’s create its skeleton in the project:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.couchbase.demo.vertx; import java.util.ArrayList; import java.util.List; import com.couchbase.client.java.CouchbaseAsyncCluster; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.rxjava.core.AbstractVerticle; public class CouchbaseVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseVerticle.class); private CouchbaseAsyncCluster cluster; } |
The init phase that we’ll write right after this will show how to use the Vert.x configuration to determine at runtime which node(s) in the Couchbase cluster we’ll bootstrap from. Instantiation of the CouchbaseCluster
is still lightweight enough that it can be done like this during init.
Add the following init method to the CouchbaseVerticle
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public void init(Vertx vertx, Context context) { super.init(vertx, context); //getting the configuration JSON JsonObject config = context.config(); //getting the bootstrap node, as a JSON array (default to localhost) JsonArray seedNodeArray = config.getJsonArray("couchbase.seedNodes", new JsonArray().add("localhost")); //convert to a List List seedNodes = new ArrayList<>(seedNodeArray.size()); for (Object seedNode : seedNodeArray) { seedNodes.add((String) seedNode); } //use that to bootstrap the Cluster this.cluster = CouchbaseAsyncCluster.create(seedNodes); } |
Asynchronously Obtaining a Bucket
The main entry point to the Couchbase API is the Bucket
interface for the sync API, or AsyncBucket
for the async API. Establishing the connection to the bucket (“opening” it) is much more heavyweight, so it should be done asynchronously.
Let’s see how we can start our Verticle by first opening the bucket we’ll use throughout the lifetime of the Verticle. We want to keep a reference to it, and use the start(Future startFuture)
method to asynchronously notify Vert.x that the Verticle is ready:
1 2 3 4 5 6 7 8 9 10 11 |
private volatile AsyncBucket bucket; @Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .subscribe( openedBucket -> bucket = openedBucket, startFuture::fail, startFuture::complete); } |
Notice first we get the name of the bucket (and the associated password if relevant) dynamically, from the Vert.x configuration. We open the bucket asynchronously, establishing the connections and resources internal to the SDK.
The doOnNext
method is used to log the opening of the bucket.
Then we subscribe to our Observable
stream and describe how we want to “consume” the final data:
- upon receiving the bucket reference, we store it in a field for later use
- if there is an error along the way, we fail the startup of the Verticle using the
Future#fail
method. - otherwise we notify Vert.x that the Verticle was successfully started using the
Future#complete
method.
That’s a pretty good start!
Gracefully tearing down the SDK
When the Verticle stops, the resources created by the SDK should be properly disposed of. The Cluster
object has a disconnect
method that does this, recursively calling close
on every Bucket
it opened (close can be used to dispose of a single Bucket).
Also since 1.0.15
RxJava has a method for shutting down its internal Threads: Schedulers.shutdown
. This should be invoked only when there won’t be subsequent usage of RxJava in the application though, so it might be a better idea to do that upon Vert.x shutdown…
Once again we’ll stop the Verticle asynchronously, using a Future
to notify the framework of stop completion:
1 2 3 4 5 6 7 8 9 |
@Override public void stop(Future stopFuture) throws Exception { cluster.disconnect() .doOnNext(isDisconnectedCleanly -> LOGGER.info("Disconnected Cluster (cleaned threads: " + isDisconnectedCleanly + ")")) .subscribe( isDisconnectedCleanly -> stopFuture.complete(), stopFuture::fail, Schedulers::shutdown); } |
(we chose to shutdown RxJava upon completion of the SDK disconnection)
Note You can tune the SDK by passing in a
CouchbaseEnvironment
upon creation of theCluster
. In that case, it is up to you to also callshutdown
on the environment when tearing down the whole SDK (that is, when all Clusters you used the environment in, generally just the one, are shut down).If you didn’t create a specific environment the SDK will internally create one and shut it down properly, the result of which is seen above in the
isDisconnectedCleanly
variable.
Seeing it in action
Let’s create a quick main
that embeds Vert.x, deploys the Verticle and then stops. Note this is a pretty naive implementation with CountDownLatches, where you would usually rather use the command line or Launcher
as a main class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void main(String[] args) throws InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = new CountDownLatch(1); vertx.deployVerticle(new CouchbaseVerticle(), event -> { if (event.succeeded()) LOGGER.info("Verticle Deployed - " + event.result()); else LOGGER.error("Verticle deployment error", event.cause()); startLatch.countDown(); }); startLatch.await(); final CountDownLatch stopLatch = new CountDownLatch(1); vertx.close(event -> { if (event.succeeded()) LOGGER.info("Vert.x Stopped - " + event.result()); else LOGGER.error("Vert.x stopping error", event.cause()); stopLatch.countDown(); }); stopLatch.await(); } |
If you execute this, here is what you should see (notice the difference in timestamp format? 2015-12-11
ones are from the SDK while Dec 11, 2015
ones are from Vert.x):
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFO Node:135 - Connected to Node localhost 2015-12-11 16:21:20 INFO ConfigurationProvider:263 - Opened bucket default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Bucket opened default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Verticle Deployed - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFO ConfigurationProvider:284 - Closed bucket default 2015-12-11 16:21:20 INFO Node:145 - Disconnected from Node localhost Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Disconnected Cluster (cleaned threads: true) Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
How to verify the error behavior? We could simply change the password to one that is wrong, just to check the logs, which then look like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 WARN Endpoint:283 - [null][KeyValueEndpoint]: Authentication Failure. 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle SEVERE: Verticle deployment error com.couchbase.client.java.error.InvalidPasswordException: Passwords for bucket "default" do not match. at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48) at rx.observers.Subscribers$5.onError(Subscribers.java:229) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
So we successfully deployed (and stopped) our first Couchbase Verticle!
High five!
/! don’t forget to change the password back to the correct one
Going further
Let’s try to do a little bit more with this Verticle. How about we try to prepare sample data in Couchbase and serve it in a REST endpoint managed by Vert.x?
Creating sample data in Couchbase on startup
We’ll create two sample documents in Couchbase during the starting up of the Verticle, users Alice and Bob.
One can store JSON in Couchbase using two Document
implementations:
JsonDocument
is the default one. It is based on a simple JSON representation provided by the SDK, theJsonObject
.RawJsonDocument
is useful when you already have JSON marshalling/unmarshalling in your application (or another way of representing JSON like Vert.x’s ownJsonObject
). In this implementation what you pass in is the raw JSON String representation.
Here are Alice and Bob, created using both alternatives:
1 2 |
JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)) |
and
1 |
RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) |
Now, the start method needs a little bit of adjustment. Instead of saving the reference to the bucket in the subscription, we’ll move that earlier in a doOnNext
. After that, we’ll create the documents and make an Observable out of them using Observable.just
. This can be forwarded to the SDK for insertion using flatMap
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .doOnNext(openedBucket -> bucket = openedBucket) .flatMap(nowIgnoreBucket -> Observable.just( JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)), RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) )) .flatMap(doc -> bucket.upsert(doc)) .subscribe(Actions.empty(), startFuture::fail, startFuture::complete); } |
The use of upsert
here guarantees that the documents will be created, whether the key already exist in database or not.
Serving JSON data from Couchbase
Let’s modify our verticle so that it doesn’t stop right away, instead spinning a HTTP Server that will try to retrieve a json document from the database and send it to a client when the route user/{id}
is used:
Here is a quick and dirty way of using Vert.x’s Launcher
to start the program (which will not stop the Vert.x core right away). Replace the content of our main
method with the following:
1 2 |
String[] vertxArgs = new String[] { "run", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Launcher.main(vertxArgs); |
Note: In a real application,
Launcher
would usually be made the main class of the jar and you’d pass in command line arguments directly.
Now let’s spin a HTTP Server on Verticle startup. Chain in the following code in the start
method, right after the flatMap(doc -> bucket.upsert(doc))
call:
1 2 3 4 |
.last() .flatMap(ignore -> vertx.createHttpServer() .requestHandler(this::handle) .listenObservable(8080)) |
We need to create the handle
method to set up that route:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private void handle(HttpServerRequest r) { String[] path = r.path().split("/"); if (path.length == 3 && "users".equals(path[1])) { bucket.get(path[2], RawJsonDocument.class) .switchIfEmpty(Observable.error(new DocumentDoesNotExistException(path[2]))) .subscribe(doc -> r.response() .putHeader("content-type", "application/json") .end(doc.content()), error -> { r.response() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(error.toString()) .end("{"error": "" + error.toString() + ""}"); }); } } |
Let’s test it: run the application and go to this url: http://localhost:8080/users/user1. You should see Alice’s JSON, served directly from Couchbase!
1 2 3 4 |
{ "name": "Alice", "age": 26 } |
For another key, you should see the exception in JSON form:
1 2 3 |
{ "error": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
Stopping the Verticle via a HTTP endpoint
Let’s quickly add a route that stops Vert.x, for fun and profit :)
1 2 3 4 |
//...replacing from the last line in `handle` } else if (r.path().equals("/stop")) { r.response() .end(" |
Closing Couchbase and Vertx
Note that running from a vertx Starter, this won’t kill the main thread
“); vertx.close(); }
Opening http://localhost:8080/stop will trigger the whole Vert.x application to stop, tearing down deployed Verticles.
Note: As stated in the message, this doesn’t kill the process when run from the IDE.
Conclusion
In this blog post, we’ve discovered how Vert.x
and the Couchbase Java SDK
can work together towards building a fully asynchronous application.
Happy asynchronous coding!
do you have a github repo link where the above implementation is available