vendredi 4 septembre 2015

Exception on Spark Job Test

Spark rookie here. I was trying to write some Unit Test for my spark Job, but keep getting exception when I'm trying to create a new StreamingContext in test, in line ssc = new StreamingContext(conf, batchDuration) Code looks like:

@RunWith(classOf[JUnitRunner])
class SparkJobTest extends FlatSpec with MockitoSugar with BeforeAndAfter {
  private val master = "local[2]"
  private val appName = "TestingAppName"
  private val batchDuration = Seconds(1)
  private val loggerMock = mock[Logger]
  private var ssc: StreamingContext = _
  private var sc: SparkContext = _

  before {
    val conf = new SparkConf()
      .setMaster(master)
      .setAppName(appName)
    ssc = new StreamingContext(conf, batchDuration)
  }

  after {
    if (ssc != null) {
      ssc.stop()
    }
  }


  "This" should "always work" in {
    assert(1==1)
  }

Exception I got:

An exception or error caused a run to abort: Bad return type
Exception Details:
  Location:
    org/apache/spark/streaming/StreamingContext.fileStream(Ljava/lang/String;Lscala/Function1;ZLscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;)Lorg/apache/spark/streaming/dstream/InputDStream; @23: areturn
  Reason:
    Type 'org/apache/spark/streaming/dstream/FileInputDStream' (current frame, stack[0]) is not assignable to 'org/apache/spark/streaming/dstream/InputDStream' (from method signature)
  Current Frame:
    bci: @23
    flags: { }
    locals: { 'org/apache/spark/streaming/StreamingContext', 'java/lang/String', 'scala/Function1', integer, 'scala/reflect/ClassTag', 'scala/reflect/ClassTag', 'scala/reflect/ClassTag' }
    stack: { 'org/apache/spark/streaming/dstream/FileInputDStream' }
  Bytecode:
    0000000: bb01 9959 2a2b 2c1d b201 9eb6 01a6 1904
    0000010: 1905 1906 b701 a9b0 
java.lang.VerifyError: Bad return type
Exception Details:
  Location:
    org/apache/spark/streaming/StreamingContext.fileStream(Ljava/lang/String;Lscala/Function1;ZLscala/reflect/ClassTag;Lscala/reflect/ClassTag;Lscala/reflect/ClassTag;)Lorg/apache/spark/streaming/dstream/InputDStream; @23: areturn
  Reason:
    Type 'org/apache/spark/streaming/dstream/FileInputDStream' (current frame, stack[0]) is not assignable to 'org/apache/spark/streaming/dstream/InputDStream' (from method signature)
  Current Frame:
    bci: @23
    flags: { }
    locals: { 'org/apache/spark/streaming/StreamingContext', 'java/lang/String', 'scala/Function1', integer, 'scala/reflect/ClassTag', 'scala/reflect/ClassTag', 'scala/reflect/ClassTag' }
    stack: { 'org/apache/spark/streaming/dstream/FileInputDStream' }
  Bytecode:
    0000000: bb01 9959 2a2b 2c1d b201 9eb6 01a6 1904
    0000010: 1905 1906 b701 a9b0                    

    at com.appnexus.data.spark.job.SparkJobTest$$anonfun$1.apply$mcV$sp(SparkJobTest.scala:25)
    at com.appnexus.data.spark.job.SparkJobTest$$anonfun$1.apply(SparkJobTest.scala:21)
    at com.appnexus.data.spark.job.SparkJobTest$$anonfun$1.apply(SparkJobTest.scala:21)
    at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:195)
    at com.appnexus.data.spark.job.SparkJobTest.runTest(SparkJobTest.scala:13)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
    at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
    at com.appnexus.data.spark.job.SparkJobTest.org$scalatest$BeforeAndAfter$$super$run(SparkJobTest.scala:13)
    at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241)
    at com.appnexus.data.spark.job.SparkJobTest.run(SparkJobTest.scala:13)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
    at org.scalatest.tools.Runner$.run(Runner.scala:883)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


Process finished with exit code 0

I googled a lot, some said it's a version issue, I tried spark 1.2, 1.3, 1.4 but no luck. I'm running my test on Mac OS X, Java 1.8.

Aucun commentaire:

Enregistrer un commentaire