Author Avatar Image
Alexander Reelsen

Backend developer, productivity fan, likes the JVM, full text search, distributed databases & systems

ElasticCC platform - Part 3 - Upgrading on k8s without downtime
Mar 15, 2022
17 minutes read

TLDR; This is a series of blog posts x-raying the platform we used to run the Elastic Community Conference 2022 including choices of the platform, technologies and persistence options chosen as well as monitoring and APM capabilities.

In this series we will cover

Rollouts without downtime

As the times from quarterly updates with weekend long downtimes are luckily over since a decade, you have to think about the ability to easily update an application - without the user noticing. If you are running on k8s, you might be lucky and this is only a single command away, after you updated your docker images.

kubectl rollout restart deployment/my-neat-app

The basic idea is to start up a new instance of the application, wait until it is ready, then move the traffic over to that instance and stop the already running instance. This is can be done with a single instance or with ten instances, by rolling over one after another.

There are a couple of a caveats, the biggest ones are data stores being in use and state.

If you are using a data store you have to make sure, that the old and new version of your app are able to cope with potential changes like a mapping update or a type change or a reindex operation. So, whenever you modify you data definition, this needs to be properly tested - and that rollout might need to be decoupled from rolling out your new feature. In the case of the ElasticCC app this was nothing to take care of, as we there were no mapping changes to our Elasticsearch data.

The second part is however more tricky. Every Java Servlet based application has the concept of a session, that is needed when a user is logged in to identify the user on the server side. By default the mapping is done via a cookie that contains the session ID. This means the state is on the server side. When you shutdown that JVM instance, the state is gone, because it is usually held in memory.

One possible solution for this is to move the state to the client side. You could set a cookie containing your state and either sign or encrypt that cookie, so that it cannot be modified on the client side. This way all your state is on the client side. Storing the user ID in there and looking it up with every request would work. I created such a cookie store in the javalin-cookie-session-store GitHub project.

The other alternative is to implement a custom session store, storing sessions in a data store instead of memory. Luckily Jetty has an established interface for that.

Jetty and session stores

When I ran a side project on Digital Ocean App Platform I wanted to make sure to not loose any sessions, so I decided to give the JDBCSessionDataStore a try, that comes with jetty and thus is also available in Javalin. This worked great, until I figured out the sessions were not properly expired and thus I ended up with millions of them after a month. We’ll get to that problem later… 😀

So, that said there is the possibility to store sessions in a data store. If you’ve read the other parts until here, you will notice, that the only data store we plan to use is Elasticsearch.

Elasticsearch Session Datastore

Finally it’s time to come up with some wisdom and whip out the

If all you have is a hammer, everything looks like a nail

phrase from Maslow. So my choice now was to introduce a SQL database or some Redis component to my app, or just use Elasticsearch.

I wanted to explore the idea of an Elasticsearch Session Data Store - which was another possibility to test the new Elasticsearch Java Client.

Jetty already has a base class for custom data store implementations named NoSqlSessionDataStore that extends AbstractSessionDataStore.

Some of the methods that need to be implemented - summarized from a few interfaces:

void store(String id, SessionData data) throws Exception;

SessionData load(String id) throws Exception;

boolean delete(String id) throws Exception;

boolean exists(String id) throws Exception;

Set<String> getExpired(Set<String> candidates);

Just judging by the names of those methods this pretty much resembles a CRUD use case operating on single documents. The only exception is the getExpired() method, that can optionally check for expired sessions so they get cleaned up.

ElasticsearchSessionDataStore setup

First, let’s model a session, with all its properties. From a user perspective the most important part is the session_data which includes the serialization of the session.

public record ElasticsearchSession(
    @JsonProperty("id")                     String id,
    @JsonProperty("context_path")           String contextPath,
    @JsonProperty("vhost")                  String vhost,
    @JsonProperty("last_node_updated")      String lastNodeUpdated,
    @JsonProperty("epoch_created")          Long epochCreated,
    @JsonProperty("epoch_accessed")         Long epochAccessed,
    @JsonProperty("epoch_last_accessed")    Long epochLastAccessed,
    @JsonProperty("epoch_cookie_set")       Long epochCookieSet,
    @JsonProperty("epoch_expiry")           Long epochExpiry,
    @JsonProperty("max_inactive_ms")        Long maxInactiveMs,
    @JsonProperty("session_data")           String sessionData) {
}

