Better Programming

Advice for programmers.

Follow publication

How to Write a Timestamp-Based Task Scheduler

Write a distributed, horizontally scalable task scheduler to ingest tasks and run at a predefined time

Adesh Srivastava
Better Programming
Published in
6 min readMar 23, 2020

--

Photo by Curtis MacNewton on Unsplash

We sometimes feel the need for a service in our design architecture that can ingest tasks for us and ensure their guaranteed execution at the preferred timestamp. A task can be anything from sending an email notification and text message to the customer, to hitting a third-party API, to scheduling another task — or it can be as simple as printing a message on the terminal.

What a task scheduler should do:

  • Ingest tasks with a predefined timestamp
  • Ensure each task is triggered on the given timestamp
  • Scale with a large number of ingesting tasks
  • Avoid SPOFs, if any
  • Avoid bottlenecks

Wait! Do we really need it?

Let’s see if Linux crons can solve this problem. All you need to do is have your script.sh ready and paste this much code on our crontab (crontab -e):

* * * * * ~/path/script.sh >> ~/path/script.log

* * * * * helps you decide the timestamps (as well as repetitions) for the script to run. The script will be repeatedly executed at the exact timestamp. Till here, everything works well.

But there is a catch. What if this machine goes down, or there are too many tasks to scale for a single piece of hardware?

Crontabs work very well with executable scripts. But that’s just it. They may not work in a distributed environment, or when you need to retry policies. They can act as SPOFs for the entire system. Suppose we need to send thousands of messages to customers in a morning slot and, unfortunately, the hardware goes down or there is a connection throttle? Having a single machine on any day is a bad idea. This problem needs a check.

So how do we solve this problem?

We will design our own in-house scheduler. It will be a horizontally scalable, independent microservice that will be able to ingest tasks at scale, hold them for the time being, and execute these tasks at the given timestamp. Let’s get started with the first step.

1. Ingest the Tasks Into the System

A task can be of multiple types, and each one is designated to do a different kind of thing, based on the type. For the sake of simplicity, let’s say task struct is just a message along with the timestamp at which the message is to be used. Once the user submits the request:

curl --location --request POST '{url}/tasks/submit' \
--data-raw '{
"message":"This is a demo task",
"timestamp":"2020-03-10T00:42:30"
}'
Ingest tasks into the system

We must return a unique TaskId back to the user. The idea is to use UUIDs as opposed to auto-incremental IDs. Once the request is properly validated, we must honor it by sending back a UUID as part of the response:

{
"status": {
"status": "SUCCEDED",
"count": 1
},
"data": [
{
"id": "21021f23-8df7-4bbf-a169-1227d3e49c63",
"message": "drredecff",
"timestamp": "2020-03-10T00:42:30.000+0000",
"version": 0,
"status": "SUBMITTED"
}
]
}

We have persistent data storage where we have settled the requested tasks in the SUBMITTED state. Now how do we poll the tasks once the time comes?

2. Poll the Tasks at the Given Timestamp

This is the trickiest part of the exercise. We will keep running this poller (set of threads) repeatedly after a time interval (configurable) to fetch all the threads whose timestamp is in the past compared to the current timestamp. This poller will spawn multiple threads from a node (same will be done by all the nodes) that’s responsible for fetching the predefined batch of tasks. Then it will rerun after a predefined time. We need to keep batch size, thread count, and interval configurable. Let’s look at a part of our application.properties file:

thread.count = 4
thread.interval = 20000
batch.size = 10000

It simply means that we need to start four threads, each of which will run after 20,000 ms of time and fetch a batch of 10,000 tasks. Here is one quick implementation of how we spawn these threads when the service starts:

A TaskProcessor would look like:

The process() method shall be marked Transactional, as multiple threads (configurable from the .properties file) will be polling the DB at a time from a single node.

We will keep the upserts ATOMIC to avoid dirty reads, just in case. Once the task is fetched by a thread, it will be marked as IN_PROGRESS in the same transaction.

As we want several threads to pick up tasks from the DB, we need to find batches. A Cursor class can serve that purpose for a single node, to break the task into batches:

Cursor => Query [Start Pointer, Batch Size]

We will use a Cursor (struct with CAS implementation) class to update the limit of the query for a single thread, as we do want other threads to access other batches parallelly.

When we do this polling, logs look somewhat like this:

2020-03-22 21:35:10.401  INFO 1378 --- [pool-3-thread-4] c.taskschedular.processor.TaskProcessor  : Thread pool-3-thread-4started/resumed ...
2020-03-22 21:35:10.401 INFO 1378 --- [pool-3-thread-1] c.taskschedular.processor.TaskProcessor : Thread pool-3-thread-1started/resumed ...
2020-03-22 21:35:10.401 INFO 1378 --- [pool-3-thread-3] c.taskschedular.processor.TaskProcessor : Thread pool-3-thread-3started/resumed ...
2020-03-22 21:35:10.402 INFO 1378 --- [pool-3-thread-1] com.taskschedular.dao.impl.TaskDaoImpl : Querying Tasks from Limit : (0,20) Before Sun Mar 22 21:35:10 IST 2020
2020-03-22 21:35:10.402 INFO 1378 --- [pool-3-thread-4] com.taskschedular.dao.impl.TaskDaoImpl : Querying Tasks from Limit : (40,20) Before Sun Mar 22 21:35:10 IST 2020

Once we scale the number of threads (for a single node), we will able to scale polling of the tasks, too. But there can be caveats as CPU cores/VM config come into the picture.

To delegate the task to other nodes as part of the distributed system, we can keep a commonplace (such as a Redis key) to update the Start Pointer from time to time (REDIS_INTERVAL). Also, we can make it configurable as to whether the cursor takes its initial value from the Redis. A snapshot of the CursorConfig class might look like this:

Once we have the tasks in memory, how do we process them at scale? There can be too many tasks at the given time frame and too few tasks at other times of the day.

3. Execute the Tasks

The simplest way to solve this problem as well as to achieve scale parallelly is to come down to the pub-sub model. Once we have fetched the message ensuring none is duplicate, we can simply put in a messaging queue (let’s use RabbitMQ) which ensures that:

  • Once the message has been put into the exchange by the publisher thread, the binding queue shall receive the message.
  • Once the queue has the message, it will deliver it asap to one of the concurrent consumers. (The message cannot be delivered twice, and there should be a proper ACK once it has been fed.)
Polling/Processing Tasks

We are using this model to scale our consumers to our preference. Also, the fetch-execute duo can be separated out. On the consumer side, when we have received the message, we are free to execute it:

This approach is open to accommodate the rerun for a task (repeatable tasks). We just need to call the SubmitAPI call for the task that has just been executed. The ingestion of the tasks is fairly scalable, as one task is independent of the other. (This leaves an option for us to use non-relational databases, but that’s for some other time.) Also, the poller is not running on a single machine, leaving out any possibility for a SPOF.

There is proper atomicity that the ORM (Hibernate) maintains, so as to avoid any dirty reads (duplicate messages being pushed to the queue). We can also finetune the time interval (for the poller to rerun) and the batches of tasks to be fetched because the only thing needed to be done is to configure the application.properties file. Also, we can scale the consumption of the job (while executing) by configuring our consumers.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Responses (1)

Write a response