Couchbase Eventing Service provides a framework for writing your own functions to process data change events (create, update, delete/expiry). Couchbase Server 6.5 introduces a set of important improvements to the Eventing Service that enable a lot of new use cases and simplify maintenance of Evening Functions.
Source Bucket Mutations
An Eventing function is defined to listen to the changes of one bucket, known as the source bucket of the function. Previous versions of Couchbase did not allow a function to mutate documents in its source bucket, because if these mutations are being fed back to the function it can potentially create an infinite recursion. This limitation is removed in Couchbase 6.5: an Eventing function can now change the documents on the source bucket, even the document of the change event. Couchbase makes such changes safe by suppressing their recursive propagation to the same Eventing function.
The possibility to change documents on the source bucket opens a lot of interesting use cases: enriching the changed documents with new attributes, performing cascade updates or deletes of dependent documents, generating new documents on the same bucket.
The example below uses Eventing to automatically generate document attributes containing the creation time of the document and the time of its last update. We are going to listen to the changes in the travel-sample bucket, and generate or update the time attributes every time a document is changed. Eventing functions access buckets over so-called bucket bindings. We create a read-write bucket binding on the travel-sample bucket, i.e. the source bucket, and give it an alias bucket:
In the next step provide the code of the function:
1 2 3 4 5 6 7 8 9 10 |
function OnUpdate(doc, meta) { log('docId', meta.id); var time = Date.now(); doc["updated"] = time; if (!("created" in doc)) { doc["created"] = time; } log('newDoc', meta, doc); bucket[meta.id] = doc; } |
In the OnUpdate handler, which is called on every insert and update event, we write the current time to the updated attribute (creating or replacing it). If the document does not yet contain created attribute, we assume it is the first change to the document, so we add created attribute containing the current time as well. After we extend the document with the new attributes we write it back to the source bucket for the same document ID.
By changing updated and created attributes, we generate new changes to the document. Eventing framework suppresses the recursion by not propagating these changes to our function. Otherwise these changes are handled in a normal way: they are replicated, indexed and even propagated to other Eventing functions.
Invoking External Functions with cURL
Eventing functions can interact with external systems by using curl function to call their REST API. The possibility to interact with external systems opens a lot of new use cases, such as propagation of data changes to other systems, notifying the application about interesting events, enriching documents with data from external systems, and so on. A preview of the curl function was already available in earlier versions of Couchbase, but with 6.5 the feature is redesigned to make it reliable and secure: the cURL calls are limited to a predefined set of URL bindings, for each binding we can specify authentication, encryption and certificate validation as necessary.
In the following example we will define an application listening to the changes in the travel-sample bucket. Save the application code below into file app.js:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
const Express = require("express"); const BasicAuth = require("express-basic-auth"); const BodyParser = require("body-parser"); var app = Express(); app.use(BodyParser.json()); app.use(BodyParser.urlencoded({ extended: true })); app.use(BasicAuth({ users: { 'couchbase': 'password' } })) app.post("/api/airline", (request, response) => { var id = request.body.id; console.log("Airline " + id + " changed"); response.send("OK"); }); var server = app.listen(3000, () => { console.log("Listening..."); }); |
The application defines an endpoint /api/airline to receive notifications about changes of airline data as POST requests with the body containing the airline ID. The application is configured with basic authentication expecting the user couchbase with the password password.
Execute the following commands to install the necessary node.js packages and run the application:
1 2 3 |
npm init -y npm install express body-parser express-basic-auth --save node app.js |
Now let’s create an Eventing Function that notifies our application about the changes of airline documents. All the external APIs that can be called by the function must be declared as URL bindings. Assuming the 192.168.61.1 is the IP of the machine where we started the application, we declare a URL binding for the URL http://192.168.61.1:3000/api and name it notifyApi.
A URL binding can use http or https protocol. We can also specify different types of authentication, and enable cookies to avoid repeated authentication. In our case, we select basic authentication with user couchbase and password password.
In the next step provide the code of the function, which listens to changes of the documents of type airline and forwards these changes to the application using curl calls:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
function OnUpdate(doc, meta) { if (doc.type == 'airline') { var request = { path: '/airline', body: { id : meta.id, value : doc } }; log('request', request); var response = curl('POST', notifyApi, request); if (response.status != 200) { log('request failed', response); } } } |
The curl function takes three parameters: the method of the HTTP request, the URL binding and the request object that can contain the following attributes:
-
- path specifies subpath of the URL binding. In our example, we append /airline to make a call to http://192.168.61.1:3000/api/airline
- the body of the HTTP request. The provided object will be encoded and marshaled as a JSON string, unless specified differently by the encoding attribute
- params contains key-value pairs to be passed as HTTP request parameters
- headers contains key-value pairs to be passed as additional HTTP headers.
The response object returned by the curl function contains the status, the body and the headers of the HTTP response. The function throws exceptions in case of various errors such as unreachable URL or invalid parameters. Wrap the call within a try-catch block if you need to log those errors.
After creating the function, go back to the Eventing tab and deploy the function NotifyChanges. If you choose to feed the entire contents of the buckets, the application will be notified about all already existing airline documents and will log their IDs.
Pausing and Resuming a Function
A running Eventing function can be paused and later resumed. When resumed, the function continues processing the events at the point where it was paused. The code and settings of a paused function can be modified, so when resumed the further events will be processed with the new version of the function.
This feature is especially useful for maintaining Eventing functions that are already in production. The definition of the function can be enhanced with new functionality or adjusted after changing the data model. However, such mid-flight code changes should be compatible with any outstanding timer callbacks scheduled by the handler
For example, if to modify the code of our NotifyChanges function, find the function in the Eventing view and click on its Pause button:
Wait until the function is shown as paused, which means that it stopped processing the events. In that state, we can modify its code and settings. After performing the necessary modifications, we can resume the function.
The function will be redeployed with the new implementation and will continue processing the events at the point where it stopped.
Resources
Download
 Documentation
Couchbase Server 6.5 Release Notes
Couchbase Server 6.5 What’s New
Blogs
Blog: Announcing Couchbase Server 6.5 – What’s New and Improved
Blog: Couchbase Brings Distributed Multi-Document ACID Transactions to NoSQL
Hey, Do we have the functionality to identify the modifications that happened in the document?