I was recently working on a project that had a need for some sort of centralized messaging system. After doing some research into the matter, I decided to move forward with the Event Aggregator from the Prism team. Prism’s Event Aggregator worked fine, but there were a few things about it that I found cumbersome. While preparing for a presentation I gave recently, called “WPF with MVVM: From the Trenches,” I began to think about what it would take to create my own event aggregator that would smooth over some of the bumps.
First up, lets talk about the name, “Event Aggregator.” During the course of our project, we used the event aggregator to publish data (or messages) in addition to events. During a discussion about the event aggregator, Jason Bock pointed out that it is really more of a “Message Bus” than an “Event Aggregator.” I agree. So, for the remainder of this post, I will refer to it as a “Message Bus.”
The next bump that I wanted to smooth over was the fact that, with Prism, we had to create both an Event class which derived from CompositePresentationEvent
A Starting Point
So, lets take a look at the method signatures that I had in mind. A Message Bus should provide functionality to Subscribe to a message, Unsubscribe from a message and Publish a message.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// Subscribe to messages of type SearchMessage, providing // HandleSearch as the callback method. messsageBus.Subscribe(HandleSearch); // Publish a message of type SearchMessage. var search = new SearchMessage(keywords); messageBus.Publish(search); // OR publish a message without explicitly defining // the type. messageBus.Publish(search); // Unsubscribe from messages of type SearchMessage. messageBus.Unsubscribe(HandleSearch); |
This looks straight-forward enough. Whatever implementation I come up with, it should leverage generics to make sure all interested parties are notified of messages they care about. The method signature I would like to see for the callbacks would be something like this:
1 2 3 4 |
public void HandleSearch(SearchMessage message) { // Do something with the message. } |
Since I am a big fan of dependency injection and inversion of control, I wanted to define an interface, called IMessageBus, to be used instead of a concrete implementation. Keeping in mind how I wanted to use this bad boy, here is the interface I came up with:
1 2 3 4 5 6 7 |
public interface IMessageBus { void Subscribe(Action handler); void Unsubscribe(Action handler); void Publish(TMessage message); void Publish(Object message); } |
Let me break this down a bit… TMessage is the Type of the message to be sent. Action
So far, so good. We now have a contract in place for our message bus. Next up is implementation.
Making It Happen
First, we need a data structure that will have some sort of key/value association. The key will be the Type of TMessage and the value will be the collection of Action
1 2 3 4 5 |
public sealed class MessageBus : IMessageBus { private Dictionary> _Subscribers = new Dictionary>(); } |
Now that we have a data structure in place to keep track of subscribers, time to implement the Subscribe method. Now, when a new subscriber is added, we need to first find out if there is already a subscriber list for that message type. If there is, use it. Otherwise, create a new one.
1 2 3 4 5 6 7 8 9 10 11 12 |
public void Subscribe(Action handler) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; handlers.Add(handler); } else { var handlers = new List(); handlers.Add(handler); _Subscribers[typeof(TMessage)] = handlers; } } |
Next up is Unsubscribe. When a subscriber makes a call to Unsubscribe, we need to make sure they are actually a subscriber in the first place and remove them if they are. Also, if they are the last subscriber, we can go ahead and remove the subscriber list for that message type.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void Unsubscribe(Action handler) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; handlers.Remove(handler); if (handlers.Count == 0) { _Subscribers.Remove(typeof(TMessage)); } } } |
Almost there. Now we just need to be able to Publish. As you probably noticed above, I thought it would be nice to both explicitly define what type of message to publish and let the message type be determined dynamically. For either case, we need to get the list of subscribers for the type of message being published and invoke each of their callback methods with the message as a parameter. First up is the explicitly define flavor:
1 2 3 4 5 6 7 8 9 10 11 |
public void Publish(TMessage message) { if (_Subscribers.ContainsKey(typeof(TMessage))) { var handlers = _Subscribers[typeof(TMessage)]; foreach (Action handler in handlers) { handler.Invoke(message); } } } |
Pretty straight-forward here too. If there are subscribers for that message type, iterate over them (casting them in the process) and invoke them.
The dynamic version of Publish is a little more involved, only because we have to use reflection in place of generics due to the dynamic aspect:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void Publish(Object message) { var messageType = message.GetType(); if (_Subscribers.ContainsKey(messageType)) { var handlers = _Subscribers[messageType]; foreach (var handler in handlers) { var actionType = handler.GetType(); var invoke = actionType.GetMethod("Invoke", new Type[] { messageType }); invoke.Invoke(handler, new Object[] { message }); } } } |
This one is essentially doing the same thing as the other Publish method. If there are subscribers for that message type, iterate over them (finding their Invoke method in the process) and invoke them. Don’t be confused by the two different Invokes that you see… The first Invoke is the name of the method on the Action type and the second Invoke is what reflection uses to execute a method.
There we have it! A simple message bus that is easy to use. But how about testing with it? Since testing was one of the bumps I wanted to smooth over, how did we do? Let’s find out!
Testing Our Shiny New Message Bus
I use RhinoMocks, so here is how I would verify that some code is publishing the correct message:
1 2 3 4 |
SearchMessage searchMessage = null; var messageBus = MockRepository.GenerateStub(); messageBus.Stub(bus => bus.Publish(Arg.Is.Anything)) .WhenCalled(inv => searchMessage = inv.Arguments[0] as SearchMessage); |
I generate a stub for IMessageBus which will set my local searchMessage variable to the actual SearchMessage that is published. I then could use searchMessage to verify that the message that is published has the correct data.
As an alternative, if all I wanted to do was verify that Publish was called on my Message Bus, I could do the following:
1 2 3 |
SearchMessage searchMessage = null; var messageBus = MockRepository.GenerateMock(); messageBus.Expect(bus => bus.Publish(Arg.Is.Anything)); |
The key changes here are that I am using a Mock instead of a Stub and I am setting up an expectation. That way I can verify that Publish was called without caring about the details:
1 |
messageBus.VerifyAllExpectations(); |
Testing for messages being published is now much cleaner with our new MessageBus than it would be with Prism’s Event Aggregator. But what about testing the handling of these messages that get published? Well, the best way that I could come up with for that is the same way I did it with Prism’s Event Aggregator, which is to use the concrete implementation of the MessageBus and actually subscribe and publish with it in the unit test. Oh well, at least we were able to clean up some of the testing aspect!
Conclusion
So, here we are at the end. My goal here was to show that it isn’t all that difficult to implement your own message bus system or, at the very least, show the general concept behind how it works. I hope that this has been useful. If you want play with the code that we have written, as well as the tests that I wrote to beat on it a little, you can download the source here: