How to (almost) continuously poll a queue when you can only run a task at discrete intervals.

In a previous article, I described how you set up Azure Web Jobs to run only a single instance of a background process. However, the shortest time interval you can currently configure is to run a scheduled task every minute. If you want to use this trick to set up a Polling Consumer, it may seem limiting that you'll have to wait up to a minute before the scheduled task can pull new messages off the queue.

The problem

The problem is this: a Web Job is a command-line executable you can schedule. The most frequent schedule you can set up is once per minute. Thus, once per minute, the executable can start and query a queue in order to see if there are new messages since it last looked.

If there are new messages, it can pull these messages off the queue and handle them one by one, until the queue is empty.

If there are no new messages, the executable exits. It will then be up to a minute before it will run again. This means that if a message arrives just after the executable exits, it will sit in the queue up to a minute before being handled.

At least, that's the naive implementation.

A more sophisticated approach

Instead of exiting immediately, what if the executable was to wait for a small period, and then check again? This means that you'd be able to increase the polling frequency to run much faster than once per minute.

If the executable also keeps track of when it started, it can gracefully exit slightly before the minute is up, enabling the task scheduler to start a new process soon after.

Example: a recursive F# implementation

You can implement the above strategy in any language. Here's an F# version. The example code below is the main 'loop' of the program, but a few things have been put into place before that. Most of these are configuration values pulled from the configuration system:

  • timeout is a TimeSpan value that specifies for how long the executable should run before exiting. This value comes from the configuration system. Since the minimum schedule frequency for Azure Web Jobs is 1 minute, it makes sense to set this value to 1 minute.
  • stopBefore is essentially DateTimeOffset.Now + timeout.
  • estimatedDuration is a TimeSpan containing your (conservative) estimate of how long time it takes to handle a single message. It's only used if there are no messages to be handled in the queue, as the algorithm then has no statistics about the average execution time for each message. This value comes from the configuration system. In a recent system, I just arbitrarily set it to 2 seconds.
  • toleranceFactor is a decimal used as a multiplier in order to produce a margin for when the executable should exit, so that it can exit before stopBefore, instead of idling for too long. This value comes from the configuration system. You'll have to experiment a bit with this value. When I originally deployed the code below, I had it set to 2, but it seems like the Azure team changed how Web Jobs are initialized, so currently I have it set to 5.
  • idleTime is a TimeSpan that controls how long the executable should sit idly waiting, if there are no messages in the queue. This value comes from the configuration system. In a recent system, I set it to 5 seconds, which means that you may experience up to 5 seconds delay from a message arrives on an empty queue, until it's picked up and handled.
  • dequeue is the function that actually pulls a message off the queue. It has the signature unit -> (unit -> unit) option. That looks pretty weird, but it means that it's a function that takes no input arguments. If there's a message in the queue, it returns a handler function, which is used to handle the message; otherwise, it returns None.
Obviously, there's quite a bit of code behind that dequeue function (it's actually the heart of the system), but exactly what it does, and how it does it, isn't particularly important in this context. If you want to see an example of how to implement a background process in F#, see my Pluralsight course on Functional Architecture with F#.

let rec handleUntilTimedOut (durations : TimeSpan list) =
    let avgDuration =
        match durations with
        | [] -> estimatedDuration
        | _ ->
            (durations |> List.sumBy (fun x -> x.Ticks)) /
                (int64 durations.Length)
            |> TimeSpan.FromTicks
    let margin =
        (decimal avgDuration.Ticks) * toleranceFactor
        |> int64
        |> TimeSpan.FromTicks
 
    if DateTimeOffset.Now + margin < stopBefore then
        match dequeue() with
        | Some handle ->
            let before = DateTimeOffset.Now
            handle()
            let after = DateTimeOffset.Now
            let duration = after - before
            let newDurations = duration :: durations
            handleUntilTimedOut newDurations
        | _ ->
            if DateTimeOffset.Now + idleTime < stopBefore then
                Async.Sleep (int idleTime.TotalMilliseconds)
                |> Async.RunSynchronously
                handleUntilTimedOut durations
 
handleUntilTimedOut []

