Faking a continuously Polling Consumer with scheduled tasks by Mark Seemann
How to (almost) continuously poll a queue when you can only run a task at discrete intervals.
In a previous article, I described how you set up Azure Web Jobs to run only a single instance of a background process. However, the shortest time interval you can currently configure is to run a scheduled task every minute. If you want to use this trick to set up a Polling Consumer, it may seem limiting that you'll have to wait up to a minute before the scheduled task can pull new messages off the queue.
The problem #
The problem is this: a Web Job is a command-line executable you can schedule. The most frequent schedule you can set up is once per minute. Thus, once per minute, the executable can start and query a queue in order to see if there are new messages since it last looked.
If there are new messages, it can pull these messages off the queue and handle them one by one, until the queue is empty.
If there are no new messages, the executable exits. It will then be up to a minute before it will run again. This means that if a message arrives just after the executable exits, it will sit in the queue up to a minute before being handled.
At least, that's the naive implementation.
A more sophisticated approach #
Instead of exiting immediately, what if the executable was to wait for a small period, and then check again? This means that you'd be able to increase the polling frequency to run much faster than once per minute.
If the executable also keeps track of when it started, it can gracefully exit slightly before the minute is up, enabling the task scheduler to start a new process soon after.
Example: a recursive F# implementation #
You can implement the above strategy in any language. Here's an F# version. The example code below is the main 'loop' of the program, but a few things have been put into place before that. Most of these are configuration values pulled from the configuration system:
-
timeout
is a TimeSpan value that specifies for how long the executable should run before exiting. This value comes from the configuration system. Since the minimum schedule frequency for Azure Web Jobs is 1 minute, it makes sense to set this value to 1 minute. -
stopBefore
is essentiallyDateTimeOffset.Now + timeout
. -
estimatedDuration
is a TimeSpan containing your (conservative) estimate of how long time it takes to handle a single message. It's only used if there are no messages to be handled in the queue, as the algorithm then has no statistics about the average execution time for each message. This value comes from the configuration system. In a recent system, I just arbitrarily set it to 2 seconds. -
toleranceFactor
is a decimal used as a multiplier in order to produce a margin for when the executable should exit, so that it can exit beforestopBefore
, instead of idling for too long. This value comes from the configuration system. You'll have to experiment a bit with this value. When I originally deployed the code below, I had it set to 2, but it seems like the Azure team changed how Web Jobs are initialized, so currently I have it set to 5. -
idleTime
is a TimeSpan that controls how long the executable should sit idly waiting, if there are no messages in the queue. This value comes from the configuration system. In a recent system, I set it to 5 seconds, which means that you may experience up to 5 seconds delay from a message arrives on an empty queue, until it's picked up and handled. -
dequeue
is the function that actually pulls a message off the queue. It has the signatureunit -> (unit -> unit) option
. That looks pretty weird, but it means that it's a function that takes no input arguments. If there's a message in the queue, it returns a handler function, which is used to handle the message; otherwise, it returns None.
let rec handleUntilTimedOut (durations : TimeSpan list) = let avgDuration = match durations with | [] -> estimatedDuration | _ -> (durations |> List.sumBy (fun x -> x.Ticks)) / (int64 durations.Length) |> TimeSpan.FromTicks let margin = (decimal avgDuration.Ticks) * toleranceFactor |> int64 |> TimeSpan.FromTicks if DateTimeOffset.Now + margin < stopBefore then match dequeue() with | Some handle -> let before = DateTimeOffset.Now handle() let after = DateTimeOffset.Now let duration = after - before let newDurations = duration :: durations handleUntilTimedOut newDurations | _ -> if DateTimeOffset.Now + idleTime < stopBefore then Async.Sleep (int idleTime.TotalMilliseconds) |> Async.RunSynchronously handleUntilTimedOut durations handleUntilTimedOut []
As you can see, the first thing the handleUntilTimedOut function does is to attempt to calculate the average duration of handling a message. Calculating the average is easy enough if you have at least one observation, but if you have no observations at all, you can't calculate the average. This is the reason the algorithm needs the estimatedDuration to get started.
Based on the the average (or estimated) duration, the algorithm next calculates a safety margin. This margin is intended to give the executable a chance to exit in time: if it can see that it's getting too close to the timeout, it'll exit instead of attempting to handle another message, since handling another message may take so much time that it may push it over the timeout limit.
If it decides that, based on the margin, it still have time to handle a message, it'll attempt to dequeue a message.
If there's a message, it'll measure the time it takes to handle the message, and then append the measured duration to the list of already observed durations, and recursively call itself.
If there's no message in the queue, the algorithm will idle for the configured period, and then call itself recursively.
Finally, the handleUntilTimedOut []
expression kicks off the polling cycle with an empty list of observed durations.
Observations #
When appropriately configured, a typical Azure Web Job log sequence looks like this:
- 1 minute ago (55 seconds running time)
- 2 minutes ago (55 seconds running time)
- 3 minutes ago (56 seconds running time)
- 4 minutes ago (55 seconds running time)
- 5 minutes ago (56 seconds running time)
- 6 minutes ago (58 seconds running time)
- 7 minutes ago (55 seconds running time)
- 8 minutes ago (55 seconds running time)
- 9 minutes ago (58 seconds running time)
- 10 minutes ago (56 seconds running time)
Summary #
If you have a situation where you need to run a scheduled job (as opposed to a continuously running service or daemon), but you want it to behave like it was a continuously running service, you can make it wait until just before a new scheduled job is about to start, and then exit. This is an approach you can use anywhere you find yourself in that situation - not only on Azure.
Update 2015 August 11: See my article about Type Driven Development for an example of how to approach this problem in a well-designed, systematic, iterative fashion.
Comments
Indeed, this technique really shines when the task runs at discrete intervals and the actual job takes less time than each interval.
For example, if a job takes 20 seconds to complete, then inside a single task of 1 minute it's possible to run 2 jobs, using the technique described in this article. – This is very nice!
FWIW, this talk and this post seem to be related with the technique shown in this article.
OTOH, in the context of Windows Services, or similar, it's possible to run a task at even smaller intervals than 1 minute.
In this case, the trick is to measure how much time it takes to run the actual job. – Then, schedule the task to run in any period less than the measured time.
As an example, if a job takes 1 seconds to complete, schedule the task to run every 0.5 seconds on a background thread while blocking the foreground thread: