Friday, July 24, 2009

How MassTransit Publish and Subscribe works

clip_image002

This is a follow-on from my last post, A First Look at MassTransit. Here’s my take on how publish and subscribe works. It’s based on a very brief scan of the MT code, so there could well be misunderstandings and missing details.

The core component of MassTransit is the ServiceBus, it’s the primary API that services use to subscribe to and publish messages. The ServiceBus has an inbound and outbound pipeline. When publish is called, a message gets sent down the pipeline until a component that cares that message type dispatches it to an endpoint. Similarly, when a message is received it is passed down the input pipeline giving each component a chance to process it.

Understanding how the input and output pipelines are populated is the key to understanding how MassTransit works. It’s instructive to get a printout of your pipelines by inspecting them with the PipelineViewer. I’ve created a little class to help with this:

 

using System.IO;
using MassTransit.Pipeline;
using MassTransit.Pipeline.Inspectors;

namespace MassTransit.Play.Helpers
{
    public class PipelineWriter : IPipelineWriter
    {
        private readonly TextWriter writer;
        private readonly IServiceBus bus;

        public PipelineWriter(IServiceBus bus, TextWriter writer)
        {
            this.bus = bus;
            this.writer = writer;
        }

        public void Write()
        {
            writer.WriteLine("InboundPipeline:\r\n");
            WritePipeline(bus.InboundPipeline);
            writer.WriteLine("OutboundPipeline:\r\n");
            WritePipeline(bus.OutboundPipeline);
        }

        private void WritePipeline(IPipelineSink<object> pipeline)
        {
            var inspector = new PipelineViewer();
            pipeline.Inspect(inspector);
            writer.WriteLine(inspector.Text);
        }
    }
}

Let’s look at the sequence of events when RuntimeServices.exe, a subscriber service and a publishing service start up.

When RuntimeServices starts up the SubscriptionService creates a list of ‘SubscriptionClients’. Initially this is empty.

clip_image003

When our subscriber comes on line, it sends an AddSubscriptionClient message to the subscription service. The subscription service then adds our subscriber to its list of subscription clients.

clip_image004

Next our publisher comes on line. It also sends an AddSubscriptionClient message to the subscription service. It too gets added to the subscription clients list.

clip_image005

When the subscriber subscribes to a particular message type, ServiceBus sends an AddSubscription message to SubscriptionService which in turn scans its list of subscription clients and sends the AddSubscription message to each one.

SubscriptionService also adds the subscription to its list of subscriptions so that when any other services come on line it can update them with the list.

The publisher receives the AddSubscription message that was broadcast to all the subscription clients and adds the subscriber endpoint to its outbound pipeline. Note that the Subscriber also receives it’s own AddSubscription message back and adds itself to its outbound pipeline (not shown in the diagram).

clip_image006

The subscriber also adds a component to its inbound pipeline to listen for messages of the subscribed type. I haven’t show this in the diagram either.

When the publisher publishes a message, it sends the message down its outbound pipeline until it is intercepted by the subscriber’s endpoint and dispatched to the subscriber’s queue. The subscription service is not involved at this point.

clip_image007

I hope this is useful if you’re trying to get to grips with MassTransit. Thanks to Dru for clarifying some points for me.

Wednesday, July 22, 2009

A First Look at MassTransit

Get the code for this post here:

http://static.mikehadlow.com/MassTransit.Play.zip

I’ve recently been trying out MassTransit as a possible replacement for our current JBOWS architecture. MassTransit is a “lean service bus implementation for building loosely coupled applications using the .NET framework.” It’s a simple service bus based around the idea of asynchronous publish and subscribe. It’s written by Dru Sellers and Chris Patterson, both good guys who have been very quick to respond on both twitter and the MassTransit google group.

To start with I wanted to try out the simplest thing possible, a single publisher and a single subscriber. I wanted to be able to publish a message and have the subscriber pick it up.

The first thing to do is get the latest source from the MassTransit google code repository and build it.

The core Mass Transit infrastructure is provided by a service called MassTransit.RuntimeServices.exe this is a windows service built on Top Shelf (be careful what you click on at work when Googling for this J). I plan to blog about Top Shelf in the future, but in short it’s a very nice fluent API for building windows services. One of the nicest things about it is that you can run the service as a console app during development but easily install it as a windows service in production.

