MassTransit on Microsoft Azure

In my first post on this blog, I showed you how to get a simple Publish/Subscribe example working very quickly using MassTransit and RabbitMQ. I’d always planned to show you how to move that example to the cloud, with Microsoft Azure being a compelling competitor in this space, especially for the .NET developer (though not exclusively for .NET). Let’s take a look at some of the cool things we can take advantage of in the cloud, including auto-scaling our subscribers to match the load of messages being published.

Setting Up Your Environment

Choosing the Right Messaging Platform

As discussed in my first post, the first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others. I really like RabbitMQ for this role for a variety of reasons, but since we’ve already looked at RabbitMQ in a number of previous posts, I thought it would make sense to take a look at Azure Service Bus. Since we’re going to be deploying to Azure, it makes a lot of sense to consider this as an option.

Thankfully, there is a MassTransit transport for Azure Service Bus (find the source on GitHub here). While we could have setup a RabbitMQ server or two using Azure Virtual Machines (either Windows or Linux), it is easier to get going on Azure by using the Azure Service Bus. Also, as we’ll find out later, when you use the Azure Service Bus, you can tie your cloud service scaling to the size of your message queue.

Setting Up Azure Service Bus

If you don’t have a Microsoft Azure account yet, head over to to azure.microsoft.com and get signed up for a free trial. (Also, MSDN subscribers have free Azure services as part of their subscription. If you have an MSDN subscription, you only need to activate your Azure account.)

Next, in the Azure Portal, click “Service Bus” button in the left-hand list of areas. Then, click the “Create a New Namespace” button:

image

In the dialog, pick a name for your new namespace (in this example, I will use the name “Loosely,” but you will have to choose a unique name). Choose a region and be sure to leave Type as “Messaging:”

image

When you click the OK button, Azure will create and activate your new Azure Service Bus namespace. There’s nothing else to setup at this time as MassTransit will automatically create the queues and topics that it will require for your publish/subscribe code.

Install the Azure SDK

In order to be able to create cloud services easily in Visual Studio, you should install the Azure SDK. This can be downloaded from the Azure Downloads page. Download the VS 2013 or 2012 install under the .NET heading, depending on your version of Visual Studio.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubAzureExample

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubAzureExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

When you’re writing a new project from scratch, you go through many permutations and refactor as you go. Initially, this example had the service bus setup code duplicated in both the publisher and subscriber projects. This is fine, particularly if you really aren’t in a position to share much code between the two sides (except the contracts of course). However, in my case, I preferred to use a common class which I’ll call “AzureBusInitializer” to set up my instance to MassTransit and get it configured.

So, add another class library to the MtPubSubAzureExample solution and name it “Configuration”. Before creating our class, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.AzureServiceBus package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

Now, create a new class called “AzureBusInitializer.”

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using MassTransit.Transports.AzureServiceBus;
using System;
using System.Configuration;

namespace Configuration
{
  public class AzureBusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(sbc =>
      {
        var azureNameSpace = GetConfigValue("azure-namespace", "YourNamespace");
        var queueUri = "azure-sb://" + azureNameSpace + "/MtPubSubAzureExample_" + queueName;

        sbc.ReceiveFrom(queueUri);
        SetupAzureServiceBus(sbc, azureNameSpace);

        moreInitialization(sbc);
      });

      return bus;
    }

    private static void SetupAzureServiceBus(ServiceBusConfigurator sbc, string azureNameSpace)
    {
      sbc.UseAzureServiceBus(a => a.ConfigureNamespace(azureNameSpace, h =>
      {
        h.SetKeyName(GetConfigValue("azure-keyname", "RootManageSharedAccessKey"));
        h.SetKey(GetConfigValue("azure-key", ""));
      }));
      sbc.UseAzureServiceBusRouting();
    }

    private static string GetConfigValue(string key, string defaultValue)
    {
      string value = ConfigurationManager.AppSettings[key];
      return string.IsNullOrEmpty(value) ? defaultValue : value;
    }

  }
}

