Table of Contents
TLDR; This is a series of blog posts x-raying the platform we used to run the Elastic Community Conference 2022 including choices of the platform, technologies and persistence options chosen as well as monitoring and APM capabilities.
In this series we will cover
- Part 1: Application structure
- Part 2: Using the new Elasticsearch Java Client
- Part 3: Upgrading on k8s without downtime
Why a new client?
The existing High Level Rest Client (HLRC) has been around for a few years already and did its job. However it came with a few drawbacks, like
- Pulling Elasticsearch and all of its dependencies
- Using a lot of Elasticsearch internal and even using Lucene classes
- Not being strongly typed for all request/response parts
The new client aligns with the already existing clients by using the same underlying foundations, along with code generated classes to make sure, that new endpoints are exposed as soon as they are defined within the Elasticsearch source.
The new client still depends on the LLRC - the Low Level Rest Client, that makes use of Apache HttpClient.
Just to reiterate: Everything within the client is now strongly typed and available via IDE completion, which means no handling with byte arrays or bytes references objects like in the old client.
History lesson: TransportClient, HLRC, LLRC, and the new client
Since Elasticsearch 5.6 an Elasticsearch High Level REST client had been
released.
This client was created to get rid of the TransportClient
, which used the
Elasticsearch binary protocol and ports for communication. This step allowed
a clear separation between client and server communication based on ports.
The REST client already was split between a Low Level REST Client (LLRC
) and
a High Level Rest Client (HLRC
). The LLRC was responsible for sending the
actual HTTP requests, configuring TLS, HTTP Basic Authentication, selecting
the right nodes to send requests to within a cluster and contained a
sniffer
to constantly discover new nodes or remove unreachable nodes from the list
of available nodes to sent requests to.
The HLRC had a a different goal: providing classes and objects for each request and response that can be made to Elasticsearch. Every class for every endpoint was created manually. Everything had to be kept in sync manually when new fields or parameters had been added resulting in high maintenance, see the points above.
The clients team providing clients for other languages like ruby, python, PHP, JavaScript already utilized an existing request spec, so the next step was to create a Java client using the same infrastructure to use code generation from the spec definition on how Elasticsearch requests look like. This is a much harder task for statically typed languages like .NET or Java, but finally was tackled. This new client is the Elasticsearch Java Client. It uses the LLRC as well so it does not have to deal with those low level tasks, but can fully focus on the user experience. There are plans however to have the LLRC part pluggable, to use the built-in HTTP client available since Java 11 in the future - currently the Apache HTTPClient is used.
Now on to the part affecting the present instead of the history.
Compatibility lesson: All ze java clients
Now this is where things get a little more complex, especially in the light of upgrades. Let’s do a quick summary here to prepare you for future upgrades, even though this is not much of a problem for the ElasticCC platform I think it’s worth having a detour here.
- The HLRC has been removed from Elasticsearch 8.0
- The LLRC is still available as before
- The new Elasticsearch Java Client is available for latest 7.x releases as well as for 8.x.
In case you are doing a migration, you can use the same LLRC for the old HLRC as well as the new Elasticsearch Java Client.
My personal upgrade strategy would be the following
- Upgrade to the latest 7.x release as with every upgrade
- Work on deprecations as with every upgrade
- Start using the new client in parallel with the existing one
- Replace the
HLRC
client step by step in your application with the new one - Remove the HLRC dependency
- Upgrade to 8.x in the correct order (see here for on-prem, Elasticsearch before the client)
OK, this was quite the side stepping, but I consider it valuable information for anyone reading this article, let’s go back to our fancy little app.
Use case within the app
As all we need to do are basically indexing data and getting it back, the use-case is the most simple one - which makes it a great use-case for getting started with the client in a small project. Also, no migration work needed to be done.
Instantiating the client
In the first
part we
already covered how to parse our application.conf
file. The good part is, that you can have good defaults, that can be
overwritten by environment variables. In this example we go with localhost
as default, but you can configure host, user and password via environment
variables.
elasticsearch {
host = "http://localhost:9200"
host = ${?ELASTICSEARCH_HOST}
user = ${?ELASTICSEARCH_USER}
password = ${?ELASTICSEARCH_PASSWORD}
}
The next part was the initialization of the client. While you can use jsonb or jackson as underlying library, I usually go with Jackson, as I am most comfortable with its setup. First up is the configuration of the object mapper:
public final static ObjectMapper mapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
The custom object mapper uses the java time module to properly serialize
the OffsetDateTime
instances used in the entities. I prefer snake case of
variable names as well as serializing dates as strings over UNIX timestamps.
The serialization inclusion ensures only non null values are
serialized to JSON.
This object mapper can now be put to use in the Elasticsearch client instantiation:
final RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearch.getString("host")));
if (elasticsearch.hasPath("user") && elasticsearch.hasPath("password")) {
final String user = elasticsearch.getString("user");
final String password = elasticsearch.getString("password");
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(user, password));
builder.setHttpClientConfigCallback(
b -> b.setDefaultCredentialsProvider(credentialsProvider));
}
RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(restClient,
new JacksonJsonpMapper(mapper));
return new ElasticsearchClient(transport);
Note that the returned instance here is a client, that only offers
synchronous and blocking methods. If you need need an async client you can
go with ElasticsearchAsyncClient
.
We can now use the same instance of Elasticsearch
client in all our
services:
this.elasticsearchClient = configuration.elasticsearch().toElasticsearchClient();
UserService userService = new UserService(elasticsearchClient);
RatingService ratingService = new RatingService(elasticsearchClient);
QuizService quizService = new QuizService(elasticsearchClient, sessionService);
Time to take a look at indices.
Entities & Indices
Let’s start with a small class and how it is being indexed.
public record Rating(@JsonProperty("session_id") String sessionId,
String username,
Integer rating,
List<Feedback> feedbacks,
@JsonProperty("created_at") OffsetDateTime createdAt) {
}
public enum Feedback {
I_LEARNED_SOMETHING,
TOO_WEAK,
LACKS_DEMO,
TOO_FAST,
GREAT_DEMO,
TOO_COMPLICATED,
VERY_INTERESTING,
UNCLEAR,
BEST_TALK_OF_MY_LIVE,
COULD_BE_MORE_EXPLORED,
FUN,
UNDERSTOOD_NOTHING;
private final String displayName;
Feedback() {
this.displayName = name().subSequence(0, 1) +
name().replaceAll("_", " ").toLowerCase(Locale.ROOT).substring(1);
}
public String displayName() {
return displayName;
}
}
The Rating
record (enjoying the sweet compactness) contains a rating for a
session. Each user should have a single rating object per session stored in
Elasticsearch. A rating can have a rating
int from 1 to 5, as well as a
list of feedbacks. That feedback functionality was added because it’s too
hard to condense a talk into a single integer compared to give some more
actionable feedback for the speakers.
The resulting JSON that will be stored looks like
{
"session_id" : "303844",
"created_at" : "2022-02-11T14:37:52.302442991Z",
"username" : "the-awesome-name-of-the-user",
"rating" : 4,
"feedbacks" : [
"I_LEARNED_SOMETHING",
"GREAT_DEMO",
"BEST_TALK_OF_MY_LIVE"
]
}
No special JSON serialization logic or anything, so we can proceed right to
the code storing this. There is a dedicated RatingService
class, that
looks like this:
public class RatingService {
public static final String INDEX = "ratings";
private final ElasticsearchClient client;
public RatingService(ElasticsearchClient client) {
this.client = client;
}
public void save(Rating rating) throws IOException {
client.index(b -> b
.index(INDEX)
.id(rating.sessionId() + "-" + rating.username())
.document(rating));
}
}
The save()
method calls client.index()
, which requires an index, a
string based ID and the document for indexing. As we wanted to make sure
that every user can only save one rating per session, one can simply use the
sessionId
and the username as the unique ID for this document. A new rating
overwrites an old one - which also means you have to make sure to load the
feedback if you just store the rating and load the rating if you are storing
new feedbacks as those are two different HTTP endpoints.
The corresponding method for retrieving a rating looks like this
public Rating findById(String sessionId, String username) throws IOException {
final GetResponse<Rating> response =
client.get(b -> b.index(INDEX).id(sessionId + "-" + username), Rating.class);
return response.source();
}
As the ID of the document is static, there is not even a need to execute a search, but we can retrieve by id.
There is one caveat with this setup and that is the use of the Feedback
enum in the rating. Once you have written the enum values with their value
into Elasticsearch, you cannot those already stored values from the
Feedback
enum without breaking the parsing. There are some workarounds
like special parsing that defaults to an UNKNOWN
enum.
JDBI allows to use ordinals instead of strings, but
that mapping needs to be stable as well and IMO is more for saving space by
using numbers and does not apply to Elasticsearch in my opinion.
There is one method running an aggregation to calculate the average rating for each session:
public Map<String, Double> retrieveAverageRatingForAllSessions() throws IOException {
final SearchResponse<Rating> response = client.search(b -> b
.index(INDEX)
.size(0)
.aggregations("by_session", ab -> ab
.terms(tb -> tb
.field("session_id.keyword")
.size(10_000))
.aggregations("avg", subAgg -> subAgg
.avg(avg -> avg.field("rating")))
),
Rating.class);
return response.aggregations().get("by_session").sterms().buckets().array()
.stream()
.collect(Collectors.toMap(StringTermsBucket::key,
b -> b.aggregations().get("avg").avg().value()));
}
The above java code will run the following request
{
"size": 0,
"aggs": {
"by_session": {
"terms": {
"field": "session_id.keyword",
"size": 10000
},
"aggs": {
"avg": {
"avg": {
"field": "rating"
}
}
}
}
}
}
For each session the average of the rating
is calculated, having a
response like this:
{
"aggregations" : {
"by_session" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "390906",
"doc_count" : 54,
"avg" : {
"value" : 4.666666666666667
}
},
...
After retrieving the response, the final part is to convert the buckets from
the aggregation response to a map of session ids and their respective
ratings. For the first bucket we would have a mapping with a key of 390906
and a value of 4.666666666666667
.
return response.aggregations().get("by_session")
.sterms().buckets().array().stream()
.collect(Collectors.toMap(
StringTermsBucket::key,
b -> b.aggregations().get("avg").avg().value())
);
The snippet above shows one of the strengths of the strongly typed client,
there is not a single cast, because of calling sterms()
on the bucket
response and thus have string term aggregation from then one. The same
applies for b.aggregations().get("avg").avg()
returning an average
aggregation where it is easy to extract the value.
Running scripts
In order to add or remove sessions to the user’s custom schedule, a script is used to add or remove sessions from the agenda. The user JSON looks like this:
{
"username" : "the-users-username",
"name" : "Firstname Lastname",
"registered" : true,
"registered_at" : "2022-02-11T02:58:37.187179656Z",
"last_login_at" : "2022-02-11T02:58:15.768292346Z",
"receive_updates" : false,
"agenda" : [
"308067",
"309600",
"308409",
"309562",
"308707",
"308455"
],
"languages" : [
"English"
],
"region" : "EMEA"
}
The agenda
contains the session ids of the sessions the user has added to
her personal agenda. In order to add or remove sessions, we need to modify
that array - that can be done with a script. Let’s cover the removal case
below:
public void removeFromAgenda(User user, String sessionId) throws IOException {
String script = """
if (ctx._source.agenda == null) { ctx.op = 'none' } else if (!ctx._source.agenda.removeIf(id -> id.equals(params.sessionId))) { ctx.op = 'none'; }
""";
client.update(b -> b.index(INDEX).id(user.username())
.script(s -> s
.inline(sb -> sb
.source(script)
.lang("painless")
.params(Map.of("sessionId", JsonData.of(sessionId)))
)
)
.retryOnConflict(3),
User.class);
}
Reformatting the painless script makes it look like this (thanks to text blocks this is already quite readable just within Java):
if (ctx._source.agenda == null) {
ctx.op = 'none'
} else if (!ctx._source.agenda.removeIf(id -> id.equals(params.sessionId))) {
ctx.op = 'none';
}
As we are trying to remove a session, we can make this a noop
if the
agenda
field does not even exist. We can also make this a noop
if the
session id was not part of the agenda
array. Otherwise the update will
happen as expected, by removing that id from the agenda
array.
As having scripts as text in your application code, you have zero guarantees that this is actually working as expected, so we have to test this. Without a real integration test against Elasticsearch this is pretty much impossible to test - you may have guessed it already when following this blog: it’s Testcontainers time!
Testing
Explain abstract testcontainers class
@Tag("slow")
@Execution(ExecutionMode.SAME_THREAD)
public abstract class AbstractElasticsearchContainerTest {
public static final ElasticsearchContainer container =
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3")
.withEnv("ES_JAVA_OPTS", "-Xms1024m -Xmx1024m");
public static RestClient restClient;
public static ElasticsearchClient client;
public abstract String indexToDelete();
@BeforeAll
public static void setup() {
container.start();
restClient = createRestClient();
client = createElasticsearchClient(restClient);
}
@AfterAll
public static void shutdown() throws IOException {
restClient.close();
}
@AfterEach
public void deleteIndex() throws IOException, InterruptedException {
String index = indexToDelete();
if (index != null) {
URI uri = URI.create("http://localhost:" +
container.getMappedPort(9200) + "/" + index);
HttpRequest request = HttpRequest.newBuilder().DELETE().uri(uri).build();
HttpClient.newHttpClient()
.send(request, HttpResponse.BodyHandlers.discarding());
}
}
public static RestClient createRestClient() {
final int httpPort = container.getMappedPort(9200);
final HttpHost host = HttpHost.create("localhost:" + httpPort);
return RestClient.builder(host).build();
}
private static ElasticsearchClient createElasticsearchClient(RestClient client) {
ElasticsearchTransport transport = new RestClientTransport(client,
new JacksonJsonpMapper(Configuration.mapper));
return new ElasticsearchClient(transport);
}
}
The above is my generic test class for any Elasticsearch test. The container
should only be started and stopped once for all tests, so this is embedded
into a static block using @BeforeAll
and @AfterAll
, the same applies to
the ElasticsearchClient
instance being used. Each test class usually
writes into a single index, so that this index should be cleaned up, after
each tests - however you could just specify a string like index-a,index-b
to delete more than one index.
A concrete test class with a simple test looks like this:
public class RatingServiceTests extends AbstractElasticsearchContainerTest {
private final RatingService ratingService = new RatingService(client);
@Override
public String indexToDelete() {
return RatingService.INDEX;
}
@Test
public void testFindById() {
Rating rating = new Rating("my-session-id", "userId", 1,
Collections.emptyList(), OffsetDateTime.now(ZoneOffset.UTC));
ratingService.save(rating);
final Rating foundRating = ratingService.findById("my-session-id", "userId");
assertThat(rating).isEqualTo(foundRating);
}
}
Summary
I hope this gave a first impression into using the client. We’re not done yet of usage within the ElasticCC app, as next up we will cover what we did to upgrade the app on k8s without downtime, which also includes storing sessions in Elasticsearch using this client.
Things to take away: The client heavily uses the builder pattern passed as arguments, for example when specifying parts of a request. While this might be confusing at first, it tries to resemble the JSON structure a little, making it possible to make it readable via proper formatting while still being typed.
SearchResponse<QuizAnswer> response = client.search(b -> b.index(ANSWER_INDEX)
.query(q -> q.bool(bq -> bq
.filter(fb -> fb
.term(t -> t
.field("user_id.keyword")
.value(FieldValue.of(email))
))
.filter(fb -> fb
.terms(t -> t
.field("question_id.keyword")
.terms(TermsQueryField.of(tqb -> tqb.value(fieldValues))
)))
)), QuizAnswer.class);
Feel free to create GitHub issues with anything you encounter for the new client in the elasticsearch-java GitHub repository or ask questions on the discuss forum. As usual, try to be on the latest version when using the client, to make sure things might not have been fixed already.
Update: I had two people ping me with a specific question about migration. If you managing your mappings via JSON already, then it probably makes sense not to convert it to Java code, as it is also reusable with Kibana Dev Tools. Even though you cannot use the new Elasticsearch Client to send a byte array to and endpoint, you can just resort to the LLRC like this:
HttpHost host = HttpHost.create("localhost:9200");
RestClient restClient = RestClient.builder(host).build();
ElasticsearchTransport transport = new RestClientTransport(restClient,
new JacksonJsonpMapper(Configuration.mapper));
ElasticsearchClient client = new ElasticsearchClient(transport);
/*
// alternative way to access rest client by casting the transport
RestClientTransport restClientTransport = (RestClientTransport) client._transport();
RestClient restClient = restClientTransport.restClient();
*/
Request request = new Request("PUT", "/my-index/_mapping");
byte[] bytes = Files.readAllBytes(Paths.get("/path/to/mapping.json"));
request.setEntity(new NByteArrayEntity(bytes, ContentType.APPLICATION_JSON));
Response response = restClient.performRequest(request);
int statusCode = response.getStatusLine().getStatusCode();
Resources
- Docs: Java Low Level Rest Client
- Elasticsearch Java Client: GitHub Repo
- Elasticsearch request/response definitions in TypeScript (used by the generator): elasticsearch-specification
- Elasticsearch Client Example Code Repo
- ElasticCC talk about the new Java Client, most recent talk by Sylvain, the maintainer of the client
Final remarks
If you made it down here, wooow! Thanks for sticking with me. You can follow or ping me on twitter, GitHub or reach me via Email (just to tell me, you read this whole thing :-).
If there is anything to correct, drop me a note, and I am happy to do so and append to this post!
Same applies for questions. If you have question, go ahead and ask!
If you want me to speak about this, drop me an email!