As you can see, the first thing the handleUntilTimedOut function does is to attempt to calculate the average duration of handling a message. Calculating the average is easy enough if you have at least one observation, but if you have no observations at all, you can't calculate the average. This is the reason the algorithm needs the estimatedDuration to get started.

Based on the the average (or estimated) duration, the algorithm next calculates a safety margin. This margin is intended to give the executable a chance to exit in time: if it can see that it's getting too close to the timeout, it'll exit instead of attempting to handle another message, since handling another message may take so much time that it may push it over the timeout limit.

If it decides that, based on the margin, it still have time to handle a message, it'll attempt to dequeue a message.

If there's a message, it'll measure the time it takes to handle the message, and then append the measured duration to the list of already observed durations, and recursively call itself.

If there's no message in the queue, the algorithm will idle for the configured period, and then call itself recursively.

Finally, the handleUntilTimedOut [] expression kicks off the polling cycle with an empty list of observed durations.

Observations

When appropriately configured, a typical Azure Web Job log sequence looks like this:

  • 1 minute ago (55 seconds running time)
  • 2 minutes ago (55 seconds running time)
  • 3 minutes ago (56 seconds running time)
  • 4 minutes ago (55 seconds running time)
  • 5 minutes ago (56 seconds running time)
  • 6 minutes ago (58 seconds running time)
  • 7 minutes ago (55 seconds running time)
  • 8 minutes ago (55 seconds running time)
  • 9 minutes ago (58 seconds running time)
  • 10 minutes ago (56 seconds running time)
Notice that all Web Jobs complete within 1 minute, leaving time for the next scheduled job to start. In all of those 55ish seconds, the job can continuously pull messages of the queue, if any are present.

Summary

If you have a situation where you need to run a scheduled job (as opposed to a continuously running service or daemon), but you want it to behave like it was a continuously running service, you can make it wait until just before a new scheduled job is about to start, and then exit. This is an approach you can use anywhere you find yourself in that situation - not only on Azure.

Update 2015 August 11: See my article about Type Driven Development for an example of how to approach this problem in a well-designed, systematic, iterative fashion.


Comments

Indeed, this technique really shines when the task runs at discrete intervals and the actual job takes less time than each interval.

For example, if a job takes 20 seconds to complete, then inside a single task of 1 minute it's possible to run 2 jobs, using the technique described in this article. – This is very nice!

FWIW, this talk and this post seem to be related with the technique shown in this article.

OTOH, in the context of Windows Services, or similar, it's possible to run a task at even smaller intervals than 1 minute.

In this case, the trick is to measure how much time it takes to run the actual job. – Then, schedule the task to run in any period less than the measured time.


As an example, if a job takes 1 seconds to complete, schedule the task to run every 0.5 seconds on a background thread while blocking the foreground thread:

open System.Threading
  open System.Threading.Tasks

  module Scheduler =
      let run job (period : TimeSpan) =
          // Start
          let cancellationTokenSource =
              new CancellationTokenSource()

          (new Task(fun () ->
              let token = cancellationTokenSource.Token
              while not token.IsCancellationRequested do
                  job()
                  token.WaitHandle.WaitOne period |> ignore)).Start()

          // Loop
          Console.WriteLine
              "Scheduler running. Type \"quit\" or \"exit\" to stop."
          let stop =
              let line = Console.ReadLine().ToUpperInvariant()
              line = "QUIT" || line = "EXIT"
          while not stop do ()

          // Stop
          cancellationTokenSource.Cancel()

  module Main =
      let job () =
          Console.Write "[."
          Async.Sleep 1000 |> Async.RunSynchronously
          Console.WriteLine ".] - OK"

      Scheduler.run job (TimeSpan.FromSeconds 0.5)
  
2015-05-18 05:57 UTC


Wish to comment?

You can add a comment to this post by sending me a pull request. Alternatively, you can discuss this post on Twitter or Google Plus, or somewhere else with a permalink. Ping me with the link, and I may add it as a comment.

Published

Thursday, 25 September 2014 06:33:00 UTC

Tags



"Our team wholeheartedly endorses Mark. His expert service provides tremendous value."
Hire me!