Parth's Blog

About Programming etc ..

© 2014. Parth Patil All rights reserved.

Lightweight logging in Scala

Sometimes you want to defer the evaluation of log lines from the point where they are logged. This helps avoid paying the cost of logging in the case when logging has been turned off. Following is a simple logger that lets you do that. This code is for demonstration purposes only and might have potential bugs/performance issues.

package com.parthpatil

import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._

class LightweightLogger {
  type DeferedString = () => String

  private val buffer = new ConcurrentLinkedQueue[DeferedString]()

  def log(expression: => String): Unit = {
    buffer.add(() => expression)
  }

  def getLinesAndFlush(): Seq[String] = {
    def getBufferContents(): Seq[DeferedString] = buffer.synchronized {
      val output = buffer.iterator.asScala.toSeq
      buffer.clear()
      output
    }

    val contents = getBufferContents()
    contents map { function => function() }
  }
}

object LightweightLogger extends App {
  val logger = new LightweightLogger
  logger.log(s"Expensive compute -> ${100 * 200}")
  logger.log(s"Expensive compute -> ${100.0 / 200}")
  val lines = logger.getLinesAndFlush()
  println(lines mkString ",")
}

The log method has a by name parameter to capture the expression that will generate a String. This by name parameter(expression) is not evaluated but is wrapped in a function and put on the queue. Only when the getLinesAndFlush() is invoked is when each item on the queue is evaluated.

Though this approach has the downside that you could end up closing over big objects and hence cause them to not get garbage collected for longer than they are needed and could put pressure on your memory or worse the objects you have closed over might no longer be in a usable state.

HBase scanning in Scala using asynchbase

I was looking for a way to do non-blocking recursive scans on HBase in Scala using the excellent asynchbase Java library. I wanted to create Scala Futures from the Deferred that asynchbase returns as Futures are much easier to work with in Scala. Following is what I came up with, its not perfect and lacks a lot of error handling. It is inspired by the follwing gist

To keep the example simple I am going to assume that we are only interested in returning the keys of our hbase table.

import com.stumbleupon.async.Callback
import org.hbase.async._
import org.slf4j.LoggerFactory
import scala.concurrent.{Promise, Await, ExecutionContext, Future}
import java.util
import scala.collection.mutable
import scala.util.Try

class RecursiveResultHandler(
  scanner: Scanner,
  promise: Promise[Seq[String]]) extends Callback[Object, util.ArrayList[util.ArrayList[KeyValue]]] {

  val logger = LoggerFactory.getLogger(getClass)
  val startTime = System.currentTimeMillis()
  
  // Initialize the collection to hold our results
  val results = mutable.ArrayBuffer[String]()
  var numRows: Int = 0

  def call(rawRows: util.ArrayList[util.ArrayList[KeyValue]]) = {
    try {
      // Once result of nextRows is null, we have reached the end of scan
      if (rawRows == null) {
        promise.success(results.toSeq)
        val timeTaken = System.currentTimeMillis - startTime
        logger.info(s"Num Rows = $numRows, Total time = $timeTaken ms")
        Try { scanner.close() } // close scanner & ignore exceptions
      } else {
        numRows += rawRows.size
        val rowsIterator = rawRows.iterator()
        while (rowsIterator.hasNext()) {
          val row = rowsIterator.next()
          val key = new String(row.get(0).key)
          results += key
        }

        scanner.nextRows().addCallback(this)
      }
    } catch {
      case e: Throwable =>
        promise.failure(e)
        Try { scanner.close() } // close scanner & ignore exceptions
    }
  }
}

Following is how you would use the above class

val promise = Promise[Seq[String]]()
try {
  val hbaseClient = new HBaseClient("zookeeperHost", "/zookeeper/hbase/path")
  val scanner: Scanner = hbaseClient.newScanner("my_table")

  val rrh = new RecursiveResultHandler(
    scanner = scanner,
    promise = promise
  )
  scanner.nextRows().addCallback(rrh)
} catch {
  case e: Throwable =>
    promise.failure(e)
}

val fut = promise.future

// use the future and profit !