Alexander Reelsen

Backend developer, productivity fan, likes distributed systems & the new serverless era

Understanding Elasticsearch Persistent Tasks
May 3, 2021
10 minutes read

TLDR; In this post we’ll dive into the persistent tasks infrastructure of Elasticsearch, that allows you to create tasks, that are visible across the cluster.

Persistent tasks are tasks that are stored in the Elasticsearch Cluster State and run somewhere on a node within the cluster. So, how to approach this? I usually try to solve the problem from the outside to the inside. Outside here still means inside of Elasticsearch of course. But as plugin developers can register their own tasks, this looks like a good place to start.

Before we dive into the gory details of persistent tasks, let’s take a step back and talk about what a task is within Elasticsearch. Every transport action and rest endpoint that gets executed is represented as a task.

In order to get a first handle on what tasks are you can check out the docs on Task management API. The most important part here is the ability to list tasks running in the cluster.

Side note: These kind of tasks are not to be confused with the pending cluster tasks, which are representing cluster level changes (within the cluster state) that have not been executed yet.

Let’s take a look at a single task response from listing the tasks

"LJiKRO1oT0aNj-9-WcniRw:425945" : {
  "node" : "LJiKRO1oT0aNj-9-WcniRw",
  "id" : 425945,
  "type" : "transport",
  "action" : "cluster:monitor/tasks/lists",
  "description" : "",
  "start_time" : "2021-05-03T08:26:05.097Z",
  "start_time_in_millis" : 1620030365097,
  "running_time" : "1.5ms",
  "running_time_in_nanos" : 1566445,
  "cancellable" : false,
  "headers" : { }
}

The first ID is the task ID (everything before the colon represents the node id). As the moment you are calling the API the listing of tasks is not yet finished this will always be returned. However there will also be a node specific listing of tasks looking like this

"LJiKRO1oT0aNj-9-WcniRw:425949" : {
  "node" : "LJiKRO1oT0aNj-9-WcniRw",
  "id" : 425949,
  "type" : "direct",
  "action" : "cluster:monitor/tasks/lists[n]",
  "description" : "",
  "start_time" : "2021-05-03T08:26:05.097Z",
  "start_time_in_millis" : 1620030365097,
  "running_time" : "48.3micros",
  "running_time_in_nanos" : 48377,
  "cancellable" : false,
  "parent_task_id" : "LJiKRO1oT0aNj-9-WcniRw:425945",
  "headers" : { }
},

You can see the action name with a [n] suffix. Another difference is the task type, in this case transport or direct.

One of the advantages of registering everything as a task is the ability to identify every task as well. This in turn means, you can also cancel tasks or wait for their completion using the task management API. This is also important to be able to abort search requests where the initiating request from the client has already been aborted in order to not use resources on those.

You may have stumbled over the .tasks index already and may wonder, what the link to that one is. If you are curious, you can check out the TaskManager, which keeps track of currently running tasks (any task) and the TaskResultsService which is able to store the result of a task execution in the .tasks index. Such a task result could be a re-index operation that was triggered via HTTP but is supposed to be run in the background by setting wait_for_completion to false.

There is however another task type - and that is persistent like in this rollup job.

"LJiKRO1oT0aNj-9-WcniRw:13272" : {
  "node" : "LJiKRO1oT0aNj-9-WcniRw",
  "id" : 13272,
  "type" : "persistent",
  "action" : "xpack/rollup/job[c]",
  "status" : {
    "job_state" : "stopped",
    "upgraded_doc_id" : true
  },
  "description" : "rollup_test",
  "start_time" : "2021-04-30T11:44:20.748Z",
  "start_time_in_millis" : 1619783060748,
  "running_time" : "14.4h",
  "running_time_in_nanos" : 52139005932570,
  "cancellable" : true,
  "parent_task_id" : "cluster:2",
  "headers" : { }
}

So, what is a persistent task? It’s mentioned in the Elasticsearch Docs and the first sentence states