The basic idea here is to serialize the session data, and then encode this as Base64 string so that it can be easily stored in Elasticsearch. This is one of the reasons why you probably would not do this on a high traffic website, as that would eat CPU and requires 25% additional space, that could be prevented. For out modest use-case this was good enough.

Next up is the AbstractSessionDataStore implementation, showing all the required methods without implementations:

@ManagedObject
public class ElasticsearchSessionDataStore extends NoSqlSessionDataStore {

    private static final Logger logger =
        LogManager.getLogger(ElasticsearchSessionDataStore.class);

    public static final String INDEX = "elasticcc-jetty-sessions";

    private final ElasticsearchClient client;

    public ElasticsearchSessionDataStore(ElasticsearchClient client) {
        this.client = client;
    }

    @Override
    protected void doStart() throws Exception {
      // mapping and index creation
    }

    @Override
    public void doStore(String id, SessionData data, long lastSaveTime) {
      // serialization
    }

    @Override
    public SessionData doLoad(String id) throws Exception {
      // deserialization
    }

    @Override
    public Set<String> doGetExpired(Set<String> candidates) {
      //expiration  check
    }

    @Override
    @ManagedAttribute(value = "does store serialize sessions", readonly = true)
    public boolean isPassivating() {
        return true;
    }

    @Override
    public boolean exists(String id) throws Exception {
    }

    @Override
    public boolean delete(String id) {
    }
}

As you can see the total number of methods needed is rather low. The total implementation ended up at around 300 lines.

Index creation & mapping

The constructor of the data store takes an Elasticsearch client, we saw in a previous post how that one is created.

On every startup I want to make sure that the mapping is correct and create the index if needed - usually this is not needed on production, but when running tests. First step is to create an enum for all fields

private enum Fields {
    Id,
    ContextPath,
    Vhost,
    LastNodeUpdated,
    SessionData,
    MaxInactiveMs,
    EpochCreated,
    EpochAccessed,
    EpochLastAccessed,
    EpochCookieSet,
    EpochExpiry;

    private final String fieldName;

    Fields() {
        this.fieldName = new PropertyNamingStrategies.SnakeCaseStrategy().translate(this.name());
    }
}

The fieldName will contain the snake case variant, i.e. ContextPath will become context_path, which is used as JSON field name.

