The Couchbase Eventing Service allows you to promptly act on mutations (or changes) to your data. All actions in Eventing are accomplished by executing a lambda, a small piece of business logic written in JavaScript.
Common use cases include data enrichment, document archiving, and integration with external REST services. See more details here.
In the following blog, we will outline how errors during the event listener execution can be handled. By using a retry mechanism we ensure that the anticipated action is performed, even if the event listener fails during execution.
Sample application
As an example, we implement part of an e-commerce application that stores the customer orders in a Couchbase collection. As soon as the status of an order is changed to paid, we want to send an order confirmation to the customer.
To accomplish this we integrate a Couchbase event listener with an external email service. The Couchbase event listener will pick up any changes to the order document, verify that the order was paid and then call out to the email service to trigger the confirmation message.
The email service is a standalone microservice providing a REST endpoint. We use the cURL support built directly into Couchbase Eventing to call the microservice from the event listener.
Conceptual flow: As the orders are updated in Couchbase, events are triggered and picked up by an event listener. The event listener then calls out to the external Email Service.
Email service returning an error
The described scenario works very well if the email service is operational. However, what happens if the email service is returning an error? Requests from the event listener to the email service will fail and therefore no confirmation message is sent to the customer. Since at the time of failure the Couchbase document change event has already been processed, no new event for the same document is triggered unless there is another change to it. In order to ensure that the confirmation is sent we need to handle the error and implement a retry mechanism. By doing this we can work around any temporary issue of the external service and at the same time guarantee that the confirmation is sent.
There are different ways to approach this. In my example below I choose to create a new collection called ‘retry’, that will store references to the documents for which the event listener execution failed.
The event listener will pick up changes to the order documents (step #1) and then call the Email Service (step #2). If the call to the email service succeeds the event listener then updates the confirmation message status in the order document (step #3). However, in case of failure a retry document is created and put into the ‘retry’ collection (step #3*).
Keeping a reference to the documents allows us to identify all failed updates and enables us to rerun them later. This could be either by manual intervention by an operator or automatic retry using Couchbase eventing timers.
- We initiate the retry process by adding a document with a specified document id into the retry collection. A recurring timer is created given a provided timer interval.
- On timer execution all documents older than a small time quanta in the retry collection are updated. By adding an attribute like fireRetry = true to the retry documents we trigger another update event that is picked up by the event listener to execute the retry mechanism. This gives us a recursive mutation that lights up all the documents in the retry collection in parallel. The retry function is now executed using all available worker threads in parallel.
- A document update event is triggered for each retry document individually.
- The corresponding order document is retrieved from the inbound collection
- Now the email service is called.
- If the call to the email service succeeds the event listener then updates the confirmation message status in the order document and removes the retry document
- In case of failure the retry document is updated and put into the ‘retry’ collection.
Code Review
Now that we have established the conceptual design, let’s have a look at the sample implementation:
Prerequisites:
- Couchbase 7 Enterprise Edition. I run Couchbase as a single node cluster on Docker on my local machine. (https://docs.couchbase.com/server/current/install/getting-started-docker.html)
- For development purposes we create a single node Couchbase Cluster running the following services:
- Index, Query, Eventing & Data Service
Please note that a single node install is not recommended for production usage.
Preparation
- Create a bucket named orders
- Create two collections in the orders buckets _default scope:
- inbound (this will contain all the incoming orders)
- retry (this will contain the retry documents referencing to the orders that have failed)
- Create bucket ‘metadata’. We will use the _default scope and _default collection. The metadata bucket is used for the Eventing metadata.
- Create an index on the retry collection. The retry listener will query any documents contained in the collection using N1QL, thus an index needs to be in place for the query to be executed.
1 |
CREATE PRIMARY INDEX idx_default_primary ON orders._default.retry USING GSI; |
Data model order document
For the sake of this sample application we use a lightweight data model for the Order document, only containing the relevant fields. Many other fields that you typically would expect in an order document are omitted.
1 2 3 4 5 6 7 8 9 10 11 12 |
{ "email": "customer_email", "paymentStatus": "initiated", "confirmationEmailSent": false, "items": [ { "name": "Swedish Meatballs 500g", "amount": 2, "unitPrice": 9.95 } ] } |
Data model retry document
The retry document contains a few basic attributes such as the document id of the order document, an attempt counter and a timestamp. The type attribute is not necessary in our application, but it can be useful to determine the type of email notification in case the application is extended to also send shipment and delivery updates.
1 2 3 4 5 6 |
{ "type": "confirmation", "docId": "order_140", "attempt": 1, "ts": 1632775908319 } |
Email Service MOCK
We will mock the Email Service using a simple Python script running a local web server. The script will randomly respond with HTTP 200 OK, or with HTTP 406 to indicate a failure.
- Update the IP address to the IP address of your local machine in line 31server = ThreadedHTTPServer((‘replace with your IP’, 9080), Handler)
- Start the script by running: python http.py
Event Listeners
Now with all the preparations in place we can go ahead and add the two Event listeners:
- evt_send_confirmation_email – provides the integration with the Email Service
- evt_send_confirmation_email_retry – contains the retry logic
- The listeners are available here: https://github.com/puhhma/cb_eventing_retry_sample
- Import the listeners (json file) into the Couchbase Eventing service.
Please note that for the listeners to work you need to follow the naming conventions used in this article.
Review evt_send_confirmation_email listener
Event listener configuration:
- The event listener is listening to the inbound collection in the orders bucket.
- The metadata bucket is used to store the listeners metadata
- The bucket aliases bkt_order_inbound and bkt_order_retry reference the corresponding inbound and retry collection in the order bucket
- The curlEmailServiceHost specifies the URL alias to the mock EmailService. Please make sure to update with your ip address
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
// OnUpdate is invoked for all documents created/updated in the 'inbound' bucket function OnUpdate(doc, meta) { // determine if document status is 'paid' & confirmation email was not previously sent if( doc.paymentStatus === "paid" && !doc.confirmationEmailSent ) { SendConfirmationMail(doc, meta.id); } else { if (debug_level > 1) log("Nothing to do for: " + meta.id); } } function SendConfirmationMail(doc, docId) { try { // build the request to the EmailService var request = { path: 'sendConfirmation', body: doc }; // perform the cURL request using the URL alias 'curlEmailServiceHost' from the settings var response = curl('POST', curlEmailServiceHost, request); if (response.status != 200) { // this did not work as expected if (debug_level > 1) { log("docId", docId, "cURL POST failed response.status:",response.status); } // create retry document referencing the documentId and store in 'retry' bucket bkt_order_retry[docId] = { "docId": docId, "attempt": 1, "ts": Date.now() } } else { if (debug_level > 5) { log("cURL POST success, sent",docId,"response.body:",response.body); } // update confirmationEmailSent status doc.confirmationEmailSent = true; bkt_order_inbound[docId] = doc; } } catch (e) { log("ERROR cURL request had an exception:",e) } } |
- Please see the inline comments for details
- The OnUpdate function is triggered once an order document is updated or created
- The request is constructed and the HTTP POST request sent to the EmailService using cURL.
- The result is evaluated. In case that the HTTP response is not successful a retry document is constructed and added to the retry collection.
- The event listener is listening to the retry collection in the orders bucket.
- The metadata bucket is used to store the listeners metadata
- The bucket aliases bkt_order_inbound and bkt_order_retry reference the corresponding inbound and retry collection in the order bucket
- The curlEmailServiceHost specifies the URL alias to the mock EmailService
- The retryTimerIntervall specifies the timer interval in seconds.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
function OnUpdate(doc, meta) { if (meta.id === "allow_retrys") { // the timer is initialized by creating document with id = 'allow_retrys' CreateRetryTimer({"id": meta.id, "mode": "initial"}); } else if (doc.fireRetry) { // process retry documents SendConfirmationMail(doc, meta.id); } } function CreateRetryTimer(context) { if (debug_level > 2) { log('From CreateRetryTimer: creating timer', context.mode, context.id); } // Create a timestamp 'retryTimerInterval' seconds (from the settings) from now var timerStartTime = new Date(); // Get current time & add 'retryTimerInterval' sec. to it. timerStartTime.setSeconds(timerStartTime.getSeconds() + retryTimerInterval); // Create a document to use as out for our context createTimer(RetryTimerCallback, timerStartTime, context.id, context); } function RetryTimerCallback(context) { if (debug_level > 2) { log('From RetryTimerCallback: timer fired', context); } // rearm the timer ASAP, to ensure timer keeps running in the event // of later errors or script timeouts in later "recurring work". CreateRetryTimer({ "id": context.id, "mode": "via_callback" }); // Update all retry documents in the 'retry' bucket. Exclude the 'allow_retys' document // and any documents that were created more than 15 seconds ago, in order to avoid retry 'to early'. N1QL("UPDATE orders._default.retry SET fireRetry = true WHERE meta().id != 'allow_retrys' AND ts < DATE_ADD_MILLIS(NOW_MILLIS(), -15, 'second')"); } function SendConfirmationMail(retryDoc, docId) { try { // resolve order document by id var doc = bkt_order_inbound[docId]; // build the request var request = { path: 'sendConfirmation', body: doc }; // perform the cURL request using the URL alias from the settings var response = curl('POST', curlEmailServiceHost, request); if (response.status != 200) { // this did not work as expected if (debug_level > 1) { log("docId", docId, "cURL POST failed response.status:",response.status); } // increment attempt count in retry document retryDoc.attempt = ++retryDoc.attempt; // Set fireRetry = false, to avoid retry execution with this document change retryDoc.fireRetry = false; retryDoc.ts = Date.now(); // update retry document bkt_order_retry[docId] = retryDoc; } else { if (debug_level > 5) { log("cURL POST success, sent",docId,"response.body:",response.body); } doc.confirmationEmailSent = true; bkt_order_inbound[docId] = doc; // delete the retry document delete bkt_order_retry[docId]; } } catch (e) { log("ERROR cURL request had an exception:",e) } } |
- The timer is initiated by adding a document with the id allow_retrys to the retry collection
- The timer is then initialized and the RetryTimerCallback function associated with the timer
- Once the timer is executed the RetryTimerCallback function is called
- Before proceeding with the retry mechanism a new timer is created as the first step to ensure this it keeps running in case of later errors
- A N1QL query is used to update all retry documents in the retry collection by adding a fireRetry attribute to the document
- Each document change results in a document update event and the retry mechanism is executed
- The order document is resolved from the inbound collection & the EmailService is called via cURL
- In case of failure the retry document is updated and the attempt counter increased
Test the sample application
Now it’s time to finally test the sample application:
- Make sure the mock EmailService is up and running
- Start the evt_send_confirmation_email event listener, but keep the evt_send_confirmation_email_retry listener stopped for now.
- Create a sample order document (see the data model above) in the Couchbase console
- In case of a success response the confirmationEmailSent attribute is updated to true in the order document.
- In case of failure a retry document is created in the retry collection. Since the EmailService will randomly reply with an error, please repeat step #3 until an error occurs.
- Now that we have captured an error lets even start the retry event listener evt_send_confirmation_email_retry
- Create a document with the id ‘allow_retrys’. This will initialize retry mechanism.
- After a short while the listener will become active and starts to process the documents in the retry collection.
- Please observe that the ‘attempt’ attribute is increased with every failed update to the Email Service. In case of a success the order document is updated and the corresponding retry document removed from the retry collection.
Since the response from the Email Service Mock is random you may need to repeat the above steps to be able to observe the anticipated behaviour.
Conclusion
In this article I outline a retry mechanism to handle error conditions when integrating Couchbase Eventing with an external REST service. This or similar solutions can be used to guarantee that anticipated actions are performed even if the external service is temporarily malfunctioning.
When considering a retry mechanism various factors need to be considered, such as the volume of retries, the available worker threads for Couchbase Eventing & the requests the external service can handle.
You can find more information about the internals of Couchbase Eventing here: https://docs.couchbase.com/server/current/eventing/eventing-overview.html
Many thanks to Jon Strabala (Principal Product Manager, Couchbase) for the technical insight and support with this article.