Parth's Blog

© 2021. Parth Patil All rights reserved.

Using Redis via Observable

In the previous blog post we saw how observable was used to create an abstraction over Apache Tailer. In this article I am going to use Observable to abstract over some Redis operations.

I chose lettue for this exercise. It has good support for the scan based operations(scan, hscan etc) in Redis and I was interested in exposing these operations via Observable.

Lets first see how to expose the PubSub APIs subscription as an Observable.

val obs = Observable[(String, String)] { subscriber =>
  val connection: RedisPubSubConnection[String, String] = client.connectPubSub()
  connection.addListener(new RedisPubSubAdapter[String, String]() {
    override def message(chan: String, msg: String): Unit = {
      if (!subscriber.isUnsubscribed)
        subscriber.onNext((chan, msg))
    }
  })
  connection.subscribe("test-channel")
}

obs subscribe { r => println(s"chan: ${r._1}, msg: ${r._2}") }

Pretty straightforward. The lettuce library provides a nice listener based API to subscribe to a given channel and I just wrapped that in the Observable’s factory method. I have exposed the stream from the pubsub system as a tuple stream of channel & message.

Similarly we can expose iterating over all the keys in Redis as an Observable. Note that it is using the non-blocking scan API instead of the blocking keys redis operation.

Observable[String] { subscriber =>
  asyncConnection.scan(new KeyStreamingChannel[String]() {
    override def onKey(key: String) {
      if (!subscriber.isUnsubscribed)
        subscriber.onNext(key)
    }
  })
} subscribe { key => println(s"key => $key") }

Lettuce again provides a decent streaming API (KeyStreamingChannel) that I have wrapped in an Observable.

The code for wrapping HSCAN in an Observable looks almost similar to the code for SCAN

Observable[(String, String)] { subscriber =>
  asyncConnection.hscan(new KeyValueStreamingChannel[String, String]() {
    override def onKeyValue(key: String, value: String) {
      if (!subscriber.isUnsubscribed)
        subscriber.onNext((key, value))
    }
  }, "my-key")
} subscribe { kv => println(s"key => ${kv._1}, value = ${kv._2}") }

The full source code for this article can be found in this gist

Scala Future vs Observable Part 1

I have been taking the enlightening coursera course Principles Of Reactive Programming and it has been fun so far. I have been using scala Future for some time and really enjoy working with it. But I hadn’t used Observable (Rx) till now and found them intriguing. Though I was not sure when to use one API over other. I thought implementing a few uses cases in both the APIs will help me understand the differences and see where the strength of each API lies in helping write good async code that is also composable.

Following is a naive attempt to wrap the Apache Commons Tailer into the Future and Rx APIs

Following is a helper class that lets us create a tailer using a simple handler function

// The following class gives a way to pass a simple function 
// from String => Unit as a handler for each log line
class MyTailer(
  filePath: String,
  handler: String => Unit,
  pollInterval: Duration,
  tailFromEnd: Boolean = true) {

  val listener = new TailerListenerAdapter {
    override def handle(line: String) { handler(line) }
  }

  val tailer = Tailer.create(
    new File(filePath),
    listener,
    pollInterval.toMillis,
    tailFromEnd)

  def stop(): Unit = { tailer.stop() }
}

Lets first look at the implementation using Future.

class FutureTailer(filePath: String, pollInterval: Duration) {
  // Create a queue to collect the tailed lines
  val queue = new LinkedBlockingQueue[String]

  val tailer = new MyTailer(filePath, { queue.add(_) }, pollInterval)

  def next(): Future[String] = Future { blocking { queue.take() } }
}

// Usage
val fTailer = createFutureTailer("/path/to/file", 100 millisecond)

def onCompleteHandler(tryLine: Try[String]): Unit = {
  tryLine match {
    case Success(line) =>
      fTailer.next() onComplete onCompleteHandler
      println(s"---------- $line ------------")

    case Failure(e) =>
  }
}

fTailer.next() onComplete onCompleteHandler

The Future based solution is implemented in a pull based manner. The library provides a next() method that when called gives you back a Future[String] corresponding to the line from the file that will be returned when its available.

To achive this I had to introduce a queue that the handler will populate as and when a new line is available. I then block on the queue to get a line which will be used to fulfill the Future that next() had previous returned.

I am not happy with the Future based version of this library for the following reasons:

  • I have to explicitly maintain a queue
  • The calling code is big and does not convey the intent quickly
  • The way next() is called recursively is confusing and its easy to screw up if you are not careful and cause a lot of parallel Futures to get created, ultimately causing OOM.

Now lets look at the Observable based implementation

def createObservableTailer(
  filePath: String,
  pollInterval: Duration): Observable[String] = {
  Observable { subscriber =>
    def handler(line: String): Unit = {
      if (!subscriber.isUnsubscribed)
        subscriber.onNext(line)
    }
    new MyTailer(filePath, handler, pollInterval)
  }
}

// Usage
val oTailer = createObservableTailer(file, 100 millisecond)
oTailer foreach { line =>
  println(s"---------- $line ------------")
}

As you can see the Observable based implementation appears more compact and easy to comprehend.

In summary I think this use case is very well suited to push based approach implemented using Observable than the pull based approach using Future.

The full code can be found here Full Code