In the previous blog post we saw how observable was used to create an abstraction over Apache Tailer. In this article I am going to use Observable to abstract over some Redis operations.
I chose lettue for this exercise. It has good support for the scan based operations(scan, hscan etc) in Redis and I was interested in exposing these operations via Observable.
Lets first see how to expose the PubSub APIs subscription as an Observable.
Pretty straightforward. The lettuce library provides a nice listener based API to subscribe to a given channel and I just wrapped that in the Observable’s factory method. I have exposed the stream from the pubsub system as a tuple stream of channel & message.
Similarly we can expose iterating over all the keys in Redis as an Observable. Note that it is using the non-blocking scan API instead of the blocking keys redis operation.
I have been taking the enlightening coursera course Principles Of Reactive Programming and it has been fun so far. I have been using scala Future for some time and really enjoy working with it. But I hadn’t used Observable (Rx) till now and found them intriguing. Though I was not sure when to use one API over other. I thought implementing a few uses cases in both the APIs will help me understand the differences and see where the strength of each API lies in helping write good async code that is also composable.
Following is a naive attempt to wrap the Apache Commons Tailer into the Future and Rx APIs
Following is a helper class that lets us create a tailer using a simple handler function
// The following class gives a way to pass a simple function // from String => Unit as a handler for each log lineclassMyTailer(filePath:String,handler:String=>Unit,pollInterval:Duration,tailFromEnd:Boolean=true){vallistener=newTailerListenerAdapter{overridedefhandle(line:String){handler(line)}}valtailer=Tailer.create(newFile(filePath),listener,pollInterval.toMillis,tailFromEnd)defstop():Unit={tailer.stop()}}
Lets first look at the implementation using Future.
classFutureTailer(filePath:String,pollInterval:Duration){// Create a queue to collect the tailed linesvalqueue=newLinkedBlockingQueue[String]valtailer=newMyTailer(filePath,{queue.add(_)},pollInterval)defnext():Future[String]=Future{blocking{queue.take()}}}// UsagevalfTailer=createFutureTailer("/path/to/file",100millisecond)defonCompleteHandler(tryLine:Try[String]):Unit={tryLinematch{caseSuccess(line)=>fTailer.next()onCompleteonCompleteHandlerprintln(s"---------- $line ------------")caseFailure(e)=>}}fTailer.next()onCompleteonCompleteHandler
The Future based solution is implemented in a pull based manner. The library provides a next() method that when called gives you back a Future[String] corresponding to the line from the file that will be returned when its available.
To achive this I had to introduce a queue that the handler will populate as and when a new line is available. I then block on the queue to get a line which will be used to fulfill the Future that next() had previous returned.
I am not happy with the Future based version of this library for the following reasons:
I have to explicitly maintain a queue
The calling code is big and does not convey the intent quickly
The way next() is called recursively is confusing and its easy to screw up if you are not careful and cause a lot of parallel Futures to get created, ultimately causing OOM.
Now lets look at the Observable based implementation