Author Avatar Image
Alexander Reelsen

Backend developer, productivity fan, likes the JVM, full text search, distributed databases & systems

ElasticCC platform - Part 2 - Using the new Elasticsearch Java Client
Mar 3, 2022
14 minutes read

TLDR; This is a series of blog posts x-raying the platform we used to run the Elastic Community Conference 2022 including choices of the platform, technologies and persistence options chosen as well as monitoring and APM capabilities.

In this series we will cover

Why a new client?

The existing High Level Rest Client (HLRC) has been around for a few years already and did its job. However it came with a few drawbacks, like

  • Pulling Elasticsearch and all of its dependencies
  • Using a lot of Elasticsearch internal and even using Lucene classes
  • Not being strongly typed for all request/response parts

The new client aligns with the already existing clients by using the same underlying foundations, along with code generated classes to make sure, that new endpoints are exposed as soon as they are defined within the Elasticsearch source.

The new client still depends on the LLRC - the Low Level Rest Client, that makes use of Apache HttpClient.

Just to reiterate: Everything within the client is now strongly typed and available via IDE completion, which means no handling with byte arrays or bytes references objects like in the old client.

History lesson: TransportClient, HLRC, LLRC, and the new client

Since Elasticsearch 5.6 an Elasticsearch High Level REST client had been released. This client was created to get rid of the TransportClient, which used the Elasticsearch binary protocol and ports for communication. This step allowed a clear separation between client and server communication based on ports.

The REST client already was split between a Low Level REST Client (LLRC) and a High Level Rest Client (HLRC). The LLRC was responsible for sending the actual HTTP requests, configuring TLS, HTTP Basic Authentication, selecting the right nodes to send requests to within a cluster and contained a sniffer to constantly discover new nodes or remove unreachable nodes from the list of available nodes to sent requests to.

The HLRC had a a different goal: providing classes and objects for each request and response that can be made to Elasticsearch. Every class for every endpoint was created manually. Everything had to be kept in sync manually when new fields or parameters had been added resulting in high maintenance, see the points above.

The clients team providing clients for other languages like ruby, python, PHP, JavaScript already utilized an existing request spec, so the next step was to create a Java client using the same infrastructure to use code generation from the spec definition on how Elasticsearch requests look like. This is a much harder task for statically typed languages like .NET or Java, but finally was tackled. This new client is the Elasticsearch Java Client. It uses the LLRC as well so it does not have to deal with those low level tasks, but can fully focus on the user experience. There are plans however to have the LLRC part pluggable, to use the built-in HTTP client available since Java 11 in the future - currently the Apache HTTPClient is used.

Now on to the part affecting the present instead of the history.

Compatibility lesson: All ze java clients

Now this is where things get a little more complex, especially in the light of upgrades. Let’s do a quick summary here to prepare you for future upgrades, even though this is not much of a problem for the ElasticCC platform I think it’s worth having a detour here.

  • The HLRC has been removed from Elasticsearch 8.0
  • The LLRC is still available as before
  • The new Elasticsearch Java Client is available for latest 7.x releases as well as for 8.x.

In case you are doing a migration, you can use the same LLRC for the old HLRC as well as the new Elasticsearch Java Client.

My personal upgrade strategy would be the following

  1. Upgrade to the latest 7.x release as with every upgrade
  2. Work on deprecations as with every upgrade
  3. Start using the new client in parallel with the existing one
  4. Replace the HLRC client step by step in your application with the new one
  5. Remove the HLRC dependency
  6. Upgrade to 8.x in the correct order (see here for on-prem, Elasticsearch before the client)

OK, this was quite the side stepping, but I consider it valuable information for anyone reading this article, let’s go back to our fancy little app.

Use case within the app

As all we need to do are basically indexing data and getting it back, the use-case is the most simple one - which makes it a great use-case for getting started with the client in a small project. Also, no migration work needed to be done.

Instantiating the client

In the first part we already covered how to parse our application.conf file. The good part is, that you can have good defaults, that can be overwritten by environment variables. In this example we go with localhost as default, but you can configure host, user and password via environment variables.

elasticsearch {
  host = "http://localhost:9200"
  host = ${?ELASTICSEARCH_HOST}
  user = ${?ELASTICSEARCH_USER}
  password = ${?ELASTICSEARCH_PASSWORD}
}

The next part was the initialization of the client. While you can use jsonb or jackson as underlying library, I usually go with Jackson, as I am most comfortable with its setup. First up is the configuration of the object mapper:

public final static ObjectMapper mapper = new ObjectMapper()
        .registerModule(new JavaTimeModule())
        .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)
        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
        .setSerializationInclusion(JsonInclude.Include.NON_NULL);

