MassTransit 3 Update: A Simple Publish/Subscribe Example

With the announcement of MassTransit 3, we learned that there were many changes to the API that were coming. The changes are all welcome ones, making MassTransit 3 simpler to work with and now completely asynchronous. What follows is this blog’s inaugural post, but updated to work with the current pre-release version of MassTransit 3.

Whenever there is an MT3 specific update, I will call it out like this.

When I first sat down to learn how to use MassTransit, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker.

Setting Up Your Environment

The first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others, but I find that RabbitMQ is really the way to go. That’s especially true when using the publish/subscribe pattern. The reason for this is that RabbitMQ has a complete routing framework built-in and MassTransit will leverage this when persisting your subscriptions. When creating a cluster of RabbitMQ servers for availability, this routing information is replicated to all the nodes.

MT3 has dropped support for MSMQ (read about that) but has officially adopted Azure Service Bus as an in-the-box supported transport.

In this article, you’re going to run RabbitMQ on your local Windows development box. Both our publisher and subscriber will connect to the same RabbitMQ instance. In a future post, I’ll detail how to set up multiple RabbitMQ instances in a cluster.

Installing RabbitMQ

RabbitMQ requires the Erlang runtime, so that’s the first thing you need to download and install. Head over to Erlang.org’s download page and get the latest binary release for Windows (it’s likely you’ll want the 64-bit version). It’s a simple setup wizard, so you’ll have Erlang installed on your machine in short order.

Next, download the latest version of RabbitMQ for Windows. Again, it’s an easy setup wizard that you can quickly fly through. Just accept the defaults.

Enabling the RabbitMQ Web Management Interface

One RabbitMQ feature that I found extremely useful (but which isn’t enabled by default) is the web-based management interface. With this, you can see the exchanges and queues that are set up by MassTransit in RabbitMQ. To enable this, find the “RabbitMQ Command Prompt (sbin dir)” item that the RabbitMQ installer added to your Start menu and launch it. From the command line, run the following command:

> rabbitmq-plugins enable rabbitmq_management

It will confirm that the plugin and its dependencies have been enabled and instruct you to restart RabbitMQ. When installed on Windows, RabbitMQ runs as a Windows service. You can use the Services MMC snap-in to restart it or just run the following command:

> net service stop RabbitMQ
...
> net service start RabbitMQ

Now go to http://localhost:15672/ to open the management console. Default credentials to login are guest/guest (you can change the credentials from the Admin tab).

There’s not much to see yet, but we’ll set the stage. Go to the Exchanges tab. You’ll see the following default RabbitMQ exchanges:

image

An exchange is something you can send messages to. It cannot hold messages. It’s merely a set of routing instructions that tell RabbitMQ where to deliver the message. We’ll come back here in a little while.

Now click on the Queues tab. Nothing here yet. Queues can actually hold messages and are where applications can actually pick up messages.

So, to use a real world analogy, an Exchange is like the local Post Office, and a Queue is like your mailbox. The only thing that an Exchange can do that most traditional Post Offices don’t do is actually make multiple copies of a message to be delivered to multiple mailboxes.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubExample

To view the code for MT3, select the “mt3” branch in this GitHub repository.

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

The original article used a separate Configuration class to handle the common configuration tasks for both the publisher and subscriber. MT3’s configuration code is much simpler and, for clarity, I’ve moved the appropriate code into the publisher and subscriber.

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for some text and then publishes that text as part of a SomethingHappened message. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts project.

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for text, and publish that text onto the bus. Real quick first, however, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.RabbitMq package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

To get the pre-release version of MassTransit 3 you need to add the -Pre flag to the Install-Package commands in the Package Manager Console:

Install-Package -Pre MassTransit.RabbitMq
Install-Package -Pre Masstransit.Log4Net
using System;
using Contracts;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x => 
        x.Host(new Uri("rabbitmq://localhost/"), h => { }));
      var busHandle = bus.Start();
      var text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage()
        {
          What = text, When = DateTime.Now
        };
        bus.Publish<SomethingHappened>(message);
      }

      busHandle.Stop().Wait();
    }
  }
}
In MT3, most operations are asynchronous, including the Publish() method. However, there is no need to await the call to publish here.

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like.

MT3 now requires an explicit call to start the bus, returning a handle that you can use to later stop the bus. Notice the Wait() method chained to the Stop() method. This is because the Stop() method is asynchronous. If you were calling Stop() inside an async method, you could await it. However, since this is a simple Console app, we are just blocking the thread until the shutdown is complete.

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

What’s Going on in RabbitMQ So Far?

If you go back into the RabbitMQ web interface and jump over to the Exchanges tab, you’ll see we have a couple new arrivals.

image

MT3 doesn’t require creating a queue if you aren’t hosting an endpoint. So, for our publisher, the MtPubSubExample_TestPublisher queue will not be created as it was in the original post.

