Roll Your Own Simple Message Bus / Event Aggregator

I was recently working on a project that had a need for some sort of centralized messaging system.  After doing some research into the matter, I decided to move forward with the Event Aggregator from the Prism team.  Prism’s Event Aggregator worked fine, but there were a few things about it that I found cumbersome.  While preparing for a presentation I gave recently, called “WPF with MVVM: From the Trenches,” I began to think about what it would take to create my own event aggregator that would smooth over some of the bumps.

First up, lets talk about the name, “Event Aggregator.”  During the course of our project, we used the event aggregator to publish data (or messages) in addition to events.  During a discussion about the event aggregator, Jason Bock pointed out that it is really more of a “Message Bus” than an “Event Aggregator.”  I agree.  So, for the remainder of this post, I will refer to it as a “Message Bus.”

The next bump that I wanted to smooth over was the fact that, with Prism, we had to create both an Event class which derived from CompositePresentationEvent and an EventArgs class for every message we wanted to publish.  With Prism, I wrote an extension method, called GetEventViaArgs(), which would get the CompositePresentationEvent based on the type of the args.  That eliminated the need for an explicit Event class, but that also made me question the need for an Event at all.  What if the message type was all that we needed for subscribing and publishing?  Eliminating the need for an Event would simplify testing as well because we wouldn’t have to mock out both getting the event and subscribing/publishing the message, which so happens to be another bump that I wanted to smooth over.

A Starting Point

So, lets take a look at the method signatures that I had in mind.  A Message Bus should provide functionality to Subscribe to a message, Unsubscribe from a message and Publish a message.

This looks straight-forward enough.  Whatever implementation I come up with, it should leverage generics to make sure all interested parties are notified of messages they care about.  The method signature I would like to see for the callbacks would be something like this:

Since I am a big fan of dependency injection and inversion of control, I wanted to define an interface, called IMessageBus, to be used instead of a concrete implementation.  Keeping in mind how I wanted to use this bad boy, here is the interface I came up with:

Let me break this down a bit…  TMessage is the Type of the message to be sent.  Action is a way for me to take, as a parameter, a method which has a void return value and takes a parameter of type TMessage.  Also note that TMessage is just a class that contains whatever data we need to convey the message.

So far, so good.  We now have a contract in place for our message bus.  Next up is implementation.

Making It Happen

First, we need a data structure that will have some sort of key/value association.  The key will be the Type of TMessage and the value will be the collection of Action which will be called when a message of type TMessage is published.  Since I plan to use generics at the method level, I decided to use Object in place of Action, then cast it before using it.  So, here is our MessageBus class so far:

Now that we have a data structure in place to keep track of subscribers, time to implement the Subscribe method.  Now, when a new subscriber is added, we need to first find out if there is already a subscriber list for that message type.  If there is, use it.  Otherwise, create a new one.

Next up is Unsubscribe.  When a subscriber makes a call to Unsubscribe, we need to make sure they are actually a subscriber in the first place and remove them if they are.  Also, if they are the last subscriber, we can go ahead and remove the subscriber list for that message type.

Almost there.  Now we just need to be able to Publish.  As you probably noticed above, I thought it would be nice to both explicitly define what type of message to publish and let the message type be determined dynamically.  For either case, we need to get the list of subscribers for the type of message being published and invoke each of their callback methods with the message as a parameter.  First up is the explicitly define flavor:

Pretty straight-forward here too.  If there are subscribers for that message type, iterate over them (casting them in the process) and invoke them.

The dynamic version of Publish is a little more involved, only because we have to use reflection in place of generics due to the dynamic aspect:

This one is essentially doing the same thing as the other Publish method.  If there are subscribers for that message type, iterate over them (finding their Invoke method in the process) and invoke them.  Don’t be confused by the two different Invokes that you see…  The first Invoke is the name of the method on the Action type and the second Invoke is what reflection uses to execute a method.

There we have it!  A simple message bus that is easy to use.  But how about testing with it?  Since testing was one of the bumps I wanted to smooth over, how did we do?  Let’s find out!

Testing Our Shiny New Message Bus

I use RhinoMocks, so here is how I would verify that some code is publishing the correct message:

I generate a stub for IMessageBus which will set my local searchMessage variable to the actual SearchMessage that is published.  I then could use searchMessage to verify that the message that is published has the correct data.

As an alternative, if all I wanted to do was verify that Publish was called on my Message Bus, I could do the following:

The key changes here are that I am using a Mock instead of a Stub and I am setting up an expectation.  That way I can verify that Publish was called without caring about the details:

Testing for messages being published is now much cleaner with our new MessageBus than it would be with Prism’s Event Aggregator.  But what about testing the handling of these messages that get published?  Well, the best way that I could come up with for that is the same way I did it with Prism’s Event Aggregator, which is to use the concrete implementation of the MessageBus and actually subscribe and publish with it in the unit test.  Oh well, at least we were able to clean up some of the testing aspect!

Conclusion

So, here we are at the end.  My goal here was to show that it isn’t all that difficult to implement your own message bus system or, at the very least, show the general concept behind how it works.  I hope that this has been useful.  If you want play with the code that we have written, as well as the tests that I wrote to beat on it a little, you can download the source here:

Share

12 thoughts on “Roll Your Own Simple Message Bus / Event Aggregator”

  1. Nice job on this and the MVVM sample application. Thank you so much for sharing. I’ve been trying to apply MVVM to a silverlight project I’m working on and your example gives me some great ideas.

  2. Doesn’t the MessageBus need to implement the Singleton pattern so that every class that subscribes or publishes can be assured of using the same MessageBus?

  3. @InterestedParty Yep. You definitely need all the messages published through the same instance. Whether you make it a singleton or have your inversion of control container keep it as a singleton (as I like to do), the important thing is that the same instance is used everywhere.

  4. ..seems the diamond brackets disappear in the blog formatting. ? Though great article to show this in a simple way so my novice brain could finally grasp it. 🙂

  5. Brent, is this thread safe? What if something subscribes while the foreach is executed within the publish?

    Thanks,
    Shaun

  6. Shaun, this implementation is a simple one and is not thread-safe. If that is a requirement, you could easily make it thread-safe by using locks.

  7. Hi, nice implementation.
    Found minor issue
    Unsubscribing in handler leads to exception(changing collection during iteration)
    Solutions:
    1. Adding SubscribeOnce method (which is usefull anyway)
    2. In Publish method iterating over copied collection.

  8. This straightforward approach has one important caveat. When you register a handler into the message bus, you’re keeping a reference to the subscriber. This means that if the subscriber fails to unsubscribe when it’s done receiving messages, the instance will not be garbage collected until the message bus is.

    If your bus is a singleton that exists for the duration of the application, this might cause your subscribers not to get collected in a timely fashion. This is only tolerable if the number of subscribers is small and bounded.

    Even if you are diligent to unsubscribe whenever your objects are disposed, you’ve defeated the point of automatic memory management.

    A solution that doesn’t have this caveat will need to use WeakReferences to avoid holding on to the subscribers.

  9. Hmm, I’m not entirely sure you have. Even though you’re wrapping action.Target with a WeakReference, you’re also keeping a strong reference to the action itself in the ActionReference.Target property. This defeats the point, as you’ll always have that reference keeping it alive and uncollectable.

Comments are closed.