Not too long ago I had written about using the Couchbase Server subdocument API with the Go SDK. Doing subdocument operations is incredibly useful if you’d like to change or access a part of a potentially huge NoSQL document. Subdocument operations save network resources and are great on performance.
A colleague of mine asked how one might do bulk subdocument mutations by key similar to how I demonstrated in my tutorial titled, Using Golang to get Multiple Couchbase Documents by Key in a Single Operation. The short answer is that you can’t with a single operation, but because Golang is so incredibly fast and awesome, you could do things in parallel and get the same results.
We’re going to see how to asynchronously perform subdocument mutations based on a list of document ids with the Go programming language and Couchbase.
Let’s figure out a real-world scenario that we want to accomplish. Let’s say we have a professional networking website that receives tens-of-millions of requests a second. Performance is a must so we decide we want to batch some of those requests together the best we can. Let’s say 100 people just updated their profile to include Golang as one of their skills. We want to append this as one of their list of skills.
Going forward, we’re going to be doing everything in a main.go file somewhere in our $GOPATH path. Open this file and include the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
package main import ( "fmt" "sync" "github.com/couchbase/gocb" ) var waitGroup sync.WaitGroup var data chan string var bucket *gocb.Bucket func worker() {} func main() { fmt.Println("Starting the application...") documentIds := []string{"nraboy", "jmichaels", "tgreenstein"} cluster, _ := gocb.Connect("couchbase://localhost") bucket, _ = cluster.OpenBucket("default", "") fmt.Println("The application has completed!") } |
Let’s break down what we have so far.
Because we plan to do things in parallel with goroutines, we need to know when it is safe to terminate our application. The WaitGroup
allows us to keep track of our asynchronous tasks and wait until they’ve all finished before continuing. Since we’re going to be processing data with goroutines, we’ll need a channel that all goroutines that use at the same time. Each of our goroutines will be an instance of the worker
method.
In the main
method we are declaring all the keys that will receive the mutation. In a production scenario, the application business logic will probably aggregate this list of keys.
We’re also establishing a connection to Couchbase.
With the foundation in place, let’s take a further look at the main
method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
func main() { fmt.Println("Starting the application...") documentIds := []string{"nraboy", "jmichaels", "tgreenstein"} data = make(chan string) cluster, _ := gocb.Connect("couchbase://localhost") bucket, _ = cluster.OpenBucket("default", "") for i := 0; i < 2; i++ { waitGroup.Add(1) go worker() } for i := 0; i < len(documentIds); i++ { data <- documentIds[i] } close(data) waitGroup.Wait() fmt.Println("The application has completed!") } |
In Go, we can spin up a ridiculously large amount of goroutines that will run in parallel. Of course the real number you can spin up is dependant on your hardware, but for now, let’s be conservative with two. For every worker
that we start, we increase the WaitGroup
. As these goroutines stop, the WaitGroup
will decrease which will eventually unblock the application and allow it to terminate.
You’ll also notice that we’ve added a channel for our string data. Each of our desired document ids are added to the channel and then the channel is closed. You’ll see why we do this when we define the worker
logic.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
func worker() { defer waitGroup.Done() for { id, ok := <-data if !ok { break } _, err := bucket.MutateIn(id, 0, 0).ArrayAppend("skills", "Golang", true).Execute() if err != nil { fmt.Printf("%s - %v\n", id, err) } } } |
The above snippet is our worker
method logic. When the function terminates, the defer
method executes which subtracts from the WaitGroup
.
Each worker
will run forever via a loop. Each loop iteration will take ids from the data
channel. If we are not ok
, it likely means that the channel is empty and we should end the loop. If we do get an id, plan to do a mutation on that document and append a new string in the skills
path, which we assume to be an array. If the array does not exist within the document, one will be created.
If there is an error for any reason, maybe the key doesn’t exist, print that an error happened.
The full code to this simple demo is as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
package main import ( "fmt" "sync" "github.com/couchbase/gocb" ) var waitGroup sync.WaitGroup var data chan string var bucket *gocb.Bucket func worker() { defer waitGroup.Done() for { id, ok := <-data if !ok { break } _, err := bucket.MutateIn(id, 0, 0).ArrayAppend("skills", "Golang", true).Execute() if err != nil { fmt.Printf("%s - %v\n", id, err) } } } func main() { fmt.Println("Starting the application...") documentIds := []string{"nraboy", "jmichaels", "tgreenstein"} data = make(chan string) cluster, _ := gocb.Connect("couchbase://localhost") bucket, _ = cluster.OpenBucket("default", "") for i := 0; i < 2; i++ { waitGroup.Add(1) go worker() } for i := 0; i < len(documentIds); i++ { data <- documentIds[i] } close(data) waitGroup.Wait() fmt.Println("The application has completed!") } |
Again, these subdocument mutations to the skills
array happen in parallel through goroutines. For more information on using goroutines to do things concurrently, check out a previous tutorial I wrote on the subject titled, Concurrent Golang Applications with Goroutines and Channels.
Conclusion
You just saw another demo to doing subdocument mutations with Couchbase and Golang. This time we explored doing things in parallel instead of trying to use one of the bulk operators. By doing things in parallel we get nearly the same performance as doing bulk operations on a list of keys.
Hi Nic, great article. My use case is to perform
bulk
crud operations on around a hundred ofsub-documents
, given a list of document keys. Like you said and concluded in the articleA colleague of mine asked how one might do bulk subdocument mutations by key similar to how I demonstrated in my tutorial titled, Using Golang to get Multiple Couchbase Documents by Key in a Single Operation. The short answer is that you can’t with a single operation, but because Golang is so incredibly fast and awesome, you could do things in parallel and get the same results.
This time we explored doing things in parallel instead of trying to use one of the bulk operators. By doing things in parallel we get nearly the same performance as doing bulk operations on a list of keys.
I want to confirm that since we will be performing hundreds of network calls anyway, how is this efficient then?