Introduction
The Couchbase Mobile Sync Gateway changes feed provides a way to monitor events in a mobile deployment. The feed makes it feasible to write sophisticated business logic. I wrote a tool to help examine and understand the feed. You can read an introduction and description in part one of this two part series. The code also serves as an example of listening to the feed.
The code
I’ve included the major classes from the app code here. This is the first version, so it can use plenty of enhancements. The parameters are all hard-wired. Check the project here on GitHub for updates. You can also find instructions for building, running, and packaging the app there.
JavaFX: The Controller class
JavaFX breaks simple apps into a controller class and a declarative UI. Let’s walk through the controller.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package com.couchbase.mobile; import com.couchbase.lite.*; import com.fasterxml.jackson.core.JsonProcessingException; import javafx.application.Platform; import javafx.beans.value.ChangeListener; import javafx.beans.value.ObservableValue; import javafx.collections.FXCollections; import javafx.collections.ObservableList; import javafx.event.ActionEvent; import javafx.fxml.FXML; import javafx.scene.control.*; import javafx.scene.control.TextField; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Map; import static com.couchbase.mobile.Runtime.mapper; public class Controller implements LiveQuery.ChangeListener, ChangeListener, SGMonitor.ChangesFeedListener, DBService.ReplicationStateListener { private static final String SYNC_GATEWAY_HOST = "http://localhost"; private static final String SG_PUBLIC_URL = SYNC_GATEWAY_HOST + ":4984/" + DBService.DATABASE; private static final String SG_ADMIN_URL = SYNC_GATEWAY_HOST + ":4985/" + DBService.DATABASE; private static final String TOGGLE_INACTIVE = "-fx-background-color: #e6555d;"; private static final String TOGGLE_ACTIVE = "-fx-background-color: #ade6a6;"; private static final String TOGGLE_DISABLED = "-fx-background-color: #555555;"; @FXML private ListView documentList; private ObservableList documents = FXCollections.observableArrayList(); @FXML private TextArea contentsText; @FXML private TextArea changesFeed; @FXML private TextField usernameText; @FXML private TextField passwordText; @FXML private ToggleButton applyCredentialsBtn; @FXML private ToggleButton syncBtn; private DBService service = DBService.getInstance(); private Database db = service.getDatabase(); private SGMonitor changesMonitor; private LiveQuery liveQuery; |
This first listing shows a bunch of boiler plate code. I implement several listeners for the UI within the class itself to cut down of files. This is for illustration purposes.
The @FXML
annotations mark all the fields that the framework will automatically bind to portions of the UI.
Next comes initialization. JavaFX calls this method as part of its standard lifecycle.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@FXML private void initialize() { documentListInitialize(); documentList.setItems(documents); setState(applyCredentialsBtn, false); setState(syncBtn, false); service.addReplicationStateListener(this); changesMonitor = new SGMonitor(SG_ADMIN_URL, "false", "true", "0", "all_docs", this); changesMonitor.start(); } private void documentListInitialize() { Query query = db.createAllDocumentsQuery(); query.setAllDocsMode(Query.AllDocsMode.INCLUDE_DELETED); liveQuery = query.toLiveQuery(); liveQuery.addChangeListener(this); liveQuery.start(); documentList.getSelectionModel().selectedItemProperty().addListener(this); } |
I’ve broken out the document list initialization into its own routine. The document list gets bound to the documentList
variable. In turn documentList
will update the UI whenever the item list we pass in changes.
I set up a live query to monitor the client database for any changes. This happens through an “all docs” query. An all docs query doesn’t require an associated view. I set INCLUDE_DELETED
so the tool can show what a deleted document looks like in the database.
With the other bindings in place, we just have to update the documents
list. We’ll see the live query listener that does that further along.
The next few lines set the initial state of a couple of toggle buttons. I need an extra listener to keep the Sync
button consistent with the actual state of the replications. More on this further along in the article.
I wrote a separate class to monitor Sync Gateway. The initialization code finished by creating a new monitor instance and kicking it off.
The next section contains several listeners.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
// LiveQuery.ChangeListener @Override public void changed(LiveQuery.ChangeEvent event) { if (event.getSource().equals(liveQuery)) { Platform.runLater(() -> { QueryEnumerator rows = event.getRows(); documents.clear(); rows.forEach(queryRow -> documents.add(queryRow.getDocumentId())); }); } } |
Here’s the live query listener that gets called whenever the local database changes. I didn’t design the tool for working with massive databases. So, whenever the data changes, I just took the brute force approach of rereading every document. The getRows
method returns an enumerator that will index doing just that. JavaFX takes care of updating the UI when documents
changes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// ListView ChangeListener @Override public void changed(ObservableValue observable, String oldId, String newId) { if (null == newId) return; Map properties = db.getDocument(newId).getProperties(); try { String json = mapper.writeValueAsString(properties); contentsText.setText(prettyText(json)); } catch (JsonProcessingException ex) { ex.printStackTrace(); Dialog.display(ex); } } |
This listener takes care of tracking when a user clicks on an entry in the document list. The entries are the document IDs, so we can use a selection to pull the document directly from the database.
1 2 3 4 5 |
// SGMonitor.ChangesFeedListener @Override public void onResponse(String body) { changesFeed.appendText(prettyText((String) body)); } |
I used a callback approach to get the results of the changes feed. The interface is defined in the SGMonitor
class. It has just the one method. In this implementation I simply take the body of the feed response and tack it on to the existing text in the changes feed text pane. There’s a little formatting done to make it easier to read, too.
1 2 3 4 5 |
// DBService.ReplicationStateListener @Override public void onChange(boolean isActive) { setState(syncBtn, isActive); } |
Finally I added a listener for replication activity. The interface comes from the DBService helper class. I wrote a bit about detecting the state of a replication here. For this app I just need to know whether a replication is running or not to keep the Sync
button state consistent. This handles cases where a user tries to start a sync but it fails. This can happen if they need to provide authentication credentials but haven’t, for example.
Next we have several methods bound to UI elements. JavaFX handles much of the wiring.
1 2 3 4 5 6 7 8 9 10 11 12 |
@FXML private void applyCredentialsToggled(ActionEvent event) { String username = null; String password = null; if (applyCredentialsBtn.isSelected()) { username = usernameText.getText(); password = passwordText.getText(); } DBService.getInstance().setCredentials(username, password); applyCredentialsBtn.setStyle(applyCredentialsBtn.isSelected() ? TOGGLE_ACTIVE : TOGGLE_INACTIVE); } |
Here I set the use of authentication credentials whenever the corresponding button gets toggled.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
@FXML private void saveContentsClicked(ActionEvent event) { Map properties = null; Document document; try { properties = mapper.readValue(contentsText.getText(), Map.class); } catch (IOException ex) { ex.printStackTrace(); Dialog.display(ex); } if (properties.containsKey("_id")) { document = db.getDocument((String) properties.get("_id")); } else { document = db.createDocument(); } try { document.putProperties(properties); } catch (CouchbaseLiteException ex) { ex.printStackTrace(); Dialog.display(ex); } } |
This code shows a couple of interesting items. I use a Jackson ObjectMapper
instance to convert the text in the content pane to a property map.
Next I check for an entry _id
. Couchbase Mobile reserves most properties starting with an “_” for system use (with special exceptions). If the text we’re trying to convert contains _id
, I assume this is an edit to an existing document. Otherwise I create a new document.
So, in a nutshell, we have an example of both creating and updating documents. This isn’t the preferred way to update, although it suffices in many cases. You can read more about updates here.
1 2 3 4 5 6 7 8 9 10 11 |
@FXML private void syncToggled(ActionEvent event) { try { syncBtn.setDisable(true); syncBtn.setStyle(TOGGLE_DISABLED); service.toggleReplication(new URL(SG_PUBLIC_URL), true); } catch (Exception ex) { ex.printStackTrace(); Dialog.display(ex); syncBtn.setDisable(false); } } |
This reacts to toggling the Sync
button. Recall though that we use a listener to verify the state elsewhere.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
@FXML private void exitClicked(ActionEvent event) { // Try to shut everything down gracefully changesMonitor.stop(); liveQuery.stop(); service.stopReplication(); db.close(); db.getManager().close(); Platform.exit(); } private void setState(ToggleButton btn, boolean active) { btn.setSelected(active); btn.setStyle(active ? TOGGLE_ACTIVE : TOGGLE_INACTIVE); btn.setDisable(false); } private String prettyText(String json) { String out = null; try { Object object = mapper.readValue(json, Object.class); out = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(object); } catch (Exception ex) { ex.printStackTrace(); } return out; } } |
The rest of the code here are just helper bits and a piece to shutdown everything before exiting.
The Database Helper class
This shows the code for a straight-forward database helper class. For the most part I just find this class a nice packaging of the typical operations needed for managing a database and starting a standard bidirectional set of replications. I’m including it here because I find it useful and for clarity.
I do implement the Replication.ChangeListener
interface. That’s maybe a little unusual. I mentioned the reason earlier on. This link takes you to the blog post about it.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
package com.couchbase.mobile; import com.couchbase.lite.Database; import com.couchbase.lite.JavaContext; import com.couchbase.lite.Manager; import com.couchbase.lite.auth.Authenticator; import com.couchbase.lite.auth.AuthenticatorFactory; import com.couchbase.lite.replicator.Replication; import com.couchbase.lite.replicator.ReplicationState; import java.net.URL; import java.util.ArrayList; import java.util.List; public class DBService implements Replication.ChangeListener { public static final String DATABASE = "db"; private static final String DB_DIRECTORY = "data"; private Manager manager; private Database database; private Replication pushReplication = null; private Replication pullReplication = null; private boolean replicationActive = false; private List stateListeners = new ArrayList(); private String username = null; private String password = null; private DBService() { try { manager = new Manager(new JavaContext(DB_DIRECTORY), Manager.DEFAULT_OPTIONS); database = manager.getDatabase(DATABASE); } catch (Exception ex) { ex.printStackTrace(); } } private static class Holder { private static DBService INSTANCE = new DBService(); } public interface ReplicationStateListener { void onChange(boolean isActive); } public static DBService getInstance() { return Holder.INSTANCE; } public Database getDatabase() { return database; } public void setCredentials(String username, String password) { this.username = username; this.password = password; } public void toggleReplication(URL gateway, boolean continuous) { if (replicationActive) { stopReplication(); } else { startReplication(gateway, continuous); } } public void startReplication(URL gateway, boolean continuous) { if (replicationActive) { stopReplication(); } pushReplication = database.createPushReplication(gateway); pullReplication = database.createPullReplication(gateway); pushReplication.setContinuous(continuous); pullReplication.setContinuous(continuous); if (username != null) { Authenticator auth = AuthenticatorFactory.createBasicAuthenticator(username, password); pushReplication.setAuthenticator(auth); pullReplication.setAuthenticator(auth); } pushReplication.addChangeListener(this); pullReplication.addChangeListener(this); pushReplication.start(); pullReplication.start(); } public void stopReplication() { if (!replicationActive) return; pushReplication.stop(); pullReplication.stop(); pushReplication = null; pullReplication = null; } public void addReplicationStateListener(ReplicationStateListener listener) { stateListeners.add(listener); } public void removeReplicationStateListener(ReplicationStateListener listener) { stateListeners.remove(listener); } // Replication.ChangeListener @Override public void changed(Replication.ChangeEvent changeEvent) { if (changeEvent.getError() != null) { Throwable lastError = changeEvent.getError(); Dialog.display(lastError.getMessage()); return; } if (changeEvent.getTransition() == null) return; ReplicationState dest = changeEvent.getTransition().getDestination(); replicationActive = ((dest == ReplicationState.STOPPING || dest == ReplicationState.STOPPED) ? false : true); stateListeners.forEach(listener -> listener.onChange(replicationActive)); } } |
The Sync Gateway Monitor class
Finally, let’s take a look at the helper class for monitoring Sync Gateway. I’ll walk through this in pieces, too.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package com.couchbase.mobile; import com.fasterxml.jackson.databind.JsonNode; import okhttp3.*; import java.io.IOException; import java.net.SocketException; import java.util.concurrent.TimeUnit; import static com.couchbase.mobile.Runtime.mapper; public class SGMonitor { private static final OkHttpClient client = new OkHttpClient.Builder() .readTimeout(1, TimeUnit.DAYS) .build(); private ChangesFeedListener listener; private HttpUrl.Builder urlBuilder; private Thread monitorThread; private String since = "0"; private Call call; SGMonitor(String url, String activeOnly, String includeDocs, String since, String style, ChangesFeedListener listener) { this.since = since; urlBuilder = HttpUrl.parse(url).newBuilder() .addPathSegment("_changes") .addQueryParameter("active_only", activeOnly) .addQueryParameter("include_docs", includeDocs) .addQueryParameter("style", style) .addQueryParameter("since", since) .addQueryParameter("feed", "longpoll") .addQueryParameter("timeout", "0"); this.listener = listener; } |
I use the OkHttp library from Square. Currently Couchbase Lite uses this library too, internally. OkHttp uses a builder pattern. I prepare a builder instance I’ll use through the rest of the code in the class constructor. You can read about the meaning of all the parameters in the Sync Gateway documentation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
public interface ChangesFeedListener { void onResponse(String body); } public void start() { monitorThread = new Thread(() -> { while (!Thread.interrupted()) { Request request = new Request.Builder() .url(urlBuilder.build()) .build(); call = client.newCall(request); try (Response response = call.execute()) { if (!response.isSuccessful()) throw new IOException("Unexpected code " + response); String body = response.body().string(); JsonNode tree = mapper.readTree(body); since = tree.get("last_seq").asText(); urlBuilder.setQueryParameter("since", since); listener.onResponse(body); } catch (SocketException ex) { return; } catch (IOException ex) { ex.printStackTrace(); Dialog.display(ex); } } }); monitorThread.setDaemon(true); monitorThread.start(); } |
The start
method has the most interesting part of the code. It spins up a background thread. Underneath the thread setup and control code I run a continuous loop. The loop does synchronous network calls. The error handling is simple. Just throw an exception if anything goes wrong.
Sync Gateway responds with JSON strings. You can see the code pulls apart the response and parses the JSON into a JsonNode
object. This is all to get at the last_seq
value in the response.
In order to track what to send next, the changes feed relies on a simple sequence mechanism. You should treat this as an opaque object. Take the value of last_seq
from the previous response, and set the since
parameter to that same value for the next request.
There’s no real harm in not supplying the since
parameter. Sync Gateway will just replay all changes from the start if it’s missing. That’s why you’ll see in this example, I cheat a little and always create the class instance with since
set to the string “0”.
In a real world application, you might want to have some way to save the last sequence string your app has processed, rather than churning through the change history every time.
The rest of the code is just a couple of short methods.
1 2 3 4 5 6 7 8 9 |
public void stop() { monitorThread.interrupt(); call.cancel(); } public String getSince() { return since; } } |
And that’s it for the main classes. There are others needed for the complete app.
Check out the GitHub repo to see all the code and instructions to build it.
Read a discussion of the app and how to use it in part one.
Postscript
You can find more resources on our developer portal and follow us on Twitter @CouchbaseDev.
You can post questions on our forums. And we actively participate on Stack Overflow.
Hit me up on Twitter with any questions, comments, topics you’d like to see, etc. @HodGreeley