This is the first of a multi-part series to leverage the Couchbase Eventing Service to run multiple scheduled tasks at specific recurring intervals in a cron like fashion completely inside the database without requiring additional infrastructure via a single general-purpose Eventing Function.
In this installment, we will focus on running fixed user routines, JavaScript functions defined inside an Eventing Function.
Later in subsequent articles we will extend the cron like Eventing Function to schedule and execute database driven dynamic N1QL statements and then finally in we will explore scheduling database driven dynamic JavaScript functions.
Background
The Couchbase Eventing Service provides a framework for writing your own routines, simple JavaScript functions, to process document changes. This service provides all the needed infrastructure to create scalable and robust cloud-based functions allowing you to focus on developing pure business logic to interact in near real-time to changes in your data. Your functions are able to access the Couchbase data service (KV), the Couchbase query service (N1QL), and REST endpoints external to the Couchbase system.
The JSON data model in Couchbase came from JavaScript, thus it is only natural that the Eventing Service exposes the ability to write JavaScript code to analyze and manipulate JSON documents on any type of change events including Inserts, Updates, Merges, and Deletes (together referred to as mutations).
Eventing Functions typically allow you to deploy and execute custom code fragments to that react to thousands and even millions of mutations per seconds in your documents. Several typical use cases are documented for developing high velocity at-scale Eventing Functions that respond to mutations of Couchbase documents.
This article will focus instead on a very low velocity use case of the Eventing Service building a reliable “in database” distributed crontab, allowing you to execute JavaScript functions that interact with Couchbase services on a regular periodic schedule.
Scheduling business logic to run at a specified date or time
Cron, named after “Chronos,” the Greek word for time is one of the most useful utilities in a Linux system. In Linux the cron utility is driven by a crontab (cron table) file, a configuration file that specifies shell commands to run periodically on a given schedule.
One drawback in running cron is that it is not designed to be a distributed service; it runs on single box, as such it presents a single point of failure. If the system is offline for several hours all scheduled tasks are missed.
Yes, there are some distributed cron implementations such as Google’s Cloud Service, AWS’ Scheduled Tasks, and Azure Functions / Time Triggers. But each cloud vendors offerings have their own idioms are not directly portable.
In addition, the methodology of configuration and control needs to be secured, for example if you control a distributed cron system via a REST API over HTTP/S you need to account for this in your security plan.
Using the Couchbase itself to run periodic commands
With a minor amount of code and planning you can leverage Couchbase’s Eventing service to provide flexible cron like functionality for your scheduled database operations or maintenance. Building the scheduler into the database allows you to achieve the following benefits:
- Portability across Cloud providers, if you rehost your Couchbase cluster your scheduler is not impacted.
- Supportability, if you utilize Couchbase you have single vendor to provided support and other services.
- Distributed, no single point of failure and all Couchbase services support distributed replicas.
- Guaranteed execution, your task gets executed even after recovery from a node failure.
Couchbase scheduling, Timers the secret sauce
Timers are Couchbase Eventing Service constructs by which developers can specify a routine (business logic) to be triggered at a future time. We will use this functionality to implement a pure Couchbase configurable crontab system that allows you the ability to trigger repetitive tasks as part of your workflows whether you need to execute a simple N1QL query or build a complex rules engine.
In all of the subsequent designs we will limit our cron implementations to a resolution of 15 seconds or greater. We have this limitation because although timers scale to the millions and are guaranteed to fire and execute, they are not wall-clock accurate currently have a bounded steady state delay of less than 14 seconds [1].
Of course, if you need a tighter schedule, i.e. less than 15 seconds, then you should merely process the mutation itself in Eventing logic without the use of a timer construct to schedule a call back in the future.
As of this writing the current Couchbase release is version 6.5.1 which two limitations that we must work around when making a robust cron system.
- In the 5.5.x, 6.0.x and 6.5.x releases a function that is invoked by a timer callback cannot reliably create a fresh timer (a user space work around can be done via a second cooperative Function).
- In the 6.5.x releases creating timers in the future (as in one hour+) in an otherwise idle system can result in a growing number of metadata bucket operations which can eventually block mutations for a given Eventing function (in 6.5.X a user space work around can be accomplished via a second cooperative Function). The severity is governed by:
- The number of vBuckets holding an active timer. Therefore if there are only a few timers in the future the issue may not be noticeable or materialize. This is the case with just a few cron schedules but for completeness in case you add date functionality I put in a fix for this issue for the code supplied in this article.
- Whether an Eventing timer has fired recently on a vBucket (which clears the issue for the given vBucket on a per function basis). Therefore systems with lots of near term timer activity will not experience this issue even if timers are scheduled far into the future.
Fortunately in version 6.6.0 both of the above issues or restrictions are lifted and a scheduler can be made in a single simple unified Eventing Function.
Prerequisites
In this article we will be using the latest GA version, i.e. Couchbase version 6.5.1 (you may need to make some changes to the Eventing Functions described for earlier Couchbase versions). The example in this article will run against the travel-sample data set which is delivered with the Couchbase server.
PRO TIP: For Advanced users only, if you are familiar with Couchbase Eventing and also our CLI / REST tools you can skip the bulk of this blog and download a ZIP file to quickly setup and run the scheduler system presented below. Right-click on the following link and choose Save Link As to download the file cron_impl_2func_CLI.zip, move it to an Eventing node, extract the ZIP file, and refer to the extracted README.txt file.
However, if you are not familiar with Couchbase or the Eventing service please walk through GET STARTED and one Eventing example specifically refer to the following:
- Setup a working Couchbase 6.5.1 server as per the directions in Start Here!
- Make sure you can run a N1QL query against the travel-sample data set as per the directions in Run Your First N1QL Query.
- Understand how to deploy a basic Eventing function as per the directions in the Document Archival example that also uses the travel-sample data set.
- Make sure you have the travel-sample bucket in the Buckets view of the UI.
- Make sure you a bucket called metadata in the Buckets view of the UI it should have the minimum size of 200MB.
- In the Buckets view of the UI create a bucket called crondata with the minimum size of 200MB. For detailed steps on how to create buckets, see Create a Bucket.
- Set allow_interbucket_recursion to true in order to allow two (2) Eventing functions to alter the same KV document [2].
1curl -X POST -u "$CB_USERNAME:$CB_PASSWORD" 'http://localhost:8091/_p/event/api/v1/config' -d '{ "allow_interbucket_recursion":true }'
Implementation #1, hard coded ‘cron’ like scheduling
For our first implementation, e.g. Part 1 of the series, we will design a simple control structure which is merely a KV JSON document and also two (2) Eventing Functions that will respond to and act upon the information in the control structure.
Below is a design of a JSON document, or control structure, that will allow us to have multiple scheduled “events”. Each schedule event will have its own control document with a unique KEY such as recurring_event::1, recurring_event::1, … recurring_event::N. The JSON structure itself contains information to “reconstitute the key” as our scheduling system will respond to changes or updates (mutations) to the control documents, such as toggling the “active” state to enable or disable the action or changing “verbose” field which controls the amount and style of logging.
The following is an example control document with KEY recurring_event::1 that will execute the JavaScript function doCronActionA at 14:54 (2:30 pm) every day.
JSON Control Record | Description |
---|---|
{ | |
“type”:”recurring_event”, | The KEY will be <<type>>::<<id>> |
“id”:1, | |
“hour”:14, | The hour of the day 0-23, *, *2X, *4X to trigger |
“min”:54, | The minute in the hour 0-59, *, *2X, *4X to trigger |
“action”:”doCronActionA”, | JavaScript function to run when the timer fires |
“active”:true, | Flag to enable or disable this schedule |
“verbose”: { | [OPTIONAL] logging control |
“user_func”:2, | Logging level for the action logic : 0=none, etc. etc. |
“scheduler”:3 | Logging level for the cron logic : 0=none, etc. etc. |
}, | |
“dynamic”: { | [DYNAMIC] system control and statistics |
“state”:”arm”, | “arm”|”rearm”|”pending” any value != “pending” start a schedule |
“next_sched”: 0, | Number of seconds since epoch to next desired schedule |
“prev_sched”: 0, | Number of seconds since epoch for previous schedule |
“prev_etime”: 0, | Number of seconds since epoch for previous schedule actual exec time |
“prev_delay”: 0, | Number of seconds that the timer was delayed from the schedule |
“prev_atime”: 0 | Number of seconds taken by the user ‘action’ |
} | |
} |
Like traditional Linux crontab you can set hour and min to legal integers, and you can also set hour to “*” to process for all hours or set min to “*” to process for all minutes.
Although we will not support the full crontab syntax we do support two non-standard settings as follows if you set both hour and min to “*4X” we will execute and re-arm four (4) times a minute and if you set them both to “*2X” we will execute and re-arm two (2) times a minute. Below is a table of supported schedules with description:
hour | min | Values can be numbers or strings |
---|---|---|
13 | 32 | Run at 13:32 (or 1:32 pm) |
* | 15 | Run every hour at 15 minutes past |
8 | 12 | Run once a day at 8:32 (or 8:32 am) |
* | * | Run once a minute |
*2X | *2X | Run twice a minute – requires both hour and min set to “*2X” |
*4X | *4X | Run four times a minute – requires both hour and min set to “*2X” |
Eventually we will use the Query Workbench to insert the cron control documents all of which must have a unique KEY of recurring_event::# to a scheduled time of execution of 14:54 ( 2:54 pm), for the action doCronActionA, we could use the following N1QL statement.
Don’t worry about actually running any N1QL statements right now, we will perform the N1QL statements later after we have built and deployed our Eventing Function.
You can create a control record (or records) in the bucket travel-sample, and then list it, arm, disarm it, adjust the schedule it follows, change the verbosity level for logging, or delete it as follows:
Action | N1QL statement |
---|---|
Create a schedule | INSERT INTO travel-sample (KEY,VALUE) VALUES (“recurring_event::1”, { “type”:”recurring_event”, “id”:1, “hour”:”14″, “min”:”54″, “action”:”doCronActionA”, “active”:true } ); |
Make an index to query data without specifying keys | CREATE primary INDEX on crondata ; |
Show all schedules order by id | SELECT * FROM crondata WHERE type=”recurring_event” order by id ; |
Show specific schedule | SELECT * FROM crondata WHERE type=”recurring_event” AND id=1 ; |
Arm or set active | UPDATE crondata SET active = true WHERE type=”recurring_event” AND id=1 ; |
Disarm or set inactive | UPDATE crondata SET active = false WHERE type=”recurring_event” AND id=1 ; |
Adjust time of trigger | UPDATE crondata SET hour = 11, min = 30 WHERE type=”recurring_event” AND id=1 ; |
Adjust logging of the “action” | UPDATE crondata SET verbose.user_data = 0 WHERE type=”recurring_event” AND id=1 ; |
Adjust logging of the scheduler logic | UPDATE crondata SET verbose.scheduler = 0 WHERE type=”recurring_event” AND id=1 ; |
Delete the schedule | DELETE FROM crondata WHERE type=”recurring_event” AND id=1 ; |
Assume we have four (4) active schedules, running the first N1QL statement, above will list all of them e.g.
1 2 |
SELECT active, action, hour, min, type, id, verbose.user_func, verbose.scheduler FROM `crondata` where type="recurring_event" order by id ; |
Would return something like the following output (table view in the Query Workbench):
active | action | hour | id | min | scheduler | type | user_func |
---|---|---|---|---|---|---|---|
true | “doCronActionA” | 14 | 1 | 54 | 1 | “recurring_event” | 2 |
true | “doCronActionB” | * | 2 | * | 1 | “recurring_event” | 1 |
true | “doCronActionC” | *2X | 3 | *2X | 4 | “recurring_event” | 4 |
true | “doCronActionD” | * | 4 | 0 | 0 | “recurring_event” | 1 |
In the above table we have four actions the first runs once a day, the second runs every minute, the third every 30 seconds, and the fourth runs once an hour. In a future installment in this series we will add “day of week” capability.
The JSON Control Record’s nested object “verbose” if not supplied will default to { “user_func”:1, “scheduler”:1 } indication a low or terse logging level for the action function and also the scheduling logic. A value of 0 will suppress all log messages, i.e. doCronActionD, while larger values will be more verbose, i.e. as defined in doCronActionC.
The JSON Control Record’s nested object “dynamic” if typically never supplied and will default to { “state”: “arm”, “next_sched”: 0, “prev_sched”: 0, “prev_etime”: 0, “prev_delay”: 0, “prev_atime”: 0 } this is a scratch pad for the running Eventing logic schedule and also provides useful statistics on execution times as such it should be treated as read-only.
At this point we have a high-level control design, but we need logic to process our control structures, this is where the Couchbase Eventing Service, specifically an Eventing Function comes into play.
The Eventing Functions
This design requires two (2) Eventing functions: a main JavaScript function “cron_impl_2func_651” and a small helper JavaScript function “cron_impl_2func_651_help”. We will discuss each section of the JavaScript functions that comprises the initial implementation combined JavaScript code almost 610 lines (with about 44% of the lines are comments and whitespace)
Don’t worry about doing a cut-n-paste right now, as later I will provide a link to download (for import) the two required Eventing Functions and all the required settings in two files named “cron_impl_2func_651.json” “cron_impl_2func_651_help.json” and also if you prefer the two full unified functions that can be cut-n-pasted directly.
Our Main Eventing Function “cron_impl_2func_651” will be composed of nine (9) JavaScript functions
- Three (3) business logic functions, (two which are empty shells).
- doCronActionA(doc) – an N1QL example user action to execute
- doCronActionB(doc) – an empty user action shell for experiments
- doCronActionC(doc) – an empty user action shell for experiments
- One (1) entry point for Eventing.
- OnUpdate(doc, meta) – the standard Eventing entry point for Inserts or Updates
- One (1) cron syntax parser to generate the next schedule.
- getNextRecurringDate(hour_str, min_str) – cron logic to find the next scheduled Date
- Three (3) support functions to check that the business logic exists or format results.
- verifyFunctionExistsViaEval(curDoc, id) – make sure we have a function to run
- toNumericFixed(number, precision) – format a float to a compact style
- toLocalISOTime(d) – format a date to a compact style
- One (1) callback function when timers are executed.
- Callback(doc) – a callback function for scheduled timers
Our Helper Eventing Function “cron_impl_2func_651_help” will be composed of one (1) JavaScript function
- One (1) entry point for Eventing.
- OnUpdate(doc, meta) – the standard Eventing entry point for Inserts or Updates
In the subsequent sections we will walk through each individual of the above JavaScript functions.
We need a JavaScript function, e.g. the business logic to run on a periodic schedule.
The first thing we want is a routine or function which has our business logic that we will execute based upon out crontab rules. We will call the JavaScript method doCronActionA(doc), however it can be called anything for example doPeriodicLedgerBalance(doc), the only requirements for our “action” functions that implements our scheduled business logic are as follows:
- Has one parameter: doc, a control document as described above of type=”recurring_event”.
- The actual JavaScript name matches the “action” field in the control document.
- Returns true on success and false on failure
- Utilizes doc.verbose.user_func to control logging if 0 it is silent, if 1 it emits a single line, if 2 it emits whatever log information is needed to debug the function, etc. etc..
We will write our function doCronActionA(doc), to run an embedded N1QL query ) to combine airline counts by country and then make a single KV document of calculated data.
1 |
SELECT country, count( * ) AS cnt FROM `travel-sample` WHERE `type` = 'airline' GROUP BY country; |
On my test system a small single node non-MDS server (running all Couchbase services) the above N1QL takes about 20 ms. (for clarity sake pretend it is super complex it takes 10 seconds to complete).
The idea here is that the final calculated and summarized KV document can be quickly loaded by a 100K (or a million) Eventing mutations per second without the additional overhead communication with the Query service nodes and of processing N1QL statements on each mutation.
It should be obvious that the goal of this particular business logic, doCronActionA(doc), is to create a semi-static cache that updates periodically on a schedule.
All we are really doing (and it’s fairly fast) is getting a count of airlines by country from the travel-sample document set. As we use N1QL we build up a document and eventually write it out to KV as a summarized document. The key point to drive home here is that we do not want to repeat the same work for millions of mutations each, especially since some calculations might take 10 seconds of Query service compute time each time we kick off an embedded N1QL query from an Eventing function.
Below we show the JavaScript function we want to run once a day (or perhaps once an hour, etc.). Note the name of the function matches the name in the control structure action field. For more details on Eventing terminology and language constructs please refer to the Couchbase documents and examples at Eventing Service: Fundamentals.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
function doCronActionA(doc) { try { // Check that doc has desired values if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return; if (doc.verbose.user_func >= 1) log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id); // this is a 6.5 N1QL query (feature not available in GA prior to 6.5) // Create an embedded N1QL iterator by issuing a SELECT statement to get the // counts of airlines by country. Make a new document and write it out to KV // We will use the iterator to create a KV document representing the results of a // HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep // a calculation up to date once a day such that it that can be read 'quickly' // by other Eventing Functions, other Couchbase services or SDKs. // Consider if we had 1 million docs in a minute do we really want to use N1QL // to recalculate something that is almost static for all 1 million documents, of // course not, so we make an intermediate value that can be read into Eventing // and used via a single 'light weight' KV read. var q_iter = SELECT country, count( * ) cnt FROM `travel-sample` WHERE `type` = 'airline' GROUP BY country; // loop through the result set and update the map 'accumulate' var accumulate = {}; var idx = 0; for (var val of q_iter) { if (doc.verbose.user_func >= 2) log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt); accumulate[val.country] = val.cnt; idx++; } // close out embedded N1QL iterator q_iter.close(); // Now let’s make a cached KV document representing a HARD length embedded N1QL // query and write it back to KV, we need a KEY and a type and id and then we // upsert it into the `travel-sample` bucket. var cachedoc = {}; cachedoc.type = "cron_cache"; cachedoc.id = "airlines_by_country"; cachedoc.date = new Date(); cachedoc.data = accumulate; var ckey = cachedoc.type + '::' + cachedoc.id; ts_bkt[ckey] = cachedoc; if (doc.verbose.user_func >= 2) { log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc); } } catch (e) { log(doc.action + ' Error exception:', e); return false; } return true; } |
The above function merely 1) queries the travel-sample bucket to extract data in this case the count of airlines for each country, 2) creates a new KV document and key and writes it out to travel-sample bucket for later use.
In addition, as part of this example we have built logging that responds to a numeric verbosity setting which a) logs a single line if control document has a value for doc.verbose.user_func == 1 or b) emits more information if the doc.verbose.user_func value >= 2.
This is a generic framework that can run one (1) cron action or even a thousand (1000) of cron actions. As such I have provided two additional “empty” function shells – as pointed out before they could have been named anything.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
function doCronActionB(doc) { try { // check that doc has desired values if (doc.type !== "recurring_event" || doc.active !== true) return; if (doc.verbose.user_func >= 1) log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id); // YOUR LOGIC HERE } catch (e) { log(doc.action + ' Error exception:', e); return false; } return true; } |
and
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
function doCronActionC(doc) { try { // check that doc has desired values if (doc.type !== "recurring_event" || doc.active !== true) return; if (doc.verbose.user_func >= 1) log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id); // YOUR LOGIC HERE } catch (e) { log(doc.action + ' Error exception:', e); return false; } return true; } |
These above functions doCronActionB and doCronActionC are trivial as they merely log information to the Eventing Application log of the Eventing function. Refer to Logging Functions for more details. Of course, you need a control document of type=”recurring_event” with active=true and an action like action = “doCronActionB” to actually enable and execute them.
We need an Eventing entry point or Handler
As of version 6.5 two entry points or handlers that are supported by the Eventing Service OnUpdate(doc, meta) and OnDelete(meta) we are only interested in the OnUpdate(doc,meta) for this example.
The OnUpdate(doc,meta) handler gets called when any document in the source bucket is created or modified (mutated) and immediately filters out documents of no interest. [3]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
function OnUpdate(doc, meta) { // fix for 6.5.X growing bucket ops if (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30); if (!cron_bkt["fix_timer_scan_issue::1"]) { cron_bkt["fix_timer_scan_issue::1"] = {}; } try { // Check if further analysis is needed we only trigger on an active recurring_event if (doc.type !== "recurring_event" || doc.active !== true) return; var update_doc = false; if (!doc.dynamic) { // Add if missing doc.dynamic with defaults doc.dynamic = { "state": "arm", "next_sched": 0, "prev_sched": 0, "prev_etime": 0, "prev_delay": 0, "prev_atime": 0 }; // we need to update the document once we have the next schedule update_doc = true; } if (!doc.verbose) { // Add if missing doc.dynamic with defaults doc.verbose = { "user_func": 1, "scheduler": 1 }; // we need to update the document once we have the next schedule update_doc = true; } // Do not process dynamic.state pending if (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return; var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEY var hour = doc.hour; var min = doc.min; // Do an eval check the JavaScript function exists. The eval occurs in a common // utility function shared with RecurringCallback if (!verifyFunctionExistsViaEval(doc, mid)) { // doc.action did not exist, we have already logged the issue return; } // Get the next valid execution time var date_timer = getNextRecurringDate(hour, min); var next_sched = Math.round(date_timer.getTime() / 1000); if (!update_doc && next_sched !== doc.dynamic.next_sched) { // the next_sched should be the same as the setting from the helper application, however // if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slot log('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' + (next_sched - doc.dynamic.next_sched) + ', reschedule'); update_doc = true; } if (update_doc) { // this mutation is recursive and will be suppressed, we ensure we have a dynamic structure doc.dynamic.next_sched = next_sched; // rather then the call a function, to trap and retry if there is a resource issue // cron_bkt[mid] = doc; if (!tryBucketKvWriteWithLog('OnUpdate F', mid, doc)) { // Failed to write doc to cron_bkt[key] the error has been logged // and there is nothing more we can do. return; } } // Schedule an Eventing timer var timer_id = createTimer(Callback, date_timer, null, doc); if (doc.verbose.scheduler >= 1) { log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' + toLocalISOTime(date_timer)); } if (doc.verbose.scheduler >= 2) { log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id); } } catch (e) { log('OnUpdate E ' + meta.id + ', Error exception:', e); } } |
The key here is that the cron logic in our handler only cares about documents that have doc.type of “recurring_event and also a doc.active of true. In addition, in this example we have built tracing for the cron housekeeping logic which is only logged to the Application log if the control document has a value for doc.verbose >= 3.
If you only run a few schedules you can turn off the user space work or “fix for 6.5.X growing bucket ops” by commenting four lines of code in the above OnUpdate block for “cron_impl_2func_651” as follows:
1 2 3 4 5 6 |
function OnUpdate(doc, meta) { // fix for 6.5.X growing bucket ops // if (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30); // if (!cron_bkt["fix_timer_scan_issue::1"]) { // cron_bkt["fix_timer_scan_issue::1"] = {}; // } |
We need code to work around possible growing bucket ops for 6.5.X
As of version 6.5.X we need a “fix for 6.5.X growing bucket ops” which happens on idle systems with lots of timers scheduled in the future. This code ensures that an Eventing timer has fired recently on a vBucket (which clears the issue for the given vBucket on a per function basis).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
// FIXUP: ADDIN FUNCTON function noopTimer(context) { // fix for 6.5.X growing bucket ops try { if (context.type === "_tmp_vbs" && context.vb === 0) { // log("noopTimer timers firing, printing only for vBucket 0"); } } catch (e) { log("OnUpdate Exception in callback noopTimer:", e); } } // FIXUP: ADDIN FUNCTON function rearmTimer(context) { // fix for 6.5.X growing bucket ops try { if (context.type === "_tmp_vbs" && context.vb === 0) { // Update/touch all docs in the helper_bucket the helper function will then // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle. // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context); // generate a mutation to re-arm the HELPER function: fix_scan_issue // which will in turn make new mutations for this Function var cur = cron_bkt[context.key]; if (cur && cur.ts_millis === context.ts_millis) { // log("rearmTimer update fix_timer_scan_issue::1 in helper_bucket alias only for vBucket 0"); var now = new Date(); cron_bkt["fix_timer_scan_issue::1"] = { "last_update": now }; } else { // NOOP we had multiple timer cycles, just let this one quietly stop. } } } catch (e) { log("OnUpdate Exception in callback rearmTimer:", e); } } // FIXUP: ADDIN FUNCTON function genNoopTimers(doc, meta, seconds) { // fix for 6.5.X growing bucket ops try { // redundant but play it safe if (doc.type === "_tmp_vbs") { // Since we are using an different function a timer on all our vBuckets do immeadiately (can take up to 15 seconds) // If we used cross bucket recursion to rearm all the timers in a recurring fashion we would add a delay of at least 40 seconds. createTimer(noopTimer, new Date(), null, doc); if (doc.vb === 0) { // Update/touch all docs in the helper_bucket the helper function will then // mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle. // log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context); // generate a mutation to re-arm the HELPER function: fix_scan_issue // which will in turn make new mutations for this Function // log("genNoopTimers make timer to rearm fix_timer_scan_issue::1"); createTimer(rearmTimer, new Date(new Date().getTime() + seconds * 1000), null, doc); } } } catch (e) { log("OnUpdate Exception in genNoopTimers:", e); } } |
We need a utility to calculate the next time in the schedule
The next function getNextRecurringDate(hour, min) will determine a time to execute the action as defined as part of our schedule. This is not a full implementation of cron, rather it contains the key standard features to execute once a day, once an hour, once a minute. It also contains some non-standard syntax to provide the ability to execute twice a minute or four times a minute.
As previously described the function getNextRecurringDate(hour, min) allows for the following (the table is duplicated below), with the last two being non-standard.[4]
hour | min | Values can be numbers or strings |
---|---|---|
13 | 32 | Run at 13:32 (or 1:32 pm) |
* | 15 | Run every hour at 15 minutes past |
8 | 12 | Run once a day at 8:32 (or 8:32 am) |
* | * | Run once a minute |
*2X | *2X | Run twice a minute – requires both hour and min set to “*2X” |
*4X | *4X | Run four times a minute – requires both hour and min set to “*2X” |
Below is an implementation of the required logic for determining the next time to trigger an Eventing timer in our schedule, in the event the user logic in our first example doCronActionA(doc) doesn’t complete timely, e.g. real-time overrun, the next quanta of the schedule will be selected. Note both Timers and their Parent Functions. So, if an Eventing Function has a default execution timeout of 60 seconds, if need be this setting can be adjusted or raised.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
function getNextRecurringDate(hour_str, min_str) { // Note Javascript Dates are in milliseconds var date_now = new Date(); var date_ret = new Date(); var hour; var min; try { hour = parseInt(hour_str); } catch (e) {} try { min = parseInt(min_str); } catch (e) {} // Note, this is only a simplistic partial 'crontab' syntax with some slight extensions // it allows once a day, once an hour, once a minute. It also contains some non-standard // syntax to provide the ability to execute twice a minute or four times a minute. if (hour_str === '*4X' && min_str === '*4X') { // once every 15 seconds or four times a minute date_ret.setMilliseconds(0); date_ret.setSeconds(15); while (date_ret.getTime() < date_now.getTime()) { date_ret.setSeconds(date_ret.getSeconds() + 15); } return date_ret; } else if (hour_str === '*2X' && min_str === '*2X') { // once every 30 seconds or twice a minute date_ret.setMilliseconds(0); date_ret.setSeconds(30); while (date_ret.getTime() < date_now.getTime()) { date_ret.setSeconds(date_ret.getSeconds() + 30); } return date_ret; } else if (hour_str === '*' && min_str === '*') { // once a minute date_ret.setMilliseconds(0); date_ret.setSeconds(0); date_ret.setMinutes(date_ret.getMinutes() + 1); } else if (hour_str !== '*' && isNaN(hour) === false && min_str === '*') { // once a minute only for a given hour date_ret.setMilliseconds(0); date_ret.setSeconds(0); date_ret.setMinutes(date_ret.getMinutes() + 1); if (date_ret.getTime() < date_now.getTime()) { date_ret.setHours(hour); } if (date_ret.getTime() > date_now.getTime()) { date_ret.setDate(date_ret.getDate() + 1); date_ret.setSeconds(0); date_ret.setMinutes(0); date_ret.setHours(hour); } } else if (hour_str === '*' && min_str !== '*' && isNaN(min) === false) { // once a hour at a given minute date_ret.setMilliseconds(0); date_ret.setSeconds(0); date_ret.setMinutes(min); // schedule for next hour date_ret.setHours(date_ret.getHours() + 1); } else if (isNaN(hour) === false && isNaN(min) === false) { // once a day for a given hour and a given minute date_ret.setMilliseconds(0); date_ret.setSeconds(0); date_ret.setMinutes(min); date_ret.setHours(hour); if (date_ret.getTime() < date_now.getTime()) { // schedule for tomorrow date_ret.setDate(date_ret.getDate() + 1); } } else { log('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>'); throw new Error('getNextRecurringDate illegal input hour_str <' + hour_str + '> min_str <' + min_str + '>'); return null; } return date_ret; } |
We need a few small utilities
The common utility function that merely checks if our JavaScript exists used by both OnUpdate(doc,meta), shown above, and the timer Callback(doc), shown later. Below is verifyFunctionExistsViaEval(curDoc, id) which takes two arguments a JSON control document and the KEY for that document.
This lets us know immediately, on deployment, if there was an issue with a naming mismatch between the JSON control record document and the actual name of the business logic function in the JavaScript code.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
function verifyFunctionExistsViaEval(curDoc, id) { var result = false; try { // check for function if missing this is invalid return result result = eval("typeof " + curDoc.action + " === 'function';"); if (result === false) { if (curDoc.verbose.scheduler >= 1) log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " + curDoc.action + "(doc) does not exist, id is", id); return result; } } catch (e) { log('verifyFunctionExistsViaEval Error exception:', e); } return result; } |
Note, if an attempt to run a non-existent function the end user will get a warning in the Application log cron_impl_2func_651.log to correct the issue.
2020-04-22T16:20:38.725-07:00 [INFO] “Warn/Disable (No Action and No Re-Arm), because required ‘action’ of doCronMyNewFunction(doc) does not exist, id is” “recurring_event::1”
This correction can be done via a Pause/Resume adding the function and then adjusting the control document with the specified id or KEY (via a toggle active to false then true) -or- adjusting the control document to point to an existing function in your handler.
Next the utility toNumericFixed(number, precision) just allows nice compact formatting of floats for our log messages.
1 2 3 4 |
function toNumericFixed(number, precision) { var multi = Math.pow(10, precision); return Math.round((number * multi).toFixed(precision + 1)) / multi; } |
Finally, the utility toLocalISOTime(d) just allows nice compact formatting of Dates for our log messages.
1 2 3 4 |
function toLocalISOTime(d) { var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in milliseconds return (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1); } |
We need a timer callback to execute the user logic and re-arm the timer
The final JavaScript function in “cron_impl_2func_651” is the Timer callback, which is called when the scheduled timer fires. The callback function must be a top-level function that takes a single argument, the context.
In this case in our OnUpdate handler we referenced a JavaScript function of Callback(doc) with a context of doc (our active scheduler control document of type=”recurring_event”)
In version 6.6 we can create another timer within a timer but for all previous versions we will need to trigger a mutation to a “helper” function (we carefully avoid infinite recursion). In 6.6 the helper function is not need and the logic is substantially simplified.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
function Callback(doc) { try { var fired_at = new Date(); // Check if further analysis is needed we only trigger on a recurring_event that is active if (doc.type !== "recurring_event") return; // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return; // process any doc.dynamic.state BUT pending if (doc.dynamic.state === "pending") return; // ================== // Check if still active // We make sure that in KV the 'doc' still exists and that it is still active if not just // return thus skipping the action and not Re-arming the timer. Note `travel-sample` is // aliased to the map 'cron_bkt var mid = doc.type + '::' + doc.id; // make our KEY var curDoc = null; try { // read the current version of doc from KV, e.g. curDoc curDoc = cron_bkt[mid]; } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception var reason = null; if (!curDoc || curDoc === null) { reason = "cron document is missing"; } else if (!curDoc.active) { reason = "cron document has active = false"; } else if (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) { reason = "cron document wrong dynamic.state expected " + doc.dynamic.state; } else if (crc64(doc) !== crc64(curDoc)) { reason = "cron document changed"; } if (reason !== null) { if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) { log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason); } if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) { log('Callback Y ' + mid + ' timer doc', doc); log('Callback Z ' + mid + ' KV curDoc', curDoc); } return; } // ================== // Verify user routine exists and if so eval it // Assume curDoc.action contains something like "doCronActionA" and we have a function in // this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be // able to alter the eval'd JavaScript function. We will execute two (2) evals. // First eval check the JavaScript function exists. The eval occurs in a common // utility function shared with Callback if (!verifyFunctionExistsViaEval(curDoc, mid)) { // curDoc.action did not exist, we have already logged the issue return; } // Second eval execute and process the user function we execute the defined function // with an argument of curDoc var beg_act = new Date(); var result = null; eval("result = " + curDoc.action + "(curDoc);"); var end_act = new Date(); var atime_ms = end_act.getTime() - beg_act.getTime(); if (curDoc.verbose.scheduler >= 2) log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) + ' sec., returned ' + result); // ================== // Calculate next time and mutate the control document for our our helper function // which will create another mutation such that OnUpdate of this function will pick // it up and generate the timer (avoids the MB-38554 issue). var hour = curDoc.hour; var min = curDoc.min; var date_timer = getNextRecurringDate(hour, min); curDoc.dynamic.prev_delay = toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3); curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched; curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000); curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3); curDoc.dynamic.state = "pending"; curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000); try { cron_bkt[mid] = curDoc; } catch (e) { log('Callback help: F ' + mid + ' FATAL could not update KV cron cycle ' + curDoc.action); return; } if (curDoc.verbose.scheduler >= 1) { log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' + toLocalISOTime(date_timer)); } if (curDoc.verbose.scheduler >= 2) { log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched + ', actual ' + curDoc.dynamic.prev_etime + ', delay ' + curDoc.dynamic.prev_delay + ', took ' + curDoc.dynamic.prev_atime); } if (curDoc.verbose.scheduler >= 3) { log('Callback C ' + mid + ' curDoc', curDoc); } } catch (e) { var mid = doc.type + '::' + doc.id; // make our KEY log('Callback E ' + mid + ' Error exception:', e); } } |
We need a helper function to trigger a new mutation
Because prior to 6.6 (which is not yet released) you cannot create a timer from within an executing timer’s callback we need a second Eventing Function (along with “allow_interbucket_recursion”:true) to trigger a mutation such that we can generate all our timers in the main Eventing Function’s OnUpdate(doc,meta) entry point. We do this as follows:
- cron_impl_2func_651 OnUpdate(doc,meta) receives a mutation, schedules a timer
- cron_impl_2func_651 After an amount of time when the timer matures the Callback(doc) routine is executed, first runs the desired user action and then it creates a mutation #1 on the control document (which is not seen by the creating Function to prevent recursion)
- cron_impl_2func_651_help OnUpdate(doc,meta) receives a mutation, makes another mutation #2 on the control document this triggers 1. above in an endless cycle.
Note, in Couchbase release 6.6 we don’t need a helper function at all because you are allowed to you create a timer from within an executing timer. This greatly simplifies the needed logic to make a cron system[2].
The sole JavaScript function in “cron_impl_2func_651_help” OnUpdate(doc,meta) is shown below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
function OnUpdate(doc, meta) { // fix for 6.5.X growing bucket ops if (meta.id.startsWith("fix_timer_scan_issue:")) upsertOneDocPerBucket(doc, meta); try { // Check that doc has desired values if (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return; // doc must have 'action', 'dynamic {}', verbose {}, dynamic.state if (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return; // Only process state pending this will only exist for a 'breif' time if (doc.dynamic.state !== "pending") return; var mid = doc.type + '::' + doc.id; // make our KEY var newdoc = null; try { // read the current version of doc from KV, e.g. curDoc newdoc = cron_bkt[mid]; } catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exception var reason = null; if (!newdoc || newdoc == null) { reason = "cron document is missing"; } else if (!newdoc.active) { reason = "cron document has active = false"; } else if (!newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) { reason = "cron document wrong dynamic.state expected " + doc.dynamic.state; } else if (crc64(doc) !== crc64(newdoc)) { reason = "cron document changed"; } if (reason != null) { if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) { log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc) return; } } newdoc.dynamic.state = "rearm"; // cron_bkt[mid] = newdoc; if (!tryBucketKvWriteWithLog('OnUpdate help: F', mid, newdoc)) { // Failed to write newdoc to cron_bkt[key] the error has been logged // and there is nothing more we can do. return; } if (newdoc.verbose.scheduler >= 1) { log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm'); } if (newdoc.verbose.scheduler >= 3) { log('OnUpdate help: B ' + mid + ',', newdoc); } } catch (e) { log('OnUpdate help: E ' + meta.id + ', Error exception:', e); } } function tryBucketKvWriteWithLog(tag, key, doc) { var success = false; var tries = 0; while (tries < 10) { tries++; try { // critical that the below succeeds, because if it doesn't the cron cycle will break cron_bkt[key] = doc; success = true; break; } catch (e) { log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e); } } if (!success) { log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action); } return success; } |
The helper function needs some utilities
These utilities provide a fix for 6.5.X growing bucket ops by ensuring an Eventing timer is fired on every vBucket in a timely fashion.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
// FIXUP: ADDIN FUNCTON // fix for 6.5.X growing bucket ops function upsertOneDocPerBucket(doc, meta) { var crcTable = makeCRC32Table(); // make one doc per bucket var isVerbose = 0; var isMacOS = false; // would be nice if this was an exposed constant in Eventing var numvbs = 1024; // default is linux/PC if (isMacOS) { numvbs = 64; } var beg = (new Date).getTime(); var result = getKeysToCoverAllPartitions(crcTable, "_tmp_vbs:", numvbs); for (var vb=0; vb<numvbs; vb++) { // brute force to fit a key prefix into a vBucket var tst = result[vb]; if (isVerbose > 1 || (isVerbose == 1) && (vb < 3 || vb > numvbs -4)) { log("KEY: " + tst); } else { if (vb == 5) console.log("\t*\n\t*\n\t*"); } // update the items to trigger a mutation for our PRIMARY fucntion cron_bkt[tst] = { "type": "_tmp_vbs", "vb": vb, "ts_millis": beg, "key": tst }; } var end = (new Date).getTime(); log("seeding one doc to each vBucket in primary_bucket alias (took " + (end - beg) + " mililis)"); } // FIXUP: ADDIN FUNCTON // fix for 6.5.X growing bucket ops function showHex(n) { return n.toString(16); } // FIXUP: ADDIN FUNCTON // fix for 6.5.X growing bucket ops function makeCRC32Table() { var crcTable = []; var c; for(var n =0; n < 256; n++){ c = n; for(var k =0; k < 8; k++){ c = ((c&1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1)); } crcTable[n] = c; } return crcTable; } // FIXUP: ADDIN FUNCTON // fix for 6.5.X growing bucket ops function crc32(crcTable,str) { var crc = 0 ^ (-1); for (var i = 0; i < str.length; i++ ) { crc = (crc >>> 8) ^ crcTable[(crc ^ str.charCodeAt(i)) & 0xFF]; } return (crc ^ (-1)) >>> 0; } // FIXUP: ADDIN FUNCTON // fix for 6.5.X growing bucket ops function getKeysToCoverAllPartitions(crcTable,keyPrefix,partitionCount) { var result = []; var remaining = partitionCount; for (var i = 0; remaining > 0; i++) { var key = keyPrefix + i; var rv = (crc32(crcTable,key) >> 16) & 0x7fff; var actualPartition = rv & partitionCount - 1; if (!result[actualPartition] || result[actualPartition] === undefined) { result[actualPartition] = key; remaining--; } } return result; } |
Now let’s deploy the two Eventing Functions
We reviewed a lot code and the design of the initial scheduler, now it’s time to see everything working together.
Remember for this example, three buckets travel-sample (a sample default data set), metadata, (the metadata bucket is a scratchpad for Eventing and can be shared with other Eventing functions), and finally the crondata (which holds our cron schedules). The travel-sample bucket has a size of 100MB and the other two buckets metadata and crondata should both have a size of 200MB and already exist as per the directions in “Prerequisites”.
- Verify your current bucket configuration by accessing the Couchbase Web Console > Buckets page:
To deploy the Eventing Function “cron_impl_2func_651” you can follow one of two methods:
- Basic complexity, Method #1 Download/Import
- Medium complexity, Method #2 Manually Add Function, Cut-n-Paste JavaScript
Method #1 Download/Import
Import the 1st Function “cron_impl_2func_651”
Download the first Eventing Function with all the required settings, Right-click on the following link and choose Save Link As to download the file cron_impl_2func_651.json onto your local file system.
From the Couchbase Web Console > Eventing page, click IMPORT, navigate to the file cron_impl_2func_651.json, select it and open it. The ADD FUNCTION dialog appears.
In the ADD FUNCTION dialog, for individual Function elements provide the below information. Note the JSON file cron_impl_2func_651.json will pre-configure all settings correctly for this example:
- For the Source Bucket drop-down, verify it is set to crondata.
- For the Metadata Bucket drop-down, verify it is set to metadata.
- Verify that cron_impl_2func_651 is the name of the Function you are creating in the Function Name text-box.
- [Optional Step] Enter text A cron like scheduler part 1, in the Description text-box.
- For the Settings option, use the default values.
- For the Bindings option, verify that two bindings exist.
- For the binding, the “bucket alias”, specifies cron_bkt as the “alias name” of the bucket, and select
crondata as the associated bucket, and the mode should be “read and write”. - For the binding, the “bucket alias”, specifies ts_bkt as the “alias name” of the bucket, and select
travel-sample as the associated bucket, and the mode should be “read and write”. - Your settings in the dialog should look like the following:
- After verifying all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651 dialog appears (with the JavaScript code pre-loaded).
- To return to the Eventing screen, click the ‘< back to Eventing‘ link (below the editor) or click the Eventing tab.
Import the 2nd Function “cron_impl_2func_651_help”
Download the second Eventing Function with all the required settings, Right-click on the following link and choose Save Link As to download the file cron_impl_2func_651_help.json onto your local file system.
From the Couchbase Web Console > Eventing page, click IMPORT, navigate to the file cron_impl_2func_651_help.json, select it and open it. The ADD FUNCTION dialog appears.
In the ADD FUNCTION dialog, for individual Function elements provide the below information. Note the JSON file cron_impl_2func_651_help.json will pre-configure all settings correctly for this example:
- For the Source Bucket drop-down, verify it is set to crondata.
- For the Metadata Bucket drop-down, verify it is set to metadata.
- Verify that cron_impl_2func_651_help is the name of the Function you are creating in the Function Name text-box.
- [Optional Step] Enter text A cron like scheduler helper part 1, in the Description text-box.
- For the Settings option, use the default values.
- For the Bindings option, verify that only one binding exists.
- For the binding, the “bucket alias”, specifies cron_bkt as the “alias name” of the bucket, and select
crondata as the associated bucket, and the mode should be “read and write”. - Your settings in the dialog should look like the following:
- After verifying all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651_help dialog appears (with the JavaScript code pre-loaded).
- To return to the Eventing screen, click the ‘< back to Eventing‘ link (below the editor) or click the Eventing tab.
Method #2 Manually Add Function, Cut-n-Paste JavaScript
Manually create “cron_impl_2func_651”
To add the first Eventing function from the Couchbase Web Console > Eventing page, click ADD FUNCTION, to add a new Function. The ADD FUNCTION dialog appears.
In the ADD FUNCTION dialog, for individual Function elements provide the below information:
- For the Source Bucket drop-down, set to crondata.
- For the Metadata Bucket drop-down, set to metadata.
- Make cron_impl_2func_651 is the name of the Function you are creating in the Function Name text-box.
- [Optional Step] Enter text A cron like scheduler part 1, in the Description text-box.
- For the Settings option, use the default values.
- For the Bindings option, create two bindings:
- For the binding, the “bucket alias”, specifies cron_bkt as the “alias name” of the bucket, and select
crondata as the associated bucket, and the mode should be “read and write”. - For the binding, the “bucket alias”, specifies ts_bkt as the “alias name” of the bucket, and select
travel-sample as the associated bucket, and the mode should be “read and write”. - After configuring your settings your dialog should look like this:
- After providing all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651 dialog appears. The cron_impl_2func_651 dialog initially contains a placeholder code block. You will substitute your actual cron_impl_2func_651 code in this block.
- Copy the following Eventing Function JavaScript source (618 lines) and paste it in the placeholder code block of cron_impl_2func_651
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618/*Function "cron_impl_2func_651" also requires "cron_impl_2func_651_help"Create a basic cron system using Eventing allows a recurring function to execute activity at aspecified time every day, hour, min, 30 sec., and 15 sec. We use a bucket called 'crondata'aliased to 'cron_bkt' which can hold one or more control documents of type = "recurring_event".The following uses of timers do not work reliably in Couchbase versions 6.5 and 6.5.1a) scheduling an Eventing timer within a timer's callbackb) overwriting an existing timer by idIn addition the ability to cancel a timer does not exist in Couchbase versions 6.5 and 6.5.1For this example, we supply one real user function that builds a recurring 'static' cache documentfrom bucket `travel-sample` via an N1QL query and save the result back to `travel-sample` viathe alais 'ts_bkt'. This JavaScript function is doCronActionA(), we also provide two placeholdersdoCronActionB() and doCronActionC() for additional experimentation.Test Doc:{"type":"recurring_event", // The KEY will be <<type>>::<<id>>"id":1, //"hour":14, // The hour of the day 0-23, *, *2X, *4X to trigger"min":54, // The minute in the hour 0-59, *, *2X, *4X to trigger"action":"doCronActionA", // What function to run on the trigger"active":false, // Flag to arm or disable this schedule"verbose" : {"user_func":2, // Logging level for the action logic : 0=none, etc. etc."scheduler":3 // Logging level for the cron logic : 0=none, etc. etc.},"dynamic" : {"state":"arm", // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule"next_sched": 0, // Number of seconds since epoch to next desired schedule"prev_sched": 0, // Number of seconds since epoch for previous schedule"prev_etime": 0, // Number of seconds since epoch for previous schedule actual exec time"prev_delay": 0, // Number of seconds that the timer was delayed from the schedule"prev_atime": 0 // Number of seconds taken by the user 'action'}}INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1",{"type":"recurring_event","id":1,"hour":14,"min":54,"action":"doCronActionA","verbose" : {"user_func":2,"scheduler":3},"active":false,"dynamic" : {"state": "arm","next_sched": 0,"prev_sched": 0,"prev_etime": 0,"prev_delay": 0,"prev_atime": 0}});Note, you can omit verbose{} and dynamic{} as they will be auto-created by this main EventingFunction "cron_impl_2func_651". If verbose{} is missing the logging levels will default toverbose" : { "user_func":1, "scheduler":1 }INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1",{"type":"recurring_event","id":1,"hour":14,"min":54,"action":"doCronActionA","active":false});N1QL : Make an index to query data without specifying keysCREATE primary INDEX on `crondata` ;N1QL : Verify or inspect settings in scheduleSELECT * FROM `crondata` WHERE type="recurring_event";N1QL : Arm or set activeUPDATE `crondata` SET active = true WHERE type="recurring_event" AND id=1 ;N1QL : Disarm or set inactiveUPDATE `crondata` SET active = false WHERE type="recurring_event" AND id=1 ;N1QL : Adjust time of triggerUPDATE `crondata` SET hour = 11, min = 30 WHERE type="recurring_event" AND id=1 ;N1QL : Adjust loggingUPDATE `crondata` SET verbose.user_func = 1, verbose.scheduler = 0 WHERE type="recurring_event" AND id=1 ;N1QL : Delete the scheduleDELETE FROM `crondata` WHERE type="recurring_event" AND id=1 ;The action field is important it 'should' exist in this Eventing Function note it could be anyJavaScript name e.g. MyFunc and you must implement like the example doCronActionA(doc) wheredoc will be the currently active item of type = 'recurring_event' read from the alias bucket‘cron_bkt’ when the timer is fired. The action JavaScript function should return either trueor false used for logging purposes. If the action does not exist it is an error and a warningis logged and the timer is disabled.In Couchbase version 6.5+ to add a new cron like daily function just pause the active handlerinsert your new function doCronActionB(doc) {...} then Resume the eventing handler. The nicething is if a timer was to be fired will the function was paused it will NOT be lost, when youresume the function it will be processed at the next available time slot.Any change to a control structure will create a new recurring schedule or timer and cancel thecurrent previous schedule this includes changing the verbosity level. The previous timer willcontinue to run however when executed it will do a Checksum on the current control structurefrom KV against it’s passed context and if different the Callback will ignore the old schedule.This logic could be altered to process immediately if the schedule has expired search for thestring "OnUpdate U" in the code below.*/// ==================/* BEG USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT BELOW */function doCronActionA(doc) {try {// Check that doc has desired valuesif (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active !== true) return;if (doc.verbose.user_func >= 1)log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);// this is a 6.5 N1QL query (feature not available in GA prior to 6.5)// Create an embedded N1QL iterator by issuing a SELECT statement to get the// counts of airlines by country. Make a new document and write it out to KV// We will use the iterator to create a KV document representing the results of a// HARD lengthy embedded N1QL query and write it back to KV, the idea is to keep// a calculation up to date once a day such that it that can be read 'quickly'// by other Eventing Functions, other Couchbase services or SDKs.// Consider if we had 1 million docs in a minute do we really want to use N1QL// to recalculate something that is almost static for all 1 million documents, of// course not, so we make an intermediate value that can be read into Eventing// and used via a single 'light weight' KV read.var q_iter = SELECT country,count( * ) cntFROM `travel-sample`WHERE `type` = 'airline'GROUP BY country;// loop through the result set and update the map 'accumulate'var accumulate = {};var idx = 0;for (var val of q_iter) {if (doc.verbose.user_func >= 2)log(doc.action + ' N1QL idx ' + idx + ', country ' + val.country + " cnt " + val.cnt);accumulate[val.country] = val.cnt;idx++;}// close out embedded N1QL iteratorq_iter.close();// Now let’s make a cached KV document representing a HARD length embedded N1QL// query and write it back to KV, we need a KEY and a type and id and then we// upsert it into the `travel-sample` bucket.var cachedoc = {};cachedoc.type = "cron_cache";cachedoc.id = "airlines_by_country";cachedoc.date = new Date();cachedoc.data = accumulate;var ckey = cachedoc.type + '::' + cachedoc.id;ts_bkt[ckey] = cachedoc;if (doc.verbose.user_func >= 2) {log(doc.action + ' upsert to KV with KEY ' + ckey + ' cachedoc ', cachedoc);}} catch (e) {log(doc.action + ' Error exception:', e);return false;}return true;}function doCronActionB(doc) {try {// check that doc has desired valuesif (doc.type !== "recurring_event" || doc.active !== true) return;if (doc.verbose.user_func >= 1)log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);// YOUR LOGIC HERE} catch (e) {log(doc.action + ' Error exception:', e);return false;}return true;}function doCronActionC(doc) {try {// check that doc has desired valuesif (doc.type !== "recurring_event" || doc.active !== true) return;if (doc.verbose.user_func >= 1)log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);// YOUR LOGIC HERE} catch (e) {log(doc.action + ' Error exception:', e);return false;}return true;}/* END USER FUNCTIONS TO RUN ONCE A DAY, HOUR, OR MINUTE - ANYTHING YOU WANT ABOVE */// ==================// FIXUP: ADDIN FUNCTONfunction noopTimer(context) {// fix for 6.5.X growing bucket opstry {if (context.type === "_tmp_vbs" && context.vb === 0) {// log("noopTimer timers firing, printing only for vBucket 0");}} catch (e) {log("OnUpdate Exception in callback noopTimer:", e);}}// FIXUP: ADDIN FUNCTONfunction rearmTimer(context) {// fix for 6.5.X growing bucket opstry {if (context.type === "_tmp_vbs" && context.vb === 0) {// Update/touch all docs in the helper_bucket the helper function will then// mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.// log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);// generate a mutation to re-arm the HELPER function: fix_scan_issue// which will in turn make new mutations for this Functionvar cur = cron_bkt[context.key];if (cur && cur.ts_millis === context.ts_millis) {// log("rearmTimer update fix_timer_scan_issue::1 in helper_bucket alias only for vBucket 0");var now = new Date();cron_bkt["fix_timer_scan_issue::1"] = { "last_update": now };} else {// NOOP we had multiple timer cycles, just let this one quietly stop.}}} catch (e) {log("OnUpdate Exception in callback rearmTimer:", e);}}// FIXUP: ADDIN FUNCTONfunction genNoopTimers(doc, meta, seconds) {// fix for 6.5.X growing bucket opstry {// redundant but play it safeif (doc.type === "_tmp_vbs") {// Since we are using an different function a timer on all our vBuckets do immeadiately (can take up to 15 seconds)// If we used cross bucket recursion to rearm all the timers in a recurring fashion we would add a delay of at least 40 seconds.createTimer(noopTimer, new Date(), null, doc);if (doc.vb === 0) {// Update/touch all docs in the helper_bucket the helper function will then// mutate all 1024 of type == vbs_seed (64 on MacOS) to create a recuring cycle.// log("noopTimer timer fired all 1024 vBuckets, logging only vb 0", context);// generate a mutation to re-arm the HELPER function: fix_scan_issue// which will in turn make new mutations for this Function// log("genNoopTimers make timer to rearm fix_timer_scan_issue::1");createTimer(rearmTimer, new Date(new Date().getTime() + seconds * 1000), null, doc);}}} catch (e) {log("OnUpdate Exception in genNoopTimers:", e);}}function OnUpdate(doc, meta) {// fix for 6.5.X growing bucket opsif (doc.type === "_tmp_vbs") genNoopTimers(doc, meta, 30);if (!cron_bkt["fix_timer_scan_issue::1"]) {cron_bkt["fix_timer_scan_issue::1"] = {};}try {// Check if further analysis is needed we only trigger on an active recurring_eventif (doc.type !== "recurring_event" || doc.active !== true) return;var update_doc = false;if (!doc.dynamic) {// Add if missing doc.dynamic with defaultsdoc.dynamic = {"state": "arm","next_sched": 0,"prev_sched": 0,"prev_etime": 0,"prev_delay": 0,"prev_atime": 0};// we need to update the document once we have the next scheduleupdate_doc = true;}if (!doc.verbose) {// Add if missing doc.dynamic with defaultsdoc.verbose = {"user_func": 1,"scheduler": 1};// we need to update the document once we have the next scheduleupdate_doc = true;}// Do not process dynamic.state pendingif (!doc.dynamic || !doc.dynamic.state || doc.dynamic.state === "pending") return;var mid = doc.type + "::" + doc.id; // this is the same as meta.id or the KEYvar hour = doc.hour;var min = doc.min;// Do an eval check the JavaScript function exists. The eval occurs in a common// utility function shared with RecurringCallbackif (!verifyFunctionExistsViaEval(doc, mid)) {// doc.action did not exist, we have already logged the issuereturn;}// Get the next valid execution timevar date_timer = getNextRecurringDate(hour, min);var next_sched = Math.round(date_timer.getTime() / 1000);if (!update_doc && next_sched !== doc.dynamic.next_sched) {// the next_sched should be the same as the setting from the helper application, however// if we undeploy/deploy or pause/resume we might haver to reschedule to the next time slotlog('OnUpdate U ' + mid + ' calculated next_sched !== doc.dynamic.next_sched, delta ' +(next_sched - doc.dynamic.next_sched) + ', reschedule');update_doc = true;}if (update_doc) {// this mutation is recursive and will be suppressed, we ensure we have a dynamic structuredoc.dynamic.next_sched = next_sched;// rather then the call a function, to trap and retry if there is a resource issue// cron_bkt[mid] = doc;if (!tryBucketKvWriteWithLog('OnUpdate F', mid, doc)) {// Failed to write doc to cron_bkt[key] the error has been logged// and there is nothing more we can do.return;}}// Schedule an Eventing timervar timer_id = createTimer(Callback, date_timer, null, doc);if (doc.verbose.scheduler >= 1) {log('OnUpdate A ' + mid + ' rcv mutation (initial or rearm) schedule timer at ' +toLocalISOTime(date_timer));}if (doc.verbose.scheduler >= 2) {log('OnUpdate B ' + mid + ' recurring timer was created, timer_id ' + timer_id);}} catch (e) {log('OnUpdate E ' + meta.id + ', Error exception:', e);}}function getNextRecurringDate(hour_str, min_str) {// Note Javascript Dates are in millisecondsvar date_now = new Date();var date_ret = new Date();var hour;var min;try {hour = parseInt(hour_str);} catch (e) {}try {min = parseInt(min_str);} catch (e) {}// Note, this is only a simplistic partial 'crontab' syntax with some slight extensions// it allows once a day, once an hour, once a minute. It also contains some non-standard// syntax to provide the ability to execute twice a minute or four times a minute.if (hour_str === '*4X' && min_str === '*4X') {// once every 15 seconds or four times a minutedate_ret.setMilliseconds(0);date_ret.setSeconds(15);while (date_ret.getTime() < date_now.getTime()) {date_ret.setSeconds(date_ret.getSeconds() + 15);}return date_ret;} elseif (hour_str === '*2X' && min_str === '*2X') {// once every 30 seconds or twice a minutedate_ret.setMilliseconds(0);date_ret.setSeconds(30);while (date_ret.getTime() < date_now.getTime()) {date_ret.setSeconds(date_ret.getSeconds() + 30);}return date_ret;} elseif (hour_str === '*' && min_str === '*') {// once a minutedate_ret.setMilliseconds(0);date_ret.setSeconds(0);date_ret.setMinutes(date_ret.getMinutes() + 1);} elseif (hour_str !== '*' && isNaN(hour) === false && min_str === '*') {// once a minute only for a given hourdate_ret.setMilliseconds(0);date_ret.setSeconds(0);date_ret.setMinutes(date_ret.getMinutes() + 1);if (date_ret.getTime() < date_now.getTime()) {date_ret.setHours(hour);}if (date_ret.getTime() > date_now.getTime()) {date_ret.setDate(date_ret.getDate() + 1);date_ret.setSeconds(0);date_ret.setMinutes(0);date_ret.setHours(hour);}} elseif (hour_str === '*' && min_str !== '*' && isNaN(min) === false) {// once a hour at a given minutedate_ret.setMilliseconds(0);date_ret.setSeconds(0);date_ret.setMinutes(min);// schedule for next hourdate_ret.setHours(date_ret.getHours() + 1);} elseif (isNaN(hour) === false && isNaN(min) === false) {// once a day for a given hour and a given minutedate_ret.setMilliseconds(0);date_ret.setSeconds(0);date_ret.setMinutes(min);date_ret.setHours(hour);if (date_ret.getTime() < date_now.getTime()) {// schedule for tomorrowdate_ret.setDate(date_ret.getDate() + 1);}} else {log('getNextRecurringDate illegal input hour_str <' +hour_str + '> min_str <' + min_str + '>');throw new Error('getNextRecurringDate illegal input hour_str <' +hour_str + '> min_str <' + min_str + '>');return null;}return date_ret;}function verifyFunctionExistsViaEval(curDoc, id) {var result = false;try {// check for function if missing this is invalid return resultresult = eval("typeof " + curDoc.action + " === 'function';");if (result === false) {if (curDoc.verbose.scheduler >= 1)log("Warn/Disable (No Action and No Re-Arm), because required 'action' of " +curDoc.action + "(doc) does not exist, id is", id);return result;}} catch (e) {log('verifyFunctionExistsViaEval Error exception:', e);}return result;}function toNumericFixed(number, precision) {var multi = Math.pow(10, precision);return Math.round((number * multi).toFixed(precision + 1)) / multi;}function toLocalISOTime(d) {var tzoffset = (new Date()).getTimezoneOffset() * 60000; //offset in millisecondsreturn (new Date(d.getTime() - tzoffset)).toISOString().slice(0, -1);}function tryBucketKvWriteWithLog(tag, key, doc) {var success = false;var tries = 0;while (tries < 10) {tries++;try {// critical that the below succeeds, because if it doesn't the cron cycle will breakcron_bkt[key] = doc;success = true;break;} catch (e) {log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e);}}if (!success) {log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action);}return success;}function Callback(doc) {try {var fired_at = new Date();// Check if further analysis is needed we only trigger on a recurring_event that is activeif (doc.type !== "recurring_event") return;// doc must have 'action', 'dynamic {}', verbose {}, dynamic.stateif (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;// process any doc.dynamic.state BUT pendingif (doc.dynamic.state === "pending") return;// ==================// Check if still active// We make sure that in KV the 'doc' still exists and that it is still active if not just// return thus skipping the action and not Re-arming the timer. Note `travel-sample` is// aliased to the map 'cron_bktvar mid = doc.type + '::' + doc.id; // make our KEYvar curDoc = null;try {// read the current version of doc from KV, e.g. curDoccurDoc = cron_bkt[mid];} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exceptionvar reason = null;if (!curDoc || curDoc === null) {reason = "cron document is missing";} elseif (!curDoc.active) {reason = "cron document has active = false";} elseif (!curDoc.dynamic.state || curDoc.dynamic.state !== doc.dynamic.state) {reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;} elseif (crc64(doc) !== crc64(curDoc)) {reason = "cron document changed";}if (reason !== null) {if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 1) {log('Callback X ' + mid + " ignore/stop this timer's schedule because " + reason);}if (!curDoc || curDoc === null || curDoc.verbose.scheduler >= 4) {log('Callback Y ' + mid + ' timer doc', doc);log('Callback Z ' + mid + ' KV curDoc', curDoc);}return;}// ==================// Verify user routine exists and if so eval it// Assume curDoc.action contains something like "doCronActionA" and we have a function in// this handler like "doCronActionA(doc)". Below we use curDoc as the end user should be// able to alter the eval'd JavaScript function. We will execute two (2) evals.// First eval check the JavaScript function exists. The eval occurs in a common// utility function shared with RecurringCallbackif (!verifyFunctionExistsViaEval(curDoc, mid)) {// curDoc.action did not exist, we have already logged the issuereturn;}// Second eval execute and process the user function we execute the defined function// with an argument of curDocvar beg_act = new Date();var result = null;eval("result = " + curDoc.action + "(curDoc);");var end_act = new Date();var atime_ms = end_act.getTime() - beg_act.getTime();if (curDoc.verbose.scheduler >= 2)log('Callback R ' + mid + ' action took ' + toNumericFixed((atime_ms / 1000), 3) +' sec., returned ' + result);// ==================// Calculate next time and mutate the control document for our our helper function// which will create another mutation such that OnUpdate of this function will pick// it up and generate the timer (avoids the MB-38554 issue).var hour = curDoc.hour;var min = curDoc.min;var date_timer = getNextRecurringDate(hour, min);curDoc.dynamic.prev_delay =toNumericFixed(((fired_at.getTime() / 1000) - curDoc.dynamic.next_sched), 3);curDoc.dynamic.prev_sched = curDoc.dynamic.next_sched;curDoc.dynamic.prev_etime = Math.round(fired_at.getTime() / 1000);curDoc.dynamic.prev_atime = toNumericFixed((atime_ms / 1000), 3);curDoc.dynamic.state = "pending";curDoc.dynamic.next_sched = Math.round(date_timer.getTime() / 1000);// rather then the call a function, to trap and retry if there is a resource issue// cron_bkt[mid] = curDoc;if (!tryBucketKvWriteWithLog('Callback F', mid, curDoc)) {// Failed to write curDoc to cron_bkt[key] the error has been logged// and there is nothing more we can do.return;}if (curDoc.verbose.scheduler >= 1) {log('Callback A ' + mid + ' gen mutation #1 to doc to force schedule rearm at ' +toLocalISOTime(date_timer));}if (curDoc.verbose.scheduler >= 2) {log('Callback B ' + mid + ' sched ' + curDoc.dynamic.prev_sched +', actual ' + curDoc.dynamic.prev_etime +', delay ' + curDoc.dynamic.prev_delay +', took ' + curDoc.dynamic.prev_atime);}if (curDoc.verbose.scheduler >= 3) {log('Callback C ' + mid + ' curDoc', curDoc);}} catch (e) {var mid = doc.type + '::' + doc.id; // make our KEYlog('Callback E ' + mid + ' Error exception:', e);}}
- After pasting, the screen appears as displayed below:
- Click Save.
- To return to the Eventing screen, click the ‘< back to Eventing‘ link (below the editor) or click the Eventing
Manually create “cron_impl_2func_651_help”
To add the second Eventing function from the Couchbase Web Console > Eventing page, click ADD FUNCTION, to add a new Function. The ADD FUNCTION dialog appears.
In the ADD FUNCTION dialog, for individual Function elements provide the below information:
- For the Source Bucket drop-down, set to crondata.
- For the Metadata Bucket drop-down, set to metadata.
- Make cron_impl_2func_651_help is the name of the Function you are creating in the Function Name text-box.
- [Optional Step] Enter text A cron like scheduler helper part 1, in the Description text-box.
- For the Settings option, use the default values.
- For the Bindings option, create one binding:
- For the binding, the “bucket alias”, specifies cron_bkt as the “alias name” of the bucket, and select
crondata as the associated bucket, and the mode should be “read and write”. - After configuring your settings your dialog should look like this:
- After providing all the required information in the ADD FUNCTION dialog, click Next: Add Code. The cron_impl_2func_651_help dialog appears. The cron_impl_2func_651_help dialog initially contains a placeholder code block. You will substitute your actual cron_impl_2func_651_help code in this block.
- Copy the following Eventing Function JavaScript source (187 lines) and paste it in the placeholder code block of cron_impl_2func_651_help
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187/*Function "cron_impl_2func_651_help" also requires "cron_impl_2func_651"Test Doc:{"type":"recurring_event", // The KEY will be <<type>>::<<id>>"id":1, //"hour":14, // The hour of the day 0-23, *, *2X, *4X to trigger"min":54, // The minute in the hour 0-59, *, *2X, *4X to trigger"action":"doCronActionA", // What function to run on the trigger"active":false, // Flag to arm or disable this schedule"verbose" : {"user_func":2, // Logging level for the action logic : 0=none, etc. etc."scheduler":3 // Logging level for the cron logic : 0=none, etc. etc.},"dynamic" : {"state":"arm", // States "arm"|"rearm"|"pending" if any value but "pending" start a schedule"next_sched": 0, // Number of seconds since epoch to next desired schedule"prev_sched": 0, // Number of seconds since epoch for previous schedule"prev_etime": 0, // Number of seconds since epoch for previous schedule actual exec time"prev_delay": 0, // Number of seconds that the timer was delayed from the schedule"prev_atime": 0 // Number of seconds taken by the user 'action'}}Note, you can omit verbose{} and dynamic{} as they will be autocreated by the main EventingFunction "cron_impl_2func_651". If verbose{} is missing the logging levels will default toverbose" : { "user_func":1, "scheduler":1 }*/function tryBucketKvWriteWithLog(tag, key, doc) {var success = false;var tries = 0;while (tries < 10) {tries++;try {// critical that the below succeeds, because if it doesn't the cron cycle will breakcron_bkt[key] = doc;success = true;break;} catch (e) {log(tag + ' ' + key + ' WARN failed to update KV tries ' + tries, e);}}if (!success) {log(tag + ' ' + +key + ' FATAL could not update KV cron cycle, tried ' + tries + ', stoping ' + curDoc.action);}return success;}function OnUpdate(doc, meta) {// fix for 6.5.X growing bucket opsif (meta.id.startsWith("fix_timer_scan_issue:")) upsertOneDocPerBucket(doc, meta);try {// Check that doc has desired valuesif (!doc.type || doc.type !== "recurring_event" || !doc.active || doc.active != true) return;// doc must have 'action', 'dynamic {}', verbose {}, dynamic.stateif (!doc.action || !doc.dynamic || !doc.verbose || !doc.dynamic.state) return;// Only process state pending this will only exist for a 'breif' timeif (doc.dynamic.state !== "pending") return;var mid = doc.type + '::' + doc.id; // make our KEYvar newdoc = null;try {// read the current version of doc from KV, e.g. curDocnewdoc = cron_bkt[mid];} catch (e) {} // needed for pre 6.5, note pure 6.5+ deployment returns null sans exceptionvar reason = null;if (!newdoc || newdoc == null) {reason = "cron document is missing";} elseif (!newdoc.active) {reason = "cron document has active = false";} elseif (!newdoc.dynamic.state || newdoc.dynamic.state !== doc.dynamic.state) {reason = "cron document wrong dynamic.state expected " + doc.dynamic.state;} elseif (crc64(doc) !== crc64(newdoc)) {reason = "cron document changed";}if (reason != null) {if (!newdoc || newdoc == null || newdoc.verbose.scheduler >= 1) {log('OnUpdate help: X stopping schedule because ' + reason + ',', newdoc)return;}}newdoc.dynamic.state = "rearm";// cron_bkt[mid] = newdoc;if (!tryBucketKvWriteWithLog('OnUpdate help: F', mid, newdoc)) {// Failed to write newdoc to cron_bkt[key] the error has been logged// and there is nothing more we can do.return;}if (newdoc.verbose.scheduler >= 1) {log('OnUpdate help: A ' + mid + ' mutation #2 to doc to force schedule rearm');}if (newdoc.verbose.scheduler >= 3) {log('OnUpdate help: B ' + mid + ',', newdoc);}} catch (e) {log('OnUpdate help: E ' + meta.id + ', Error exception:', e);}}// FIXUP: ADDIN FUNCTON// fix for 6.5.X growing bucket opsfunction upsertOneDocPerBucket(doc, meta) {var crcTable = makeCRC32Table();// make one doc per bucketvar isVerbose = 0;var isMacOS = false; // would be nice if this was an exposed constant in Eventingvar numvbs = 1024; // default is linux/PCif (isMacOS) {numvbs = 64;}var beg = (new Date).getTime();var result = getKeysToCoverAllPartitions(crcTable, "_tmp_vbs:", numvbs);for (var vb=0; vb<numvbs; vb++) {// brute force to fit a key prefix into a vBucketvar tst = result[vb];if (isVerbose > 1 || (isVerbose == 1) && (vb < 3 || vb > numvbs -4)) {log("KEY: " + tst);} else {if (vb == 5) console.log("\t*\n\t*\n\t*");}// update the items to trigger a mutation for our PRIMARY fucntioncron_bkt[tst] = { "type": "_tmp_vbs", "vb": vb, "ts_millis": beg, "key": tst };}var end = (new Date).getTime();log("seeding one doc to each vBucket in primary_bucket alias (took " + (end - beg) + " mililis)");}// FIXUP: ADDIN FUNCTON// fix for 6.5.X growing bucket opsfunction showHex(n) {return n.toString(16);}// FIXUP: ADDIN FUNCTON// fix for 6.5.X growing bucket opsfunction makeCRC32Table() {var crcTable = [];var c;for(var n =0; n < 256; n++){c = n;for(var k =0; k < 8; k++){c = ((c&1) ? (0xEDB88320 ^ (c >>> 1)) : (c >>> 1));}crcTable[n] = c;}return crcTable;}// FIXUP: ADDIN FUNCTON// fix for 6.5.X growing bucket opsfunction crc32(crcTable,str) {var crc = 0 ^ (-1);for (var i = 0; i < str.length; i++ ) {crc = (crc >>> 8) ^ crcTable[(crc ^ str.charCodeAt(i)) & 0xFF];}return (crc ^ (-1)) >>> 0;}// FIXUP: ADDIN FUNCTON// fix for 6.5.X growing bucket opsfunction getKeysToCoverAllPartitions(crcTable,keyPrefix,partitionCount) {var result = [];var remaining = partitionCount;for (var i = 0; remaining > 0; i++) {var key = keyPrefix + i;var rv = (crc32(crcTable,key) >> 16) & 0x7fff;var actualPartition = rv & partitionCount - 1;if (!result[actualPartition] || result[actualPartition] === undefined) {result[actualPartition] = key;remaining--;}}return result;}
- After pasting, the screen appears as displayed below:
- Click Save.
- To return to the Eventing screen, click the ‘< back to Eventing‘ link (below the editor) or click the Eventing
Deploy the two functions
We are now ready to start the Eventing functions. From the Couchbase Web Console > Eventing screen:
- Click on the Function name cron_impl_2func_651_help to expand and expose the Function controls.
- Click Deploy.
- In the Confirm Deploy Function dialog, select “From now” from the Feed boundary option.
Let’s start the other Eventing Function. From the Couchbase Web Console > Eventing screen:
- Click on the Function name cron_impl_2func_651 to expand and expose the Function controls.
- Click Deploy.
- In the Confirm Deploy Function dialog, select “From now” from the Feed boundary option.
Setup a cron task to run four (4) times per minute
At this point our Eventing function is waiting for a mutation specifically any document of type=”recurring_event” that has a field active=true.
From the Couchbase Web Console > Query page, we will use N1QL to create a new scheduled task in the ‘travel-sample’ bucket:
- Cut-n-paste the following N1QL statement into the Query Editor
123456789101112131415INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::1",{"type": "recurring_event","id":1,"hour":"*","min":"0","action": "doCronActionA","verbose": {"user_func": 2,"scheduler": 3},"active": false});
- Click Execute
Activate our first cron task
The control document we previously made was not activated because we specified “active”:false, furthermore the schedule above will only run once an hour, but we want to test things and see them work in the near future.
First we need an index to be able to manipulate our control documents in N1QL this only needs to be done once
- Cut-n-paste the following N1QL statement into the Query Editor
1CREATE primary INDEX on `crondata` ;
Cut-n-paste the following N1QL statement into the Query EditorNow we will activate the task, but we will adjust the repeating schedule to every 15 seconds to see exactly how the system is behaving. We do this by modifying the control document with KEY recurring_event::1
-
123UPDATE `crondata`SET active=TRUE, hour="*4X", min="*4X"WHERE type="recurring_event" AND id=1 ;
- Click Execute
We used the non-standard syntax of =”*4X” to schedule a recurring item four time a minute we can see our work function doCronActionA executing and also the housekeeping logic to schedule the function via log statements because we set verbose=3.
The scheduler is now running four times a minute. You can see the activity in the statistics and in the Application log files for the Eventing Functions cron_impl_2func_651 and cron_impl_2func_651_help.
- Access the Couchbase Web Console > Dashboard you will see activity burst every 15 seconds:
- Access the Couchbase Web Console > Eventing and click the Log link of the deployed cron_impl_2func_651 Eventing function. This Function Log dialog lists log statements in reverse order (newest items first). The initial output should be similar to the following:
2020-05-20T18:34:33.340-07:00 [INFO] “OnUpdate B recurring_event::1 recurring timer was created, timer_id 570927555481258455388”
2020-05-20T18:34:33.340-07:00 [INFO] “OnUpdate A recurring_event::1 rcv mutation (initial or rearm) schedule timer at 2020-05-20T18:34:45.000”
2020-05-20T18:34:33.233-07:00 [INFO] “doCronActionA upsert to KV with KEY cron_cache::airlines_by_country cachedoc ” {“type”:”cron_cache”,”id”:”airlines_by_country”,”date”:”2020-05-21T01:34:33.232Z”,”data”:{“United States”:127,”United Kingdom”:39,”France”:21}}
2020-05-20T18:34:33.233-07:00 [INFO] “Callback R recurring_event::1 action took 0.013 sec., returned true”
2020-05-20T18:34:33.233-07:00 [INFO] “Callback C recurring_event::1 curDoc” {“action”:”doCronActionA”,”active”:true,”hour”:”*4X”,”id”:1,”min”:”*4X”,”type”:”recurring_event”,”verbose”:{“scheduler”:3,”user_func”:2},”dynamic”:{“state”:”pending”,”next_sched”:1590024885,”prev_sched”:1590024870,”prev_etime”:1590024873,”prev_delay”:3.218,”prev_atime”:0.013}}
2020-05-20T18:34:33.233-07:00 [INFO] “Callback B recurring_event::1 sched 1590024870, actual 1590024873, delay 3.218, took 0.013”
2020-05-20T18:34:33.233-07:00 [INFO] “Callback A recurring_event::1 gen mutation #1 to doc to force schedule rearm at 2020-05-20T18:34:45.000”
2020-05-20T18:34:33.232-07:00 [INFO] “doCronActionA N1QL idx 2, country France cnt 21”
2020-05-20T18:34:33.232-07:00 [INFO] “doCronActionA N1QL idx 1, country United Kingdom cnt 39”
2020-05-20T18:34:33.232-07:00 [INFO] “doCronActionA N1QL idx 0, country United States cnt 127”
2020-05-20T18:34:33.220-07:00 [INFO] “doCronActionA user action controlled by recurring_event::1”
2020-05-20T18:34:19.340-07:00 [INFO] “OnUpdate B recurring_event::1 recurring timer was created, timer_id 381384185845112994486”
2020-05-20T18:34:19.340-07:00 [INFO] “OnUpdate A recurring_event::1 rcv mutation (initial or rearm) schedule timer at 2020-05-20T18:34:30.000”The oldest line at the bottom is the mutation that started the schedule (or re-armed the schedule) e.g. OnUpdate message, and we see the first two full executions of the business logic we coded up in doCronActionA
- There will also be some messaging related to the “fix for 6.5.X growing bucket ops” but that will be logged to cron_impl_2func_651_help you will see messages like the following about every 30 seconds:
2020-05-20T18:34::49.185-07:00 [INFO] “seeding one doc to each vBucket in primary_bucket alias (took 221 mililis)”
Let’s adjust both the frequency and the verbosity of this particular task. We will use the standard cron syntax of ‘*’ for both hour and min to get a recurring schedule of once per minute 4X slower than the currently running frequency. In addition, we will lower the verbosity level of the scheduler logic to zero and the user function to 1 so we only see one message per invocation.
From the Couchbase Web Console > Query page, we will use N1QL to create a new scheduled task in the ‘travel-sample’ bucket:
- Cut-n-paste the following N1QL statement into the Query Editor
1234UPDATE `crondata`SET verbose.scheduler = 0, verbose.user_func = 1,active=true, hour="*", min="*"WHERE type="recurring_event" AND id=1 ;
- Click Execute
After a 2- or 3-minutes access the Couchbase Web Console > Eventing and click the Log link of the deployed cron_impl_2func_651 Eventing function.
- The recurring schedule is now one minute and much less verbose. Only one log message or line is emitted for each function execution (once again in reverse time order).
2020-05-20T18:43:04.231-07:00 [INFO] “doCronActionA user action controlled by recurring_event::1”
2020-05-20T18:42:08.233-07:00 [INFO] “doCronActionA user action controlled by recurring_event::1”Only one log message or line is emitted for each scheduled user function execution, i.e. doCronActionA (once again in reverse time order).
Let’s look at the work being performed
This code provides a practical framework for executing any JavaScript function on a recurring schedule and our function doCronActionA is upserting (insert or update) a calculated cache KV document once per minute.
To check the results of the deployed Eventing Function, access the Couchbase Web Console > Buckets page and click the Documents link of the travel-sample bucket.
- In the text box “N1QL WHERE” past the following text.
1type="cron_cache"
- Click Retrieve Docs
You should now see one document a cache document cron_cache::airlines_by_country which is being updated once per minute by the scheduled function doCronActionA.
- Click on the id “cron_cache::airlines_by_country” you will see your cached document that is being updated by the business logic of doCronActionA.
What a minute and Edit the document again and you will see the “date” field updating, of course the source data is “static” right now so the counts will remain the same.
Let’s look at the control document
This code provides a framework for keeping a bit of statistics on each running schedule.
To check the statistics of the deployed Eventing Function, access the Couchbase Web Console > Buckets page and click the Documents link of the crondata bucket.
- In the text box “N1QL WHERE” past the following text.
1type="recuring_event"
- Click Retrieve Docs
You should now see one document your control document recurring_event::1 that is driving the scheduled function doCronActionA. - Click on the id “recurring_event::1” you will see your control document that is being updated by the scheduler logic with some statistics in the JSON object dynamic.
The “dynamic” portion of the document which is automatically added keeps some debug statistics: - prev_sched: is the previous UNIX timestamp of the schedule that last ran
- prev_etime: is the actual UNIX timestamp when the schedule last ran
- prev_delay: is delay from the prev_sched to the prev_etime
- prev_atime: is the time it took to run this action, i.e. doCronActionA to execute.
- next_sched: is the next scheduled execution for this action
These statistics kept in the JSON dynamic sub-object for each schedule are useful for determining that your scheduling system is healthy and your action being executed is finishing in a timely fashion.
Verify that the cache updates on data changes
The entire purpose of doCronActionA is to perform work at or near a scheduled time and update a cache document with the KEY “cron_cache::airlines_by_country”.
Let’s do some validations in the Query Monitor that our cache is being updated by 1) looking at the cache document, 2) delete some airlines from the travel-sample document set, and 3) verifying the cache document gets updated by the function doCronActionA.
From the Couchbase Web Console > Query page, we will use N1QL to view and manipulate data in the ‘travel-sample’ bucket:
- Cut-n-paste the following N1QL statement into the
- Query Editor
12SELECT data FROM `travel-sample`WHERE `type` = 'cron_cache' AND id== 'airlines_by_country'; - Click Execute
In the JSON view of the Query Workbench you should see:
123456789[{"data": {"France": 21,"United Kingdom": 39,"United States": 127}}] - Cut-n-paste the following N1QL statement into the Query Editor
12DELETE FROM `travel-sample`WHERE `type` = 'airline' AND callsign LIKE 'U%' - Click Execute
In the JSON view of the Query Workbench you should see (we just deleted some data)
123{"results": []} - Wait a bit over a minute
- Cut-n-paste the following N1QL statement into the Query Editor
12SELECT data FROM `travel-sample`WHERE `type` = 'cron_cache' AND id== 'airlines_by_country'; - Click Execute
In the JSON view of the Query Workbench you should see that four (4) airlines are no longer present
123456789[{"data": {"France": 21,"United Kingdom": 39,"United States": 123}}]
Startup a second scheduled task
This code provides a practical framework for executing 1 to N JavaScript functions on recurring schedules.
We will let the function doCronActionA continue to run on a one-minute schedule but now we will enable doCronActionB on a 30 second schedule (two times per minute). This function is an empty shell and it will only log that it was invoked.
From the Couchbase Web Console > Query page, we will use N1QL to view and manipulate data in the ‘crondata’ bucket:
- Cut-n-paste the following N1QL statement into the Query Editor
123456789101112131415INSERT INTO `crondata` (KEY,VALUE) VALUES ("recurring_event::2",{"type":"recurring_event","id":2,"hour":"*2X","min":"*2X","action":"doCronActionB","verbose": {"user_func": 1,"scheduler": 0},"active": true}); - Click Execute
At this point you are executing two (2) different tasks each running on two (2) different schedules to verify wait two to three minutes and inspect the log files again
- Access the Couchbase Web Console > Eventing and click the Log link of the deployed cron_impl_2func_651 Eventing function.
Only one log message or line is emitted for each function execution (once again in reverse time order). We see that doCronActionA fires once a minute while doCronActionB fires twice as much e.g. once every 30 seconds.2020-05-20T19:16:05.259-07:00 [INFO] “doCronActionA user action controlled by recurring_event::1”
2020-05-20T19:16:05.255-07:00 [INFO] “doCronActionB user action controlled by recurring_event::2”
2020-05-20T19:15:37.253-07:00 [INFO] “doCronActionB user action controlled by recurring_event::2”
2020-05-20T19:15:09.250-07:00 [INFO] “doCronActionA user action controlled by recurring_event::1”
2020-05-20T19:15:09.249-07:00 [INFO] “doCronActionB user action controlled by recurring_event::2”
2020-05-20T19:14:34.255-07:00 [INFO] “doCronActionB user action controlled by recurring_event::2”
[OPTIONAL] Pause / Edit JavaScript / Resume
We are essentially done with this installment, feel free to experiment and modifying things and experiment for example:
From the Couchbase Web Console > Eventing screen:
- Click on the Function name cron_impl_2func_651 to expand and expose the Function controls.
- Click Pause.
- In the Confirm Pause Function dialog, select “Pause Function”.
- Click “Edit JavaScript”
- If you’re feeling confident, modify the doCronActionB function to perform some KV operations or integrate with cURL.
If you just want to see a change add something simple to the function try something like the below:
1234567891011121314151617function doCronActionB(doc) {try {// check that doc has desired valuesif (doc.type !== "recurring_event" || doc.active !== true) return;if (doc.verbose.user_func >= 1)log(doc.action + ' user action controlled by ' + doc.type + '::' + doc.id);// YOUR LOGIC HEREvar a = 1 + 7;log('this is my logic, a = 1 +7 = ' + a);} catch (e) {log(doc.action + ' Error exception:', e);return false;}return true;} - Click Save.
- To return to the Eventing screen, click the ‘< back to Eventing‘ link (below the editor) or click the Eventing
- Click Resume.
- In the Confirm Resume Function dialog, select “Resume Function”.
- Wait for about a minute and for the function cron_impl_2func_651 to deploy
- Click the Log link of the deployed cron_impl_2func_651 Eventing function.
2020-05-20T19:20:41.343-07:00 [INFO] “this is my logic, a = 1 +7 = 8”
2020-05-20T19:20:41.343-07:00 [INFO] “doCronActionB user action controlled by recurring_event::2”
Cleanup
Cleanup involves undeploying and deleting the function and then removing the two buckets you created. This concludes the Example.
Remove Functions
From the Couchbase Web Console > Eventing screen:
- Click on the Function name cron_impl_2func_651 to expand and expose the Function controls.
- Click Undeploy.
- In the Confirm Undeploy Function dialog, select “Undeploy Function”.
- Wait for the function cron_impl_2func_651 to undeploy.
- Click Delete.
- In the Confirm Delete Function dialog, select “Delete Function”.
From the Couchbase Web Console > Eventing screen:
- Click on the Function name cron_impl_2func_651_help to expand and expose the Function controls.
- Click Undeploy.
- In the Confirm Undeploy Function dialog, select “Undeploy Function”.
- Wait for the function cron_impl_2func_651_help to undeploy.
- Click Delete.
- In the Confirm Delete Function dialog, select “Delete Function”.
Remove Buckets
Next Drop the buckets ‘metadata’ ‘crondata‘, and ‘travel-sample’ (they can always be recreated).
From the Couchbase Web Console > Buckets page and click the Documents link of the travel-sample bucket.
- Click on the bucket name “metadata” to expand and expose the controls
- Click Delete
- In the Confirm Delete Bucket dialog, select “Delete Bucket”.
- Click on the bucket name “crondata” to expand and expose the controls
- Click Delete
- In the Confirm Delete Bucket dialog, select “Delete Bucket”.
- Click on the bucket name “travel-sample” to expand and expose the controls
- Click Delete
- In the Confirm Delete Bucket dialog, select “Delete Bucket”.
Final Thoughts
I hope you found this walkthrough educational and have developed more appreciation for the Couchbase Eventing Service as a whole.
Earlier I noted that Eventing is designed to process high velocity mutations (in millions per second) from the DCP stream of the function’s associated source bucket. This Eventing Function or scheduler needs only to react to minimal changes in scheduler documents.
I have started 5,000 schedules with this code unchanged just by adding control documents. I have even run 120,000 schedules every minute just to test this implementation (yes 120K is a fairly ludicrous amount of independent cron schedules to run). In addition I put 120,000 sc3hedules 2 days in the future to ensure we had no degenerate resource issues.
Being forced to use two Evening Functions to create the scheduling system, the main function cron_impl_2func_651 and a simple helper cron_impl_2func_651_help was not as elegant as I had hoped. In addition having to work around a bucket op leak via firing a timer on every vBucket was disappointing to say the least. Fortunately, due to this effort changes were implemented and in the upcoming 6.6.0 release I can make a cleaner scheduler using a single Evening Function. The key enhancements in 6.6 are 1) the ability to generate a new timer from within a timer callback and 2) the ability to cancel or overwrite an existing timer by reference, and 3) the elimination of growing resource usage on idle systems with timers scheduled in the future.
I utilized a standalone bucket ‘crondata’ to hold the schedule or control documents (I do not count the use of ‘metadata’ bucket as it is a required system scratchpad for Eventing and reserved for Eventing itself) to give the most flexibility in manipulating data in other buckets. If I had collocated the control documents in another bucket I would not be able N1QL operations on that bucket (since it is a source bucket to the Eventing Function) and would have been limited to only KV operations to manipulate data in the collocated bucket.
I challenge you to try your hand at other cron use cases and also to think of other ways to leverage a scheduling service:
- Checking an item count in a large dataset during “off peak hours” and performing incremental purging
- Performing scheduled document enrichment via N1QL.
- Recalculating stock portfolios on a regular schedule.
- Managing TTL or Expiry times via N1QL on a recurring schedule, refer to How To Manage Time-To-Live (TTL) Documents with Couchbase N1QL.
- Integrating with external REST endpoints on a repeating schedule, refer to Using cURL with the Eventing Service: Update.
- Update the JavaScript of cron_impl_2func_651 to add a new field “prev_astatus” to the JSON dynamic object to save the true/false result flag returned from the previous executed user action.
Updates
This blog was updated on July 24, 2020 to add a work around for the 6.5.x releases having growing number of metadata bucket operations which can eventually block mutations for a given Eventing function when creating timers in the future (as in one hour+) in an otherwise idle system.
Next Steps
In a few weeks “Implementing a robust portable cron like scheduler via Couchbase Eventing (Part 2)” will be released, where we will explore executing a sequence of database driven dynamic N1QL statements without the requirement to edit the Eventing Function or to define a hardcoded “action” script inside the Eventing function.
Resources
- Download: Download Couchbase Server 6.5.1
- Eventing Function: cron_impl_2func_651.json
- Eventing Helper Function: cron_impl_2func_651_help.json
References
- Couchbase Eventing documentation:
https://docs.couchbase.com/server/current/eventing/eventing-overview.html - Couchbase Server 6.5 What’s New:
https://docs.couchbase.com/server/6.5/introduction/whats-new.html - Couchbase blogs on Eventing:
https://www.couchbase.com/blog/tag/eventing/
We would love to hear from you on how you liked the 6.5 features and how it’ll benefit your business going forward. Please share your feedback via the comments or in the Couchbase forum.
Footnotes
[1] The timer implementation in the Eventing Service is designed to handle large numbers of distributed timers in the millions at high velocity. A single Eventing node can handle over 100K Timers per second and the only promise is to run timers as soon as possible, e.g. no timers lost. Consider that the current scan interval is seven (7) seconds to harvest timers that are ready to fire as such you should expect some delays. For more details on Timer scheduling refer to Timers: Wall-clock Accuracy in the Couchbase documentation.
[2] By adjusting allow_interbucket_recursion to true you are removing guards that were put in place in the Couchbase server to protect against accidentally Eventing logic which can initiate infinite recursive loops. There is nothing wrong with this but it is easy to make a mistake when leveraging recursion. In Couchbase versions 6.6 adjusting the Eventing logic can be collapsed from two (2) Eventing Functions into one (1) simplified Eventing Function with no need to adjust the allow_interbucket_recursion setting.
[3] Two major limitations exist. First, if a document is modified several times in a short duration, the calls may be coalesced into a single event due to deduplication. Second, it is not possible to discern between Create and Update operations. For the proposes of a cron function neither limitation presents an issue.
[4] Why I didn’t implement exact crontab semantics, I could have but amount of code is excessive – I refer you to explore the https://github.com/kelektiv/node-cron package along with its dependencies of moment and moment-timezone (all very large packages). The getNextRecurringDate(hour_str, min_str) may not be as flexible but it is simple and covers our use case.
Highly interested in the 6.6.x updates to use a single function! I love this implementation for a cron-like scheduler.
I just realized this is available on GitHub! I’ll be taking a look at this! Recommend updating this blog post with the link :) Thanks!
Hi Alex,
I will be updating the “cron-like scheduler” soon with both a 6.6.0 version and 6.6.1 version (using advanced bucket accessors).
However most likely in a follow up blog (Part 2) – but I will of cross link them, glad you found my 6.6.0 prototype on GitHub.