In the previous blog post we discussed how Couchbase can seamlessly integrate into an Event-Driven Architecture. Using the Couchbase Eventing Service, document mutations can be published to a Solace PubSub+ queue from where the data is made available to subscribing microservices in near real-time.
This post focuses on how microservices as Topic subscribers can leverage Couchbase as a scalable & resilient datastore.
Architecture overview & sample application
In this sample application, we subscribe to a Solace PubSub+ Topic, process the data, and update the document in Couchbase.
In our example application, we store hotel details in Couchbase. Updates to the hotel configuration are published to a Solace PubSub+ topic. We build an application that subscribes to the Topic, retrieves the updated data and updates the existing hotel configuration in Couchbase.
We use the travel-sample bucket that contains sample data such as flights, routes and others. In our example, we focus on the hotel documents stored in the inventory scope in the hotel collection. All hotel documents consist of a JSON document with various attributes.
We will update only a small subset of attributes for this tutorial, such as pets_allowed and free_internet.
Caption: The sample application subscribes to a Solace Topic and updates hotel data in Couchbase. Other applications may provide the hotel configuration changes. We use a simple service that publishes updates to the Topic.
Prerequisites
Please review the prerequisites for this sample application:
Couchbase Capella
Create a free Couchbase Capella trial account and follow the instructions to provision your trial cluster. This only takes a few minutes and gives you a fully-fledged Couchbase cluster, including the Data service and the sample dataset, travel-sample, that we will use throughout this blog post.
You can use this tutorial when not using our managed Couchbase Capella offering. The travel-sample bucket is part of the Couchbase server product and can be imported using the Couchbase console.
Solace PubSub+ Event Broker Cloud
Sign up for a free Solace cloud trial and create a Service/VPN. We will use the VPN to connect to a Solace Topic.
Code review – Solace subscriber & publisher
To subscribe to messages from the Solace Topic we use the Solace Java API. This example implementation is derived from examples in the Solace documentation. For implementation details consult the Solace documentation.
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties properties = new JCSMPProperties(); properties.setProperty(JCSMPProperties.HOST, "tcps://yourhost.messaging.solace.cloud:55443"); properties.setProperty(JCSMPProperties.USERNAME, "solace-cloud-client"); properties.setProperty(JCSMPProperties.PASSWORD, "password"); properties.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties); session.connect(); final Topic topic = JCSMPFactory.onlyInstance().createTopic("tutorial/topic"); |
We set up the JCSMPProperties with the relevant access information available in the Solace web interface. We then create a JCSMPSession and a Topic.
To subscribe to the messages on the Topic we implement an XMLMessageConsumer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Message Receiver - Subscribe to Topic ### final XMLMessageConsumer cons = session.getMessageConsumer(new XMLMessageListener() { @Override public void onReceive(BytesXMLMessage msg) { if (msg instanceof TextMessage) { System.out.printf("SUBSCRIBE: Message received: '%s'%n", ((TextMessage) msg).getText()); UpdateHotelData.getInstance().upsertHotelData(((TextMessage) msg).getText()); } else { System.out.println("Message received."); } } @Override public void onException(JCSMPException e) { System.out.printf("Consumer received exception: %s%n", e); } }); session.addSubscription(topic); System.out.println("SUBSCRIBE: Connected. Awaiting message..."); cons.start(); |
In the onReceive method, we take the message, convert it to a string and then call the upsertHotelData method of UpdateHotelData class that contains the Couchbase implementation discussed a bit later in this blog post.
We add the topic subscription to the session and start the message consumer.
In order to be able to test the implementation, we also create a message publisher. For this, we first implement an XMLMessageProducer.
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Message Producer - Send message to Topic XMLMessageProducer prod = session.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { @Override public void responseReceivedEx(Object key) { System.out.println("Producer received response for msg: " + key.toString()); } @Override public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { System.out.printf("Producer received error for msg: %s@%s - %s%n", key.toString(), timestamp, cause); } }); |
With the message producer in place, we can now send messages as seen below.
1 2 3 4 5 6 7 8 9 |
// Send messages. Here we loop through and send the same message multiple times. for (int msgsSent = 0; msgsSent < 2; ++msgsSent) { TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); msg.setText(data); System.out.printf("PUBLISH: Sending message '%s' to topic '%s'...%n", data, topic.getName()); prod.send(msg, topic); //Gives us some time to follow the console logs Thread.sleep(3000); } |
Upserting the data to Couchbase
With the logic to subscribe and publish messages in place, let’s now focus on how to update the hotel documents in Couchbase.
For this example, we assume that the message retrieved from the Topic has the JSON format below. Including a hotel_id that we will use as the document id of the hotel documents stored in Couchbase, as well as a number of attributes that we can change:
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": true, "free_breakfast": true, "free_internet": true, "free_parking": true } |
To establish a secure connection from our application to Couchbase Capella, we need to:
-
- Create a database user and grant this user access to the hotel data.
- Whitelist the IP address of the host running the application
- Capture the secure connection URL
You can manage all these details in the Capella management plane. Login to Capella in the browser:
- Open the Trial – Cluster cluster and navigate to the Connect tab
- Scroll down to the Database Access section and click on Manage Credentials
- Click on Create Database Credentials and provide a database username and password. Configure the Bucket level access to the inventory scope in the travel-sample bucket and grant Read/Write access.
- Whitelist the application IP address by clicking on Manage Allowed IP and then Add Allowed IP. When running the application from your local machine, simply click Add My IP and your external IP address will be discovered and added automatically.
- In the Wide Area Network section copy the connection URL.
Code review – connect to Couchbase and update documents
We connect to Couchbase Capella as the first step by creating a cluster object. We provide the endpoint URL captured in the previous step as well as the database user credentials.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Update this to your cluster String endpoint = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; String username = "dbuser"; String password = "Passw0rd!"; // User Input ends here. String bucketName = "travel-sample"; // Cluster Environment configuration ClusterEnvironment env = ClusterEnvironment.builder() .securityConfig( SecurityConfig.enableTls(true).trustManagerFactory(InsecureTrustManagerFactory.INSTANCE)) .ioConfig(IoConfig.enableDnsSrv(true)).build(); // Initialize the Connection and provide database user credentials Cluster cluster = Cluster.connect(endpoint, ClusterOptions.clusterOptions(username, password).environment(env)); // Create bucket object bucket = cluster.bucket(bucketName); bucket.waitUntilReady(Duration.parse("PT10S")); // create collection object for the 'hotel' collection located in the 'inventory' scope collection = bucket.scope("inventory").collection("hotel"); |
Once the bucket object is created we can retrieve the hotel scope and use it later to upsert the document changes. The hotel scope is within the inventory scope of the travel-sample bucket.
Please note, that I use the singleton pattern in order to use the same collection object for all subsequent requests. See the sample implementation for details in my GitHub project.
Let’s now look into how we can update the hotel documents with the data received from the Topic.
The message received is a String so as the first step we need to convert it into a JSON object.
1 2 3 4 |
public void upsertHotelData(String content) { … hotelData = JsonObject.fromJson(content); … |
Since the input data from the Topic contains only a subset of all the attributes of the hotel document, we need to consider how to update the document fields most efficiently.
There are two possible options:
-
- We use the document ID to retrieve the full document from Couchbase, update the attributes and write the document back.
- We use the sub-document API in Couchbase to only update dedicated fields within the document instead of replacing the entire document.
Option one is commonly used to update or upsert documents. However, if we work with bigger documents and we only want to update a small portion of it there is no need to send the entire document over the network. In our implementation below, we will use the sub-document API to resolve and update fields in the hotel document.
1 2 3 4 5 6 7 8 9 |
//Instead of resolving the entire document we use the sub-document API //to only resolve the relevant parts of the hotel document prior to the update. //Here we can define the path to the relevant attributes of the document. //In our scenario 'pets_ok' and 'free_internet' are at the document root. LookupInResult result = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); //Display the current values prior to the document update System.out.println("Values prior to update: 'pets_ok': " + result.contentAs(0, String.class) + ", 'free_internet': " + result.contentAs(1, String.class)); |
We use the lookupIn operation to query the document for certain paths. Whereby different components in the path are separated by dots (.). The pets_ok and free_internet attributes are located at the document root in our scenario. We then traverse through the result and display the current values.
1 2 3 4 5 6 |
//Even here we use the sub-document API. //Instead of updating the entire document we only update the relevant attributes. //The UPSERT method will either update the attributes if they already exist or create them in case they do not exist in within the document MutateInResult upsertResult = collection.mutateIn(hotelData.getString("hotel_id"), Arrays.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
The mutateIn operation allows the update of one or more paths in a document. We identify the document in Couchbase by its ID and provide an array of paths and values we want to set.
Last but not least, we run the lookupIn operation to verify that the update was successful.
1 2 3 4 |
//Resolve and display the updated attributes LookupInResult result1 = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); System.out.println("Values after the update: 'pets_ok': " + result1.contentAs(0, String.class) + ", 'free_internet': " + result1.contentAs(1, String.class)); |
Next steps
This article taught us to integrate Solace event broker with Couchbase using the Couchbase & Solace Java SDKs.
- The full code example is available in my GitHub repository for this project.
- Learn to send messages from Couchbase to a Solace Topic in this blog post (Part 1).
- Read about the available Couchbase SDKs here.
- Use the Couchbase playground to start developing quickly.
- Try Solace PubSub+ Cloud with a trial.
- Start using Couchbase today with the Couchbase Capella cloud database.