The custom object mapper uses the java time module to properly serialize the OffsetDateTime instances used in the entities. I prefer snake case of variable names as well as serializing dates as strings over UNIX timestamps. The serialization inclusion ensures only non null values are serialized to JSON.

This object mapper can now be put to use in the Elasticsearch client instantiation:

final RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearch.getString("host")));

if (elasticsearch.hasPath("user") && elasticsearch.hasPath("password")) {
  final String user = elasticsearch.getString("user");
  final String password = elasticsearch.getString("password");
  final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();

  credentialsProvider.setCredentials(AuthScope.ANY, 
      new UsernamePasswordCredentials(user, password));
  
  builder.setHttpClientConfigCallback(
      b -> b.setDefaultCredentialsProvider(credentialsProvider));
}

RestClient restClient = builder.build();
ElasticsearchTransport transport = new RestClientTransport(restClient,
    new JacksonJsonpMapper(mapper));
return new ElasticsearchClient(transport);

Note that the returned instance here is a client, that only offers synchronous and blocking methods. If you need need an async client you can go with ElasticsearchAsyncClient.

We can now use the same instance of Elasticsearch client in all our services:

this.elasticsearchClient = configuration.elasticsearch().toElasticsearchClient();

UserService userService = new UserService(elasticsearchClient);
RatingService ratingService = new RatingService(elasticsearchClient);
QuizService quizService = new QuizService(elasticsearchClient, sessionService);

Time to take a look at indices.

Entities & Indices

Let’s start with a small class and how it is being indexed.

public record Rating(@JsonProperty("session_id") String sessionId,
                     String username,
                     Integer rating,
                     List<Feedback> feedbacks,
                     @JsonProperty("created_at") OffsetDateTime createdAt) {
}


public enum Feedback {

    I_LEARNED_SOMETHING,
    TOO_WEAK,
    LACKS_DEMO,
    TOO_FAST,
    GREAT_DEMO,
    TOO_COMPLICATED,
    VERY_INTERESTING,
    UNCLEAR,
    BEST_TALK_OF_MY_LIVE,
    COULD_BE_MORE_EXPLORED,
    FUN,
    UNDERSTOOD_NOTHING;

    private final String displayName;

    Feedback() {
        this.displayName = name().subSequence(0, 1) + 
            name().replaceAll("_", " ").toLowerCase(Locale.ROOT).substring(1);
    }

    public String displayName() {
        return displayName;
    }
}

The Rating record (enjoying the sweet compactness) contains a rating for a session. Each user should have a single rating object per session stored in Elasticsearch. A rating can have a rating int from 1 to 5, as well as a list of feedbacks. That feedback functionality was added because it’s too hard to condense a talk into a single integer compared to give some more actionable feedback for the speakers.

The resulting JSON that will be stored looks like

{
  "session_id" : "303844",
  "created_at" : "2022-02-11T14:37:52.302442991Z",
  "username" : "the-awesome-name-of-the-user",
  "rating" : 4,
  "feedbacks" : [
    "I_LEARNED_SOMETHING",
    "GREAT_DEMO",
    "BEST_TALK_OF_MY_LIVE"
  ]
}

No special JSON serialization logic or anything, so we can proceed right to the code storing this. There is a dedicated RatingService class, that looks like this:

public class RatingService {

    public static final String INDEX = "ratings";

    private final ElasticsearchClient client;

    public RatingService(ElasticsearchClient client) {
        this.client = client;
    }

    public void save(Rating rating) throws IOException {
        client.index(b -> b
            .index(INDEX)
            .id(rating.sessionId() + "-" + rating.username())
            .document(rating));
    }
}

The save() method calls client.index(), which requires an index, a string based ID and the document for indexing. As we wanted to make sure that every user can only save one rating per session, one can simply use the sessionId and the username as the unique ID for this document. A new rating overwrites an old one - which also means you have to make sure to load the feedback if you just store the rating and load the rating if you are storing new feedbacks as those are two different HTTP endpoints.

The corresponding method for retrieving a rating looks like this

public Rating findById(String sessionId, String username) throws IOException {
  final GetResponse<Rating> response =
      client.get(b -> b.index(INDEX).id(sessionId + "-" + username), Rating.class);
  return response.source();
}

As the ID of the document is static, there is not even a need to execute a search, but we can retrieve by id.

There is one caveat with this setup and that is the use of the Feedback enum in the rating. Once you have written the enum values with their value into Elasticsearch, you cannot those already stored values from the Feedback enum without breaking the parsing. There are some workarounds like special parsing that defaults to an UNKNOWN enum. JDBI allows to use ordinals instead of strings, but that mapping needs to be stable as well and IMO is more for saving space by using numbers and does not apply to Elasticsearch in my opinion.

