HBase scanning in Scala using asynchbase
16 Sep 2014
I was looking for a way to do non-blocking recursive scans on HBase in Scala using the excellent asynchbase Java library . I wanted to create Scala Futures from the Deferred that asynchbase returns as Futures are much easier to work with in Scala. Following is what I came up with, its not perfect and lacks a lot of error handling. It is inspired by the follwing gist
To keep the example simple I am going to assume that we are only interested in returning the keys of our hbase table.
import com.stumbleupon.async.Callback
import org.hbase.async._
import org.slf4j.LoggerFactory
import scala.concurrent. { Promise , Await , ExecutionContext , Future }
import java.util
import scala.collection.mutable
import scala.util.Try
class RecursiveResultHandler (
scanner : Scanner ,
promise : Promise [ Seq [ String ]]) extends Callback [ Object , util.ArrayList [ util.ArrayList [ KeyValue ]]] {
val logger = LoggerFactory . getLogger ( getClass )
val startTime = System . currentTimeMillis ()
// Initialize the collection to hold our results
val results = mutable . ArrayBuffer [ String ]()
var numRows : Int = 0
def call ( rawRows : util.ArrayList [ util.ArrayList [ KeyValue ]]) = {
try {
// Once result of nextRows is null, we have reached the end of scan
if ( rawRows == null ) {
promise . success ( results . toSeq )
val timeTaken = System . currentTimeMillis - startTime
logger . info ( s "Num Rows = $numRows, Total time = $timeTaken ms" )
Try { scanner . close () } // close scanner & ignore exceptions
} else {
numRows += rawRows . size
val rowsIterator = rawRows . iterator ()
while ( rowsIterator . hasNext ()) {
val row = rowsIterator . next ()
val key = new String ( row . get ( 0 ). key )
results += key
}
scanner . nextRows (). addCallback ( this )
}
} catch {
case e : Throwable =>
promise . failure ( e )
Try { scanner . close () } // close scanner & ignore exceptions
}
}
}
Following is how you would use the above class
val promise = Promise [ Seq [ String ]]()
try {
val hbaseClient = new HBaseClient ( "zookeeperHost" , "/zookeeper/hbase/path" )
val scanner : Scanner = hbaseClient . newScanner ( "my_table" )
val rrh = new RecursiveResultHandler (
scanner = scanner ,
promise = promise
)
scanner . nextRows (). addCallback ( rrh )
} catch {
case e : Throwable =>
promise . failure ( e )
}
val fut = promise . future
// use the future and profit !