Parth's Blog

About Programming etc ..

© 2014. Parth Patil All rights reserved.

Optimizing dedupe cache using bloom filters

I have been involved in an early stage product where in we recommend urls to users and we don't want to show duplicate recommendations. So we want to store a "seen" cache per user that stores the urls that the user has already been recommended.

The straightforward thing to do is to store the seen cache on the backend in the form of a set that is keyed off of the userId. This cache could live in redis or memcache. But we are doing a proof of concept for our idea and didn't want to put in too much time in backend work upfront. So we decided to store the seen cache on the browser side.

First option we thought of was to store the seen cache in the Local Storage of the browser and that would have allowed us to store a lot of data on the client side. But this would mean that the browser would have to send the seen cache with every request for recommendations. We wanted to make the seen cache transparent to the client and in the future when we would build the distributed server side seen cache the client would not have to change. To that end we decided to store the seen cache in the browser cookie. The cookie travels back and forth between the server and client without the client having to do anything special.

But as you might have guessed when using browser cookie you get a very limited amount of storage per domain. As per the speck its 4k bytes per domain but can vary depending on the browser. Some browsers will allow bigger cookies. But at any rate its a very limited amount of storage given that you have to share this storage with other cookies in our domain.

Our urlIds are MD5 of the full canonical url. In string format it ends up being 32 character string (i.e 32 bytes).

My first attempt was to store the seen cache as concatenated md5 hashes of the urls. This approach though simple was not very efficient. We wanted to store atleast 100 urls in the seen cache. To store 100 urls we would need 3200 bytes. That would leave only ~800 bytes for all other cookies on our domain and that was not acceptable.

So after some thinking I realized that I needed a probablistic datastructure for checking set membership and that would allow me to configure the space vs accuracy tradeoff. For the seen cache we could tolerate false positives (i.e seen cache would tell that a url has been seen before when in fact it was not) but not false negatives. So for this use case the Bloom Filter fit the bill pretty well.

So in the backend I decided to use the Bloom Filter implemented in Google Guava. The high level idea is to set the urlIds in the Bloom Filter and then get the underlying bit vector of the bloom filter, base64 encode it and then set it in the cookie. When the client sends back the cookie in the request we can follow a reverse process to reconstruct the Bloom Filter.

Following is what the code looks like

 1 import org.apache.commons.codec.binary.Base64
 2 import
 3 import{Funnels, BloomFilter}
 4 import
 6 class MyBloomFilter(bf: BloomFilter[Array[Byte]]) {
 7   def this(numItems: Int, errorPercent: Double) = {
 8     this(BloomFilter.create(Funnels.byteArrayFunnel, numItems, errorPercent))
 9   }
11   private var cardinality = 0
12   val md5 = MessageDigest.getInstance("MD5")
14   def genHash(str: String): Array[Byte] = md5.digest(str.getBytes)
16   def size = cardinality
18   def put(item: String): Unit = {
19     bf.put(genHash(item))
20     cardinality += 1
21   }
23   def mightContain(item: String): Boolean = bf.mightContain(genHash(item))
25   def serializeToBase64(): String = {
26     val bs = new ByteArrayOutputStream
27     bf.writeTo(bs)
28     val bfByteArray = bs.toByteArray
29     bs.close()
30     Base64.encodeBase64String(bfByteArray)
31   }
32 }
34 object MyBloomFilter {
35   def fromBase64(serialized: String): MyBloomFilter = {
36     val bfByteArray = Base64.decodeBase64(serialized)
37     val inputStream = new ByteArrayInputStream(bfByteArray)
38     val bf = BloomFilter.readFrom(inputStream, Funnels.byteArrayFunnel)
39     new MyBloomFilter(bf)
40   }
41 }