There is one method running an aggregation to calculate the average rating for each session:

public Map<String, Double> retrieveAverageRatingForAllSessions() throws IOException {
  final SearchResponse<Rating> response = client.search(b -> b
              .index(INDEX)
              .size(0)
              .aggregations("by_session", ab -> ab
                      .terms(tb -> tb
                              .field("session_id.keyword")
                              .size(10_000))
                      .aggregations("avg", subAgg -> subAgg
                              .avg(avg -> avg.field("rating")))
              ),
      Rating.class);

  return response.aggregations().get("by_session").sterms().buckets().array()
      .stream()
      .collect(Collectors.toMap(StringTermsBucket::key,
          b -> b.aggregations().get("avg").avg().value()));
}

The above java code will run the following request

{
  "size": 0,
  "aggs": {
    "by_session": {
      "terms": {
        "field": "session_id.keyword",
        "size": 10000
      },
      "aggs": {
        "avg": {
          "avg": {
            "field": "rating"
          }
        }
      }
    }
  }
}

For each session the average of the rating is calculated, having a response like this:

{
  "aggregations" : {
    "by_session" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "390906",
          "doc_count" : 54,
          "avg" : {
            "value" : 4.666666666666667
          }
        },
  ...

After retrieving the response, the final part is to convert the buckets from the aggregation response to a map of session ids and their respective ratings. For the first bucket we would have a mapping with a key of 390906 and a value of 4.666666666666667.

return response.aggregations().get("by_session")
    .sterms().buckets().array().stream()
    .collect(Collectors.toMap(
        StringTermsBucket::key,
        b -> b.aggregations().get("avg").avg().value())
    );

The snippet above shows one of the strengths of the strongly typed client, there is not a single cast, because of calling sterms() on the bucket response and thus have string term aggregation from then one. The same applies for b.aggregations().get("avg").avg() returning an average aggregation where it is easy to extract the value.

Running scripts

In order to add or remove sessions to the user’s custom schedule, a script is used to add or remove sessions from the agenda. The user JSON looks like this:

{
  "username" : "the-users-username",
  "name" : "Firstname Lastname",
  "registered" : true,
  "registered_at" : "2022-02-11T02:58:37.187179656Z",
  "last_login_at" : "2022-02-11T02:58:15.768292346Z",
  "receive_updates" : false,
  "agenda" : [
    "308067",
    "309600",
    "308409",
    "309562",
    "308707",
    "308455"
  ],
  "languages" : [
    "English"
  ],
  "region" : "EMEA"
}

The agenda contains the session ids of the sessions the user has added to her personal agenda. In order to add or remove sessions, we need to modify that array - that can be done with a script. Let’s cover the removal case below:

public void removeFromAgenda(User user, String sessionId) throws IOException {
    String script = """
        if (ctx._source.agenda == null) { ctx.op = 'none' } else if (!ctx._source.agenda.removeIf(id -> id.equals(params.sessionId))) { ctx.op = 'none'; }
    """;

    client.update(b -> b.index(INDEX).id(user.username())
        .script(s -> s
            .inline(sb -> sb
                .source(script)
                .lang("painless")
                .params(Map.of("sessionId", JsonData.of(sessionId)))
            )
        )
        .retryOnConflict(3),
        User.class);
    }

Reformatting the painless script makes it look like this (thanks to text blocks this is already quite readable just within Java):

if (ctx._source.agenda == null) {
  ctx.op = 'none'
} else if (!ctx._source.agenda.removeIf(id -> id.equals(params.sessionId))) {
  ctx.op = 'none';
}

As we are trying to remove a session, we can make this a noop if the agenda field does not even exist. We can also make this a noop if the session id was not part of the agenda array. Otherwise the update will happen as expected, by removing that id from the agenda array.

As having scripts as text in your application code, you have zero guarantees that this is actually working as expected, so we have to test this. Without a real integration test against Elasticsearch this is pretty much impossible to test - you may have guessed it already when following this blog: it’s Testcontainers time!

Testing

Explain abstract testcontainers class

@Tag("slow")
@Execution(ExecutionMode.SAME_THREAD)
public abstract class AbstractElasticsearchContainerTest {

  public static final ElasticsearchContainer container =
          new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3")
          .withEnv("ES_JAVA_OPTS", "-Xms1024m -Xmx1024m");
  public static RestClient restClient;
  public static ElasticsearchClient client;

  public abstract String indexToDelete();

  @BeforeAll
  public static void setup() {
    container.start();
    restClient = createRestClient();
    client = createElasticsearchClient(restClient);
  }

  @AfterAll
  public static void shutdown() throws IOException {
    restClient.close();
  }

  @AfterEach
  public void deleteIndex() throws IOException, InterruptedException {
    String index = indexToDelete();
    if (index != null) {
      URI uri = URI.create("http://localhost:" + 
          container.getMappedPort(9200) + "/" + index);
      HttpRequest request = HttpRequest.newBuilder().DELETE().uri(uri).build();

      HttpClient.newHttpClient()
          .send(request, HttpResponse.BodyHandlers.discarding());
    }
  }

  public static RestClient createRestClient() {
    final int httpPort = container.getMappedPort(9200);
    final HttpHost host = HttpHost.create("localhost:" + httpPort);
    return RestClient.builder(host).build();
  }

  private static ElasticsearchClient createElasticsearchClient(RestClient client) {
    ElasticsearchTransport transport = new RestClientTransport(client, 
        new JacksonJsonpMapper(Configuration.mapper));

    return new ElasticsearchClient(transport);
  }
}

The above is my generic test class for any Elasticsearch test. The container should only be started and stopped once for all tests, so this is embedded into a static block using @BeforeAll and @AfterAll, the same applies to the ElasticsearchClient instance being used. Each test class usually writes into a single index, so that this index should be cleaned up, after each tests - however you could just specify a string like index-a,index-b to delete more than one index.

A concrete test class with a simple test looks like this:

public class RatingServiceTests extends AbstractElasticsearchContainerTest {

  private final RatingService ratingService = new RatingService(client);

  @Override
  public String indexToDelete() {
    return RatingService.INDEX;
  }

  @Test
  public void testFindById() {
    Rating rating = new Rating("my-session-id", "userId", 1,
        Collections.emptyList(), OffsetDateTime.now(ZoneOffset.UTC));
    ratingService.save(rating);

    final Rating foundRating = ratingService.findById("my-session-id", "userId");

    assertThat(rating).isEqualTo(foundRating);
  }
}

Summary

I hope this gave a first impression into using the client. We’re not done yet of usage within the ElasticCC app, as next up we will cover what we did to upgrade the app on k8s without downtime, which also includes storing sessions in Elasticsearch using this client.

Things to take away: The client heavily uses the builder pattern passed as arguments, for example when specifying parts of a request. While this might be confusing at first, it tries to resemble the JSON structure a little, making it possible to make it readable via proper formatting while still being typed.

SearchResponse<QuizAnswer> response = client.search(b -> b.index(ANSWER_INDEX)
    .query(q -> q.bool(bq -> bq
      .filter(fb -> fb
        .term(t -> t
          .field("user_id.keyword")
          .value(FieldValue.of(email))
      ))
      .filter(fb -> fb
        .terms(t -> t
          .field("question_id.keyword")
          .terms(TermsQueryField.of(tqb -> tqb.value(fieldValues))
      )))
    )), QuizAnswer.class);

Feel free to create GitHub issues with anything you encounter for the new client in the elasticsearch-java GitHub repository or ask questions on the discuss forum. As usual, try to be on the latest version when using the client, to make sure things might not have been fixed already.

Update: I had two people ping me with a specific question about migration. If you managing your mappings via JSON already, then it probably makes sense not to convert it to Java code, as it is also reusable with Kibana Dev Tools. Even though you cannot use the new Elasticsearch Client to send a byte array to and endpoint, you can just resort to the LLRC like this:

HttpHost host = HttpHost.create("localhost:9200");
RestClient restClient = RestClient.builder(host).build();

ElasticsearchTransport transport = new RestClientTransport(restClient,
      new JacksonJsonpMapper(Configuration.mapper));
ElasticsearchClient client = new ElasticsearchClient(transport);

/*
// alternative way to access rest client by casting the transport
RestClientTransport restClientTransport = (RestClientTransport) client._transport();
RestClient restClient = restClientTransport.restClient();
*/

Request request = new Request("PUT", "/my-index/_mapping");
byte[] bytes = Files.readAllBytes(Paths.get("/path/to/mapping.json"));
request.setEntity(new NByteArrayEntity(bytes, ContentType.APPLICATION_JSON));

Response response = restClient.performRequest(request);
int statusCode = response.getStatusLine().getStatusCode();

Resources

Final remarks

If you made it down here, wooow! Thanks for sticking with me. You can follow or ping me on twitter, GitHub or reach me via Email (just to tell me, you read this whole thing :-).

If there is anything to correct, drop me a note, and I am happy to do so and append to this post!

Same applies for questions. If you have question, go ahead and ask!

If you want me to speak about this, drop me an email!


Back to posts