Before running RuntimeServices you have to provide it with a SQL database. I wanted to use my local SQL Server instance so I opened up the MassTransit.RuntimeServices.exe.config file, commented out the SQL CE NHibernate configuration and uncommented the SQL Server stuff. I also changed the connection string to point to a test database I’d created. I then ran the SetupSQLServer.sql script (under the PreBuiltServices\MassTransit.RuntimeServices folder) into my database to create the required tables.

So let’s start up RuntimeServices by double clicking the MassTransit.RuntimeServices.exe in the bin directory.

clip_image002

A whole load of debug messages are spat out. Also we can see that some new private MSMQs have been automatically created:

clip_image004

We can also launch MassTransit.SystemView.exe (also in the bin folder) which gives us a nice GUI view of our services:

clip_image006

I think it shows a list of subscriber queues on the left. If you expand the nodes you can see the types that are subscribed to. I guess the reason that the mt_subscriptions and mt_health_control queues are not shown is that they don’t have any subscriptions associated with them.

Now let’s create the simplest possible subscriber and publisher. First I’ll create a message structure. I want my message class to be shared by my publisher and subscriber, so I’ll create it in its own assembly and then reference that assembly in the publisher and subscriber projects. My message is very simple, just a regular POCO:

namespace MassTransit.Play.Messages
{
    public class NewCustomerMessage
    {
        public string Name { get; set; }
    }
}

Now for the publisher. MassTransit uses the Castle Windsor IoC container by default and log4net so we need to add the following references:

clip_image008

The MassTransit API is configured as a Windsor facility. I’m a big fan of Windsor, so this all makes sense to me. Here’s the Windsor config file:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <facilities>
    <facility id="masstransit">
      <bus id="main" endpoint="msmq://localhost/mt_mike_publisher">
        <subscriptionService endpoint="msmq://localhost/mt_subscriptions" />
        <managementService heartbeatInterval="3" />
      </bus>
      <transports>
        <transport>MassTransit.Transports.Msmq.MsmqEndpoint, MassTransit.Transports.Msmq</transport>
      </transports>
    </facility>
  </facilities>
</configuration>

As you can see we reference the ‘masstransit’ facility and configure it with two main nodes, bus and transports. Transports is pretty straightforward, we simply specify the MsmqEndpoint. The bus node specifies an id and an endpoint. As far as I understand it, if your service only publishes, then the queue is never used. But MassTransit throws, if you don’t specify it. I’m probably missing something here, any clarification will be warmly received J

Continuing with the configuration; under bus are two child nodes, subscriptionService and management service. The subscriptionService endpoint specifies the location of the subscription queue which RuntimeServices uses to keep track of subscriptions, this should be the location of the queue created when RuntimeServices starts up for the first time, on my machine it was mt_subscriptions. I’m unsure what the managementService specifies exactly, but I think it’s the subsystem that allows RuntimeServices to monitor the health of the service. I’m assuming that the heartbeatInterval is the number of seconds between each notification.

Next, let’s code our publisher. I’m going to create a simple console application, I would host a service with Top Shelf in production, but right now I want to do the simplest thing possible, so I’m going to keep any other infrastructure out of the equation for the time being. Here’s the publisher code:

using System;
using MassTransit.Play.Messages;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;

namespace MassTransit.Play.Publisher
{
    public class Program
    {
        static void Main()
        {
            Console.WriteLine("Starting Publisher");

            MsmqEndpointConfigurator.Defaults(config =>
            {
                config.CreateMissingQueues = true;
            });

            var container = new DefaultMassTransitContainer("windsor.xml");
            var bus = container.Resolve<IServiceBus>();

            string name;
            while((name = GetName()) != "q")
            {
                var message = new NewCustomerMessage {Name = name};
                bus.Publish(message);
                
                Console.WriteLine("Published NewCustomerMessage with name {0}", message.Name);
            }

            Console.WriteLine("Stopping Publisher");
            container.Release(bus);
            container.Dispose();
        }

        private static string GetName()
        {
            Console.WriteLine("Enter a name to publish (q to quit)");
            return Console.ReadLine();
        }
    }
}

