Table of Contents
TLDR; This is a series of blog posts x-raying the platform we used to run the Elastic Community Conference 2022 including choices of the platform, technologies and persistence options chosen as well as monitoring and APM capabilities.
In this series we will cover
- Part 1: Application structure
- Part 2: Using the new Elasticsearch Java Client
- Part 3: Upgrading on k8s without downtime
Rollouts without downtime
As the times from quarterly updates with weekend long downtimes are luckily over since a decade, you have to think about the ability to easily update an application - without the user noticing. If you are running on k8s, you might be lucky and this is only a single command away, after you updated your docker images.
kubectl rollout restart deployment/my-neat-app
The basic idea is to start up a new instance of the application, wait until it is ready, then move the traffic over to that instance and stop the already running instance. This is can be done with a single instance or with ten instances, by rolling over one after another.
There are a couple of a caveats, the biggest ones are data stores being in use and state.
If you are using a data store you have to make sure, that the old and new version of your app are able to cope with potential changes like a mapping update or a type change or a reindex operation. So, whenever you modify you data definition, this needs to be properly tested - and that rollout might need to be decoupled from rolling out your new feature. In the case of the ElasticCC app this was nothing to take care of, as we there were no mapping changes to our Elasticsearch data.
The second part is however more tricky. Every Java Servlet based application has the concept of a session, that is needed when a user is logged in to identify the user on the server side. By default the mapping is done via a cookie that contains the session ID. This means the state is on the server side. When you shutdown that JVM instance, the state is gone, because it is usually held in memory.
One possible solution for this is to move the state to the client side. You could set a cookie containing your state and either sign or encrypt that cookie, so that it cannot be modified on the client side. This way all your state is on the client side. Storing the user ID in there and looking it up with every request would work. I created such a cookie store in the javalin-cookie-session-store GitHub project.
The other alternative is to implement a custom session store, storing sessions in a data store instead of memory. Luckily Jetty has an established interface for that.
Jetty and session stores
When I ran a side project on Digital Ocean App Platform I wanted to make sure to not loose any sessions, so I decided to give the JDBCSessionDataStore a try, that comes with jetty and thus is also available in Javalin. This worked great, until I figured out the sessions were not properly expired and thus I ended up with millions of them after a month. We’ll get to that problem later… 😀
So, that said there is the possibility to store sessions in a data store. If you’ve read the other parts until here, you will notice, that the only data store we plan to use is Elasticsearch.
Elasticsearch Session Datastore
Finally it’s time to come up with some wisdom and whip out the
If all you have is a hammer, everything looks like a nail
phrase from Maslow. So my choice now was to introduce a SQL database or some Redis component to my app, or just use Elasticsearch.
I wanted to explore the idea of an Elasticsearch Session Data Store - which was another possibility to test the new Elasticsearch Java Client.
Jetty already has a base class for custom data store implementations named
NoSqlSessionDataStore
that extends AbstractSessionDataStore
.
Some of the methods that need to be implemented - summarized from a few interfaces:
void store(String id, SessionData data) throws Exception;
SessionData load(String id) throws Exception;
boolean delete(String id) throws Exception;
boolean exists(String id) throws Exception;
Set<String> getExpired(Set<String> candidates);
Just judging by the names of those methods this pretty much resembles a CRUD
use case operating on single documents. The only exception is the
getExpired()
method, that can optionally check for expired sessions so
they get cleaned up.
ElasticsearchSessionDataStore setup
First, let’s model a session, with all its properties. From a user
perspective the most important part is the session_data
which includes the
serialization of the session.
public record ElasticsearchSession(
@JsonProperty("id") String id,
@JsonProperty("context_path") String contextPath,
@JsonProperty("vhost") String vhost,
@JsonProperty("last_node_updated") String lastNodeUpdated,
@JsonProperty("epoch_created") Long epochCreated,
@JsonProperty("epoch_accessed") Long epochAccessed,
@JsonProperty("epoch_last_accessed") Long epochLastAccessed,
@JsonProperty("epoch_cookie_set") Long epochCookieSet,
@JsonProperty("epoch_expiry") Long epochExpiry,
@JsonProperty("max_inactive_ms") Long maxInactiveMs,
@JsonProperty("session_data") String sessionData) {
}
The basic idea here is to serialize the session data, and then encode this as Base64 string so that it can be easily stored in Elasticsearch. This is one of the reasons why you probably would not do this on a high traffic website, as that would eat CPU and requires 25% additional space, that could be prevented. For out modest use-case this was good enough.
Next up is the AbstractSessionDataStore
implementation, showing all the
required methods without implementations:
@ManagedObject
public class ElasticsearchSessionDataStore extends NoSqlSessionDataStore {
private static final Logger logger =
LogManager.getLogger(ElasticsearchSessionDataStore.class);
public static final String INDEX = "elasticcc-jetty-sessions";
private final ElasticsearchClient client;
public ElasticsearchSessionDataStore(ElasticsearchClient client) {
this.client = client;
}
@Override
protected void doStart() throws Exception {
// mapping and index creation
}
@Override
public void doStore(String id, SessionData data, long lastSaveTime) {
// serialization
}
@Override
public SessionData doLoad(String id) throws Exception {
// deserialization
}
@Override
public Set<String> doGetExpired(Set<String> candidates) {
//expiration check
}
@Override
@ManagedAttribute(value = "does store serialize sessions", readonly = true)
public boolean isPassivating() {
return true;
}
@Override
public boolean exists(String id) throws Exception {
}
@Override
public boolean delete(String id) {
}
}
As you can see the total number of methods needed is rather low. The total implementation ended up at around 300 lines.
Index creation & mapping
The constructor of the data store takes an Elasticsearch client, we saw in a previous post how that one is created.
On every startup I want to make sure that the mapping is correct and create the index if needed - usually this is not needed on production, but when running tests. First step is to create an enum for all fields
private enum Fields {
Id,
ContextPath,
Vhost,
LastNodeUpdated,
SessionData,
MaxInactiveMs,
EpochCreated,
EpochAccessed,
EpochLastAccessed,
EpochCookieSet,
EpochExpiry;
private final String fieldName;
Fields() {
this.fieldName = new PropertyNamingStrategies.SnakeCaseStrategy().translate(this.name());
}
}
The fieldName
will contain the snake
case variant, i.e. ContextPath
will become context_path
, which is used as JSON field name.
@Override
protected void doStart() throws Exception {
super.doStart();
final BooleanResponse exists = client.indices().exists(b -> b.index(INDEX));
if (exists.value() == false) {
final CreateIndexResponse createResponse =
client.indices().create(b -> b.index(INDEX));
if (createResponse.acknowledged() == false) {
throw new RuntimeException("Session index creation was not acknowledged");
}
}
...
After index creation the mapping should be created. For an existing index,
no error should be returned, if the mapping already exists. You can see the
use of the fieldName
property, so make sure the snake case version is
used.
Map<String, Property> properties = new HashMap<>();
Property keywordProperty = Property.of(pb -> pb.keyword(builder -> builder));
properties.put(Fields.Id.fieldName, keywordProperty);
properties.put(Fields.ContextPath.fieldName, keywordProperty);
properties.put(Fields.Vhost.fieldName, keywordProperty);
properties.put(Fields.LastNodeUpdated.fieldName, keywordProperty);
Property dateProperty =
Property.of(pb -> pb.date(builder -> builder.format("epoch_millis")));
properties.put(Fields.EpochCreated.fieldName, dateProperty);
properties.put(Fields.EpochAccessed.fieldName, dateProperty);
properties.put(Fields.EpochLastAccessed.fieldName, dateProperty);
properties.put(Fields.EpochCookieSet.fieldName, dateProperty);
properties.put(Fields.EpochExpiry.fieldName, dateProperty);
properties.put(Fields.SessionData.fieldName,
Property.of(pb -> pb.binary(builder -> builder)));
properties.put(Fields.MaxInactiveMs.fieldName,
Property.of(pb -> pb.long_(nb -> nb)));
client.indices().putMapping(b -> b.index(INDEX).properties(properties));
After setting the mapping, we also want to make sure, that the mapping is actually correct. After receiving the mapping, you can try to retrieve the concrete type, and if the mapping type is different an exception is thrown. This way, you can ensure the correct mapping.
// check for correct mapping and bail if it does not work
final GetMappingResponse mapping = client.indices().getMapping(b -> b.index(INDEX));
final Map<String, Property> properties = mapping.get(INDEX).mappings().properties();
// this ensures the right types are set
properties.get(Fields.Id.fieldName).keyword();
properties.get(Fields.ContextPath.fieldName).keyword();
properties.get(Fields.Vhost.fieldName).keyword();
properties.get(Fields.LastNodeUpdated.fieldName).keyword();
properties.get(Fields.SessionData.fieldName).binary();
properties.get(Fields.MaxInactiveMs.fieldName).long_();
properties.get(Fields.EpochCreated.fieldName).date();
properties.get(Fields.EpochAccessed.fieldName).date();
properties.get(Fields.EpochLastAccessed.fieldName).date();
properties.get(Fields.EpochCookieSet.fieldName).date();
properties.get(Fields.EpochExpiry.fieldName).date();
So now, as part of starting the session store, index creation and mapping check is properly handled. Time to figure out how a session is serialized.
Serializing & storing a session
The serialization is pretty much the same for all data stores. You need to figure out a way to represent the session object in a format suitable for the data store.
@Override
public void doStore(String id, SessionData data, long lastSaveTime) throws Exception {
logger.debug("Storing session id [{}]", id);
final String base64Session;
try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos)) {
SessionData.serializeAttributes(data, oos);
oos.flush();
base64Session = Base64.getEncoder().encodeToString(bos.toByteArray());
}
ElasticsearchSession session = new ElasticsearchSession(id,
_context.getCanonicalContextPath(), _context.getVhost(),
data.getLastNode(), data.getCreated(), data.getAccessed(),
data.getLastAccessed(), data.getCookieSet(), data.getExpiry(),
data.getMaxInactiveMs(),
base64Session);
client.index(b -> b.index(INDEX).id(id).document(session));
}
The core of this code snippet is to serialize the session attributes via
SessionData.serializeAttributes()
as an object output stream right into a
byte array, which then gets encoded via Base64, so this can be stored as a
string.
Note: This seemingly simple implementation comes at a prize. If your
classes do not implement java.io.Serializable
, then serialization into an
ObjectOutputStream
will not work and the object will not be part of the
output stream, as if they had not been added to the HttpSession
.
Our User
entity needed a slight change to properly support serialization:
public class User implements Serializable {
@Serial
private static final long serialVersionUID = 2563834867543039733L;
}
with this change serialization of the sessions works as expected.
Loading a session
Loading the session is just inverting the workflow of storing. Retrieving from Elasticsearch, base64 decoding and deserializing of that input stream to an object.
@Override
public SessionData doLoad(String id) throws Exception {
GetResponse<ElasticsearchSession> response;
try {
response = client.get(b -> b.index(INDEX).id(id), ElasticsearchSession.class);
if (response.found() == false) {
return null;
}
final ElasticsearchSession session = response.source();
SessionData sessionData = newSessionData(id, session.epochCreated(),
session.epochAccessed(), session.epochLastAccessed(),
session.maxInactiveMs());
sessionData.setContextPath(session.contextPath());
sessionData.setVhost(session.vhost());
sessionData.setLastNode(session.lastNodeUpdated());
sessionData.setCookieSet(session.epochCookieSet());
sessionData.setExpiry(session.epochExpiry());
final byte[] sessionBytes = Base64.getDecoder().decode(session.sessionData());
try (ByteArrayInputStream bis = new ByteArrayInputStream(sessionBytes);
DataInputStream dis = new DataInputStream(bis);
ClassLoadingObjectInputStream ois =
new ClassLoadingObjectInputStream(dis)) {
SessionData.deserializeAttributes(sessionData, ois);
}
return sessionData;
} catch (Exception e) {
logger.error("Error retrieving session id [" + id + "]", e);
return null;
}
}
Again there is no custom code for serializing/deserializing JSON, which makes this really nice to read.
Delete a session
Sessions can be deleted in two ways. When a user logs out by explicitly triggering this (clicking the log out button) or when a session expires. This means the session store needs a way to delete a session, which is rather short in our implementation, as we can just delegate to Elasticsearch.
@Override
public boolean delete(String id) {
logger.debug("Deleting of session id [{}]", id);
try {
final DeleteResponse response = client.delete(b -> b.index(INDEX).id(id));
return response.result() != Result.NotFound;
} catch (Exception e) {
return false;
}
}
Existence check
Checking for the existence of a session looks like this. Jetty uses this to not create an session with an existing session id.
@Override
public boolean exists(String id) throws Exception {
final boolean exists = client.exists(b -> b.index(INDEX).id(id)).value();
logger.debug("Checking for existence of session id [{}]: [{}]", id, exists);
return exists;
}
Retrieve expired sessions
The final part of the session store deals with expiration. Once a user logged in, but stopped using the app, the session ages and probably is not used again, but still takes up space. This is where the expiration mechanism comes in.
The method to implement is named
@Override
public Set<String> doGetExpired(Set<String> candidates) {
}
The argument is a list of possible candidates to expire that need to be checked. However most session stores do some additional work and also search for expired sessions on top of those supplied candidates.
So the first step is to check for the supplied candidates
Set<String> result = new HashSet<>(candidates.size());
final long now = System.currentTimeMillis();
if (candidates.isEmpty() == false) {
List<String> ids = new ArrayList<>(100);
// bulk for every 100 documents
for (String candidate : candidates) {
ids.add(candidate);
if (ids.size() >= 100) {
result.addAll(retrieveExpiredIds(ids, now));
ids.clear();
}
}
result.addAll(retrieveExpiredIds(ids, now));
}
Every 100
documents retrieveExpiredIds
is called, which looks like this:
private List<String> retrieveExpiredIds(List<String> ids, long expirationInMillis) {
try {
final MgetResponse<ElasticsearchSession> response =
client.mget(b -> b.index(INDEX).ids(ids), ElasticsearchSession.class);
final List<MultiGetResponseItem<ElasticsearchSession>> docs = response.docs();
return docs.stream()
// successful responses only
.filter(MultiGetResponseItem::isResult)
.map(MultiGetResponseItem::result)
// if the hit is not found it should be expired as well
.filter(hit -> hit.found() == false ||
hit.source().epochExpiry() > 0 &&
hit.source().epochExpiry() <= expirationInMillis)
.map(GetResult::id)
.toList();
} catch (Exception e) {
logger.error("error mget'ing sessions", e);
return Collections.emptyList();
}
}
This issues a Multi GET request, then checks the response if a proper result was returned, maps down to the result and checks if the expiration time was exceeded.
So this collects the correct ids that really need to be expired. On top of that comes a second query, that searches for any sessions that are expired, but not part of this list.
try {
final SearchResponse<ElasticsearchSession> response =
client.search(b -> b.index(INDEX)
.query(q -> q.range(rq -> rq
.field("epoch_expiry")
.lt(JsonData.of(upperBound))
.gt(JsonData.of(0)))
)
.size(10_000), ElasticsearchSession.class);
response.hits().hits().stream().map(Hit::id).forEach(result::add);
} catch (IOException e) {
logger.info("error search for expired sessions", e);
}
The above approach has a few limitations, like being limited to 10k documents per call - if you had a ton of sessions this would probably be not enough, but for our use case it was. Especially with the optimization mentioned below.
With this, the basic implementation is finished and we can take a quick look regarding testing.
Testing
Testing is done via Testcontainers. Starting an Elasticsearch instance and then storing sessions, retrieving them back and making sure the serialized entities could be retrieved.
One test was to start two Javalin instances, and then setting the session in one instance and making sure it could be retrieved in the second instance. Strictly speaking this still does not resemble a real live integration test, as both Javalin applications are residing in the same JVM and could theoretically share resources - which I could not find evidence they actually do:
Javalin app1 = Javalin.create(cfg ->
cfg.sessionHandler(() -> createSessionHandler(client)));
app1.get("/", ctx -> {});
app1.get("/set", ctx -> {
ctx.sessionAttribute("user", "value");
});
Javalin app2 = Javalin.create(cfg ->
cfg.sessionHandler(() -> createSessionHandler(client)));
app2.get("/", ctx -> {
final String key = ctx.sessionAttribute("user");
ctx.result(key);
});
app1.start(0);
app2.start(0);
After starting both apps and binding them to random ports, one request gets send to the first app, and then a second request gets sent to the second app.
final HttpClient client = HttpClient.newHttpClient();
// set session attribute
response = client.send(HttpRequest.newBuilder()
.GET()
.uri(URI.create("http://localhost:" + app1.port() + "/set"))
.build(), ofString());
assertThat(response.statusCode()).isEqualTo(200);
// extract session cookie
final String sessionCookie = response.headers().firstValue("set-cookie").get();
// second request retrieves session attribute
response = client.send(HttpRequest.newBuilder()
.GET()
.header("Cookie", sessionCookie)
.uri(URI.create("http://localhost:" + app2.port()))
.build(), ofString());
assertThat(response.statusCode()).isEqualTo(200);
assertThat(response.body()).isEqualTo("value");
Note: This is an incomplete test. If you want to make sure this works
for your custom application, make sure to store the same entities in your
session. If you use a POJO, it must implement the Serializable
interface in order to get persisted. Keep that in mind when writing your
tests.
Session persistence
With the initial implementation of my session handler I ran into an unexpected problem. Shortly after deploying the initial version and letting it run in production I checked the index containing the sessions and it showed me 250k entries. The website wasn’t even promoted yet, so how could that be. The session expiry time wasn’t hit yet, so that must have been valid sessions.
Turns out that most visitors are not humans to any kind of website today. This was all traffic from security scanners and very few indexing bots. I knew there was a lot of scanners out there, but I was blown away by the amount of requests for an application not being promoted at all.
What happened here from a java perspective? Every request creates a session. Now that’s something that probably is not true for every web application, as it is possible to only create a session, if someone adds an attribute to it. Turns out the an underlying dependency was always adding data to the session so it always got persisted - pac4j in this case, because of CSRF handling.
From an application perspective however, storing all the sessions was not
necessary. There is only need to save a session if the user was logged in. The
solution was to add a persist_session
boolean to the session once logged
in, and check if that one was set in the session store implementation.
app.after("/login", ctx -> {
ctx.sessionAttribute("persist_session", true);
});
So only when the login was successful, we add the session attribute. The session store got a modification like this:
@Override
public void doStore(String id, SessionData data,
long lastSaveTime) throws Exception {
Boolean persistSession = (Boolean) data.getAttribute("persist_session");
if (persistSession == null || persistSession == false) {
logger.debug("Not persisting session id [{}], no user information", id);
return;
}
// serialize session
// store session
client.index(b -> b.index(INDEX).id(id).document(session));
}
This practically removed the issue of storing a lot of sessions, down to the few people who actually logged into the platform. This however only works, the data that was stored in the session by other parts of your application does not need to be shared and that’s something you need to test out.
Integration in Javalin
The final step is the integration in Javalin. The code needed for custom session handlers looks like this:
this.app = Javalin.create(cfg -> {
cfg.sessionHandler(() -> createSessionHandler(elasticsearchClient));
});
public static SessionHandler createSessionHandler(ElasticsearchClient client) {
SessionHandler sessionHandler = new SessionHandler();
sessionHandler.setMaxInactiveInterval(86_400 * 7);
sessionHandler.setHttpOnly(true);
sessionHandler.setSameSite(HttpCookie.SameSite.STRICT);
sessionHandler.setSecureRequestOnly(true);
SessionCache sessionCache = new NullSessionCache(sessionHandler);
sessionCache.setSaveOnCreate(true);
sessionCache.setFlushOnResponseCommit(true);
sessionCache.setSessionDataStore(new ElasticsearchSessionDataStore(client));
sessionHandler.setSessionCache(sessionCache);
return sessionHandler;
}
The setup contains one tiny details and that is the use of the
NullSessionCache
. This session cache does do any local caching but
immediate persistence into the session data store. The default
implementation uses a hash map for caching, which can lead to problems, if
you have two instances of your app running and your load balancer is doing
round robin distribution. This would work, if the load balancer is doing
session fixation, but the question remains, when the session is persisted.
So for simplicity I went with the non-caching cache here, so I knew how
thing would work out.
What left me puzzled about the above code is the circular dependency. There is
SessionHandler sessionHandler = new SessionHandler();
SessionCache sessionCache = new NullSessionCache(sessionHandler);
sessionHandler.setSessionCache(sessionCache);
but if that makes things work, so be it… I have not dug deeper why this is the case, but might do so in the future.
Keeping dependencies in sync
The last integration part is some hygiene with regards to dependencies.
Whenever the Javalin dependency gets updated, I would like to not care about
the underlying Jetty version but make sure that the jetty-nosql
dependency
is always up-to-date with the jetty dependency from Javalin. I came up with
this snippet:
dependencies {
def javalinVersion = '4.3.0'
components.all(JettyBomAlignmentRule)
implementation "io.javalin:javalin:${javalinVersion}"
implementation 'org.eclipse.jetty:jetty-nosql'
}
// make sure that we do not have to care for the jetty-nosql version dependency
abstract class JettyBomAlignmentRule implements ComponentMetadataRule {
void execute(ComponentMetadataContext ctx) {
ctx.details.with {
if (id.group.startsWith("org.eclipse.jetty")) {
belongsTo("org.eclipse.jetty:jetty-bom:${id.version}", false)
}
}
}
}
Readiness & liveness probes
If you want to know more about proces, check out the k8s documentation, but it probably makes sense to briefly touch here as it is part of the whole rollout process.
livenessProbe:
failureThreshold: 3
periodSeconds: 30
httpGet:
path: /my-health-endpoint
port: 8080
readinessProbe:
failureThreshold: 15
initialDelaySeconds: 10
periodSeconds: 5
httpGet:
path: /my-health-endpoint
port: 8080
Depending on your application the definition of liveness and readyness may be different or also dependent on other services (i.e. the availability of the underlying data store). In this example both are using the same endpoints and within the Javalin app readiness can be described as the moment when the application is listening on the configured port. The same applies to liveness. I also did not consider the need for startup probes in this example.
Now when running
kubectl rollout restart deployment/my-neat-app
kubectl get pods -w
you can watch your pods being replaced one-by-one and check in between if your application is still reachable and works as expected when being logged in.
Summary
That was the guide to downtime free rollouts for the ElasticCC app. Documentation for the jetty nosql part is rather sparse and I will need to take a look how later versions of Jetty are supporting this as Javalin is currently on a rather old version (major version 9).
The other workaround of storing state on the client side and do a lookup for every request that comes in for the user might be a good idea to keep complexity on the server side low, but needs to ensure that all the needed data is serialized or stored.
Either solution will take some additional work to ensure your application is ready for rollouts with downtime, but I consider this an absolute crucial thing to implement.
Happy session handling!
Final remarks
You can follow or ping me on twitter, GitHub or reach me via Email (just to tell me, you read this whole thing :-).
If there is anything to correct, drop me a note, and I am happy to do so and append to this post!
Same applies for questions. If you have a question, go ahead and ask!
If you want me to speak about this, drop me an email!
Resources
- Javalin
- Javalin Jetty Session Handling - Persisting, caching and clustering
- Testcontainers
- Jetty NoSqlSessionDataStore
- K8s - Configure Liveness, Readiness and Startup Probes