Contracts:SomethingHappened is a new exchange created for the SomethingHappened message type. When we published this message, MassTransit automatically created this exchange. Click on it and scroll down to the Bindings section, and you’ll see there are no bindings yet:

image

That’s because nobody has subscribed to SomethingHappened messages yet. They go to the exchange and then die because there’s no queue to route them to.

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit.RabbitMq and MassTransit.Log4Net NuGet packages.

Don’t forget the -Pre switch to get the pre-release version of MT3.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using System;
using System.Threading.Tasks;
using Contracts;
using MassTransit;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : IConsumer<SomethingHappened>
  {
    public Task Consume(ConsumeContext<SomethingHappened> context)
    {
      Console.Write("TXT: " + context.Message.What);
      Console.Write("  SENT: " + context.Message.When);
      Console.Write("  PROCESSED: " + DateTime.Now);
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
      return Task.FromResult(0);
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

The Consume method is meant to be asynchronous, so it returns a Task. In this example, we aren’t making any other asynchronous calls, so we just use the Task.FromResult() helper method to return a Task with a zero result. If you were doing something asynchronous in the Consume method you could use async/await:

public async Task Consume(ConsumeContext<SomethingHappened> context)
{
  await SomeAsynchronousMethod(context.Message);
}

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using System;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x =>
      {
        var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });

        x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
          e.Consumer<SomethingHappenedConsumer>());
      });
      var busHandle = bus.Start();
      Console.ReadKey();
      busHandle.Stop().Wait();
    }
  }
}
Notice we are now passing a queue name to the new ReceiveEndpoint MT3 method. Make sure not to share this queue name with other applications.

Now right-click on the MtPubSubExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run.

Type some messages into the publisher. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in RabbitMQ Now?

Go back into the RabbitMQ web interface and go back to the Exchanges tab. You’ll see a new exchange called MtPubSubExample_TestSubscriber, but first click on the Contracts:SomethingHappened exchange and scroll down to the Bindings section. You’ll see we now have a binding.

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up this binding for us. Click on the MtPubSubExample_TestSubscriber here, and you’ll see you’re taken to the setup page for an exchange called MtPubSubExample_TestSubscriber. Scroll down to Bindings, and you’ll see we’re bound to a queue named the same as the exchange (in the binding diagrams, exchanges show up as rectangles with rounded corners, whereas queues have straight corners).

image

The web interface is great in how it shows the predecessor in addition to the successor in the path. Click the MtPubSubExample_TestSubscriber queue here, and you’ll be taken to the queue setup page for that queue. If you haven’t fired up the TestSubscriber app since we published those last three messages, you should see that there are three messages in the queue:

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 6-minute lag (the 6 minutes we were poking around in RabbitMQ before starting up the subscriber again).

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit 3. I will work on updating more posts like this one.

Until then…

  • Pingback: A Simple MassTransit Publish/Subscribe Example | Loosely Coupled Labs()

  • Michael Rose

    Hi David,
    Great article. I followed the previous one as well for the previous Mass Transit version. I’m struggling slightly with one part of this – when I stop the consumer my messages are automatically moved to a queuename_error queue whenever they are published. Is there any way to prevent “undelivered” messages being moved to the error queue and simply being queued in the main queue until the subscriber/consumer starts up again?

  • Nick Barrett

    Hi David,
    Great post. Thanks for taking the time to update your original post to v3. I used this post as my first attempt at MassTransit. It worked really well. I hope you are able to find time to update some of your other posts to v3 as well.

    Thanks again =)

  • Jovan Racic

    The received messages after subscriber restart are not in correct order? Probably it’s multi-threading issue because queue is FIFO. Could You give me some explanation about.Thank’s.

  • Ananth Eswar

    How do I create a temporary queue with MassTrasit 3.0. I know how to do it with 2.0 but the same is not working for 3,0. Thanks for the help!

  • miguel

    Hi, tanks for the post. I am looking for the ServiceBusConfigurator in masstransit 3. thanks very much.

    your masstransit 2 version –> public static IServiceBus CreateBus(string queueName, Action moreInitialization)

    • miguel

      updated to mass transit 3—> public static IBusControl CreateBus(Action registrationAction = null)

      publisher —> Bus = BusInitializer.CreateBus((x, host) => { });
      consumer—>BusInitializer.CreateBus((cfg, host) =>
      {
      cfg.ReceiveEndpoint(host, “StringQueUeName”,e =>
      {
      e.Consumer();
      });

      });

  • miguel

    Hi dprothero,
    I need your help, I am studying from your real world example. I updated this version to mass transit 3.
    loosely coupled example not run with separate solution.
    I separated the console project and run both (solution1– console and solution2–web) but the console not found new messages.

    can you guide to me.

    thanks
    Miguel