Plugins can create a kind of tasks called persistent tasks.

And this gives us a great hint how to approach this problem, as this implies there is plugin support and thus an interface to implement.

That is the PersistentTaskPlugin interface, which is implemented by several plugins like CCR, IngestGeo, Machine Learning, Rollup and Transform. All of these requires tasks to be run in the background.

Looking at the PersistentTaskPlugin code, we see this

List<PersistentTasksExecutor<?>> getPersistentTasksExecutor( ... ) {
    return Collections.emptyList();
}

Leaving the arguments aside, we are expected to return a list of PersistentTasksExecutor instances. So let’s figure out what that is. Code comments to the rescue:

/**
 * An executor of tasks that can survive restart of
 * requesting or executing node.
 * These tasks are using cluster state rather than 
 * only transport service to send requests and responses.
 */
public abstract class 
    PersistentTasksExecutor<Params extends PersistentTaskParams> {

The package org.elasticsearch.persistent package is interesting in general. But let’s keep going step by step.

The PersistentTasksExecutor has a couple of interesting methods.

/**
 * Returns the node id where the params has to be executed,
 *
 * The default implementation returns the least loaded data node
 */
public Assignment getAssignment(Params params, ClusterState clusterState) {

This method returns an assignment on which node the task should be executed. The assignment contains a node ID and an optional explanation. The default logic of that decision is in the selectLeastLoadedNode which selects a node based on the minimum number of tasks that are running on a data node. The number of tasks however refers to exactly this task and not some sort of total tasks on that node.

Different implementations behave differently in what node they actually get assigned to. The ShardFollowTasksExecutor also ensures that the node can connect to remote clusters. The TransformPersistentTasksExecutor even checks if the remote role is needed plus some internal transform state.

There are a few more methods to take a look at. Next the validate method:

/**
* Checks the current cluster state for compatibility with the params
* 
* Throws an exception if the supplied params cannot be executed on the 
* cluster in the current state.
*/
public void validate(Params params, ClusterState clusterState) {}

This is a noop by default, however the ShardFollowTasksExecutor for Cross Cluster Replication uses this to ensure, that the shard to follow is active.

Next up is the createTask() method, that returns AllocatedPersistentTask - we’ll talk about the return type one in a minute.

/**
 * Creates a AllocatedPersistentTask for communicating with task manager
 */
protected AllocatedPersistentTask createTask(long id, String type,
                    String action, TaskId parentTaskId,
                    PersistentTask<Params> taskInProgress, 
                    Map<String, String> headers) {

    return new AllocatedPersistentTask(id, type, action, 
                    getDescription(taskInProgress), parentTaskId, headers);
}

The task that gets created here will be doing the actual execution including the ability to initialize code before a task and return a status - which is important for long running tasks.

One interesting part here is that the AllocatedPersistentTask is extending from CancellableTask from Task - which any action within Elasticsearch also extends from this class and allows to see the current running tasks. CancellableTask is used for long running tasks, that might need to be canceled, like when user aborts a search request - so this does not imply its state needs to be stored somewhere. For persistent tasks it makes sense to have them cancellable, because a node might need to do a clean shutdown - keep that in mind when creating your own persistent tasks.

The next method is an unspectacular one. It’s really just trying to return a human readable description, usually returning its ID.

/**
 * Returns task description that will be available via task manager
 */
protected String getDescription(PersistentTask<Params> taskInProgress) {
    return "id=" + taskInProgress.getId();
}

And finally the logic that is about to be executed in the nodeOperation method.

/**
 * This operation will be executed on the executor node.
 *
 * NOTE: The nodeOperation has to throw an exception, trigger
 * task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to
 * indicate that the persistent task has finished.
 */
protected abstract void nodeOperation(AllocatedPersistentTask task, 
                                      Params params, 
                                      @Nullable PersistentTaskState state);

This is where the magic happens. The GeoIpDownloader (used to download the geo IP databases in future releases) downloads the geo IP database. The TransformPersistentTasksExecutor is doing its transformations in there, the RollupJobPersistentTasksExecutor uses it to schedule a rollup job.

Every task (including persistent tasks) is free to write their result anywhere, most of them are returning data to the requesting client. Async search results are not stored in the .tasks index for example, but in the .async-search, as those are not the results of a task, but the results that need to be delivered to a client. If you do a re-index for example, you are interested, how fast the re-index operation is going on and when it is finished. This information gets stored in the .tasks index.

Persistent tasks have a bootstrap problem where to get their configuration data from and thus the cluster state is a good way to retrieve it from. There is also no HTTP operation to store a persistent task, as each plugin is responsible to provide REST actions for a concrete implementation of a persistent task like a rollup job.

So, how does a job get stored in the cluster state? Let’s look at a rollup job to show this:

GET _cluster/state?filter_path=**.persistent_tasks

returns

{
  "metadata" : {
    "persistent_tasks" : {
      "last_allocation_id" : 2,
      "tasks" : [
        {
          "id" : "test",
          "task" : {
            "xpack/rollup/job" : {
              "params" : {
                "config" : {
                  ...
                }
              }
            }
          },
          "allocation_id" : 2,
          "assignment" : {
            "executor_node" : "LJiKRO1oT0aNj-9-WcniRw",
            "explanation" : ""
          }
        }
      ]
    }
  }
}

The data in the cluster state shows the node this task gets executed on plus its whole configuration, so that it can be started automatically without human intervention.

How does one trigger a persistent task? This is where the PersistentTasksService comes into play, that needs to be called when a job is to be started as well as the PersistentTasksClusterService in order store persistent tasks in the cluster state.

Let’s see how this all falls together for a rollup job:

  1. A request comes in via REST to the RestPutRollupJobAction
  2. The request becomes a transport action, namely TransportPutRollupJobAction, which is a master node action
  3. The rollup index gets created
  4. The transport action calls another transport action name StartPersistentTaskAction
  5. This action calls PersistentTasksClusterService.createPersistentTask
  6. Final call clusterService.submitStateUpdateTask in order to persist the task in the cluster state
  7. After this the PeriodicRechecker ensures that tasks, that are not assigned to a node will be assigned

If a node fails, a reassignment will happen. This is a fundamental feature of those tasks, that they will be restarted elsewhere if nodes disconnect. As there are persistent tasks you also may need to switch nodes due to resource exhaustion (not enough memory), those checks also happen periodically in the background.

After we know what happens to store and trigger the job. Let’s examine what happens on the execution before calling it a day.

The class in question is RollupJobTask, this creates a task, but also schedules the runs via a scheduler engine in the background in RollupJobPersistentTasksExecutor.nodeOperation().

Once a job gets assigned to a node, the rollup job schedules the cronjob running in the background and unassigned that scheduling if the node gets unassigned from executing that job - see the RollupJobTask.onCancelled method. The execution of the rollup job itself is triggered, when the scheduler notifies the RollupJobTask.triggered() method.

And that is basically all the magic behind persistent tasks.

Summary

  • You cannot create persistent tasks yourself via an API, it’s a building block for plugins
  • Pretty much everything is a task in Elasticsearch (a search, an index operation, a nodes stats request)
  • Some tasks are running in the background and synchronize their states into the .tasks system index, or specialized index like the .async-search one.
  • Some tasks are not triggered from the outside (they can be created from the outside, but are triggered automatically, like a rollup job every 15 minutes). Those are persistent tasks.
  • Persistent tasks have their own logic where to execute, i.e. on a node with certain roles like a data node or if there is enough memory available
  • Persistent tasks store the result of their computation in specialized indices and do not use the .tasks one
  • If you want to implement your own persistent tasks, this requires you to write a plugin and implement PersistentTaskPlugin

Happy tasking!


Back to posts