I was recently working on a project that had a need for some sort of centralized messaging system. After doing some research into the matter, I decided to move forward with the Event Aggregator from the Prism team. Prism’s Event Aggregator worked fine, but there were a few things about it that I found cumbersome. While preparing for a presentation I gave recently, called “WPF with MVVM: From the Trenches,” I began to think about what it would take to create my own event aggregator that would smooth over some of the bumps.
First up, lets talk about the name, “Event Aggregator.” During the course of our project, we used the event aggregator to publish data (or messages) in addition to events. During a discussion about the event aggregator, Jason Bock pointed out that it is really more of a “Message Bus” than an “Event Aggregator.” I agree. So, for the remainder of this post, I will refer to it as a “Message Bus.”
The next bump that I wanted to smooth over was the fact that, with Prism, we had to create both an Event class which derived from CompositePresentationEvent
A Starting Point
So, lets take a look at the method signatures that I had in mind. A Message Bus should provide functionality to Subscribe to a message, Unsubscribe from a message and Publish a message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// Subscribe to messages of type SearchMessage, providing // HandleSearch as the callback method. messsageBus.Subscribe(HandleSearch); // Publish a message of type SearchMessage. var search = new SearchMessage(keywords); messageBus.Publish(search); // OR publish a message without explicitly defining // the type. messageBus.Publish(search); // Unsubscribe from messages of type SearchMessage. messageBus.Unsubscribe(HandleSearch); |
This looks straight-forward enough. Whatever implementation I come up with, it should leverage generics to make sure all interested parties are notified of messages they care about. The method signature I would like to see for the callbacks would be something like this:
1 2 3 4 |
public void HandleSearch(SearchMessage message) { // Do something with the message. } |
Since I am a big fan of dependency injection and inversion of control, I wanted to define an interface, called IMessageBus, to be used instead of a concrete implementation. Keeping in mind how I wanted to use this bad boy, here is the interface I came up with:
1 2 3 4 5 6 7 |
public interface IMessageBus { void Subscribe(Action handler); void Unsubscribe(Action handler); void Publish(TMessage message); void Publish(Object message); } |
Let me break this down a bit… TMessage is the Type of the message to be sent. Action
So far, so good. We now have a contract in place for our message bus. Next up is implementation.
Making It Happen
First, we need a data structure that will have some sort of key/value association. The key will be the Type of TMessage and the value will be the collection of Action
1 2 3 4 5 |
public sealed class MessageBus : IMessageBus { private Dictionary> _Subscribers = new Dictionary>(); } |
Now that we have a data structure in place to keep track of subscribers, time to implement the Subscribe method. Now, when a new subscriber is added, we need to first find out if there is already a subscriber list for that message type. If there is, use it. Otherwise, create a new one.
1 2 3 4 5 6 7 8 9 10 11 12 |
public void Subscribe(Action handler) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; handlers.Add(handler); } else { var handlers = new List(); handlers.Add(handler); _Subscribers[typeof(TMessage)] = handlers; } } |
Next up is Unsubscribe. When a subscriber makes a call to Unsubscribe, we need to make sure they are actually a subscriber in the first place and remove them if they are. Also, if they are the last subscriber, we can go ahead and remove the subscriber list for that message type.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void Unsubscribe(Action handler) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; handlers.Remove(handler); if (handlers.Count == 0) { _Subscribers.Remove(typeof(TMessage)); } } } |
Almost there. Now we just need to be able to Publish. As you probably noticed above, I thought it would be nice to both explicitly define what type of message to publish and let the message type be determined dynamically. For either case, we need to get the list of subscribers for the type of message being published and invoke each of their callback methods with the message as a parameter. First up is the explicitly define flavor:
1 2 3 4 5 6 7 8 9 10 11 |
public void Publish(TMessage message) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; foreach (Action handler in handlers) { handler.Invoke(message); } } } |
Pretty straight-forward here too. If there are subscribers for that message type, iterate over them (casting them in the process) and invoke them.
The dynamic version of Publish is a little more involved, only because we have to use reflection in place of generics due to the dynamic aspect:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void Publish(Object message) { var messageType = message.GetType(); if (_Subscribers.ContainsKey(messageType)) { var handlers = _Subscribers[messageType]; foreach (var handler in handlers) { var actionType = handler.GetType(); var invoke = actionType.GetMethod("Invoke", new Type[] { messageType }); invoke.Invoke(handler, new Object[] { message }); } } } |
This one is essentially doing the same thing as the other Publish method. If there are subscribers for that message type, iterate over them (finding their Invoke method in the process) and invoke them. Don’t be confused by the two different Invokes that you see… The first Invoke is the name of the method on the Action type and the second Invoke is what reflection uses to execute a method.
There we have it! A simple message bus that is easy to use. But how about testing with it? Since testing was one of the bumps I wanted to smooth over, how did we do? Let’s find out!
Testing Our Shiny New Message Bus
I use RhinoMocks, so here is how I would verify that some code is publishing the correct message:
1 2 3 4 |
SearchMessage searchMessage = null; var messageBus = MockRepository.GenerateStub(); messageBus.Stub(bus => bus.Publish(Arg.Is.Anything)) .WhenCalled(inv => searchMessage = inv.Arguments[0] as SearchMessage); |
I generate a stub for IMessageBus which will set my local searchMessage variable to the actual SearchMessage that is published. I then could use searchMessage to verify that the message that is published has the correct data.
As an alternative, if all I wanted to do was verify that Publish was called on my Message Bus, I could do the following:
1 2 3 |
SearchMessage searchMessage = null; var messageBus = MockRepository.GenerateMock(); messageBus.Expect(bus => bus.Publish(Arg.Is.Anything)); |
The key changes here are that I am using a Mock instead of a Stub and I am setting up an expectation. That way I can verify that Publish was called without caring about the details:
1 |
messageBus.VerifyAllExpectations(); |
Testing for messages being published is now much cleaner with our new MessageBus than it would be with Prism’s Event Aggregator. But what about testing the handling of these messages that get published? Well, the best way that I could come up with for that is the same way I did it with Prism’s Event Aggregator, which is to use the concrete implementation of the MessageBus and actually subscribe and publish with it in the unit test. Oh well, at least we were able to clean up some of the testing aspect!
Conclusion
So, here we are at the end. My goal here was to show that it isn’t all that difficult to implement your own message bus system or, at the very least, show the general concept behind how it works. I hope that this has been useful. If you want play with the code that we have written, as well as the tests that I wrote to beat on it a little, you can download the source here:
Nice job on this and the MVVM sample application. Thank you so much for sharing. I’ve been trying to apply MVVM to a silverlight project I’m working on and your example gives me some great ideas.
I agree with Jason, nice job on the article and the sample. Really loved it and have been implementing it.
Doesn’t the MessageBus need to implement the Singleton pattern so that every class that subscribes or publishes can be assured of using the same MessageBus?
@InterestedParty Yep. You definitely need all the messages published through the same instance. Whether you make it a singleton or have your inversion of control container keep it as a singleton (as I like to do), the important thing is that the same instance is used everywhere.
..seems the diamond brackets disappear in the blog formatting. ? Though great article to show this in a simple way so my novice brain could finally grasp it. 🙂
Brent, is this thread safe? What if something subscribes while the foreach is executed within the publish?
Thanks,
Shaun
Shaun, this implementation is a simple one and is not thread-safe. If that is a requirement, you could easily make it thread-safe by using locks.
Hi, nice implementation.
Found minor issue
Unsubscribing in handler leads to exception(changing collection during iteration)
Solutions:
1. Adding SubscribeOnce method (which is usefull anyway)
2. In Publish method iterating over copied collection.
This straightforward approach has one important caveat. When you register a handler into the message bus, you’re keeping a reference to the subscriber. This means that if the subscriber fails to unsubscribe when it’s done receiving messages, the instance will not be garbage collected until the message bus is.
If your bus is a singleton that exists for the duration of the application, this might cause your subscribers not to get collected in a timely fashion. This is only tolerable if the number of subscribers is small and bounded.
Even if you are diligent to unsubscribe whenever your objects are disposed, you’ve defeated the point of automatic memory management.
A solution that doesn’t have this caveat will need to use WeakReferences to avoid holding on to the subscribers.
@Ilia, you are absolutely right. I have addressed that with the message bus implementation for MVVM Fabric: https://github.com/brentedwards/MvvmFabric/tree/master/MvvmFabric/Messaging
Hmm, I’m not entirely sure you have. Even though you’re wrapping action.Target with a WeakReference, you’re also keeping a strong reference to the action itself in the ActionReference.Target property. This defeats the point, as you’ll always have that reference keeping it alive and uncollectable.