I have a Scala spark job that reads from HBase like so:
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
val uniqueAttrs = calculateFreqLocation(hBaseRDD)
I am trying to write a unit test for the function calculateFreqLocation:
def calculateFreqLocation(inputRDD: RDD[(ImmutableBytesWritable, Result)]): Map[String, Map[(String, String, String), Long]] = {
val valueType = classOf[Array[Attribute]]
val family = "cf_attributes".getBytes()
val qualifier = "attributes".getBytes()
val rdd7 = inputRDD.map(kv => (getUUID(kv._1.get()).toString(),
objectMapper.readValue(new String(kv._2.getValue(family, qualifier)), valueType))).flatMap(flattenRow).filter(t => location_attributes.contains(t._2))
val countByUUID = rdd7.countByValue().groupBy(_._1._1)
val countByUUIDandKey = countByUUID.map(kv => (kv._1, kv._2.groupBy(_._1._2)))
val uniqueAttrs = countByUUIDandKey.map(uuidmap => (uuidmap._1,uuidmap._2.map(keymap => keymap._2.maxBy(_._2))))
return uniqueAttrs
This counts unique attributes for each UUID. My unit test tries to recreate the HTable data and then pass the RDD to the function to see if the output matches:
class FrequentLocationTest extends SparkJobSpec {
"Frequent Location calculation" should {
def longToBytes(x: Long): Array[Byte] = {
return ByteBuffer.allocate(java.lang.Long.SIZE / java.lang.Byte.SIZE).putLong(x).array
val currTimestamp = System.currentTimeMillis / 1000
val UUID_1 = UUID.fromString("123456aa-8f07-4190-8c40-c7e78b91a646")
val family = "cf_attributes".getBytes()
val column = "attributes".getBytes()
val row = "[{'name':'Current_Location_Ip_Address', 'value':'123.456.123.248'}]"
val resultRow = Array(new KeyValue(row.getBytes(), family, column, null))
val key = "851971aa-8f07-4190-8c40-c7e78b91a646".getBytes() ++ longToBytes(currTimestamp)
val input = Seq((key,row))
val correctOutput = Map(
("851971aa-8f07-4190-8c40-c7e78b91a646" -> Map(("123456aa-8f07-4190-8c40-c7e78b91a646","Current_Location_Ip_Address","123.456.123.248") -> 1))
"case 1 : return with correct output (frequent location calculation)" in {
val inputRDD = sc.makeRDD(input, 1)
val hadoonRdd = new HadoopRDD(sc, sc.broadcast(new SerializableWritable(new Configuration()))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]], null, classOf[InputFormat[ImmutableBytesWritable,Result]], classOf[ImmutableBytesWritable],classOf[Result],1)
val finalInputRdd = hadoonRdd.union(inputRDD.map(kv => ( new ImmutableBytesWritable(kv._1), new Result(Array(new KeyValue(kv._2.getBytes(), family, column, null))))))
val resultMap = FrequentLocation.calculateFreqLocation(finalInputRdd)
resultMap == correctOutput
//val customCorr = new FrequentLocation().calculateFreqLocation(inputRDD)
//freqLocationMap must_== correctOutput
What I get is org.apache.spark.SparkException: Task not serializable. I've come to understand it is because of the LongByteWritable and other HTable classes that spark cant serialize between nodes. The code I provided I am actually getting into developer Spark apis (creating the HadoopRDD manually) but dont have any way to actually populate this with data. How can I test this? I need to return an instance of a HadoopRDD with data in it to this function. Or an instance of RDD(ImmutableBytesWritable, Result). I was initially creating this RDD manually, same error. then I switched to using map and mapping it from raw binary/text. Any help would be appreciated!
