I am working in a project that involves Elasticsearch, Scala and Akka,
and surprisingly for me, I have run into some problems not easily solvable
by a simple Google search, so I decided to put some notes here.
The project is about collecting logs and doing some analytics with the data.
There are several background services (implemented as Akka actors) which are
processing data from/to an Elasticsearch database.
Choosing the client library
There are several client libraries for Scala which do a great job in integrating
Elasticsearch API into the Scala language, but after taking a look I decided
not to use any of them, and simply use the native Java client.
Before you blame on me, please hear my justification. I was mainly interested in
learning the basics of the Elasticsearch API and its core design principals so I
decided to use the library written in Java. The Scala versions are really cool,
but they add another layer on top of it that makes more
difficult to understand what is going on and solve problems.
Connecting
There are several options to have an Elasticsearch client:
Creating a local embedded node
By creating a node that is part of a cluster and contributes with data
By creating a node that doesn’t contributes with data
By creating a transport that just communicates requests and responses to
other nodes in the cluster.
I just needed to connect to an existing cluster without contributing in data,
and decided to use the third option. At the beginning I was confused where to
create the connection, whether from the actors or somewhere else, how many
connections could I have, was it thread safe ? I had previous experience with
the Python client, but this was different, the Python library is more like
using a transport where you can create several client connections (and the
library maintains a pool of connections internally). In this case, as the node
creation is a very expensive operation and the native Java client is thread
safe, it is better to create one instance during the application start and pass it
to the actors that require access to the database (still not 100% sure if this
is the best pattern, but currently it works for me).
Futures vs ActionListener
The native Java client does a good job with not blocking during the execution of
operations but the first thing I was missing was its integration with the Scala
futures. After browsing a bit I found a good solution to wrap the execution into a Scala future. After minor modifications, this is what I use:
And this is an example of use:
Actor example
Let’s see an example on how to do a search without blocking an actor.
The following code have an actor that does an aggregation by timestamp when
receives a StartQuery message and returns a QueryResult with the list of
intervals found in the elasticsearch query result.
Testing
Usually integration tests involving databases are not so simple as it is the case
with unit tests, but the fact that we can create an embedded instance of Elasticsearch
makes it very convenient and easy to use.
You can encapsulate all the code required to create an embedded node with the
following code (adapted from this
post and this post):
Now, we are ready to see a test example using TestKit
and ScalaTest.
The first test is an example of how to test an actor feature accessing directly
to the underlying actor class. Using TestActorRef in this way any message
sent to the actor is processed in the same thread. In the second example the
actor is created using the actor props with the actor system and the messages
are processed in other thread. For further details on how to test Akka actors
you can read the Akka documentation.
Summary
We have seen very briefly how to use the Elasticsearch native Java client from
Scala using Futures, how to launch an embedded server for testing and how to
write an integration test for an Akka actor that uses an Elasticsearch database.