We’re creating a static method called “CreateBus,” which both our publisher and subscriber can use to set up an instance of a bus, using the Log4NetLogger, and connecting to our Azure Service Bus namespace. Because there may be additional custom setup that the publisher or subscriber may want to do, we allow passing in a lambda expression to perform the additional setup.

I’ve highlighted a couple lines that will require App.config settings in our publisher and subscriber apps. Or, you can hard-code the values here. You should know your namespace name (as you just created it). To find your access key, you can click on the “Connection Information” button in the lower toolbar in the Azure Portal:

image

The key will be the “SharedAccessKey” parameter in the connection string. For example:

Endpoint=sb://loosely.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=key-goes-here

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for a number of messages they’d like to publish and then publish that many SomethingHappened messages. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts (and add one to Configuration too, while you’re at it).

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for the number of messages they want to publish, and publish that many messages onto the bus. Real quick first, however, add a NuGet reference to the MassTransit package.

using Configuration;
using Contracts;
using System;
using System.Threading.Tasks;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = AzureBusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter number of messages to generate (quit to exit): ");
        text = Console.ReadLine();

        int numMessages = 0;
        if (int.TryParse(text, out numMessages) && numMessages > 0)
        {
          Parallel.For(0, numMessages, i =>
          {
            var message = new SomethingHappenedMessage() { What = "message " + i.ToString(), When = DateTime.Now };
            bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
          });
        }
        else if(text != "quit")
        {
          Console.WriteLine("\"" + text + "\" is not a number.");
        }
      }

      bus.Dispose();
    }
  }
}

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like. Be sure to update the app.config for TestPublisher to include your Azure key and namespace:

  <appSettings>
    <add key="azure-namespace" value="loosely" />
    <add key="azure-key" value="YourKeyHere" />
  </appSettings>

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit NuGet package.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using Contracts;
using MassTransit;
using System;
using System.Threading;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.WriteLine("TXT: " + message.Message.What +
                        "  SENT: " + message.Message.When.ToString() +
                        "  PROCESSED: " + DateTime.Now.ToString() + 
                        " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using Configuration;
using MassTransit;
using System;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = AzureBusInitializer.CreateBus("TestSubscriber", sbc =>
      {
        sbc.SetConcurrentConsumerLimit(64);
        sbc.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }
  }
}

Now right-click on the MtPubSubAzureExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run. Again, be sure to update the app.config for TestSubscriber to include your Azure key and namespace like you did for TestPublisher.

Publish some messages from the publisher window. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in Azure Service Bus?

Go back into the Azure Portal, select your namespace, and click on the the Topics tab. You’ll see a new topic called contracts..somethinghappened. In Azure Service Bus, queues can be subscribers to a topic. You’ll see the topic has a “1” in Subscriptions Count:

image

Click on the “contracts..somethinghappened” topic and then click on the “Subscriptions” tab. You will see that there is a single queue subscribed to the topic named “MtPubSubAzureExample_TestSubscriberXXXXXX:”

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up a topic subscription for us!

Now, click on the “Queues” tab and you will see that the queue “mtpubsubazureexample_testsubscriber” has 3 messages (the “Queue Length” column):

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 7-minute lag (the 7 minutes we were poking around in Azure before starting up the subscriber again).

Run the Subscriber In the Cloud

Now that we have a simple publish/subscribe example working, let’s have our subscriber actually run in the cloud. There’s a number of ways we could do this, but probably the most “cloudy” way to do it would be to create an Azure Cloud Service. These are extremely easy to create in Visual Studio 2013 (assuming you’ve installed the Azure SDK as instructed at the beginning of this post).

Add a new project to your MtPubSubAzureExample solution and choose “Azure Cloud Service” from the list of project templates. Name the new project “TestCloudSubscriber”. A cloud setup dialog will appear next. Select “Worker Role” only as we will be creating a background service that processes messages using MassTransit. Rename the “WorkerRole1” to “TestCloudSubscriberWorker.”

image

This will add the TestCloudSubscriber and TestCloudSubscriberWorker projects and drop you into the WorkerRole.cs file in TestCloudSubscriberWorker. As always, be sure to update the app.config for TestCloudSubscriberWorker to include your Azure key and namespace. Add a project reference to the Configuration and Contracts projects and add the MassTransit NuGet package to the new worker project.

