15 May 2015
Sometimes you want to defer the evaluation of log lines from the point where they are logged. This helps avoid paying the cost of logging in the case when logging has been turned off.
Following is a simple logger that lets you do that. This code is for demonstration purposes only and might have potential bugs/performance issues.
package com.parthpatil
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
class LightweightLogger {
type DeferedString = () => String
private val buffer = new ConcurrentLinkedQueue[DeferedString]()
def log(expression: => String): Unit = {
buffer.add(() => expression)
}
def getLinesAndFlush(): Seq[String] = {
def getBufferContents(): Seq[DeferedString] = buffer.synchronized {
val output = buffer.iterator.asScala.toSeq
buffer.clear()
output
}
val contents = getBufferContents()
contents map { function => function() }
}
}
object LightweightLogger extends App {
val logger = new LightweightLogger
logger.log(s"Expensive compute -> ${100 * 200}")
logger.log(s"Expensive compute -> ${100.0 / 200}")
val lines = logger.getLinesAndFlush()
println(lines mkString ",")
}
The log
method has a by name parameter to capture the expression that will generate a String. This by name parameter(expression) is not evaluated but is wrapped in a function and put on the queue. Only when the getLinesAndFlush()
is invoked is when each item on the queue is evaluated.
Though this approach has the downside that you could end up closing over big objects and hence cause them to not get garbage collected for longer than they are needed and could put pressure on your memory or worse the objects you have closed over might no longer be in a usable state.
16 Sep 2014
I was looking for a way to do non-blocking recursive scans on HBase in Scala using the excellent asynchbase Java library. I wanted to create Scala Futures from the Deferred that asynchbase returns as Futures are much easier to work with in Scala. Following is what I came up with, its not perfect and lacks a lot of error handling. It is inspired by the follwing gist
To keep the example simple I am going to assume that we are only interested in returning the keys of our hbase table.
import com.stumbleupon.async.Callback
import org.hbase.async._
import org.slf4j.LoggerFactory
import scala.concurrent.{Promise, Await, ExecutionContext, Future}
import java.util
import scala.collection.mutable
import scala.util.Try
class RecursiveResultHandler(
scanner: Scanner,
promise: Promise[Seq[String]]) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {
val logger = LoggerFactory.getLogger(getClass)
val startTime = System.currentTimeMillis()
// Initialize the collection to hold our results
val results = mutable.ArrayBuffer[String]()
var numRows: Int = 0
def call(rawRows: util.ArrayList[util.ArrayList[KeyValue]]) = {
try {
// Once result of nextRows is null, we have reached the end of scan
if (rawRows == null) {
promise.success(results.toSeq)
val timeTaken = System.currentTimeMillis - startTime
logger.info(s"Num Rows = $numRows, Total time = $timeTaken ms")
Try { scanner.close() } // close scanner & ignore exceptions
} else {
numRows += rawRows.size
val rowsIterator = rawRows.iterator()
while (rowsIterator.hasNext()) {
val row = rowsIterator.next()
val key = new String(row.get(0).key)
results += key
}
scanner.nextRows().addCallback(this)
}
} catch {
case e: Throwable =>
promise.failure(e)
Try { scanner.close() } // close scanner & ignore exceptions
}
}
}
Following is how you would use the above class
val promise = Promise[Seq[String]]()
try {
val hbaseClient = new HBaseClient("zookeeperHost", "/zookeeper/hbase/path")
val scanner: Scanner = hbaseClient.newScanner("my_table")
val rrh = new RecursiveResultHandler(
scanner = scanner,
promise = promise
)
scanner.nextRows().addCallback(rrh)
} catch {
case e: Throwable =>
promise.failure(e)
}
val fut = promise.future
// use the future and profit !