@Override
protected void doStart() throws Exception {
  super.doStart();
  final BooleanResponse exists = client.indices().exists(b -> b.index(INDEX));
  if (exists.value() == false) {
    final CreateIndexResponse createResponse =
        client.indices().create(b -> b.index(INDEX));

    if (createResponse.acknowledged() == false) {
      throw new RuntimeException("Session index creation was not acknowledged");
    }
  }

  ...

After index creation the mapping should be created. For an existing index, no error should be returned, if the mapping already exists. You can see the use of the fieldName property, so make sure the snake case version is used.

Map<String, Property> properties = new HashMap<>();
Property keywordProperty = Property.of(pb -> pb.keyword(builder -> builder));
properties.put(Fields.Id.fieldName, keywordProperty);
properties.put(Fields.ContextPath.fieldName, keywordProperty);
properties.put(Fields.Vhost.fieldName, keywordProperty);
properties.put(Fields.LastNodeUpdated.fieldName, keywordProperty);

Property dateProperty = 
    Property.of(pb -> pb.date(builder -> builder.format("epoch_millis")));
properties.put(Fields.EpochCreated.fieldName, dateProperty);
properties.put(Fields.EpochAccessed.fieldName, dateProperty);
properties.put(Fields.EpochLastAccessed.fieldName, dateProperty);
properties.put(Fields.EpochCookieSet.fieldName, dateProperty);
properties.put(Fields.EpochExpiry.fieldName, dateProperty);

properties.put(Fields.SessionData.fieldName,
    Property.of(pb -> pb.binary(builder -> builder)));
properties.put(Fields.MaxInactiveMs.fieldName,
    Property.of(pb -> pb.long_(nb -> nb)));

client.indices().putMapping(b -> b.index(INDEX).properties(properties));

After setting the mapping, we also want to make sure, that the mapping is actually correct. After receiving the mapping, you can try to retrieve the concrete type, and if the mapping type is different an exception is thrown. This way, you can ensure the correct mapping.

// check for correct mapping and bail if it does not work
final GetMappingResponse mapping = client.indices().getMapping(b -> b.index(INDEX));
final Map<String, Property> properties = mapping.get(INDEX).mappings().properties();
// this ensures the right types are set
properties.get(Fields.Id.fieldName).keyword();
properties.get(Fields.ContextPath.fieldName).keyword();
properties.get(Fields.Vhost.fieldName).keyword();
properties.get(Fields.LastNodeUpdated.fieldName).keyword();
properties.get(Fields.SessionData.fieldName).binary();
properties.get(Fields.MaxInactiveMs.fieldName).long_();
properties.get(Fields.EpochCreated.fieldName).date();
properties.get(Fields.EpochAccessed.fieldName).date();
properties.get(Fields.EpochLastAccessed.fieldName).date();
properties.get(Fields.EpochCookieSet.fieldName).date();
properties.get(Fields.EpochExpiry.fieldName).date();

So now, as part of starting the session store, index creation and mapping check is properly handled. Time to figure out how a session is serialized.

Serializing & storing a session

The serialization is pretty much the same for all data stores. You need to figure out a way to represent the session object in a format suitable for the data store.

@Override
public void doStore(String id, SessionData data, long lastSaveTime) throws Exception {
  logger.debug("Storing session id [{}]", id);
  final String base64Session;
  try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
       ObjectOutputStream oos = new ObjectOutputStream(bos)) {

      SessionData.serializeAttributes(data, oos);
      oos.flush();
      base64Session = Base64.getEncoder().encodeToString(bos.toByteArray());
  }

  ElasticsearchSession session = new ElasticsearchSession(id, 
      _context.getCanonicalContextPath(), _context.getVhost(),
      data.getLastNode(), data.getCreated(), data.getAccessed(),
      data.getLastAccessed(), data.getCookieSet(), data.getExpiry(),
      data.getMaxInactiveMs(),
      base64Session);

  client.index(b -> b.index(INDEX).id(id).document(session));
}

The core of this code snippet is to serialize the session attributes via SessionData.serializeAttributes() as an object output stream right into a byte array, which then gets encoded via Base64, so this can be stored as a string.

Note: This seemingly simple implementation comes at a prize. If your classes do not implement java.io.Serializable, then serialization into an ObjectOutputStream will not work and the object will not be part of the output stream, as if they had not been added to the HttpSession.

Our User entity needed a slight change to properly support serialization:

public class User implements Serializable {

    @Serial
    private static final long serialVersionUID = 2563834867543039733L;

}

with this change serialization of the sessions works as expected.

Loading a session

Loading the session is just inverting the workflow of storing. Retrieving from Elasticsearch, base64 decoding and deserializing of that input stream to an object.

@Override
public SessionData doLoad(String id) throws Exception {
  GetResponse<ElasticsearchSession> response;
  try {
    response = client.get(b -> b.index(INDEX).id(id), ElasticsearchSession.class);
    if (response.found() == false) {
      return null;
    }
    final ElasticsearchSession session = response.source();

    SessionData sessionData = newSessionData(id, session.epochCreated(), 
            session.epochAccessed(), session.epochLastAccessed(),
            session.maxInactiveMs());
    sessionData.setContextPath(session.contextPath());
    sessionData.setVhost(session.vhost());
    sessionData.setLastNode(session.lastNodeUpdated());
    sessionData.setCookieSet(session.epochCookieSet());
    sessionData.setExpiry(session.epochExpiry());

    final byte[] sessionBytes = Base64.getDecoder().decode(session.sessionData());
    try (ByteArrayInputStream bis = new ByteArrayInputStream(sessionBytes);
         DataInputStream dis = new DataInputStream(bis);
         ClassLoadingObjectInputStream ois = 
             new ClassLoadingObjectInputStream(dis)) {

      SessionData.deserializeAttributes(sessionData, ois);
    }

    return sessionData;
  } catch (Exception e) {
    logger.error("Error retrieving session id [" + id + "]", e);
    return null;
  }
}