Following is example of how you would use it

 1 object BloomFilterTest extends App {
 2   val myBloom = new MyBloomFilter(10, 0.01)
 3   myBloom.put("")
 4   myBloom.put("")
 6   val encodedStr = myBloom.serializeToBase64()
 8   // Recreate the Bloom Filter from the Base64 encoded string
 9   val myBloom2 = MyBloomFilter.fromBase64(encodedStr)
11   println(s"google exits -> " + myBloom2.mightContain(""))
12   println(s"yahoo exits -> " + myBloom2.mightContain(""))
13   println(s"microsoft exits -> " + myBloom2.mightContain(""))
14 }

Following is what I get when I run it

1 google exits -> true
2 yahoo exits -> true
3 microsoft exits -> false

This technique resulted in a huge saving in space. The original naive technique used would have required 32 * 100 = 3200 bytes. This new technique yields a base64 encoded string of just 168 bytes. Thats ~95% reduction in space!

Redis Job Queue With Retry Ability and Observable Interface

In this blog post I will explain how I implemented a simple Job queue in Redis that can have multiple workers pulling work from it and processing it. The job queue also offers a way to retry failed jobs upto certain number of times.

I have used the Redis Sorted Set data structure to hold the Tasks to be processed. The time when the job needs to be processed is represented by the score component of the sorted set and the actual Task is the member component. So when Tasks are added to the queue for the first time the time when they should be processed is given as the current timestamp. When the job fails a future timestamp is calculated and the score for the failed job is updated with this future timestamp. The consequence of this is that Tasks whose processing should be attempted farther in the future will be farther away from the front of the queue.

The Redis sorted set is always sorted by high to low but I want my Tasks to be sorted from low to high so that the oldest Task eligible for processing/reprocessing is in the front of the queue. To achieve the correct sort I don't use the epoch time as is for the score but I use MAX_EPOCHTIME - CURRENT_TS.

So when Task fails its the responsibility of the client to put the Task back on the queue with the right next attempt timestamp and also update the numFailures so that the next time the Task fails while processing the processing worker can decide to discard the Task if the Task has crossed the num attempts threshold or to requeue the job with a future timestamp of when the Task processing should be reattempted.

Again I am using the lettuce Java client for Redis that I had used in the previous blog post. This library returns Guava Futures for most of its operations. Though Guava Futures are much better than the JDK Future, they still are not as easy to work with as Scala Future. So I have provided the following implicit conversion from Guava Future to Scala Future.

1 implicit def guavaFutureToScalaFuture[T](gFuture: ListenableFuture[T])
2                                         (implicit executor: ListeningExecutorService): Future[T] = {
3   val p = Promise[T]()
4   Futures.addCallback[T](gFuture, new FutureCallback[T] {
5     def onSuccess(s: T)         { p.success(s) }
6     def onFailure(e: Throwable) { p.failure(e) }
7   }, executor)
8   p.future
9 }

Secondly I need a java ExecutorService to run the Guava Futures and I need a scala ExecutionContext to run the scala Futures and I wanted to share the same thread pool for both these things. So I create a Java ExecutorService instance and then create an ExecutionContext to run Scala Futures and a ListeningExecutorService to run the Guava Futures. Following is the code.

1 val executorService: ExecutorService = Executors.newFixedThreadPool(4)
2 implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
3 implicit val executor = MoreExecutors.listeningDecorator(executorService)

Now in order to know if the task in the front of queue is ready to be reprocessed we have to check what its score is which if you remember is the timestamp (MAX_EPOCHTIME - TIMESTAMP) indicating when to attempt processing of this task. One way to do this is to get the Task from the top of the queue and if its next attempt timestamp is less than current timestamp then process it and delete it from the queue. If the timestamp is in the future then put the Task back on the queue.

