One of the many advantages of using a document-based database such as Couchbase is the ability to use a flexible data model to store data without the constraints of a rigid, predetermined schema. Many customers choose a NoSQL database to support the ingestion of data from multiple sources. But what if you need your data transformed so that it matches your existing data? Maybe you need it all in a uniform format for reporting purposes. Whatever the reason, this is a perfect use for the Eventing Service in Couchbase.

Couchbase Eventing Service

The eventing service is a built-in feature of Couchbase Enterprise Edition that allows the application of JavaScript functions, or lambdas, that respond to mutation events. Whenever a document is created, modified, or deleted, an action can be taken by executing pure business logic in your JavaScript. There are a number of actions that can be taken, such as data enrichment, calling of external workflows (via RESTful API calls), and timer-based events (resuming execution in the future). Think of events as a legacy database post-trigger on steroids.

Data consolidation

In this simplified example, imagine you have just acquired two new data sources for customer data. This data might be coming from a normalized source, or in a flat file, or both, and you want this data to match your existing customer data stored in a denormalized format.

The first data source is a normalized database (clean and accurate) containing customers in Canada. Each customer record points to a billing and a shipping address, and each address is normalized to eliminate redundancy of city and province data:

Normalized datasource for eventing
The second data source is a flat file also containing Canadian customers, but that does not track separate billing and shipping addresses. As you can see, only a single address data element is stored within the customer record:

Customer table, data model of single address data

We need to transform this data as it comes in so that it matches our existing customer database which is denormalized such that billing and shipping addresses are attached to the customer data (from the second source) as sub-documents:

Creating sub-documents on denormalzied data model

Let’s start with an easy solution first, the flat file import. The process for transforming this data involves copying the customer data (name, email, phone, company name) into a new document. The address data then needs to be copied into subdocuments for both billing and mailing addresses. Because we only have one address per customer, we’ll copy this address into both the billing and the mailing address subdocument:

Importing and transforming subdocuments
JavaScript code to consolidate the tables

The Eventing code to accomplish this is quite simple:

The above code should be easy enough to understand, even for non JavaScript coders, but I’ll break this down by section just to be sure.

    • Line 1 above is called anytime a document in the source bucket is mutated (created or modified). We’ll see below how we define the source bucket.
    • Lines 2-3 create JavaScript objects to hold the new customer document and the address document that will be used as subdocuments within the customer document.
    • Lines 5-15 copy the properties from the original document into the customer and address objects.
    • Lines 18 and 19 assign the address document as customer subdocuments for both mailing and billing address properties.
    • Line 22 actually creates the document in the Couchbase destination bucket. We’ll see below how we define the destination bucket.
    • Finally line 25 will delete the original document, although it’s commented out here.
    • Line 27 logs the operation. You may want to comment/remove this after testing.

Set up environment

To test this script, we’ll need to set up our Couchbase environment. First of all, we’ll need a Couchbase cluster running the data service as well as the eventing service. This can be as small as a single node cluster running in Docker, or a larger multi-node cluster running in Capella.

Once we have the cluster up and running, we’ll need to create the buckets for importing data, and a final destination bucket. The eventing service also requires a metadata bucket. Create the following four buckets, each with the minimum required quota of 100mb:

    • schema1
    • schema2
    • customer
    • eventing_merge 

I also recommend enabling flush for this exercise.

Creating an eventing function

Now we need to create the eventing function. Select the eventing tab, and then click Add Function. Use the following settings:

Eventing function settings in Couchbase

Next click on Next: Add Code and replace the default OnUpdate function with the code from above. Click Save and Return. At this point the code has been stored, but the function needs to be deployed in order for it to begin processing data. Click on the function name to open up its settings and go ahead and deploy the function now.

Creating test JSON documents

Now let’s test it by creating a document in the schema2 bucket. Add a document using wile.e.coyote@acmecorp.com as the document key, and with the following document body:

In just a moment, you should see the statistics for the event indicate success:

event status feedback

Now look in the customer bucket, and you should see the document above has been transformed. Notice how the address now exists as subdocuments within the properties billing_address and mailing_address:

sampel JSON document for addresses

Transforming to sub-documents

Now let’s tackle the more complex schema transformation. Migrating the relational structure to match our sub-document structure looks something like this:

Migrating the relational structure to match our sub-document structure
We’ll need to denormalize the address and postal code tables into a single subdocument, one each for the shipping/mailing and billing addresses. In this case, the data from the three tables is stored in collections within the bucket. 

JavaScript code for denormalization

We’ll break down this code again as before.

    • Lines 2-4 create JavaScript Objects to store the new document as well as the subdocuments for the mailing and billing addresses.
    • Lines 6-10 copy the contact properties to the new document.
    • Lines 12 and 13 copy the address data from the address collection using the document key stored in the contact address_id property.
    • Lines 15-19 pull the city and province data out of the province collection, using the province_id from the address document as the document key.
    • Lines 21 and 22 then attach these document objects as subdocuments to the new contact document.
    • Finally, line 26 writes the new document to the destination bucket.
    • Line 29, if uncommented, will delete the original contact document from the source bucket. Note that we do not want to also delete the associated address and province documents because they may be used by other contacts.

Creating the eventing function for denormalization

Create this eventing function using the following settings, and paste the above code in to replace the OnUpdate function. When finished, go ahead and deploy the function.

Setting up an eventing function in Couchbase

To test this function, we’ll need to create collections in the schema1 bucket (use the default scope) with the names: addresses, customers, postal_codes. Then create the following documents (in this order, otherwise the lookups in the function will fail):

Postal_codes collection, with document ID 43062:

Postal_codes collection, with document ID 23228:

Addresses collection, with document ID 000071:

Addresses collection, with document ID 000086:

Finally, in the customers collection, with document ID blinded_by@science.com:

Fairly quickly you should see the statistics for this function show success:

Function success status

And if you look at the document in the customers bucket, you should see the following transformation with the two addresses denormalized into subdocuments.

Automatic denormalization of JSON documents using eventing

Closing thoughts

This is a very simplistic example. In a production environment, you’ll likely add code to handle exceptions.  

Also you may increase the number of workers from 1 to the number of vCPUs if you have a large dataset and require greater performance. 

The eventing functions used in this blog can be found in the following github repo: https://github.com/djames42/cb_eventing_merge. In this repository are also a couple of scripts (both a Bash script and a Python script) that can be used to build out a single node cluster with the eventing functions and larger sample datasets pre-loaded.

Resources

References

 

Author

Posted by Daniel James, Senior Solutions Engineer

Leave a reply