Again there is no custom code for serializing/deserializing JSON, which makes this really nice to read.

Delete a session

Sessions can be deleted in two ways. When a user logs out by explicitly triggering this (clicking the log out button) or when a session expires. This means the session store needs a way to delete a session, which is rather short in our implementation, as we can just delegate to Elasticsearch.

@Override
public boolean delete(String id) {
  logger.debug("Deleting of session id [{}]", id);
  try {
    final DeleteResponse response = client.delete(b -> b.index(INDEX).id(id));
    return response.result() != Result.NotFound;
  } catch (Exception e) {
    return false;
  }
}

Existence check

Checking for the existence of a session looks like this. Jetty uses this to not create an session with an existing session id.

@Override
public boolean exists(String id) throws Exception {
  final boolean exists = client.exists(b -> b.index(INDEX).id(id)).value();
  logger.debug("Checking for existence of session id [{}]: [{}]", id, exists);
  return exists;
}

Retrieve expired sessions

The final part of the session store deals with expiration. Once a user logged in, but stopped using the app, the session ages and probably is not used again, but still takes up space. This is where the expiration mechanism comes in.

The method to implement is named

@Override
public Set<String> doGetExpired(Set<String> candidates) {
}

The argument is a list of possible candidates to expire that need to be checked. However most session stores do some additional work and also search for expired sessions on top of those supplied candidates.

So the first step is to check for the supplied candidates

Set<String> result = new HashSet<>(candidates.size());

final long now = System.currentTimeMillis();
if (candidates.isEmpty() == false) {
    List<String> ids = new ArrayList<>(100);

    // bulk for every 100 documents
    for (String candidate : candidates) {
        ids.add(candidate);
        if (ids.size() >= 100) {
            result.addAll(retrieveExpiredIds(ids, now));
            ids.clear();
        }
    }
    result.addAll(retrieveExpiredIds(ids, now));
}

Every 100 documents retrieveExpiredIds is called, which looks like this:

private List<String> retrieveExpiredIds(List<String> ids, long expirationInMillis) {
  try {
    final MgetResponse<ElasticsearchSession> response =
        client.mget(b -> b.index(INDEX).ids(ids), ElasticsearchSession.class);

    final List<MultiGetResponseItem<ElasticsearchSession>> docs = response.docs();
    return docs.stream()
        // successful responses only
        .filter(MultiGetResponseItem::isResult)
        .map(MultiGetResponseItem::result)
        // if the hit is not found it should be expired as well
        .filter(hit -> hit.found() == false || 
             hit.source().epochExpiry() > 0 && 
             hit.source().epochExpiry() <= expirationInMillis)
        .map(GetResult::id)
        .toList();
  } catch (Exception e) {
    logger.error("error mget'ing sessions", e);
    return Collections.emptyList();
  }
}

This issues a Multi GET request, then checks the response if a proper result was returned, maps down to the result and checks if the expiration time was exceeded.

So this collects the correct ids that really need to be expired. On top of that comes a second query, that searches for any sessions that are expired, but not part of this list.

try {
  final SearchResponse<ElasticsearchSession> response =
      client.search(b -> b.index(INDEX)
          .query(q -> q.range(rq -> rq
            .field("epoch_expiry")
            .lt(JsonData.of(upperBound))
            .gt(JsonData.of(0)))
          )
          .size(10_000), ElasticsearchSession.class);
  response.hits().hits().stream().map(Hit::id).forEach(result::add);
} catch (IOException e) {
    logger.info("error search for expired sessions", e);
}

The above approach has a few limitations, like being limited to 10k documents per call - if you had a ton of sessions this would probably be not enough, but for our use case it was. Especially with the optimization mentioned below.

