dimanche 23 août 2015

Integration test with ScalaTest and Future

I'm trying to build my application around asynchronous method so I use Future with Scala. For more information, it's elastic4s lib in order to query an ElasticSearch server.

I begin with this pattern, so I 1st created this method:

    def insert(person: Person) = {
      client.execute {
        index into "index" / "type" source person
      } onComplete {
        case Success(s) => {
          logger.info("{} has been inserted successfully", interaction.toString)
          something_to_do()
         }
        case Failure(t) => logger.warn("An error has occured: {}", t.getMessage)
     }
   }

If I understand well, this method is not testable. When I read this post, I have to create a method which returns a Future, and I have to use ScalaFutures (for ScalaTest) to test my async method.

So I execute this pattern, and I've created two method like this:

def insert(person: Person): Future[IndexResponse] = {
    client.execute {
        index into "index" / "type" source person
    }
}

def insertToEs(person: Person): Unit = {
    insert(person) onComplete {
        case Success(s) => {
            logger.info("{} has been inserted successfully", person.toString)
            something_to_do()
        }
        case Failure(t) => logger.warn("An error has occured: {}", t.getMessage)
    }
}

So now it's easy to test my first method, but how do I test the insertToEs method?

More generally, I would like to create integration tests: kafka (embedded) to elasticSearch (embedded).

I have this code which calls previous insertToEs method:

def receive: Unit = {
    ...
    val iterator = stream.iterator()
    while(iterator.hasNext) {
        insertToEs(iterator.next.message)
    }
}

I've created a test on receive method, but it looks like the integration test ends before the methods have been executed.

@RunWith(classOf[JUnitRunner])
class ToEsSpec extends FlatSpec with Matchers with BeforeAndAfterAll with ScalaFutures {

  override def beforeAll = {
      run_kafka_server
      run_elasticsearch_server
  }

  override def afterAll = {
      shutdown
  }

  it should "receive message from kafka and insert to es" {

      send_to_kafka(json)

      receive

      assert if inserted in es
  }
}

Could you please advice me ? Thanks.

Aucun commentaire:

Enregistrer un commentaire