In this post, I’ll show how to convert a callback-based listener style API to a reactive one with RxJava 2.
Introduction
Reactive programming and RxJava have become pretty hot topics, especially over the last couple of years. I got a taste of RxJava while trying it out in an Android project a little while ago. I had a simple threading problem, which I could have solved easily enough other ways. Since I had been reading about RxJava, though, I decided to try it out. I was immediately impressed with how much simpler and understandable the code was.
Despite that success, RxJava has a bit of a reputation for being difficult to learn. In a more recent project, I wanted to handle live updates from a database. The database (Couchbase Lite), has a callback-based system for monitoring changes. I wanted to wrap that callback into a reactive structure. (This could have been an Observable or a Flowable. Look for a follow-on article to talk about choosing between them.)
The first thing I discovered was, I couldn’t find a good example of a general version of what I wanted. There’s a simple example in the RxJava documentation, but it has some drawbacks I wanted to avoid. For instance, in the example, the Event object is assumed to carry a method to determine if a given event is the last in the stream. A lot of callbacks in Android don’t have such a method.
Although I did later discover a Stack Overflow post that covers the basics pretty well, I wanted to understand more.
A full discussion of the internals could, not surprisingly, fill a book. In this post I’ll just cover the core. There’s code with several experiments to help understand details. That’s too much for one write-up, so we’ll save that for another time.
Objective
To be more explicit, we will look at taking a listener callback interface, common in event-driven programming, and wrapping into an Observable
.
That is, how do we go from
1 2 3 |
public interface OnItemListener<T> { void onItem(T item); } |
to
1 |
Observable<T> |
Android’s OnClickListener
is one example of this type of API. OnClickListener
is an interface with one method, onClick
. It has an Android View
as a parameter. The Android system uses this to deliver streams of events like button presses, and so on.
Getting Started
The source for this work can be found on GitHub here.
We’ll only look at a small part of that code in this post. Other parts of the code are designed to try out various experiments. Those may be something for future articles. For this, we’ll just focus on the central topic.
To follow along, clone the repo. The code is configured to be built by gradle, so you can run it from the command line or import it into your favorite environment.
Creating a Source
The object is to convert an API for listening to some kind of source of events. The first thing we need is an actual event stream to test with. There are a few sources built to experiment, so we’ll start with a base class.
Listing: BasicSource.java
1 2 3 4 5 6 7 8 9 10 |
// File: src/main/java/com/couchbase/rx/BasicSource.java package com.couchbase.rx; public class BasicSource<T> { protected volatile OnItemListener<? super T> listener; public static interface OnItemListener<T> { void onItem(T item); } public void setOnItemListener(OnItemListener<? super T> listener) { this.listener = listener; } } |
This defines the interface, and has the common field and setter method needed by all implementations.
Next, derive a source that mimics an unbounded stream of events as follows.
Listing: UnboundSource.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
// File: src/main/java/com/couchbase/rx/UnboundSource.java package com.couchbase.rx; import java.util.function.Supplier; import static com.couchbase.Util.sleep; public class UnboundSource<T> extends BasicSource<T> { private Supplier<T> supplier; private OnItemListener<? super T> current; public UnboundSource(Supplier<T> supplier) { this.supplier = supplier; } public void start() { System.out.println("Source emitting on thread " + Thread.currentThread().getName()); for (;;) { System.out.println("Emitting item on thread " + Thread.currentThread().getName()); current = listener; if (null != current) current.onItem(supplier.get()); sleep(100); } } } |
This version generates new items using a Supplier
function, passed in to the constructor. This just shows there’s nothing special about the actual objects fed out, since the supplier could create anything.
We have a method to explicitly start creating items. Here we use an infinite loop to generate them indefinitely.
The assignment of listener
to current
works around a race condition where disposing the subscription can happen between the check for null and the actual invocation of the onItem
callback.
Converting to an Observable
Great, so now we have a source that mimics, say, an open-ended series of button clicks. Next, let’s create a custom Observable
.
We’ll use the method recommended in the RxJava documentation. This uses the Observable.create
method, rather than subclassing Observable
directly. (The project includes code to do the latter, too, for comparison.)
Take a look at the listing.
Listing: Observables.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// File: src/main/java/com/couchbase/rx/Observables.java package com.couchbase.rx; import io.reactivex.Observable; import io.reactivex.ObservableOnSubscribe; import com.couchbase.Util.ComputeFunction; public class Observables { public static void main(String[] args) { UnboundSource<Object> source = new UnboundSource<>(Object::new); ObservableOnSubscribe<Object> handler = emitter -> { System.out.println("Create on thread - " + Thread.currentThread().getName()); source.setOnItemListener(item -> { System.out.println("Listen on thread - " + Thread.currentThread().getName()); if (emitter.isDisposed()) return; emitter.onNext(item); }); emitter.setCancellable(() -> source.setOnItemListener(null)); source.start(); }; Observable.create(handler) .subscribe(ComputeFunction::compute, Throwable::printStackTrace, () -> System.out.println("Done"), t -> System.out.println("onSubscribe on thread " + Thread.currentThread().getName())); } } |
First, we create an instance of our UnboundSource
.
Next, we create an instance of ObservableOnSubscribe
using a lambda expression. The method we’re overriding, subscribe
, has one parameter, an Emitter
object.
This provides the connection between our source’s listener callback and a subscriber, through a second lambda expression. This second expression just checks emitter.isDisposed
to make sure the subscription is still active, then pushes an item downstream by calling emitter.onNext
. That’s the key line this has been building up to.
Having wired up our original callback, we want to provide a way to stop the flow. We use a Cancellable
here for simplicity. The lambda expression breaks the flow by nulling out the listener callback.
A Disposable
would also work. This Stack Overflow answer gives a good idea of the difference between them, and how to choose which to use.
Disposables are the RxJava 2 solution to unsubscribing from a stream. This post by Kaushik Gopal explains some of the reasoning around the use of disposables in general.
With everything interconnected, we fire up the source to start generating events.
Instantiation and Subscription
With our ObservableOnSubscribe
instance in hand, we can now create our Observable
instance with the call to create
.
The Observable
class provides a fluent interface with quite a few methods available. We subscribe to the resulting observable using a method that breaks out the ‘onXXX’ functions into individual pieces.
Output
If you run the example as shown, you should get something like the following output.
1 2 3 4 5 6 7 8 9 10 |
onSubscribe on thread main Create on thread - main Source emitting on thread main Emitting item on thread main Listen on thread - main Compute: object java.lang.Object@d8355a8 on thread main Emitting item on thread main Listen on thread - main Compute: object java.lang.Object@28d25987 on thread main ... |
Everything is happening in series on the main thread. Not super interesting, for all that effort. We haven’t leveraged the full power of RxJava yet. For those familiar with RxJava, you’ll know how easy it is to make the code run asynchronously. That’s something interesting with quirks I didn’t expect. Again, something to explore in another post.
Commentary: Learning RxJava
When I originally decided to write this post, I wanted to talk about the things I learned while studying some of the internals of RxJava. In the end, it became far too much for one article.
Part of what makes RxJava challenging is the shear number of APIs, but even the basics involve a web of intertwined interfaces.
For example, there’s only one version of Observable.create
. It takes an ObservableOnSubscribe
instance as an argument, as we’ve seen. Simple enough.
But then we dig down a bit. ObservableOnSubscribe
is an interface with only one method, subscribe
. Again, simple, yet this begins to reveal part of what makes understanding RxJava tricky.
Going further, we see ObservableOnSubscribe.subscribe
supplies an ObservableEmitter
as an argument. An ObservableEmitter
extends an Emitter
, and adds some methods for managing a Disposable
. An Emitter
, it turns out, has almost the same interface as an Observable
. It only lacks the onSubscribe
method.
There are mostly interface definitions. We haven’t touched on the implementations, and I’m only scratching the surface here.
The code is fascinating. Explore more, and you’ll get into how operators can modify what’s going on in a whole chain of calls, how RxJava does buffering, how some operations signal how many items to pass on, and more.
I’m looking forward to exploring further and passing on what I learn.
Postscript
Couchbase is open source and free to try out.
Get started with sample code, example queries, tutorials, and more.
Find more resources on our developer portal.
Follow us on Twitter @CouchbaseDev.
You can post questions on our forums.
We actively participate on Stack Overflow.
Hit me up on Twitter with any questions, comments, topics you’d like to see, etc. @HodGreeley
ReactiveX logo used courtesy of the ReactiveX projects under terms of the Creative Commons Attribution 3.0 License.