In the previous blog post we saw how observable was used to create an abstraction over Apache Tailer. In this article I am going to use Observable to abstract over some Redis operations.
I chose lettue for this exercise. It has good support for the scan based operations(scan, hscan etc) in Redis and I was interested in exposing these operations via Observable.
Lets first see how to expose the PubSub APIs subscription as an Observable.
Pretty straightforward. The lettuce library provides a nice listener based API to subscribe to a given channel and I just wrapped that in the Observable’s factory method. I have exposed the stream from the pubsub system as a tuple stream of channel & message.
Similarly we can expose iterating over all the keys in Redis as an Observable. Note that it is using the non-blocking scan API instead of the blocking keys redis operation.
Lettuce again provides a decent streaming API (KeyStreamingChannel) that I have wrapped in an Observable.
The code for wrapping HSCAN in an Observable looks almost similar to the code for SCAN
The full source code for this article can be found in this gist
I have been taking the enlightening coursera course Principles Of Reactive Programming and it has been fun so far. I have been using scala Future for some time and really enjoy working with it. But I hadn’t used Observable (Rx) till now and found them intriguing. Though I was not sure when to use one API over other. I thought implementing a few uses cases in both the APIs will help me understand the differences and see where the strength of each API lies in helping write good async code that is also composable.
Following is a naive attempt to wrap the Apache Commons Tailer into the Future and Rx APIs
Following is a helper class that lets us create a tailer using a simple handler function
Lets first look at the implementation using Future.
The Future based solution is implemented in a pull based manner. The library provides a next() method that when called gives you back a Future[String] corresponding to the line from the file that will be returned when its available.
To achive this I had to introduce a queue that the handler will populate as and when a new line is available. I then block on the queue to get a line which will be used to fulfill the Future that next() had previous returned.
I am not happy with the Future based version of this library for the following reasons:
I have to explicitly maintain a queue
The calling code is big and does not convey the intent quickly
The way next() is called recursively is confusing and its easy to screw up if you are not careful and cause a lot of parallel Futures to get created, ultimately causing OOM.
Now lets look at the Observable based implementation
As you can see the Observable based implementation appears more compact and easy to comprehend.
In summary I think this use case is very well suited to push based approach implemented using Observable than the pull based approach using Future.