With this, the basic implementation is finished and we can take a quick look regarding testing.

Testing

Testing is done via Testcontainers. Starting an Elasticsearch instance and then storing sessions, retrieving them back and making sure the serialized entities could be retrieved.

One test was to start two Javalin instances, and then setting the session in one instance and making sure it could be retrieved in the second instance. Strictly speaking this still does not resemble a real live integration test, as both Javalin applications are residing in the same JVM and could theoretically share resources - which I could not find evidence they actually do:

Javalin app1 = Javalin.create(cfg -> 
    cfg.sessionHandler(() -> createSessionHandler(client)));
app1.get("/", ctx -> {});
app1.get("/set", ctx -> {
    ctx.sessionAttribute("user", "value");
});

Javalin app2 = Javalin.create(cfg -> 
    cfg.sessionHandler(() -> createSessionHandler(client)));
app2.get("/", ctx -> {
    final String key = ctx.sessionAttribute("user");
    ctx.result(key);
});

app1.start(0);
app2.start(0);

After starting both apps and binding them to random ports, one request gets send to the first app, and then a second request gets sent to the second app.

final HttpClient client = HttpClient.newHttpClient();
// set session attribute
response = client.send(HttpRequest.newBuilder()
        .GET()
        .uri(URI.create("http://localhost:" + app1.port() + "/set"))
        .build(), ofString());
assertThat(response.statusCode()).isEqualTo(200);

// extract session cookie
final String sessionCookie = response.headers().firstValue("set-cookie").get();

// second request retrieves session attribute
response = client.send(HttpRequest.newBuilder()
        .GET()
        .header("Cookie", sessionCookie)
        .uri(URI.create("http://localhost:" + app2.port()))
        .build(), ofString());
assertThat(response.statusCode()).isEqualTo(200);
assertThat(response.body()).isEqualTo("value");

Note: This is an incomplete test. If you want to make sure this works for your custom application, make sure to store the same entities in your session. If you use a POJO, it must implement the Serializable interface in order to get persisted. Keep that in mind when writing your tests.

Session persistence

With the initial implementation of my session handler I ran into an unexpected problem. Shortly after deploying the initial version and letting it run in production I checked the index containing the sessions and it showed me 250k entries. The website wasn’t even promoted yet, so how could that be. The session expiry time wasn’t hit yet, so that must have been valid sessions.

Turns out that most visitors are not humans to any kind of website today. This was all traffic from security scanners and very few indexing bots. I knew there was a lot of scanners out there, but I was blown away by the amount of requests for an application not being promoted at all.

What happened here from a java perspective? Every request creates a session. Now that’s something that probably is not true for every web application, as it is possible to only create a session, if someone adds an attribute to it. Turns out the an underlying dependency was always adding data to the session so it always got persisted - pac4j in this case, because of CSRF handling.

From an application perspective however, storing all the sessions was not necessary. There is only need to save a session if the user was logged in. The solution was to add a persist_session boolean to the session once logged in, and check if that one was set in the session store implementation.

app.after("/login", ctx -> {
  ctx.sessionAttribute("persist_session", true);
});

So only when the login was successful, we add the session attribute. The session store got a modification like this:

@Override
public void doStore(String id, SessionData data, 
                    long lastSaveTime) throws Exception {
  Boolean persistSession = (Boolean) data.getAttribute("persist_session");
  if (persistSession == null || persistSession == false) {
    logger.debug("Not persisting session id [{}], no user information", id);
    return;
  }

  // serialize session 

  // store session
  client.index(b -> b.index(INDEX).id(id).document(session));
}

This practically removed the issue of storing a lot of sessions, down to the few people who actually logged into the platform. This however only works, the data that was stored in the session by other parts of your application does not need to be shared and that’s something you need to test out.

Integration in Javalin

The final step is the integration in Javalin. The code needed for custom session handlers looks like this:

this.app = Javalin.create(cfg -> {
  cfg.sessionHandler(() -> createSessionHandler(elasticsearchClient));
});