The first statement instructs the MassTransit MsmqEndpointConfigurator to create any missing queues so that we don’t have to manually create the mt_mike_publisher queue. The pattern used here is very common in the MassTransit code, where a static method takes an Action<TConfig> of some configuration class.

The next line creates the DefaultMassTransitContainer. This is a WindsorContainer with the MassTransitFacility registered and all the components needed for MassTransit to run. For us the most important service is the IServiceBus which encapsulates most of the client API. The next line gets the bus from the container.

We then set up a loop getting input from the user, creating a NewCustomerMessage and calling bus.Publish(message). It really is as simple as that.

Let’s look at the subscriber next. The references and Windsor.xml config are almost identical to the publisher, the only thing that’s different is that the bus endpoint should point to a different msmq; mt_mike_subscriber in my case.

In order to subscribe to a message type we first have to create a consumer. The consumer ‘consumes’ the message when it arrives at the bus.

using System;
using MassTransit.Internal;
using MassTransit.Play.Messages;

namespace MassTransit.Play.Subscriber.Consumers
{
    public class NewCustomerMessageConsumer : Consumes<NewCustomerMessage>.All, IBusService
    {
        private IServiceBus bus;
        private UnsubscribeAction unsubscribeAction;

        public void Consume(NewCustomerMessage message)
        {
            Console.WriteLine(string.Format("Received a NewCustomerMessage with Name : '{0}'", message.Name));
        }

        public void Dispose()
        {
            bus.Dispose();
        }

        public void Start(IServiceBus bus)
        {
            this.bus = bus;
            unsubscribeAction = bus.Subscribe(this);
        }

        public void Stop()
        {
            unsubscribeAction();
        }
    }
}

You create a consumer by implementing the Consumes<TMessage>.All interface and, as Ayende says, it’s a very clever, fluent way of specifying both what needs to be consumed and how it should be consumed. The ‘All’ interface has a single method that needs to be implemented, Consume, and we simply write to the console that the message has arrived. Our consumer also implements IBusService, that gives us places to start and stop the service bus and do the actual subscription.

Here’s the Main method of the subscription console application:

using System;
using Castle.MicroKernel.Registration;
using MassTransit.Play.Subscriber.Consumers;
using MassTransit.Transports.Msmq;
using MassTransit.WindsorIntegration;

namespace MassTransit.Play.Subscriber
{
    class Program
    {
        static void Main()
        {
            Console.WriteLine("Starting Subscriber, hit return to quit");

            MsmqEndpointConfigurator.Defaults(config =>
                {
                    config.CreateMissingQueues = true;
                });

            var container = new DefaultMassTransitContainer("windsor.xml")
                .Register(
                    Component.For<NewCustomerMessageConsumer>().LifeStyle.Transient
                );

            var bus = container.Resolve<IServiceBus>();
            var consumer = container.Resolve<NewCustomerMessageConsumer>();
            consumer.Start(bus);

            Console.ReadLine();
            Console.WriteLine("Stopping Subscriber");
            consumer.Stop();
            container.Dispose();
        }
    }
}

Once again we specify that we want MassTransit to create our queues automatically and create a DefaultMassTransitContainer. The only addition we have to make for our subscriber is to register our consumer so that the bus can resolve it from the container.

Next we simply grab the bus and our consumer from the container and call start on the consumer passing it the bus. A nice little bit of double dispatch :)

Now we can start up our Publisher and Subscriber and send messages between them.

clip_image010

clip_image012

Wow it works! I got a lot of childish pleasure from starting up multiple instances of my publisher and subscriber on multiple machines and watching the messages go back and forth. But then I’m a simple soul.

Looking at the MSMQ snap-in, we can see that the MSMQ queues have been automatically created for mt_mike_publisher and mt_mike_subscriber.

clip_image014

MassTransit System View also shows that the NewCustomerMessage is subscribed to on mt_mike_subscriber. It also shows the current status of our services. You can see that I have been turning them on and off through the morning.

clip_image016

Overall I’m impressed with MassTransit. Like most new open source projects the documentation is non-existent, but I was able to get started by looking at the Starbucks sample and reading Rhys C’s excellent blog posts. Kudos to Chris Patterson (AKA PhatBoyG) and Dru Sellers for putting such a cool project together.