With Reactive Extensions, and a bit of composition, you can publish and subscribe to events in a structurally safe way.

Previously, in my series about Dependency Injection and events, you learned how to connect a publisher and a subscriber via a third party (often the Composition Root).

The problem with that approach is that while it's loosely coupled, it's too easy to forget to connect the publisher and the subscriber. It's also possible to forget to unsubscribe. In neither case can the compiler help you.

However, the advantage of using Reactive Extensions over .NET events is that IObserver<T> is composable. That turns out to be quite an important distinction!

The problem with IObservable<T> #

While I consider IObserver<T> to be an extremely versatile interface, I consider IObservable<T> to be of limited usefulness. Consider its definition:

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

The idea is that the publisher (the Observable) receives a subscriber (the Observer) via Method Injection. When the method completes, the subscriber is considered subscribed to the publisher's events, until the subscriber disposes of the returned subscription reference.

There's a couple of problems with this design:

  • It's too easy to forget to invoke the Subscribe method. This is not a problem if you're writing a system in which publishers dynamically subscribe to event streams, but it's problematic if your system relies on certain publishers and subscribers to be connected.
  • It implies mutation in the publisher, because the publisher must somehow keep a list of all its subscribers.
  • It breaks Command Query Separation (CQS).
  • Since it implies mutation, it's not thread-safe by default.
Fortunately, it's possible to work with IObserver<T> while completely ignoring IObservable<T>.

From Method Injection to Constructor Injection #

As you learned in the last couple of articles, the subscriber should not require any dependency in order to react to events. Yet, if Method Injection via IObservable<T> isn't a good approach either, then what's left?

Good old Constructor Injection.

The important realization is that it's not the subscriber (NeedyClass, in previous examples) that requires a dependency - it's the publisher!

Imagine that until now, you've had a publisher implementing IObservable<T>. In keeping with the running example throughout this series, this was the publisher that originally implemented IDependency. Thus, it's still called RealDependency. For simplicity's sake, assume that its implementation is as simple as this:

public class RealDependency : IObservable<Unit>
{
    private readonly Subject<Unit> subject;
 
    public RealDependency()
    {
        this.subject = new Subject<Unit>();
    }
 
    public void MakeItHappen()
    {
        this.subject.OnNext(Unit.Default);
    }
 
    public IDisposable Subscribe(IObserver<Unit> observer)
    {
        return this.subject.Subscribe(observer);
    }
}

What if, instead of implementing IObservable<Unit>, this class would use Constructor Injection to request an IObserver<Unit>? Then it would look like this:

public class RealDependency
{
    private readonly IObserver<Unit> observer;
 
    public RealDependency(IObserver<Unit> observer)
    {
        this.observer = observer;
    }
 
    public void MakeItHappen()
    {
        this.observer.OnNext(Unit.Default);
    }
}

That's much simpler, and you just got rid of an entire type (IObservable<Unit>)! Even better, you've also eliminated all use of IDisposable. Oh, and it also conforms to CQS, and is thread-safe.

Connection #

The names of the concrete classes are completely off by now, but you can connect publisher (RealDependency) with its subscriber (NeedyClass) from a third party (Composition Root):

var subscriber = new NeedyClass();
var publisher = new RealDependency(subscriber);

Not only is this easy, the statically typed structure of both classes helps you do the right thing: the compiler will issue an error if you don't supply a subscriber to the publisher.

But wait, you say: now the publisher is forced to have a single observer. Isn't the whole idea about publish/subscribe that you can have an arbitrary number of subscribers for a given publisher? Yes, it is, and that's still possible.

Composition #

More than a single subscriber is easy if you introduce a Composite:

public class CompositeObserver<T> : IObserver<T>
{
    private readonly IEnumerable<IObserver<T>> observers;
 
    public CompositeObserver(IEnumerable<IObserver<T>> observers)
    {
        this.observers = observers;
    }
 
    public void OnCompleted()
    {
        foreach (var observer in this.observers)
            observer.OnCompleted();
    }
 
    public void OnError(Exception error)
    {
        foreach (var observer in this.observers)
            observer.OnError(error);
    }
 
    public void OnNext(T value)
    {
        foreach (var observer in this.observers)
            observer.OnNext(value);
    }
}

While it looks like a bit of work, this class is so reusable that I wonder why it's not included in the Rx library itself... It enables you to subscribe any number of subscribers to the publisher, e.g. two:

var sub1 = new NeedyClass();
var sub2 = new AnotherObserver();
var publisher = 
    new RealDependency(
        new CompositeObserver<Unit>(
            new IObserver<Unit>[] { sub1, sub2 }));

I'll leave it as an exercise to the reader to figure out how to implement the scenario with no subscribers :)

Conclusion #

Sticking to IObserver<T> and simply injecting it into the publishers is much simpler than any other alternative I've described so far. Nonetheless, keep in mind that the reason this simplification works so well is because it assumes that you know all subscribers when you compose your object graph.

There's a reason the IObservable<T> interface exists, and that's to support scenarios where publishers and subscribers come and go during the lifetime of an application. The simplification described here doesn't handle that scenario, but if you don't need that flexibility, you can greatly simplify your eventing infrastructure by disposing of IObservable<T> ;)


Comments

This also happens to be a very clean solution for avoiding leaking a partially constructed instance in multi-threaded scenarios (see, for example, the "Initialization safety risks" section at http://www.ibm.com/developerworks/java/library/j-jtp07265/index.html, which applies equally to C#). The only alternative to passing fully constructed listeners to the publisher is to use post-construction method invocation (à la IStartable) for registration, which is much less elegant.
2014-01-08 16:00 UTC
Tony Johnson #
Isn't there a big downside here that you lose the ability to use all of the rx extension methods since they all apply to to IObservable instead of IObserver? Would you end up re-implementing them from an IObserver viewpoint?
2015-11-06 14:00 UTC

Tony, thank you for writing. That's a good observation, and you're right: if you need to make substantial transformation, filtering, aggregation, and so on, on your event streams, then it would be valuable to be able to leverage Reactive Extensions. Not being able to do that would be a loss.

Using IObserver, as I suggest here, does come with that disadvantage, so as always, it's important to weigh the advantages and disadvantages against each other. If you're doing lots of event stream processing, then it would most likely be best to go with idiomatic Reactive Extensions (and not the solution proposed in this article). If, on the other hand, you mostly need to make sure that some Command is executed once and only once (or at least once, depending on your delivery guarantees), then my proposed solution may be more appropriate.

At its heart, the choice is between pub/sub systems on one side, and point-to-point systems on the other side. If it's important that exactly one destination system receives the messages, it's a point-to-point channel in action. If, on the other hand, zero to any arbitrary number of subscribers are welcome to consume the messages, it's a pub/sub system.

2015-11-07 12:14 UTC
SpencerJB #

I have briefly experimenting with this idea over the last couple of days.

In my experiemnt I found that rather than implement the IObserver interface on my subscribers it was easier to use Observer.Create and pass in an the Action I wanted to call to that.

This has left me wondering wether I could dispense with the whole IObserver interfacea and simply pass the Action that I wanted to call into the publisher.

2016-06-09 12:58 UTC

Spencer, thank you for writing. Indeed, delegates are equivalent to single-method interfaces. Once you realise that, functional programming begins to look more and more attractive.

Actions, with their void return type, don't seem particularly functional, however, but you can build up an entire architecture on that concept.

Personally, in C#, I prefer to stick with interfaces for dependency injection, since I find it more idiomatic, but other people have different opinions on that.

2016-06-09 16:35 UTC


Wish to comment?

You can add a comment to this post by sending me a pull request. Alternatively, you can discuss this post on Twitter or somewhere else with a permalink. Ping me with the link, and I may respond.

Published

Wednesday, 11 September 2013 09:35:00 UTC

Tags



"Our team wholeheartedly endorses Mark. His expert service provides tremendous value."
Hire me!
Published: Wednesday, 11 September 2013 09:35:00 UTC