To make these set of operations atomic and low cost(save on network roundtrip to Redis) I decided to implement the pop operation of the queue via a Lua Script. The Lua script runs on the server side as a transaction which is very useful as I want to make the pop operation atomic (pop & delete from queue). Following is what the script looks like.

 1 local zset_key = KEYS[1]
 2 local reverse_current_ts = ARGV[1]
 3 local max_ts = ARGV[2]
 5 -- Get all items older than current timestamp
 6 local arr ='ZRANGEBYSCORE', zset_key, reverse_current_ts, max_ts)
 7 local arr_size = table.maxn(arr)
 9 if (arr_size > 0) then
10   -- Delete these items from the zset
11'ZREMRANGEBYSCORE', zset_key, reverse_current_ts, max_ts)
12   return arr
13 else
14  return {}
15 end

To the Lua script I pass the key for the redis sorted set, the reverse current ts (max epoch time - current ts) and the max epoch timestamp. The script checks if there are any Tasks with next attempt timestamp in the past and if yes it will fetch them, delete them from the queue and return them. Make sure to use local to create non-global variables in Redis Lua scripts.

Now lets look at the interesting part of exposing an Observable queue over this Redis Sorted Set. My first attempt was the way I have used Observable before by explicitly using the Observable's apply method. Following is how the code looks. It uses the getTask() method that returns a Future[Seq[Task]].

1 Observable.interval(pollInterval) flatMap { i =>
2   Observable[Task] { subscriber =>
3     getTasks foreach { tasks =>
4       tasks foreach { subscriber.onNext(_) }
5     }
6   }
7 }

Note that I am not doing any error handling in the above code to keep the examples small. So I have an outer Observable that emits events every pollInterval duration. Inside that I am wrapping a call to getTasks in another Observable. Note that I am using flatMap at the top level else I will end up with Observable[Observable[Task]].

Though the above approach to construct Observable is not bad for this small example but Erik Meijer has strongly urged in the Reactive Programming course to not construct Observables directly via its apply method or via create but to try to use combinator methods provided by Observable to construct new ones. So here is what I came up with.

1 for {
2   _         <- Observable.interval(pollInterval)
3   tasks     <- Observable.from(getTasks) // Get Observable from Future[Seq[Task]]
4   flattened <- Observable.from(tasks) // Get Observable from Seq[Task]
5 } yield {
6   flattened
7 }

Note that above on line 4 I do another Observable.from() because I don't want to end up with Observable[Seq[Task]] but I want to get Observable[Task].

Following is a simple case class to represent the Task

 1 case class Task(created: Long, numFailures: Int, payload: String) {
 2   def toJValue(): JValue = {
 3     ("created" -> created) ~
 4     ("numFailures" -> numFailures) ~
 5     ("payload" -> payload)
 6   }
 8   override def toString(): String = {
 9     compact(render(toJValue))
10   }
11 }

Here is how the client uses the job queue

 1 val client = new RedisClient("")
 2 val asyncConnection = client.connectAsync()
 4 val rcq = new RedisConditionalQueue(
 5   asyncConnection = asyncConnection,
 6   conditionCheckingLuaScript = None,
 7   zsetKey = "sorted1",
 8   executorService = executorService)
10 rcq.getObservableQueue(pollInterval = 1 second) subscribe { task =>
11   println(s"received task -> $task ")
13   // If the task fails enqueue it back with a timestamp in the future
14   processTask(task) onComplete {
15     case Success(_) =>
16       println(s"Job Success!, task = $task")
17     case Failure(e) =>
18       println(s"Job Failed!, task = $task")
19       val totalFailures = task.numFailures + 1
20       val newTask = task.copy(numFailures = totalFailures)
21       if (totalFailures < MAX_ALLOWED_FAILURES) {
22         val nextAttemptTs = System.currentTimeMillis + 2 * totalFailures * 1000
23         println(s"Reenqueued -> $newTask")
24         rcq.enqueue(newTask, nextAttemptTs)
25       } else {
26         println(s"Discarding task -> $newTask, totalFailures = $totalFailures")
27       }
28   }
29 }

The entire code for this example is in this gist