public static SessionHandler createSessionHandler(ElasticsearchClient client) {
  SessionHandler sessionHandler = new SessionHandler();
  sessionHandler.setMaxInactiveInterval(86_400 * 7);
  sessionHandler.setHttpOnly(true);
  sessionHandler.setSameSite(HttpCookie.SameSite.STRICT);
  sessionHandler.setSecureRequestOnly(true);

  SessionCache sessionCache = new NullSessionCache(sessionHandler);
  sessionCache.setSaveOnCreate(true);
  sessionCache.setFlushOnResponseCommit(true);
  sessionCache.setSessionDataStore(new ElasticsearchSessionDataStore(client));

  sessionHandler.setSessionCache(sessionCache);
  return sessionHandler;
}

The setup contains one tiny details and that is the use of the NullSessionCache. This session cache does do any local caching but immediate persistence into the session data store. The default implementation uses a hash map for caching, which can lead to problems, if you have two instances of your app running and your load balancer is doing round robin distribution. This would work, if the load balancer is doing session fixation, but the question remains, when the session is persisted. So for simplicity I went with the non-caching cache here, so I knew how thing would work out.

What left me puzzled about the above code is the circular dependency. There is

SessionHandler sessionHandler = new SessionHandler();
SessionCache sessionCache = new NullSessionCache(sessionHandler);
sessionHandler.setSessionCache(sessionCache);

but if that makes things work, so be it… I have not dug deeper why this is the case, but might do so in the future.

Keeping dependencies in sync

The last integration part is some hygiene with regards to dependencies. Whenever the Javalin dependency gets updated, I would like to not care about the underlying Jetty version but make sure that the jetty-nosql dependency is always up-to-date with the jetty dependency from Javalin. I came up with this snippet:

dependencies {
    def javalinVersion = '4.3.0'
    components.all(JettyBomAlignmentRule)

    implementation "io.javalin:javalin:${javalinVersion}"
    implementation 'org.eclipse.jetty:jetty-nosql'
}


// make sure that we do not have to care for the jetty-nosql version dependency
abstract class JettyBomAlignmentRule implements ComponentMetadataRule {
  void execute(ComponentMetadataContext ctx) {
    ctx.details.with {
      if (id.group.startsWith("org.eclipse.jetty")) {
          belongsTo("org.eclipse.jetty:jetty-bom:${id.version}", false)
      }
    }
  }
}

Readiness & liveness probes

If you want to know more about proces, check out the k8s documentation, but it probably makes sense to briefly touch here as it is part of the whole rollout process.

livenessProbe:
  failureThreshold: 3
  periodSeconds: 30
  httpGet:
    path: /my-health-endpoint
    port: 8080
readinessProbe:
  failureThreshold: 15
  initialDelaySeconds: 10
  periodSeconds: 5
  httpGet:
    path: /my-health-endpoint
    port: 8080

Depending on your application the definition of liveness and readyness may be different or also dependent on other services (i.e. the availability of the underlying data store). In this example both are using the same endpoints and within the Javalin app readiness can be described as the moment when the application is listening on the configured port. The same applies to liveness. I also did not consider the need for startup probes in this example.

Now when running

kubectl rollout restart deployment/my-neat-app
kubectl get pods -w

you can watch your pods being replaced one-by-one and check in between if your application is still reachable and works as expected when being logged in.

Summary

That was the guide to downtime free rollouts for the ElasticCC app. Documentation for the jetty nosql part is rather sparse and I will need to take a look how later versions of Jetty are supporting this as Javalin is currently on a rather old version (major version 9).

The other workaround of storing state on the client side and do a lookup for every request that comes in for the user might be a good idea to keep complexity on the server side low, but needs to ensure that all the needed data is serialized or stored.

Either solution will take some additional work to ensure your application is ready for rollouts with downtime, but I consider this an absolute crucial thing to implement.

Happy session handling!

Final remarks

You can follow or ping me on twitter, GitHub or reach me via Email (just to tell me, you read this whole thing :-).

If there is anything to correct, drop me a note, and I am happy to do so and append to this post!

Same applies for questions. If you have a question, go ahead and ask!

If you want me to speak about this, drop me an email!

Resources


Back to posts