Copy the SomethingHappenedConsumer.cs file from TestSubscriber to TestCloudSubscriberWorker. Edit the namespace of SomethingHappenedConsumer.cs to be “TestCloudSubscriberWorker” and change the Console.WriteLine to a call to Trace.TraceInformation:

using Contracts;
using MassTransit;
using System;
using System.Diagnostics;
using System.Threading;

namespace TestCloudSubscriberWorker
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Trace.TraceInformation("TXT: " + message.Message.What +
                             "  SENT: " + message.Message.When.ToString() +
                             "  PROCESSED: " + DateTime.Now.ToString() + 
                             " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

Now, edit the top of WorkerRole.cs to add-in a couple usings and a private field for our service bus:

using Configuration;
using MassTransit;
using Microsoft.WindowsAzure.ServiceRuntime;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace TestCloudSubscriberWorker
{
  public class WorkerRole : RoleEntryPoint
  {
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
    private IServiceBus _bus;

Then, edit the OnStart() method to initialize the bus and our subscriptions:

public override bool OnStart()
{
  // Set the maximum number of concurrent connections
  ServicePointManager.DefaultConnectionLimit = 12;

  // For information on handling configuration changes
  // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

  bool result = base.OnStart();

  if(result)
  {
    _bus = AzureBusInitializer.CreateBus("TestCloudSubscriber", sbc =>
    {
      sbc.SetConcurrentConsumerLimit(64);
      sbc.Subscribe(subs =>
      {
        subs.Consumer<SomethingHappenedConsumer>().Permanent();
      });
    });
  }
  
  Trace.TraceInformation("TestCloudSubscriberWorker has been started");

  return result;
}

And edit the OnStop() method to dispose of the bus:

public override void OnStop()
{
  Trace.TraceInformation("TestCloudSubscriberWorker is stopping");

  this.cancellationTokenSource.Cancel();
  this.runCompleteEvent.WaitOne();

  if(_bus != null)
    _bus.Dispose();
  
  base.OnStop();

  Trace.TraceInformation("TestCloudSubscriberWorker has stopped");
}

Right-click on the MtPubSubAzureExample solution and choose “Set StartUp Projects…” Choose “Multiple startup projects” and set the Action to Start for TestPublisher and TestCloudSubscriber (the TestCloudSubscriber will startup our TestCloudSubscriberWorker). Run the solution and you should see your familiar TestPublisher console window. Where is the TestCloudSubscriber service? Look in your tray for the Azure emulator icon:

image

Right-click and choose “Show Compute Emulator UI” and then click on the “TestCloudSubscriberWorker” in the tree view. This should bring up the console for the cloud service which should be displaying the trace messages. You should see “Working” every 1 second, since that’s what the Worker Role code template does:

image

If we publish some messages from the TestPublisher, we should see them get handled in the emulator console:

image

Publish the Cloud Service to Azure

Now we want to actually run the TestCloudSubscriber, well, in the cloud! Right-click on the TestCloudSubscriber project and select “Publish.” This will bring up the “Publish Azure Application” dialog. After signing in, another dialog should popup titled “Create Cloud Service and Storage Account.” Give it any name you like and choose the region (you probably want the same region where you created your Azure Service Bus):

image

Then, on the Settings tab of the Publish wizard, simply click the Publish button. This will publish your cloud service to Azure, which you’ll be able to see under Cloud Services in the Azure Portal:

image

Click on your service and then click on the “Instances” tab:

image

As you can see, our first instance of our service is running. Back in Visual Studio, you can right-click on the TestPublisher project and set it as the sole startup project. Run it and publish a few messages to the bus. Our cloud service should process the messages, but how can we be sure? The quick and dirty way is to check the Queue Length of the mtpubsubazureexample_testcloudsubscriber queue. As long as this is a decreasing number (or a zero), then our cloud service is processing the messages. If you want more than that and would like to actually see the trace messages we are writing out from our cloud service, read on.

Viewing Full Trace Information

Expand the “Roles” folder under the TestCloudSubscriber project in Visual Studio. Right-click on the TestCloudSubscriberWorker role and select “Properties.” On the Configuration tab, find the Diagnostics section and change “Errors only” to “All information.” After doing so, re-publish your service to Azure and wait for it to restart.

Once it is up and running, publish a few more messages using TestPublisher. Now, in Visual Studio’s Server Explorer, expand the Azure node, Storage, your storage account (e.g. testcloudsubscriber), Tables, and then double-click the “WADLogsTable” table. Here you can view all of the tracing messages from your cloud service. You should see your messages in the “Message” column:

image

You probably don’t want to run your service with the “All information” option set all the time, but we do get a warm fuzzy feeling knowing everything is being processed the way we expect!

Scaling Out Your Subscribers

Azure has a great and stupid simple way to automatically scale out your cloud service based on the number of messages in your queue. In the Azure Portal, simply navigate to your Cloud Service and click the “Scale” tab. Next to “Scale by Metric,” choose “Queue”:

image

Under “Instance Range,” choose your upper limit of instances (e.g. 5):

image

Select the Namespace and Queue Name you want to “watch” and tie your service scaling to:

image

Finally, set “Target per Machine” to how many messages you want a single instance to be able to handle at any given time. For example, if you have a single instance running, and the number of messages in the queue exceeds this number, then Azure will spin up another instance based on the above rules.

image

Thanks Azure! In order to test this out, I actually stopped the cloud service so it wasn’t processing messages and then published 400,000 messages. I restarted the cloud service and it ran with one instance for a while, but pretty soon, it started launching new instances, based on my configuration (which I had dialed back pretty far to make sure the scaling would get triggered). It worked great, and that backlog of messages was worked down to zero in no time.

image

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit and Azure. Most of the topics dealt with in my previous articles dealing with MassTransit and RabbitMQ can also be applied to running MassTransit in Azure and with Azure Service Bus (with the exception of RabbitMQ specific topics). I hope to see you back here soon and, as always, please let me know if you have any questions or suggestions for future topics!

Until then…

  • Brett Westover

    Awesome post David. Thanks for showing us the “cloudy” way!

  • Ben Gale

    Hey, Just running your sample code. This bit:

    var queueUri = “azure-sb://owner:bjOAWQJalkmd9LKas0lsdklkdw4mAHwKZUJ1jKwTLdc=@” + azureNameSpace + “/MtPubSubAzureExample_” + queueName;

    I’ve left unchanged and it works. What is the owner and what looks like a shared key? Is that yours, what do I set that too?

    • dprothero

      Good catch! It’s actually completely unnecessary (which is why it still works for you). It was leftover from my experimenting. I’ve pulled it out and everything still works.

      var queueUri = “azure-sb://” + azureNameSpace + “/MtPubSubAzureExample_” + queueName;

      • Ben Gale

        Brilliant, that explains my confusion! Thanks for the lightning fast reply.

  • David McClelland

    I’m glad you built this example – have you tried with the local version of Microsoft’s Service Bus (the extraordinarily-named Windows Server Service Bus!) to see if this works the same?

    • dprothero

      To the best of my knowledge, the Azure Service Bus transport for MassTransit does has not been successfully tested to work with Service Bus for Windows Server. I believe it would be a simple matter for a developer to modify the transport code to make it work, but to date, this work has not yet been done.

  • Andy Charalambous

    Great post, really helped me out in setting up my own MT/w Azure SB application. One thought though – I tried to lock things down a little – it feels wrong to be using such a root level SAS key, but things wouldn’t work at this point. Any suggestions/best practices for when running in production? Presumably “Manage” permissions are mandatory the first time this is run, in order for the topic, subscription and new queue to be created? Can permissions be changed to using Listen and Send afterward? I tried but had no success.

    Thanks!

  • Rishabh Ajmera

    Hi David,

    Really nice article.

    I am trying to figure out a way to use MassTransit v3 with Azure service bus and just could not find a way to get it to work.
    I read online in several articles that it is possible.
    I saw you have an article on using RabbitMQ with Masstransit v3.
    If you have code sample for Masstransitv3 with Azure, can you please share a link to it. It will be of great help.