dimanche 29 novembre 2015

How to speed up Spark SQL unit tests?

I'm evaluating Spark SQL to implement a simple reporting module (few simple aggregations over Avro data already stored on HDFS). I have no doubt that Spark SQL could a good fit for both my functional and non-functional requirements.

However, on top of production requirements I want to make sure the module will be testable. We follow a BDD approach with very focused scenarios which means that this module will require to run tens / hundreds of SQL queries over some very simple data (1..10 records).

To get a rough idea of the performance I can expect from Spark SQL in local mode, I have quickly prototyped a few tests:

  1. select count(*) from myTable
  2. select key, count(*) from myTable group by key

The first test takes 100ms on average, but the second one takes 500ms. Such performance is unacceptable this it would make the test suite too slow.

For comparison, I can run the same test in 10ms using Crunch and its MemPipeline (1500ms with MRPipeline in local mode) and also 1500ms with Hive in embedded mode. Spark SQL is thus a bit faster than MR in local mode, but still way to slow to build good test suites.

Is it possible to speed up Spark SQL in local mode ?

Is there a better / faster way to test a Spark SQL module ?

(I have not profiled the execution yet but since a groupBy().countByKey() on a RDD takes 40ms on average I expect to find that the culprit is the query optimizer)


My quick & dirty test code follows:

  SparkConf sparkConf = new SparkConf()
                .setMaster("local[4]")
                .setAppName("poc-sparksql");

  try (JavaSparkContext ctx = new JavaSparkContext(sparkConf)) {
        SQLContext sqlCtx = new SQLContext(ctx);

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select count(*) from myTable");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }

        for (int i = 0; i < ITERATIONS; i++) {
            Stopwatch testCaseSw = new Stopwatch().start();

            DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
            df.registerTempTable("myTable");
            DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");

            System.out.println("Results: " + result.collectAsList());
            System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
        }
 }

Aucun commentaire:

Enregistrer un commentaire