Song recommendations with F# agents by Mark Seemann
MailboxProcessors as small Recawr Sandwiches.
This article is part of a series named Alternative ways to design with functional programming. As the title implies, over a multitude of articles, I present various alternatives for applying functional programming to a particular problem. When I present the Impureim Sandwich design pattern, the most common reaction is: What if you need to make additional, impure reads in the middle of an algorithm?
This article series looks at alternatives when this (in my experience rare) requirement seems inescapable. A previous article outlined a general alternative: Use some kind of pipes-and-filters architecture to turn an undisciplined Transaction Script into a composition of 'filters', where each filter is a self-contained Recawr Sandwich.
Depending on the specific technology choices you make, you may encounter various terminology related to this kind of architecture. Filters may also be called actors, message handlers, or, as is the case in this article, agents. For a consistent pattern language, see Enterprise Integration Patterns.
The code shown here is taken from the fsharp-agents branch of the example code Git repository.
Async instead of Task #
The F# base library comes with a class called MailboxProcessor, often called an agent. It's an in-memory message handler that can run in the background of other tasks, pulling messages off an internal queue one at a time.
It's been around for such a long time that its API is based on Async<T>, which precedes the now-ubiquitous Task<TResult>. While conversions exist, I thought it'd make the example code simpler if I first redefined the SongService
interface to return Async
-based results.
type SongService = abstract GetTopListenersAsync : songId : int -> Async<IReadOnlyCollection<User>> abstract GetTopScrobblesAsync : userName : string -> Async<IReadOnlyCollection<Scrobble>>
Keep in mind that, despite lack of the idiomatic I
prefix, this is an interface, not an abstract class. (It would have had to have the [<AbstractClass>]
attribute to be an abstract class.)
This is minor change that only affected a few lines of code where I had to change from task expressions to async expressions.
Gather own scrobbles #
As was also the case in the previous article, we may as well start at the beginning of the algorithm. Given a user name, we'd like to find that user's top scrobbles. Once we've found them, we'd like to post them to an agent. Since F# agents are message-based, we must define an appropriate message type.
type private ScrobbleMessage = Scrobble of Scrobble | EndOfStream
If you recall the previous article, with Reactive Extensions for .NET, you can use the OnCompleted method to signal the end of a stream. Such a method isn't available for an F# agent, because an agent is a consumer of messages, rather than a stream of values. For that reason, a ScrobbleMessage
may either be a Scrobble
or an EndOfStream
.
With the message definition in place, you can define the first step of the algorithm like this:
// string -> SongService -> MailboxProcessor<ScrobbleMessage> -> Async<unit> let private gatherOwnScrobbles userName (songService : SongService) (channel : MailboxProcessor<_>) = async { // Impure let! scrobbles = songService.GetTopScrobblesAsync userName // Pure let scrobblesSnapshot = scrobbles |> Seq.sortByDescending _.ScrobbleCount |> Seq.truncate 100 |> Seq.map Scrobble // Impure Seq.iter channel.Post scrobblesSnapshot channel.Post EndOfStream }
The gatherOwnScrobbles
action isn't itself an agent. Rather, it takes one as input, to which it posts messages. Notice that the operation returns an Async<unit>
value. In other words, it doesn't really return anything, but rather posts messages to the injected channel
.
Like in the previous article, once all scrobbles are posted, gatherOwnScrobbles
indicates the end of the stream by posting a final EndOfStream
message. This still works in this implementation, since F# agents (as far as I've been able ascertain) handle messages in order. If you're using a distributed messaging framework based on a message bus, and possibly handlers running on multiple machines, you can't always assume this to be the case. As I wrote in Song recommendations with pipes and filters, you'll need to extrapolate from both this and the previous article in such cases. This is where a pattern language as presented in Enterprise Integration Patterns may come in handy. Perhaps you need a Message Sequence in this case.
Be that as it may, the gatherOwnScrobbles
action is a small Recawr Sandwich with clearly delineated steps.
The natural next step is to implement a MailboxProcessor that can receive those scrobble messages.
Gather other listeners #
To handle the messages posted by gatherOwnScrobbles
we'll create an agent. This one, however, isn't going to complete the algorithm. Rather, it's going to publish even more messages that yet another agent may deal with. For that, we need another message type:
type private UserMessage = User of User | EndOfStream
We see what starts to look like a pattern: A 'payload' case, and a case to indicate that no more message will be coming.
The following action creates an agent that handles scrobble messages and publishes user messages:
// SongService -> MailboxProcessor<UserMessage> -> MailboxProcessor<ScrobbleMessage> let private gatherOtherListeners (songService : SongService) (channel : MailboxProcessor<_>) = MailboxProcessor.Start <| fun inbox -> let rec loop () = async { let! message = inbox.Receive () match message with | Scrobble scrobble -> // Impure let! otherListeners = songService.GetTopListenersAsync scrobble.Song.Id // Pure let otherListenersSnapshot = otherListeners |> Seq.filter (fun u -> u.TotalScrobbleCount >= 10_000) |> Seq.sortByDescending _.TotalScrobbleCount |> Seq.truncate 20 |> Seq.map User // Impure Seq.iter channel.Post otherListenersSnapshot return! loop () | ScrobbleMessage.EndOfStream -> channel.Post EndOfStream } loop ()
If you can look past some of the infrastructure required to initialize and implement the agent (MailboxProcessor
), the main message handler is, once again, a small Recawr Sandwich. The other case in the match
expression maps one EndOfStream
case to another EndOfStream
case. Notice that this case does not recursively call loop
. This means that once the agent receives an EndOfStream
message, it stops all further message processing.
You may have noticed that the loop
starts with an 'unmarked' impure step to receive a message. Once a message arrives, it matches on the message. You may argue that there seems to be more than one impure step in the sandwich, but as I've previously outlined, sometimes a sandwich has more that three layers.
I could have compressed the code that receives and dispatches the message to a single line of code:
match! input.Receive () with
I felt, however, that for readers who aren't familiar with F# agents, it would help to instead make things more explicit by having a named message
value in the code. It's an example of using an explicit variable for readability purposes.
A third agent, created by gatherOtherScrobbles
, handles the messages published by gatherOtherListeners
by publishing even more song messages. We'll get back to that message type in a moment, but the agent looks similar to the one shown above. You may consult the Git repository if you're curious about the details.
Collecting the recommendations #
The final agent is a bit different, because it needs to do two things:
- Handle song messages
- Return the recommendations once they're ready
Because of that extra responsibility, the message type isn't a two-way discriminated union. Instead, it has a third case that we haven't yet seen.
type private SongMessage = | Song of Song | EndOfStream | Fetch of AsyncReplyChannel<Option<IReadOnlyCollection<Song>>>
The Song
and EndOfStream
cases are similar to what we've already seen. These are the two messages that the gatherOtherScrobbles
agent publishes.
What does the third case, Fetch
, do? It looks odd, with its AsyncReplyChannel
payload. In a moment, you'll see how it's used, but essentially, this is how F# agents support the Request-Reply pattern. Let's see it all in action:
// unit -> MailboxProcessor<SongMessage> let private collectRecommendations () = MailboxProcessor.Start <| fun input -> let rec loop recommendations isDone = async { let! message = input.Receive () match message with | Song song -> return! loop (song :: recommendations) false | SongMessage.EndOfStream -> let recommendations = recommendations |> List.sortByDescending _.Rating |> List.truncate 200 return! loop recommendations true | Fetch replyChannel -> if isDone then replyChannel.Reply (Some recommendations) else replyChannel.Reply None return! loop recommendations isDone } loop [] false
The purpose of this agent is to collect all the songs that the previous agent recommended. Once it receives an EndOfStream
message, it sorts the songs and keeps only the top 200.
Note that the recursive loop
action takes two parameters, recommendations
and isDone
. The loops starts with an empty song list and the flag set to false
. When a new Song
arrives, the loop prepends the song onto the song list and recurses. Notice that in that case, the flag remains false
.
Only when an EndOfStream
message arrives does the agent calculate the final recommendations. Afterwards, it recursively calls loop
with the flag raised (set to true
). Notice, however, that the agent doesn't stop handling messages, like the other agents do when encountering an EndOfStream
message.
At any time during execution, a Fetch
message may arrive. This is a request to return the recommendations, if they're ready. In that case, the recommendations
are wrapped in a Some
case and returned. If the recommendations are not yet ready, None
is returned instead.
This enables the overall, blocking method to poll for the recommendations until they are ready. You'll see how this works in a moment.
Polling for results #
The MailboxProcessor
class defines a PostAndAsyncReply
method that does, indeed, fit the Fetch
case of the above SongMessage
type. This enables us to implement a polling mechanism like this:
let rec private poll (agent : MailboxProcessor<_>) = task { match! agent.PostAndAsyncReply Fetch with | Some result -> return result | None -> return! poll agent }
This recursive action uses PostAndAsyncReply
to keep polling its agent until it receives a useful reply. Since this code is mostly meant for illustration purposes, I've allowed myself a few shortcuts.
First, this effectively implements a busy loop. Whenever it receives a None
reply, it immediately recurses to try again. A more reasonable implementation may put a small delay there, but I think that finding the optimal delay time may be a matter of experimentation. After all, if you're concerned with performance, race your horses. Given that this is demo code, I don't have any real horses to race, so I'm not going to try. From observation, however, it doesn't seem as though the tests, at least, run any slower despite the tight loop.
Secondly, the poll
loop keeps going until it receives a useful response. What if that never happens? A more robust implementation should implement some kind of timeout or ceiling that enables it to give up if it's been running for too long.
Apart from all that, how does the poll
action even type-check? On a MailboxProcessor<'Msg>
object, the PostAndAsyncReply
method has this type:
(AsyncReplyChannel<'Reply> -> 'Msg) -> Async<'Reply>
ignoring an optional timeout parameter.
The above Fetch
case constructor fits the type of PostAndAsyncReply
, since it has the type
AsyncReplyChannel<Option<IReadOnlyCollection<Song>>> -> SongMessage
This means that we can infer 'Reply
to be Option<IReadOnlyCollection<Song>>
. It also means that 'Msg
must be SongMessage
, and again we can infer that the agent
parameter has the type MailboxProcessor<SongMessage>
.
Composition #
With all components ready, we can now compose them as a blocking method. Notice that, in the following, the GetRecommendationsAsync
method hasn't changed type or observable behaviour. The change from Task
to Async
(described above) required some trivial changes to FakeSongService
, but apart from that, I had to change no test code to make this refactoring.
As a first attempt, we may compose the agents using the idiomatic left-to-right pipeline operator, like this:
type RecommendationsProvider (songService : SongService) = member _.GetRecommendationsAsync userName = let collect = collectRecommendations () task { do! collect |> gatherOtherScrobbles songService |> gatherOtherListeners songService |> gatherOwnScrobbles userName songService return! poll collect }
First, the task
expression starts all the agents, and then proceeds to poll
the collect
agent until it arrives at a result.
This passes all tests, but has at least two problems. One problem is that the composition seems backwards. It looks as though the process starts with collect
, then proceeds to gatherOtherScrobbles
, and so on. In reality, it's the other way around. You should really understand the composition as being defined 'bottom-up', or right-to-left, if we put it on a single line. We'll return to the other problem in a moment, but let's first see if we can do something about this one.
My first attempt to fix this problem was to try to use the reverse pipeline operator <|
, but due to precedence rules, it didn't work without parentheses. And if we need parentheses anyway, there's no reason to use the reverse pipeline operator.
type RecommendationsProvider (songService : SongService) = member _.GetRecommendationsAsync userName = let collect = collectRecommendations () task { do! gatherOwnScrobbles userName songService ( gatherOtherListeners songService ( gatherOtherScrobbles songService ( collect))) return! poll collect }
This composition uses a slightly unorthodox code formatting style. Since collect
is really nested inside of gatherOtherScrobbles
, it should really have been indented to the right of it. Likewise, gatherOtherScrobbles
is nested inside of gatherOtherListeners
, and so on. A more idiomatic formatting of the code might be something like this:
type RecommendationsProvider (songService : SongService) = member _.GetRecommendationsAsync userName = let collect = collectRecommendations () task { do! gatherOwnScrobbles userName songService ( gatherOtherListeners songService ( gatherOtherScrobbles songService collect)) return! poll collect }
This, however, blurs the semantics of the composition in favour of the mechanics of it. I don't consider it an improvement.
All of this, however, turns out to be moot because of the other problem. The MailboxProcessor
class implements IDisposable, and to be good citizens, we ought to dispose of the objects once we're done with them. This is possible, but we're now back to the backwards order of composition.
type RecommendationsProvider (songService : SongService) = member _.GetRecommendationsAsync userName = task { use collect = collectRecommendations () use otherScrobbles = gatherOtherScrobbles songService collect use otherListeners = gatherOtherListeners songService otherScrobbles do! gatherOwnScrobbles userName songService otherListeners return! poll collect }
This may not be an entirely unsolvable problem, but this is where I'm no longer interested in pursuing this line of inquiry much further. Instead of those 'factory actions' that create and return agents, you could refactor each agent into a separate object that, when disposed of, also disposes of any inner agents it may contain. If so, you could again compose these objects as shown above, and only dispose of the outer object.
Evaluation #
These various attempts to make the agents compose nicely, in a way that also works as self-documenting code, reveals that F# agents aren't as composable as ReactiveX. To be fair, the MailboxProcessor
class also predates Reactive Extensions, so we shouldn't blame it for not being as good as a more recent technology.
One major problem is that agents don't compose naturally, like IObservable<T> does. I briefly considered whether it'd be possible to make MailboxProcessor<'Msg>
a monad, but armed with the knowledge of variance imparted by Thinking with Types, I quickly realized that the type is invariant. This is easily seen because one method, Post
, is contravariant in 'Msg
, whereas most other methods are covariant. I'm using deliberately vague language, since there's no reason to calculate the kind of variance for all methods when you've already found two incompatible members.
Another fly in the ointment is that the collectRecommendations
action looks messy. As presented, it's not a pretty Impureim Sandwich. Most of the 'middle' message handling could be extracted to a pure function, were it not for the Fetch
case. Calling replyChannel.Reply
has a side effect, and while I know of refactorings that move side effects around, I'd need access to the replyChannel
in order to impart that effect from somewhere else. This would still be possible if I returned an action to be invoked, but I don't see much point in that. In general, that request-reply API doesn't strike me as particularly functional.
Based on all that griping, you may be wondering whether this kind of architecture is worth the trouble. Keep in mind, though, that some of the issues I just outlined is the result of the particular MailboxProcessor
API. It's not a given that if you use some other message-based framework, you'll run into the same issues.
You may also find the notion of posting messages to a chain of agents, only to poll one of them for the result, as carrying coals to Newcastle. Keep in mind, however, that the code presented here refactors a blocking method call that apparently takes about ten minutes to run to completion. It's possible that I read too much into the situation, but I'm guessing that the 'real' code base that was the inspiration for the example code, doesn't actually block for ten minutes in order to return a result. Rather, I still speculate, it's probably a background batch job that produces persisted views; e.g. as JSON files. If so, you'd really just want to trigger a new batch job and let it run to completion in the background. In such a scenario, I'd find an asynchronous, message-based architecture suitable for the job. In that case, you'd need no polling loop. Rather, you serve a persisted view whenever anyone asks for it, and once in a while, that persisted view has been updated by the background process.
Conclusion #
Compared to the previous article, which used Reactive Extensions to compose self-contained Recawr Sandwiches, using F# agents is a move towards a more standard kind of message-based architecture. Hopefully, it does a good enough job of illustrating how you can refactor an impure action into a composition of individual sandwiches, even if some of the details here are particular to F# agents.
It's not necessarily always the best solution to the underlying problem being addressed in this article series, but it seems appropriate if the problem of large data sets is combined with long running time. If you can convert the overall problem to a fire-and-forget architecture, a message-based system may be suitable.
If, on the other hand, you need to maintain the blocking nature of the operation, you may need to reach for the big, universal hammer.
Next: Song recommendations with free monads.