Loosely Coupled Labs https://looselycoupledlabs.com **ARCHIVED** A Blog Loosely Related to System Architecture by David Prothero **ARCHIVED** Fri, 25 Mar 2016 19:45:41 +0000 en-US hourly 1 https://wordpress.org/?v=5.8.6 MMS Employee Directory in ASP.NET MVC 5 https://looselycoupledlabs.com/2016/03/mms-employee-directory-in-asp-net-mvc-5/ Fri, 25 Mar 2016 19:45:41 +0000 http://looselycoupledlabs.com/?p=282 Continue reading ]]> In my last post, I mentioned I would have a new tutorial posted on the Twilio site very soon.

Well, the post is now live.

It’s a simple ASP.NET MVC 5 application that listens for SMS text messages and uses Entity Framework to find employees in the database that match the incoming queries. It responds with an MMS message with the photo and contact info for the employee.

After you’re done playing with that, please send me your ideas for a cool MassTransit / Twilio integration!

]]>
Meet Your Friendly, Neighborhood .NET Twilion https://looselycoupledlabs.com/2016/03/meet-your-friendly-neighborhood-net-twilion/ Mon, 21 Mar 2016 17:17:56 +0000 http://looselycoupledlabs.com/?p=284 Continue reading ]]> I have a new role and I’m very excited about it. I am now a Developer Educator for Twilio. Twilio takes care of the messy telecom hardware and exposes a globally available cloud API that developers can interact with to build intelligent and complex communications systems. To get this role, one of the things I had to do was give a live demo of something (anything really). To illustrate how easy it was to work with the Twilio API, I utilized the SMS API to enable two-factor authentication in an ASP.NET MVC web app. The process of walking a bunch of developers (used to writing Python and JavaScript code on their MacBooks) through using Visual Studio was a lot of fun and I must have impressed because I got the job!

Google_ProfileOn my first day, I got introduced to a novel new approach to technical and API documentation. Today, they are being officially announced. They’re called Tutorials which sounds simple enough, but instead of putting the narrative first, they put the code first because, let’s face it, that’s what us developers are the most interested in seeing anyway. Rather than try to describe it to you, I encourage you to go check it out. Here’s a great tutorial on how to automate an SMS-based workflow using C# and ASP.NET MVC. And here’s another one on how to use masked phone numbers (just like they do at Airbnb).

sms-emp-dirI am excited to be joining the team that puts these excellent tutorials together. I’m currently working on a company directory that you can query using SMS and get back a photo, full name, email, and phone number of an employee in your company. I created a directory for the fictitious company Marvel Universe, which you can give a spin by sending a text message to: +1-209-337-3517. (Hint, make your text message the name of your favorite Marvel comic hero.) When you do, you’ll get back the contact info as an MMS response.

 

That’s just a preview. I’ll write another blog post once that tutorial has been published. And, you know I’m going to find a way to use MassTransit with the Twilio API’s, so stay tuned…

]]>
MassTransit 3 Update: A Simple Publish/Subscribe Example https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/ https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/#comments Sat, 04 Jul 2015 22:54:48 +0000 http://looselycoupledlabs.com/?p=263 Continue reading ]]> With the announcement of MassTransit 3, we learned that there were many changes to the API that were coming. The changes are all welcome ones, making MassTransit 3 simpler to work with and now completely asynchronous. What follows is this blog’s inaugural post, but updated to work with the current pre-release version of MassTransit 3.

Whenever there is an MT3 specific update, I will call it out like this.

When I first sat down to learn how to use MassTransit, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker.

Setting Up Your Environment

The first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others, but I find that RabbitMQ is really the way to go. That’s especially true when using the publish/subscribe pattern. The reason for this is that RabbitMQ has a complete routing framework built-in and MassTransit will leverage this when persisting your subscriptions. When creating a cluster of RabbitMQ servers for availability, this routing information is replicated to all the nodes.

MT3 has dropped support for MSMQ (read about that) but has officially adopted Azure Service Bus as an in-the-box supported transport.

In this article, you’re going to run RabbitMQ on your local Windows development box. Both our publisher and subscriber will connect to the same RabbitMQ instance. In a future post, I’ll detail how to set up multiple RabbitMQ instances in a cluster.

Installing RabbitMQ

RabbitMQ requires the Erlang runtime, so that’s the first thing you need to download and install. Head over to Erlang.org’s download page and get the latest binary release for Windows (it’s likely you’ll want the 64-bit version). It’s a simple setup wizard, so you’ll have Erlang installed on your machine in short order.

Next, download the latest version of RabbitMQ for Windows. Again, it’s an easy setup wizard that you can quickly fly through. Just accept the defaults.

Enabling the RabbitMQ Web Management Interface

One RabbitMQ feature that I found extremely useful (but which isn’t enabled by default) is the web-based management interface. With this, you can see the exchanges and queues that are set up by MassTransit in RabbitMQ. To enable this, find the “RabbitMQ Command Prompt (sbin dir)” item that the RabbitMQ installer added to your Start menu and launch it. From the command line, run the following command:

> rabbitmq-plugins enable rabbitmq_management

It will confirm that the plugin and its dependencies have been enabled and instruct you to restart RabbitMQ. When installed on Windows, RabbitMQ runs as a Windows service. You can use the Services MMC snap-in to restart it or just run the following command:

> net service stop RabbitMQ
...
> net service start RabbitMQ

Now go to http://localhost:15672/ to open the management console. Default credentials to login are guest/guest (you can change the credentials from the Admin tab).

There’s not much to see yet, but we’ll set the stage. Go to the Exchanges tab. You’ll see the following default RabbitMQ exchanges:

image

An exchange is something you can send messages to. It cannot hold messages. It’s merely a set of routing instructions that tell RabbitMQ where to deliver the message. We’ll come back here in a little while.

Now click on the Queues tab. Nothing here yet. Queues can actually hold messages and are where applications can actually pick up messages.

So, to use a real world analogy, an Exchange is like the local Post Office, and a Queue is like your mailbox. The only thing that an Exchange can do that most traditional Post Offices don’t do is actually make multiple copies of a message to be delivered to multiple mailboxes.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubExample

To view the code for MT3, select the “mt3” branch in this GitHub repository.

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

The original article used a separate Configuration class to handle the common configuration tasks for both the publisher and subscriber. MT3’s configuration code is much simpler and, for clarity, I’ve moved the appropriate code into the publisher and subscriber.

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for some text and then publishes that text as part of a SomethingHappened message. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts project.

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for text, and publish that text onto the bus. Real quick first, however, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.RabbitMq package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

To get the pre-release version of MassTransit 3 you need to add the -Pre flag to the Install-Package commands in the Package Manager Console:

Install-Package -Pre MassTransit.RabbitMq
Install-Package -Pre Masstransit.Log4Net
using System;
using Contracts;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x => 
        x.Host(new Uri("rabbitmq://localhost/"), h => { }));
      var busHandle = bus.Start();
      var text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage()
        {
          What = text, When = DateTime.Now
        };
        bus.Publish<SomethingHappened>(message);
      }

      busHandle.Stop().Wait();
    }
  }
}
In MT3, most operations are asynchronous, including the Publish() method. However, there is no need to await the call to publish here.

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like.

MT3 now requires an explicit call to start the bus, returning a handle that you can use to later stop the bus. Notice the Wait() method chained to the Stop() method. This is because the Stop() method is asynchronous. If you were calling Stop() inside an async method, you could await it. However, since this is a simple Console app, we are just blocking the thread until the shutdown is complete.

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

What’s Going on in RabbitMQ So Far?

If you go back into the RabbitMQ web interface and jump over to the Exchanges tab, you’ll see we have a couple new arrivals.

image

MT3 doesn’t require creating a queue if you aren’t hosting an endpoint. So, for our publisher, the MtPubSubExample_TestPublisher queue will not be created as it was in the original post.

Contracts:SomethingHappened is a new exchange created for the SomethingHappened message type. When we published this message, MassTransit automatically created this exchange. Click on it and scroll down to the Bindings section, and you’ll see there are no bindings yet:

image

That’s because nobody has subscribed to SomethingHappened messages yet. They go to the exchange and then die because there’s no queue to route them to.

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit.RabbitMq and MassTransit.Log4Net NuGet packages.

Don’t forget the -Pre switch to get the pre-release version of MT3.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using System;
using System.Threading.Tasks;
using Contracts;
using MassTransit;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : IConsumer<SomethingHappened>
  {
    public Task Consume(ConsumeContext<SomethingHappened> context)
    {
      Console.Write("TXT: " + context.Message.What);
      Console.Write("  SENT: " + context.Message.When);
      Console.Write("  PROCESSED: " + DateTime.Now);
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
      return Task.FromResult(0);
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

The Consume method is meant to be asynchronous, so it returns a Task. In this example, we aren’t making any other asynchronous calls, so we just use the Task.FromResult() helper method to return a Task with a zero result. If you were doing something asynchronous in the Consume method you could use async/await:

public async Task Consume(ConsumeContext<SomethingHappened> context)
{
  await SomeAsynchronousMethod(context.Message);
}

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using System;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x =>
      {
        var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });

        x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
          e.Consumer<SomethingHappenedConsumer>());
      });
      var busHandle = bus.Start();
      Console.ReadKey();
      busHandle.Stop().Wait();
    }
  }
}
Notice we are now passing a queue name to the new ReceiveEndpoint MT3 method. Make sure not to share this queue name with other applications.

Now right-click on the MtPubSubExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run.

Type some messages into the publisher. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in RabbitMQ Now?

Go back into the RabbitMQ web interface and go back to the Exchanges tab. You’ll see a new exchange called MtPubSubExample_TestSubscriber, but first click on the Contracts:SomethingHappened exchange and scroll down to the Bindings section. You’ll see we now have a binding.

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up this binding for us. Click on the MtPubSubExample_TestSubscriber here, and you’ll see you’re taken to the setup page for an exchange called MtPubSubExample_TestSubscriber. Scroll down to Bindings, and you’ll see we’re bound to a queue named the same as the exchange (in the binding diagrams, exchanges show up as rectangles with rounded corners, whereas queues have straight corners).

image

The web interface is great in how it shows the predecessor in addition to the successor in the path. Click the MtPubSubExample_TestSubscriber queue here, and you’ll be taken to the queue setup page for that queue. If you haven’t fired up the TestSubscriber app since we published those last three messages, you should see that there are three messages in the queue:

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 6-minute lag (the 6 minutes we were poking around in RabbitMQ before starting up the subscriber again).

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit 3. I will work on updating more posts like this one.

Until then…

]]>
https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/feed/ 9
MassTransit versus NServiceBus: FIGHT! https://looselycoupledlabs.com/2014/11/masstransit-versus-nservicebus-fight/ https://looselycoupledlabs.com/2014/11/masstransit-versus-nservicebus-fight/#comments Sun, 16 Nov 2014 23:01:50 +0000 http://looselycoupledlabs.com/?p=244 Continue reading ]]> I often get asked: “which is better, MassTransit or NServiceBus?” It’s a perfectly reasonable question for an outsider looking to get started with message-based SOA on .NET to ask. However, as is usually the case with the “which is better” question for any technology, the answer is the ubiquitous (and exasperating) “it depends.”

Goals and Disclaimers

This post is not a deep-dive comparison of the two frameworks. To do so would be extremely time consuming and I’m not sure would add a lot of value as the use cases for these frameworks are so broad. Instead, I hope to offer some general guidance on how to approach your own comparison research (with links to more details where appropriate).

The other goal for this post is to hopefully solicit feedback from others to improve the quality of advice offered here. You may want to check back frequently for updates and interesting conversation. Keep things civil, however, please. (The “FIGHT”!” in the post title is just my own attempt at journalistic sensationalism.)

First, the disclaimers… I’ve used MassTransit for quite some time and obviously if you look at the other posts on this blog, you can see there’s a definite MassTransit bias. But there’s a twist! I actually do some (paid) work for Particular Software (makers of NServiceBus) writing documentation for their products. Particular is not paying me to write this blog post, however. If this post feels biased to you in any way, I apologize. We’re all human. On the other hand, the level of exposure I have to both frameworks does put me in a good position to publish some thoughts on the differences. Proceed with caution!

MassTransit versus NServiceBus

Common Qualities

Both frameworks have these qualities in common (in broad strokes):

  • The projects were started and are actively maintained by wicked smart SOA experts. [MT | NSB]
  • The code base is high quality, unit tested, and open to community contributions. [MT | NSB]
  • There is a very responsive community. [MT | NSB]
  • They support the most common messaging patterns (request/response, publish/subscribe, sagas, etc.). [MT | NSB]
  • Both support your choice of IoC Container. [MT | NSB]
  • Exceptions during message processing are handled and messages re-queued to be retried. [MT | NSB]

The bottom line is these are both very solid frameworks.

So What’s Different?

It’s been mentioned in other places many times before, but here are the basic differences if you haven’t seen them (again in broad strokes):

  • While both are open source, only MassTransit is free to use in production. Production use of NServiceBus requires a commercial license. [MT | NSB]
  • MassTransit support is strictly community-based through it’s Google Group (where the founders are extremely active) whereas NServiceBus offers a commercial support option. [MT | NSB]
  • NServiceBus has an entire platform of other (commercial) tools supporting it.
  • NServiceBus’ documentation and the platform itself tends to hold your hand a little more and try to steer you down the path of good practices. (You can still shoot yourself in the foot, they just try to make it harder.)

Obviously, there are many other, more detailed, differences but these are the major factors most people are considering when getting started.

The Automotive Analogy

Let’s use an automotive analogy and say MassTransit is a base model Honda Accord. Very solid, reliable vehicle. Always highly recommended by the auto experts and consumer groups. It will get you from point A to point B in a predictable manner. There are even upgraded components you can get that are designed to work well with it to give you a better experience.

If MassTransit is a Honda Accord, then NServiceBus is a Lexus IS350. All the same attributes as the Honda, but with a lot more capabilities right in the package. It’s a lot more comfortable, has more horsepower, and can do some pretty impressive things. Could you make the Honda do all the things that a Lexus could do? Possibly, but you’re going to be working in the garage a long time to upgrade it (or paying a custom car shop a lot of money to do so). It would be a very custom, one-of-a-kind vehicle, whereas the Lexus was designed holistically to have all its features work together in a reliable and safe package.

A Concrete Example

Let me give you a concrete example of some of the additional features you would get “in the box” from NServiceBus versus MassTransit. Take the example of working with a queuing system like RabbitMQ or Azure Service Bus that does not support distributed transactions. If you have a business process that needs to be highly reliable and consistent, then you have to deal with this lack of distributed transaction support. Consider this simple example:

  1. You take the message off the queue.
  2. You do some database work.
  3. Before you can commit your database work, your server crashes.

In this example, you will lose data because the message will no longer be in the queue, but your database work was not persisted. This is why both MassTransit and NServiceBus don’t actually remove the message from the queue until your handler has completed successfully. So the scenario could go like this:

  1. You peek at the next message on the queue.
  2. You do your database work and it commits.
  3. You go to remove the message from the queue, but… oops… the server crashes.

Now the message is still in the queue and will be processed again. This could lead to duplicate data. “No problem,” you say, “just keep track of the messages that are processed in the database.”

  1. You peek at the next message on the queue.
  2. You do your database work.
  3. In the same db transaction, you record the message id so you know it’s been processed.
  4. Commit the database transaction.
  5. You go to remove the message from the queue, but… oops… the server crashes.

Now we’re okay because the logic in your handler can check to see if the message id has already been processed. We’re utilizing the transaction capability of your database to help us out here.

Problem solved, right? Actually, not quite… what if you want to publish another message after finishing the database work? That won’t be involved in the same transaction and now you risk either publishing incorrect information to the service bus or not publishing it at all depending on what order you do your database commit and message publishing. Here’s a great talk by Udi Dahan about this very problem and what the full end-to-end solution for it looks like:


Used with permission. Please visit Particular’s site for more information on the Advanced Distributed Systems Design course that this video is from.

The point that I am trying to make here is that with MassTransit, you have to code for these scenarios. This isn’t always necessary given the business requirements. If the occasional lost message or inconsistency in the database is not a big deal, then you wouldn’t need to add that logic and MT or NSB would do the job for you just fine.

However, if you do need this, then NServiceBus offers it out of the box. It also offers deferred message retry and publishing out of the box. There’s just a number of scenarios that it handles for you where with MassTransit you have to either roll your own or wire up another component (some of which are built by the MassTransit guys).

Bottom Line

If your needs don’t require compromise-free reliability and consistency and you are on a tight budget, then definitely look at MassTransit. If you do require compromise-free reliability and consistency, then you need to weigh the pros and cons of building the additional code necessary on top of MassTransit versus paying for NServiceBus but getting all of the functionality you need in one package.

Now it’s your turn. Let me know where I dropped the ball, where I didn’t elaborate enough, or where you think I’ve got it just all wrong…

]]>
https://looselycoupledlabs.com/2014/11/masstransit-versus-nservicebus-fight/feed/ 8
MassTransit on Microsoft Azure https://looselycoupledlabs.com/2014/09/masstransit-on-microsoft-azure-2/ https://looselycoupledlabs.com/2014/09/masstransit-on-microsoft-azure-2/#comments Tue, 16 Sep 2014 15:11:37 +0000 http://looselycoupledlabs.com/?p=213 Continue reading ]]> In my first post on this blog, I showed you how to get a simple Publish/Subscribe example working very quickly using MassTransit and RabbitMQ. I’d always planned to show you how to move that example to the cloud, with Microsoft Azure being a compelling competitor in this space, especially for the .NET developer (though not exclusively for .NET). Let’s take a look at some of the cool things we can take advantage of in the cloud, including auto-scaling our subscribers to match the load of messages being published.

Setting Up Your Environment

Choosing the Right Messaging Platform

As discussed in my first post, the first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others. I really like RabbitMQ for this role for a variety of reasons, but since we’ve already looked at RabbitMQ in a number of previous posts, I thought it would make sense to take a look at Azure Service Bus. Since we’re going to be deploying to Azure, it makes a lot of sense to consider this as an option.

Thankfully, there is a MassTransit transport for Azure Service Bus (find the source on GitHub here). While we could have setup a RabbitMQ server or two using Azure Virtual Machines (either Windows or Linux), it is easier to get going on Azure by using the Azure Service Bus. Also, as we’ll find out later, when you use the Azure Service Bus, you can tie your cloud service scaling to the size of your message queue.

Setting Up Azure Service Bus

If you don’t have a Microsoft Azure account yet, head over to to azure.microsoft.com and get signed up for a free trial. (Also, MSDN subscribers have free Azure services as part of their subscription. If you have an MSDN subscription, you only need to activate your Azure account.)

Next, in the Azure Portal, click “Service Bus” button in the left-hand list of areas. Then, click the “Create a New Namespace” button:

image

In the dialog, pick a name for your new namespace (in this example, I will use the name “Loosely,” but you will have to choose a unique name). Choose a region and be sure to leave Type as “Messaging:”

image

When you click the OK button, Azure will create and activate your new Azure Service Bus namespace. There’s nothing else to setup at this time as MassTransit will automatically create the queues and topics that it will require for your publish/subscribe code.

Install the Azure SDK

In order to be able to create cloud services easily in Visual Studio, you should install the Azure SDK. This can be downloaded from the Azure Downloads page. Download the VS 2013 or 2012 install under the .NET heading, depending on your version of Visual Studio.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubAzureExample

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubAzureExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

When you’re writing a new project from scratch, you go through many permutations and refactor as you go. Initially, this example had the service bus setup code duplicated in both the publisher and subscriber projects. This is fine, particularly if you really aren’t in a position to share much code between the two sides (except the contracts of course). However, in my case, I preferred to use a common class which I’ll call “AzureBusInitializer” to set up my instance to MassTransit and get it configured.

So, add another class library to the MtPubSubAzureExample solution and name it “Configuration”. Before creating our class, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.AzureServiceBus package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

Now, create a new class called “AzureBusInitializer.”

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using MassTransit.Transports.AzureServiceBus;
using System;
using System.Configuration;

namespace Configuration
{
  public class AzureBusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(sbc =>
      {
        var azureNameSpace = GetConfigValue("azure-namespace", "YourNamespace");
        var queueUri = "azure-sb://" + azureNameSpace + "/MtPubSubAzureExample_" + queueName;

        sbc.ReceiveFrom(queueUri);
        SetupAzureServiceBus(sbc, azureNameSpace);

        moreInitialization(sbc);
      });

      return bus;
    }

    private static void SetupAzureServiceBus(ServiceBusConfigurator sbc, string azureNameSpace)
    {
      sbc.UseAzureServiceBus(a => a.ConfigureNamespace(azureNameSpace, h =>
      {
        h.SetKeyName(GetConfigValue("azure-keyname", "RootManageSharedAccessKey"));
        h.SetKey(GetConfigValue("azure-key", ""));
      }));
      sbc.UseAzureServiceBusRouting();
    }

    private static string GetConfigValue(string key, string defaultValue)
    {
      string value = ConfigurationManager.AppSettings[key];
      return string.IsNullOrEmpty(value) ? defaultValue : value;
    }

  }
}

We’re creating a static method called “CreateBus,” which both our publisher and subscriber can use to set up an instance of a bus, using the Log4NetLogger, and connecting to our Azure Service Bus namespace. Because there may be additional custom setup that the publisher or subscriber may want to do, we allow passing in a lambda expression to perform the additional setup.

I’ve highlighted a couple lines that will require App.config settings in our publisher and subscriber apps. Or, you can hard-code the values here. You should know your namespace name (as you just created it). To find your access key, you can click on the “Connection Information” button in the lower toolbar in the Azure Portal:

image

The key will be the “SharedAccessKey” parameter in the connection string. For example:

Endpoint=sb://loosely.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=key-goes-here

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for a number of messages they’d like to publish and then publish that many SomethingHappened messages. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts (and add one to Configuration too, while you’re at it).

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for the number of messages they want to publish, and publish that many messages onto the bus. Real quick first, however, add a NuGet reference to the MassTransit package.

using Configuration;
using Contracts;
using System;
using System.Threading.Tasks;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = AzureBusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter number of messages to generate (quit to exit): ");
        text = Console.ReadLine();

        int numMessages = 0;
        if (int.TryParse(text, out numMessages) && numMessages > 0)
        {
          Parallel.For(0, numMessages, i =>
          {
            var message = new SomethingHappenedMessage() { What = "message " + i.ToString(), When = DateTime.Now };
            bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
          });
        }
        else if(text != "quit")
        {
          Console.WriteLine("\"" + text + "\" is not a number.");
        }
      }

      bus.Dispose();
    }
  }
}

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like. Be sure to update the app.config for TestPublisher to include your Azure key and namespace:

  <appSettings>
    <add key="azure-namespace" value="loosely" />
    <add key="azure-key" value="YourKeyHere" />
  </appSettings>

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit NuGet package.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using Contracts;
using MassTransit;
using System;
using System.Threading;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.WriteLine("TXT: " + message.Message.What +
                        "  SENT: " + message.Message.When.ToString() +
                        "  PROCESSED: " + DateTime.Now.ToString() + 
                        " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using Configuration;
using MassTransit;
using System;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = AzureBusInitializer.CreateBus("TestSubscriber", sbc =>
      {
        sbc.SetConcurrentConsumerLimit(64);
        sbc.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }
  }
}

Now right-click on the MtPubSubAzureExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run. Again, be sure to update the app.config for TestSubscriber to include your Azure key and namespace like you did for TestPublisher.

Publish some messages from the publisher window. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in Azure Service Bus?

Go back into the Azure Portal, select your namespace, and click on the the Topics tab. You’ll see a new topic called contracts..somethinghappened. In Azure Service Bus, queues can be subscribers to a topic. You’ll see the topic has a “1” in Subscriptions Count:

image

Click on the “contracts..somethinghappened” topic and then click on the “Subscriptions” tab. You will see that there is a single queue subscribed to the topic named “MtPubSubAzureExample_TestSubscriberXXXXXX:”

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up a topic subscription for us!

Now, click on the “Queues” tab and you will see that the queue “mtpubsubazureexample_testsubscriber” has 3 messages (the “Queue Length” column):

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 7-minute lag (the 7 minutes we were poking around in Azure before starting up the subscriber again).

Run the Subscriber In the Cloud

Now that we have a simple publish/subscribe example working, let’s have our subscriber actually run in the cloud. There’s a number of ways we could do this, but probably the most “cloudy” way to do it would be to create an Azure Cloud Service. These are extremely easy to create in Visual Studio 2013 (assuming you’ve installed the Azure SDK as instructed at the beginning of this post).

Add a new project to your MtPubSubAzureExample solution and choose “Azure Cloud Service” from the list of project templates. Name the new project “TestCloudSubscriber”. A cloud setup dialog will appear next. Select “Worker Role” only as we will be creating a background service that processes messages using MassTransit. Rename the “WorkerRole1” to “TestCloudSubscriberWorker.”

image

This will add the TestCloudSubscriber and TestCloudSubscriberWorker projects and drop you into the WorkerRole.cs file in TestCloudSubscriberWorker. As always, be sure to update the app.config for TestCloudSubscriberWorker to include your Azure key and namespace. Add a project reference to the Configuration and Contracts projects and add the MassTransit NuGet package to the new worker project.

Copy the SomethingHappenedConsumer.cs file from TestSubscriber to TestCloudSubscriberWorker. Edit the namespace of SomethingHappenedConsumer.cs to be “TestCloudSubscriberWorker” and change the Console.WriteLine to a call to Trace.TraceInformation:

using Contracts;
using MassTransit;
using System;
using System.Diagnostics;
using System.Threading;

namespace TestCloudSubscriberWorker
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Trace.TraceInformation("TXT: " + message.Message.What +
                             "  SENT: " + message.Message.When.ToString() +
                             "  PROCESSED: " + DateTime.Now.ToString() + 
                             " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

Now, edit the top of WorkerRole.cs to add-in a couple usings and a private field for our service bus:

using Configuration;
using MassTransit;
using Microsoft.WindowsAzure.ServiceRuntime;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace TestCloudSubscriberWorker
{
  public class WorkerRole : RoleEntryPoint
  {
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
    private IServiceBus _bus;

Then, edit the OnStart() method to initialize the bus and our subscriptions:

public override bool OnStart()
{
  // Set the maximum number of concurrent connections
  ServicePointManager.DefaultConnectionLimit = 12;

  // For information on handling configuration changes
  // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

  bool result = base.OnStart();

  if(result)
  {
    _bus = AzureBusInitializer.CreateBus("TestCloudSubscriber", sbc =>
    {
      sbc.SetConcurrentConsumerLimit(64);
      sbc.Subscribe(subs =>
      {
        subs.Consumer<SomethingHappenedConsumer>().Permanent();
      });
    });
  }
  
  Trace.TraceInformation("TestCloudSubscriberWorker has been started");

  return result;
}

And edit the OnStop() method to dispose of the bus:

public override void OnStop()
{
  Trace.TraceInformation("TestCloudSubscriberWorker is stopping");

  this.cancellationTokenSource.Cancel();
  this.runCompleteEvent.WaitOne();

  if(_bus != null)
    _bus.Dispose();
  
  base.OnStop();

  Trace.TraceInformation("TestCloudSubscriberWorker has stopped");
}

Right-click on the MtPubSubAzureExample solution and choose “Set StartUp Projects…” Choose “Multiple startup projects” and set the Action to Start for TestPublisher and TestCloudSubscriber (the TestCloudSubscriber will startup our TestCloudSubscriberWorker). Run the solution and you should see your familiar TestPublisher console window. Where is the TestCloudSubscriber service? Look in your tray for the Azure emulator icon:

image

Right-click and choose “Show Compute Emulator UI” and then click on the “TestCloudSubscriberWorker” in the tree view. This should bring up the console for the cloud service which should be displaying the trace messages. You should see “Working” every 1 second, since that’s what the Worker Role code template does:

image

If we publish some messages from the TestPublisher, we should see them get handled in the emulator console:

image

Publish the Cloud Service to Azure

Now we want to actually run the TestCloudSubscriber, well, in the cloud! Right-click on the TestCloudSubscriber project and select “Publish.” This will bring up the “Publish Azure Application” dialog. After signing in, another dialog should popup titled “Create Cloud Service and Storage Account.” Give it any name you like and choose the region (you probably want the same region where you created your Azure Service Bus):

image

Then, on the Settings tab of the Publish wizard, simply click the Publish button. This will publish your cloud service to Azure, which you’ll be able to see under Cloud Services in the Azure Portal:

image

Click on your service and then click on the “Instances” tab:

image

As you can see, our first instance of our service is running. Back in Visual Studio, you can right-click on the TestPublisher project and set it as the sole startup project. Run it and publish a few messages to the bus. Our cloud service should process the messages, but how can we be sure? The quick and dirty way is to check the Queue Length of the mtpubsubazureexample_testcloudsubscriber queue. As long as this is a decreasing number (or a zero), then our cloud service is processing the messages. If you want more than that and would like to actually see the trace messages we are writing out from our cloud service, read on.

Viewing Full Trace Information

Expand the “Roles” folder under the TestCloudSubscriber project in Visual Studio. Right-click on the TestCloudSubscriberWorker role and select “Properties.” On the Configuration tab, find the Diagnostics section and change “Errors only” to “All information.” After doing so, re-publish your service to Azure and wait for it to restart.

Once it is up and running, publish a few more messages using TestPublisher. Now, in Visual Studio’s Server Explorer, expand the Azure node, Storage, your storage account (e.g. testcloudsubscriber), Tables, and then double-click the “WADLogsTable” table. Here you can view all of the tracing messages from your cloud service. You should see your messages in the “Message” column:

image

You probably don’t want to run your service with the “All information” option set all the time, but we do get a warm fuzzy feeling knowing everything is being processed the way we expect!

Scaling Out Your Subscribers

Azure has a great and stupid simple way to automatically scale out your cloud service based on the number of messages in your queue. In the Azure Portal, simply navigate to your Cloud Service and click the “Scale” tab. Next to “Scale by Metric,” choose “Queue”:

image

Under “Instance Range,” choose your upper limit of instances (e.g. 5):

image

Select the Namespace and Queue Name you want to “watch” and tie your service scaling to:

image

Finally, set “Target per Machine” to how many messages you want a single instance to be able to handle at any given time. For example, if you have a single instance running, and the number of messages in the queue exceeds this number, then Azure will spin up another instance based on the above rules.

image

Thanks Azure! In order to test this out, I actually stopped the cloud service so it wasn’t processing messages and then published 400,000 messages. I restarted the cloud service and it ran with one instance for a while, but pretty soon, it started launching new instances, based on my configuration (which I had dialed back pretty far to make sure the scaling would get triggered). It worked great, and that backlog of messages was worked down to zero in no time.

image

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit and Azure. Most of the topics dealt with in my previous articles dealing with MassTransit and RabbitMQ can also be applied to running MassTransit in Azure and with Azure Service Bus (with the exception of RabbitMQ specific topics). I hope to see you back here soon and, as always, please let me know if you have any questions or suggestions for future topics!

Until then…

]]>
https://looselycoupledlabs.com/2014/09/masstransit-on-microsoft-azure-2/feed/ 10
Monitoring RabbitMQ https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/ https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/#comments Wed, 20 Aug 2014 03:52:58 +0000 http://looselycoupledlabs.com/?p=158 Continue reading ]]> We’ve talked a lot about using MassTransit with RabbitMQ in a variety of scenarios on this blog. We talked about error handling, which is something you need to know how to implement in a real world MassTransit + RabbitMQ deployment. Another important aspect to a production deployment is monitoring. Lets take a look at how we can leverage the RabbitMQ Management HTTP API (installed with the RabbitMQ Management Plugin) to query our queues so we can monitor the health of our service bus.

Enabling the RabbitMQ Management Plugin

In A Simple MassTransit Publish/Subscribe Example, I detailed how to install RabbitMQ on Windows along with the RabbitMQ Management Plugin. You can refer to that post for the full details, but it’s important to know the RabbitMQ Management Plugin isn’t installed out of the box. One simple command will install it:

> rabbitmq-plugins enable rabbitmq_management

After a restart of the RabbitMQ service, visiting http://localhost:15672/ should bring you to the management GUI (default credentials are guest/guest). The HTTP API is available at http://localhost:15672/api/.

Simple REST API Call

Type the following URL into your browser to get a list of queues in your RabbitMQ instance:

http://localhost:15672/api/queues

You will need to supply the guest/guest credentials to access the API method. You should see the list of queues returned as JSON. If you want to get data for a single, specific queue, use the url format /api/queues/vhost/queue-name. The default vhost in RabbitMQ is “/” which URL encoded is %2F, so the the URL to retrieve info for the queue named MtPubSubExample_TestSubscriber would be http://localhost:15672/api/queues/%2F/MtPubSubExample_TestSubscriber and that would return the following JSON:

{
   "memory":21856,
   "messages":1,
   "messages_details":{
      "rate":0.0
   },
   "messages_ready":1,
   "messages_ready_details":{
      "rate":0.0
   },
   "messages_unacknowledged":0,
   "messages_unacknowledged_details":{
      "rate":0.0
   },
   "idle_since":"2014-08-16 9:30:50",
   "consumer_utilisation":"",
   "policy":"",
   "exclusive_consumer_tag":"",
   "consumers":0,
   "backing_queue_status":{
      "q1":0,
      "q2":0,
      "delta":[
         "delta",
         "undefined",
         0,
         "undefined"
      ],
      "q3":1,
      "q4":0,
      "len":1,
      "pending_acks":0,
      "target_ram_count":"infinity",
      "ram_msg_count":0,
      "ram_ack_count":0,
      "next_seq_id":180224,
      "persistent_count":1,
      "avg_ingress_rate":0.0,
      "avg_egress_rate":0.0,
      "avg_ack_ingress_rate":0.0,
      "avg_ack_egress_rate":0.0
   },
   "state":"running",
   "incoming":[

   ],
   "deliveries":[

   ],
   "consumer_details":[

   ],
   "name":"MtPubSubExample_TestSubscriber",
   "vhost":"/",
   "durable":true,
   "auto_delete":false,
   "arguments":{

   },
   "node":"rabbit@PROWIN1"
}

In this snippet of JSON you can see the queue has a single message in it. Now, let’s see what we can do with this new knowledge to keep tabs on our queues.

What to Monitor

At a minimum, I would monitor the number of ready messages in your queues. What constitutes an alerting threshold obviously will depend on your application. If you expect to have fairly real-time processing of your messages, and have the necessary horsepower in your consumer(s) to keep up, then you might alert if the queue goes over 10 unprocessed messages. You just need to decide how sensitive you need it to be. I’ll show you how to get the data out.

Which queues should you monitor? Any queue you expect to receive messages likely warrants monitoring. This means, for example, in our simple publish/subscribe example, we would monitor the MtPubSubExample_TestSubscriber queue. We also will want to monitor the corresponding error queue (e.g. MtPubSubExample_TestSubscriber_error). In our customer portal example, we would monitor Loosely_CustomerPortal_Backend and Loosely_CustomerPortal_Backend_error.

How to Monitor

This is largely up to you and what monitoring tools you already have in place. Most monitoring tools such as Nagios, NewRelic, AlertSite, and System Center will have some notion of a custom counter or metric that you can have the system monitor and send alerts when they reach a certain threshold. If your selected monitoring tool has a RabbitMQ plugin, you’re in luck. Otherwise, you will need to write a small script to supply the data points to your monitoring system. Let’s take a look at how to do this in C#.

Checking Your RabbitMQ Queues in C#

The code for this sample is available on github at https://github.com/dprothero/Loosely.RabbitMq.Monitor. Clone that repository or follow along here to build the project from scratch. I am using Visual Studio 2013.

First, create a new C# Console project. We can call it Loosely.RabbitMq.Monitor and name the solution something like Loosely.RabbitMq.Utilities. Install the NuGet package Json.NET to your project. That will help us read the JSON response from the web service.

Add a new class file to your project and name it QueueInfo.cs. Delete the QueueInfo class declaration (leaving just the namespace declaration). Copy the JSON code above to your clipboard and then position your cursor within the namespace declaration in QueueInfo.cs. From the Edit menu, select Paste Special, and then Paste JSON as Classes. This will create C# class declarations that will follow your expected JSON. Rename the newly pasted class named Rootobject to QueueInfo. You should end up with the classes below in QueueInfo.cs:

namespace Loosely.RabbitMq.Monitor
{
  public class QueueInfo
  {
    public int memory { get; set; }
    public int messages { get; set; }
    public Messages_Details messages_details { get; set; }
    public int messages_ready { get; set; }
    public Messages_Ready_Details messages_ready_details { get; set; }
    public int messages_unacknowledged { get; set; }
    public Messages_Unacknowledged_Details messages_unacknowledged_details { get; set; }
    public string idle_since { get; set; }
    public string consumer_utilisation { get; set; }
    public string policy { get; set; }
    public string exclusive_consumer_tag { get; set; }
    public int consumers { get; set; }
    public Backing_Queue_Status backing_queue_status { get; set; }
    public string state { get; set; }
    public object[] incoming { get; set; }
    public object[] deliveries { get; set; }
    public object[] consumer_details { get; set; }
    public string name { get; set; }
    public string vhost { get; set; }
    public bool durable { get; set; }
    public bool auto_delete { get; set; }
    public Arguments arguments { get; set; }
    public string node { get; set; }
  }

  public class Messages_Details
  {
    public float rate { get; set; }
  }

  public class Messages_Ready_Details
  {
    public float rate { get; set; }
  }

  public class Messages_Unacknowledged_Details
  {
    public float rate { get; set; }
  }

  public class Backing_Queue_Status
  {
    public int q1 { get; set; }
    public int q2 { get; set; }
    public object[] delta { get; set; }
    public int q3 { get; set; }
    public int q4 { get; set; }
    public int len { get; set; }
    public int pending_acks { get; set; }
    public string target_ram_count { get; set; }
    public int ram_msg_count { get; set; }
    public int ram_ack_count { get; set; }
    public int next_seq_id { get; set; }
    public int persistent_count { get; set; }
    public float avg_ingress_rate { get; set; }
    public float avg_egress_rate { get; set; }
    public float avg_ack_ingress_rate { get; set; }
    public float avg_ack_egress_rate { get; set; }
  }

  public class Arguments
  {
  }

}

If you wanted, you could clean up the property and class names to better match C# naming conventions. Just be sure to add the necessary JsonProperty attribute so JSON.NET can deserialize the JSON into the correct object properties. For example:

[JsonProperty("avg_ack_egress_rate")]
public float AvgAckEgressRate { get; set; }

Note that JSON.NET is case insensitive when deserializing, so you don’t need these attributes if you are only adjusting the case of the identifier (e.g. “name” to “Name”).

Next, we could just hard-code the following in Program.cs to get the message count for the MtPubSubExample_TestSubscriber queue:

using Newtonsoft.Json;
using System;
using System.Net;

namespace Loosely.RabbitMq.Monitor
{
  class Program
  {
    static void Main(string[] args)
    {
      var client = new WebClient();
      client.Credentials = new NetworkCredential("guest", "guest");
      var data = client.DownloadString("http://localhost:15672/api/queues/%2F/MtPubSubExample_TestSubscriber");
      var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);

      Console.WriteLine("Queue: " + queueInfo.Name);
      Console.WriteLine("Queue Depth: " + queueInfo.MessagesReady.ToString());

      Console.Write("\r\nPress any key to continue.");
      Console.ReadKey();
    }
  }
}

The code should be pretty straight forward. We make a request to the HTTP API, deserialize the JSON data into a plain old C# object (POCO), and then output the properties we are interested in. However, to make this really useful, we should refactor this code so it can be command line driven. Then, we can simply pass parameters on the command line to tell the program the instance of RabbitMQ to monitor (base URL for the API), the credentials to connect to the server, and the queue we want to interrogate.

For command line argument parsing to a console application, I like to use the NDesk.Options NuGet package. It makes supporting command line parameters (-x or –xxxx or /XXXX) super easy. Add the NDesk.Options package to your project. With that added, we can modify our Program.cs file to the following:

using NDesk.Options;
using Newtonsoft.Json;
using System;
using System.Net;

namespace Loosely.RabbitMq.Monitor
{
  class Program
  {
    static void Main(string[] args)
    {
      string baseUrl = "http://localhost:15672/api",
             userName = "guest",
             password = "guest",
             queueName = null,
             vhost = "/";

      var p = new OptionSet () {
        { "u|baseUrl=",  v => baseUrl = v },
        { "U|user=",     v => userName = v },
        { "p|password=", v => password = v },
        { "q|queue=",    v => queueName = v },
        { "v|vhost=",    v => vhost = v }
      };
      p.Parse(args);
      
      var client = new WebClient();
      if(userName != null)
        client.Credentials = new NetworkCredential(userName, password);

      var url = baseUrl + "/queues/" + Uri.EscapeDataString(vhost) + "/" + Uri.EscapeDataString(queueName);
      var data = client.DownloadString(url);
      var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);

      // Here is where we report the message count to our monitoring system.
      // Replace the console output/wait with your code.
      Console.WriteLine("Queue: " + queueInfo.Name);
      Console.WriteLine("Queue Depth: " + queueInfo.MessagesReady.ToString());

      Console.Write("\r\nPress any key to continue.");
      Console.ReadKey();
    }
  }
}

To be able to run from within Visual Studio, you can try out different command line arguments by going to the Debug project properties page:

image

Conclusion

As you can see, it is fairly easy to interrogate RabbitMQ to get information about the status of its queues (as well as any number of other objects). If you want to make extensive use of the HTTP API, I would suggest taking a look at Mike Hadlow’s EasyNetQ.Client.Management library, available on NuGet. This is a complete C# wrapper around the HTTP API, making it trivial to work with the API from C#.

I hope you found value in this blog post. Please don’t hesitate to contact me if you have any questions or suggestions for future blog posts. Until then…

]]>
https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/feed/ 2
Scaling Out Subscribers With MassTransit https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/ https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/#comments Mon, 04 Aug 2014 00:41:33 +0000 http://looselycoupledlabs.com/?p=150 Continue reading ]]> So far on this blog, we’ve been looking at the publish/subscribe messaging pattern using MassTransit and RabbitMQ. So far, we’ve dealt with a single publisher and a single subscriber. We looked at how we can have those two roles live on separate servers. Finally, we looked at how to handle errors in the subscriber.

What happens, now, when your subscriber can’t process messages as fast as the messages are being published on the bus? This is a special situation, to be sure, but it certainly is possible in high message volume environments. It can be compounded if the messages themselves are fairly expensive to process.

Open the Flood Gates

Let’s take our simple publish/subscribe example and tweak it so we can have it dump a large number of messages onto the bus extremely quickly. Originally, the example prompted for a string and published that string as a single SomethingHappened message. Instead, let’s prompt for a number of messages that should be put onto the bus.

using Configuration;
using Contracts;
using System;
using System.Threading.Tasks;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter number of messages to generate (quit to exit): ");
        text = Console.ReadLine();

        int numMessages = 0;
        if (int.TryParse(text, out numMessages) && numMessages > 0)
        {
          Parallel.For(0, numMessages, i =>
          {
            var message = new SomethingHappenedMessage() { What = "message " + i.ToString(), When = DateTime.Now };
            bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
          });
        }
        else if(text != "quit")
        {
          Console.WriteLine("\"" + text + "\" is not a number.");
        }
      }

      bus.Dispose();
    }
  }
}

We’re using the System.Threading.Tasks.Parallel.For method to be able to simultaneously publish multiple messages onto the bus.

Now in our subscriber, let’s have it simulate 250 milliseconds of processing time with a call to System.Threading.Thread.Sleep. Also, because MassTransit will run 4 threads per CPU for our consumer and the messages will be flying in, we’ll condense our Console output to a single WriteLine call instead of multiple calls to Write so as to avoid the output from multiple messages getting jumbled together.

using Contracts;
using MassTransit;
using System;
using System.Threading;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.WriteLine("TXT: " + message.Message.What +
                        "  SENT: " + message.Message.When.ToString() +
                        "  PROCESSED: " + DateTime.Now.ToString() + 
                        " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

Running the Test

The test code for this example can be found on github.

Make sure both TestPublisher and TestSubscriber are set up as startup projects and run the project in Visual Studio. Try publishing 1,000 messages:

image

As you could probably see, the prompt on the publisher returned well before the subscriber finished processing the messages. This means we were able to publish messages to the bus faster that we could process them. This could be a problem if you don’t expect any “lulls” in publishing which would allow the subscriber to catch up.

We can further illustrate the backlog by looking at the graph for the MtPubSubExample_TestSubscriber queue in the RabbitMQ management interface (found at http://localhost:15672/ – see this post for details). You have to have the interface up and be watching the graph while your publisher/subscriber test is actually running:

image

Here you can see that the publisher hit a peak of 200 messages per second, while the subscriber sustained a steady rate of about 40 messages per second.

With a spike and then nothing for a time, perhaps slow and steady wins the race for our subscriber. Try 10,000 messages and watch the RabbitMQ graphs:

image

This more dramatically illustrates the problem. The number of queued messages (top graph) is continuing to go up with no relief in sight. And the bottom graph shows we’re publishing messages at a rate of 259 per second, but we only process them at a rate of around 40 per second. Again, since the publish storm eventually passes, the subscriber does eventually catch up.

Let’s look at a couple ways we can increase the throughput of our subscriber.

Option 1: Increase the Prefetch Count

If you do the math on the rate of 40 messages per second that we observe, you will arrive at what appears to be 10 simultaneous threads processing messages (each message takes a quarter of a second). However, the default number of threads that MassTransit can use for consumers is actually the number of processors in your machine multiplied by 4. So, on my 8 core machine, that would be 32 threads. Why are we only observing 10?

The reason is due to the number of messages the RabbitMQ transport will “prefetch” from the queue. The default for this is 10, so we can only process 10 messages simultaneously. To increase this, you include a “prefetch=X” parameter in the query string of the queue URL. For example:

x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName + "?prefetch=32");

Now that this is set to 32 to match the maximum thread of 32, we should observe a 128 message per second processing rate.

Option 2: Increase the Thread Count

We can also tell MassTransit to allow more threads to be used to consume messages. You put a call to SetConcurrentConsumerLimit in your bus initialization code. Below we bump the thread count to 64 (doubling the number of threads):

using Configuration;
using MassTransit;
using System;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestSubscriber", x =>
      {
        x.SetConcurrentConsumerLimit(64);
        x.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }
  }
}

Don’t forget to also increase your prefetch setting (see option 1 above) to match. Now we’re processing 256 messages per second! That’s pretty close to our 259 per second we observed being published onto the bus.

However, at some point, your machine is going to run out of processing power. Perhaps it already has. We’re just sleeping the thread here for 250ms, so the ceiling is pretty high on how many threads we could run, but if there was real processing happening, we might be maxing out the CPU on the machine. As any good architect knows, don’t scale up, scale out!

Option 3: Run More Subscribers

Try dumping another 10,000 messages onto the bus. While you’ve got one subscriber running, you can simply execute another instance of the TestSubscriber executable and it will start processing messages too, effectively doubling your processing rate!

Having multiple subscribers connected to the same RabbitMQ queue is what’s called the “competing consumer” pattern. RabbitMQ will make sure each consumer gets unique messages in essentially a round-robin fashion.

Again, however, if we’re already maxing out the CPU on the subscriber machine, what we really need is to run another subscriber on another machine with its own available resources. If we use our current example code, however, we will get duplicate messages because each machine is connecting to localhost for its RabbitMQ instance. (Don’t forget to make sure all the RabbitMQ instances are in a cluster.) Since each instance will have its own queue, then MassTransit will treat each queue as a unique consumer as opposed to competing consumers. Each queue will get a copy of the same message routed to it. Remember this diagram from our clustering article illustrating how each machine has its own RabbitMQ instance:

Clearly, this is not what we want. In order to be competing consumers, the two subscriber processes must be connected to the same RabbitMQ instance and queue. What we need is an architecture more like the following:

cluster-2

Obviously, this makes the RabbitMQ server a single point of failure and a dependency for the two subscriber machines. If high availability is a requirement, then you would need to look into some type of virtual IP address based clustering (like keepalived on Linux or NLB on Windows). You will also need to implement highly available queues in RabbitMQ so that the queues are replicated across your multiple instances.

Implementing the Centralized RabbitMQ Server

Obviously, the first step is to install RabbitMQ on a new server. We’ll call this machine “submaster” (for subscription master). Instructions for installing RabbitMQ can be found in this blog post. Then, join the RabbitMQ instance on submaster to a cluster with the RabbitMQ instance on your publisher machine. Instructions for creating a RabbitMQ cluster can be found in this blog post. We should have these nodes in our RabbitMQ cluster:

image

In order to connect to a remote RabbitMQ instance, we need to do some security housekeeping. First, we need to modify the Windows Firewall on the submaster machine to allow in the default RabbitMQ port of 5672. Next, we need to create a new user in RabbitMQ that the subscriber can use to login. We’ll call it “testsubscriber” and give it a password of “test”. On the Admin tab of the RabbitMQ management interface, you can begin adding a new user:

image

Type in the username, password, administrator tag, and click Add user. Initially, the user won’t have any permissions:

image

Click on the testsubscriber user and then click “Set permission” as seen here:

image

Now we need to modify our Configuration.BusInitializer class to be able to connect to a specific machine name instead of hard-coding localhost as well as utilize our username and password. We’ll have it read these items from our App.config. Remember, our publisher can still use localhost (which doesn’t require username/password), but our subscriber needs to connect to the submaster machine with some credentials.

First, add a reference to System.Configuration to the Configuration project. Then modify the BusInitializer class to allow reading the machine name, username, and password from configuration:

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using System;
using System.Configuration;

namespace Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        var serverName = GetConfigValue("rabbitmq-server-name", "localhost");
        var userName = GetConfigValue("rabbitmq-username", "");
        var password = GetConfigValue("rabbitmq-password", "");
        var queueUri = "rabbitmq://" + serverName + "/MtPubSubExample_" + queueName + "?prefetch=64";

        if (userName != "")
        {
          x.UseRabbitMq(r =>
          {
            r.ConfigureHost(new Uri(queueUri), h =>
            {
              h.SetUsername(userName);
              h.SetPassword(password);
            });
          });
        }
        else
          x.UseRabbitMq();

        x.ReceiveFrom(queueUri);
        moreInitialization(x);
      });

      return bus;
    }

    private static string GetConfigValue(string key, string defaultValue)
    {
      string value = ConfigurationManager.AppSettings[key];
      return string.IsNullOrEmpty(value) ? defaultValue : value;
    }
  }
}

Since we’re only going to deviate from the default of localhost on our subscriber, open the TestSubscriber project and add the following lines into the App.config:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
  </startup>

  <appSettings>
    <add key="rabbitmq-server-name" value="submaster" />
    <add key="rabbitmq-username" value="testsubscriber" />
    <add key="rabbitmq-password" value="test" />
  </appSettings>
</configuration>

Running the Test

On my dev machine, I fired up one instance of TestSubscriber with the above configuration. Then, on the publisher machine, I started up both TestPublisher and TestSubscriber. Here’s it running after pushing 100,000 messages onto the bus:

Capture

Lots of blinking lights. The more interesting thing is to observe the messages processed per second in RabbitMQ:

image

Wrap Up

So now you can see how it would be possible to scale out your message processing. Perhaps in a future post, we’ll take a look at leveraging the cloud. It should be possible to monitor the number of messages in your queue and spin up new cloud workers to pick up the slack and then shut them down when the queue quiets back down. Until then…

]]>
https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/feed/ 7
Error Handling in MassTransit Consumers https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/ https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/#comments Sun, 27 Jul 2014 02:18:36 +0000 http://looselycoupledlabs.com/?p=115 Continue reading ]]> Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post if you aren’t familiar with it as we are going to build on that application here.)

But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let’s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.

When It All Goes Horribly Wrong

So what will happen to our TicketOpened messages if there’s an error in the TicketOpenedConsumer? In our example, we’re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there’s a number of things that could go wrong. Let’s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.

MassTransit Default Error Handling

To see what MassTransit does, let’s inject a way to get the consumer to throw an exception. Start by cloning the https://github.com/dprothero/Loosely.CustomerPortal repository (master branch) or by building the application in my previous blog post. The final code is in the same repository, but in the error-handling branch.

Here’s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                      "Id: " + envelope.Message.Id + "\r\n" +
                      "Email: " + envelope.Message.CustomerEmail + "\r\n" + 
                      "Message: " + envelope.Message.Message);

      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.\n\n" + 
                        "Your Message:\n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

We just check to see if the text of the message contains the word “poison” and, if it does, throw an exception. Now we can run the app, open a ticket, and type “poison” into the message field to get our consumer to throw the exception:

image

Take a look at the log file (C:\Logs\Loosely.CustomerPortal.Backend.log) and you’ll see these entries:

image

What’s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There’s no delay between retries (we’ll look at that later). Since our exception isn’t really transient, then it’s going to try 5 times without success. Next question… where’s the TicketOpened message now?

Go into the RabbitMQ management interface (see this post for instructions – should be at http://localhost:15672) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:

image

Click on the error queue and scroll down to the “Get messages” section. Set Requeue to ‘No’ and click “Get Message(s).” This will remove the message from the queue and display it to us. You can see our poison message in JSON format:

image

Sidebar: Changing Default Retry Limit

If you want to change MassTransit’s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.SetDefaultRetryLimit(1);
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

That will set the retry limit to 1.

Requeuing Error Messages

If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel plugin. First, make sure your consumer process isn’t running. Then, open up the “RabbitMQ Command Prompt (sbin dir)” item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:

> rabbitmq-plugins enable rabbitmq_shovel
> rabbitmq-plugins enable rabbitmq_shovel_management

After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into “Shovel Management.” Click “Add a new shovel” and name it something like “Temporary Error Move.” Set the Source Queue to “Loosely_CustomerPortal_Backend_error” and the Destination Queue to “Loosely_CustomerPortal_Backend.” Click “Add shovel.”

This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:

image

Now go back to the Admin tab, Shovel Management, and click on your “Temporary Error Move” shovel. From there, click on the “Delete this shovel” button. If you don’t delete the shovel, it will continue to move messages from the error queue back into the primary queue… essentially creating an infinite retry loop.

Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our “poison” message is really a permanent failure.

Transient Versus Permanent Failures

With a permanent failure, we’re talking about a message that just can’t be processed – at least not with the code written the way it currently is. Perhaps there’s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.

Perhaps the error is such an edge case that we won’t fix it and so we’re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.

Transient Failures

What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient failures – meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.

As we saw, MassTransit has a bit of a blunt method to try to account for transient failures… it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You’d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.

NServiceBus, a commercial analog to MassTransit, has this retry delay ability built into it (called “second-level retries”). However, MassTransit does not. We will have to roll our own, but it won’t be difficult.

Roll Your Own Retry Delay Logic

Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let’s just build the logic into our TicketOpenedConsumer class.

Here’s the new TicketOpenedConsumer class with progressive retry delay logic:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
      {
        {0, 60}, {60, 300}, {300, -1}
      };

    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      int retryDelay = 0;
      int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
      var nextRetryDelay = DelayProgression[retryDelay];
      bool sleepAndRepublish = false;
      
      try
      {
        // Here is where you would persist the ticket to a data store of some kind.
        // For this example, we'll just write it to the trace log.
        Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                        "Id: " + envelope.Message.Id + "\r\n" +
                        "Email: " + envelope.Message.CustomerEmail + "\r\n" +
                        "Message: " + envelope.Message.Message + "\r\n" +
                        "Current/Next Retry Delay: " + retryDelay.ToString() + "/" + 
                          nextRetryDelay.ToString() + "\r\n" +
                        "Current Time: " + DateTime.Now.ToString());

        CheckForContrivedErrorConditions(envelope);

        // Send email confirmation to the customer.
        var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                          "We will respond to your inquiry ASAP.\n\n" +
                          "Your Message:\n" + envelope.Message.Message;

        EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);

        // Here is where you would commit any open database transaction
        Trace.WriteLine("Message committed.");
      }
      catch (Exception ex)
      {
        Trace.WriteLine("Exception caught.");
        if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
          sleepAndRepublish = true;
        else throw;
      }

      if(sleepAndRepublish)
      {
        Thread.Sleep(nextRetryDelay * 1000);
        envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
          x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
          x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
        });
      }
    }

    private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)
    {
      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      if (envelope.Message.Message.Contains("server-blip"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-blip", 
          "server-online(blipped)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-down"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-down",
            "server-blip(downed)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-disaster"))
        throw (new Exception("The mail server is down."));

    }
  }
}

So let’s take a look at a few lines of code in isolation and discuss what’s happening. First, we setup a dictionary to indicate what we’d like the progression of delays to be.

static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
  {
    {0, 60}, {60, 300}, {300, -1}
  };

The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:

int retryDelay = 0;
int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
var nextRetryDelay = DelayProgression[retryDelay];

Then we check for a header on the message called “loosely.retry-delay-seconds.” Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you’d like. When we want to retry a message later, we’ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.

Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:

CheckForContrivedErrorConditions(envelope);

Within that function, we define three strings (in addition to the original “poison” string) for which we will scan.

“server-disaster” Simulate mail server down for a very long time.
“server-down” Simulate mail server down for less than 5 minutes.
“server-blip” Simulate the mail server down for less than 30 seconds.

Finally, we wrap all of the actual message processing in a try…catch block. If an exception occurs, we check the message to see if it’s a message we know to be a transient condition and if the next retry delay value is not negative one (-1). Negative one will be our trigger to tell us we need to give up on retrying.

Trace.WriteLine("Exception caught.");
if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
  sleepAndRepublish = true;
else throw;

If the condition is met, we set a flag to indicate we want to sleep (delay) for a bit and then republish the message so it will be retried later. If the condition is not met, we re-throw the exception and MassTransit will handle it per normal message processing rules (default being to retry 4 more times and then move to the error queue).

If we do want to sleep and republish, that code is simple:

if(sleepAndRepublish)
{
  Thread.Sleep(nextRetryDelay * 1000);
  envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
    x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
  });
}

We put the thread to sleep for the prescribed number of seconds (more on that later) and then, after the time has elapsed, we republish the message to the bus with a “loosely.retry-delay-seconds” header value of the amount of time we delayed before republishing the message. That will put the message back on the bus and our consumer will get called again with it. This time, the message will have the special header on it and we’ll know to move onto the next delay value (or stop retrying if that value is –1).

Did You Seriously Just Block the Consumer Thread?

Good catch. Yes, this can have performance implications. MassTransit has a setting called ConcurrentConsumerLimit, which is set to the number of CPU’s times 4 (so 16 on a 4 processor machine). We’re essentially “killing” one of these 16 (or however many) threads while we sleep, thus limiting the number of messages we can process while we’re waiting.

But is this really a problem? In this example, our service is only responsible for processing TicketOpened messages. Every TicketOpened message needs to trigger an email to be sent. If the email server is down, then none of the TicketOpened messages are going to be able to successfully be processed. In this case, it probably makes sense for the entire service to slow down and wait until the mail server is back online.

If the service were responsible for processing many different types of messages, then this would certainly be an issue. However, it begs the question whether it makes sense for a single service to handle different types of messages. In some cases it might, particularly if they all need to be handled in much the same way. But in a lot of cases, it will make more sense to create separate services for your different message types.

What If the Service Crashes While Sleeping?

So we have our consumer sleeping on the consumer thread and the message is “in limbo” while we’re waiting. What happens to the message if the service crashes during the Thread.Sleep? If you send a “server-down” message to get the message to go through the retry logic, take a look at the queue in RabbitMQ:

image

It shows one message in the “Unacked” (unacknowledged) column. This means two things: 1) it won’t deliver the message to any other consumer threads or processes, and 2) it won’t remove the message unless it is acknowledged. If the process hosting our consumer service dies before acknowledging the message, RabbitMQ will move the message back to the “Ready” state.

Caveats and Disclaimers

These bloggers, sheesh. Always cutting corners in the code for the “sake of brevity.” It’s difficult to balance a good, crisp article with well crafted code. First, you don’t see any unit tests. Bad programmer. Next, with a good suite of tests watching your back, the code in this example could be refactored into shorter methods and perhaps a helper class for the retry delay progression. Finally, the call to Thread.Sleep should probably be refactored into a loop to wake up every couple of seconds to see if the service needs to stop.

Other Options

Of course there are other ways to build delayed retry logic into your services. The method used in this post is just the simplest to illustrate, but you can take this further. For example, take a look at the MassTransit-Quartz project. This uses the open source Quartz.NET scheduler to enable delayed publishing of messages. It does, however, require an ADO.NET database to persist the scheduled jobs so you don’t lose your delayed messages. If you need scheduling and visibility into messages that were delayed, then this is your ticket.

Another pattern that could be implemented is that of moving the delayed messages to another RabbitMQ queue. Then you could write something that periodically polled that queue and moved the messages back into the primary queue after the desired delay.

Next Stop…

Let’s take a look at how we can implement multiple consumers in a couple different use cases. In one case, we might want to spin up multiple. identical consumers on different machines to scale out message processing. In another case, we may want to have completely different consumers subscribing to the same message type but intending to do different things with the messages. After that, we’ll probably take a look at Sagas (chaining multiple messaging processing steps together) and then… who knows? Send me your thoughts and questions regarding anything you’d like to see here.

]]>
https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/feed/ 7
A Real-World MassTransit Customer Portal Example https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/ https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/#comments Sat, 19 Jul 2014 04:11:17 +0000 http://looselycoupledlabs.com/?p=98 Continue reading ]]> Now that we’ve seen some simple examples of how to use MassTransit with the Publish/Subscribe pattern on multiple machines, let’s build something that resembles a more real-world app. In this article, we’ll build an ASP.NET MVC Customer Portal app where a customer can create a new support ticket. The ticket will be published onto the service bus. We’ll create a Windows Service to be the subscriber of these messages and it will handle the tickets, in this example, sending a confirmation email to the customer.

You can get all the code from this blog post at https://github.com/dprothero/Loosely.CustomerPortal.

This is a big one, so roll up your sleeves…

The Web App

Let’s build a rudimentary front-end application that will be a stand-in for a true customer portal style web site. We’ll build an app that has a single menu option on it’s home page:

image

When the user clicks “Open a new support ticket,” they will get a very simple form asking for their email address and a description of their problem or question:

image

When the user clicks “Open Ticket,” they will see a confirmation message containing their ticket number:

image

So let’s dig into the code to build this web app.

New ASP.NET MVC Project

Open Visual Studio and choose File… New Project. Select the “ASP.NET Web Application” project template. Give it the name “Loosely.CustomerPortal.WebApp” and name the solution “Loosely.CustomerPortal.”

In the “New ASP.NET Project” select the “Empty” template and check the “MVC” box under the “Add folders and core references for” heading.

Contracts

Now we need a place to keep our “contracts” for our service bus. A contract is an interface that specifies the format of our message type. Add a new class library to the solution and name it “Loosely.Bus.Contracts.” Add a new file to the class library called TicketOpened and define the following interface:

namespace Loosely.Bus.Contracts
{
  public interface TicketOpened
  {
    string Id { get; }
    string CustomerEmail { get; set; }
    string Message { get; set; }
  }
}

This is the message type we will publish onto the service bus whenever a user of the web application wants to open a new support ticket.

Configuration

Now let’s add another class library named “Loosely.Bus.Configuration” where we’ll keep our common MassTransit configuration code. Add the MassTransit.Log4Net and MassTransit.RabbitMQ NuGet packages to this new class library.

We’ll put the common service bus initialization code into a class called BusInitializer:

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using System;

namespace Loosely.Bus.Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        x.UseRabbitMq();
        x.ReceiveFrom("rabbitmq://localhost/Loosely_" + queueName);
        moreInitialization(x);
      });

      return bus;
    }
  }
}

You may recall, in a previous post, when we did the same thing. In fact, this code is nearly identical. The only thing we’ve changed is the prefix for the queue name. In that earlier post I describe what we’re doing here in detail. In summary, we’re setting up a new instance of a MassTransit service bus that will use RabbitMQ for it’s transport mechanism.

Put ASP.NET on the Bus

Returning to the Loosely.CustomerPortal.WebApp in our solution, right-click on References and add project references to the Loosely.Bus.Configuration and Loosely.Bus.Contracts projects. Also, add the MassTransit NuGet package to the project.

The best place I’ve found to create and configure MassTransit in an ASP.NET app is in the Global.asax’s Application_Start event handler. Open Global.asax.cs and make sure the code looks like this:

using Loosely.Bus.Configuration;
using MassTransit;
using System.Web.Mvc;
using System.Web.Routing;

namespace Loosely.CustomerPortal.WebApp
{
  public class MvcApplication : System.Web.HttpApplication
  {
    public static IServiceBus Bus {get; set;}

    protected void Application_Start()
    {
      AreaRegistration.RegisterAllAreas();
      RouteConfig.RegisterRoutes(RouteTable.Routes);

      Bus = BusInitializer.CreateBus("CustomerPortal_WebApp", x => { });
    }

    protected void Application_End()
    {
      Bus.Dispose();
    }
  }
}

We’re adding a public, static property to the MvcApplication class that we can use elsewhere in our application to get access to our service bus. In the Application_Start event handler, after the routing code added by Visual Studio, we can use our BusInitializer class to create a new bus and assign it to the static property. Later, when we want to use the bus, we’ll simply use the expression MvcApplication.Bus.

Don’t forget to call Dispose on the bus in the Application_End event.

Ticket Model

Still within the WebApp project, add a new model to the Models folder and call it “Ticket.” This will be the model we will bind our support ticket data entry form to. We’ll also have it implement the TicketOpened interface so we can publish it to our service bus.

using Loosely.Bus.Contracts;
using System;

namespace Loosely.CustomerPortal.WebApp.Models
{
  public class Ticket : TicketOpened
  {
    private string _id;
    
    public string Id { get { return _id; } }
    public string CustomerEmail { get; set; }
    public string Message { get; set; }

    public Ticket()
    {
      _id = Guid.NewGuid().ToString();
    }

    public void Save()
    {
      MvcApplication.Bus.Publish<TicketOpened>(this, x => { 
        x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
      });
    }
  }
}

Because we want to be able to tell the user what their ticket ID is right away, we need some method to generate a statistically unique, random ID. GUIDs work well for this in terms of being easy to implement for a developer, as it’s a single line of code. They aren’t a great user experience, of course, due their length.

Timeout to Pontificate

The point to remember is that we need a way to generate an identifier that we know should be unique in whatever data storage repository we will be storing the tickets in without having to consult said data storage repository. Remember, the reason we’re using a service bus is to have this web application loosely coupled to whatever backend is used for our ticketing system.

When the rubber meets the road, however, you may be integrating with a ticketing system that wants to assign it’s own IDs. In that case, you will have to decide whether the requirement to display the ticket ID immediately to the user is worth a round-trip to the ticketing system to get it. There’s no one right answer. Building systems is a constant series of trade-offs.

Back to the Model…

We also have a Save method that we will call from our controller (coming soon). Instead of what you typically see in a Save method (saving to a database), we’re publishing the Ticket onto the bus. Since the ticket implements the TicketOpened interface from our Contracts assembly, other processes can subscribe to these TicketOpened messages and do something interesting with them.

The Controllers and Views

Now let’s build some UI. Add an empty controller called “HomeController” to the “Controllers” folder. Nothing much is needed in this controller – just return a view that will have our “menu” of options (a menu of one option, that is):

using System.Web.Mvc;

namespace Loosely.CustomerPortal.WebApp.Controllers
{
  public class HomeController : Controller
  {
    // GET: Home
    public ActionResult Index()
    {
      return View();
    }
  }
}

Create a view named Index under the Views/Home folder, leaving the “Use a layout page” option checked. The layout page will give us a basic page template so we don’t have to worry about formatting too much. The view can then simply present our single menu option:

@{
    ViewBag.Title = "Index";
}

<h2>Customer Portal</h2>

<a href="@Url.Content("~/Ticket/Open")">Open a new support ticket</a>

As you can see, the link to open a new support ticket is taking us to /Ticket/Open, which means we need a TicketController with an Open action. Add this controller to the Controllers folder (choose empty again). Below is the code for this controller:

using System.Web.Mvc;

namespace Loosely.CustomerPortal.WebApp.Controllers
{
  public class TicketController : Controller
  {
    [HttpGet]
    public ActionResult Open()
    {
      var ticket = new Models.Ticket();
      return View(ticket);
    }

    [HttpPost]
    public ActionResult Open(Models.Ticket ticket)
    {
      ticket.Save();
      return Redirect("~/Ticket/Opened/" + ticket.Id);
    }

    public ActionResult Opened(string id)
    {
      ViewBag.TicketId = id;
      return View();
    }
  }
}

A few interesting things are going on now. First, we’ve got the Open action that’s tagged with the HttpGet attribute. This action simply creates a new ticket model and binds it to the (soon to be created) view. This view will be the data entry form allowing the user to supply their email and message text.

The next method is also called Open but is tagged with the HttpPost attribute. This is because when the user submits the form we will still be posting to the /Ticket/Open url (sorry if this is review for you MVC vets). The post action takes in a ticket model that should be populated with the data from the user’s form submission.

We take the ticket the user submits and call the Save method on it (which, as you’ll recall, is what will post the message to the service bus). Following the save, we redirect to the Opened action, passing the ticket ID.

Finally, we have the Opened action which passes the ticket ID into the view via the ViewBag so it can be displayed to the user.

Ticket Views

We need a couple views for the Ticket controller. Under the Views/Ticket folder, create an empty view named Open and be sure to select “Ticket (Loosely.CustomerPortal.WebApp.Models)” for the model class. This view will contain our data input form that will be bound to the Ticket model:

@model Loosely.CustomerPortal.WebApp.Models.Ticket

@{
    ViewBag.Title = "Open";
}

<h2>Open a Ticket</h2>

@using ( Html.BeginForm() )
{
  <fieldset>
    <legend>Ticket Info</legend>
    <div>@Html.LabelFor(model => model.CustomerEmail)</div>
    <div>@Html.TextBoxFor(model => model.CustomerEmail)</div>

    <div>@Html.LabelFor(model => model.Message)</div>
    <div>@Html.TextBoxFor(model => model.Message)</div>

    <input type="submit" value="Open Ticket" />

  </fieldset>
}

Add another view named Opened under the same Views/Ticket folder. This view will simply display the ticket ID:

@{
    ViewBag.Title = "Opened";
}

<h2>Ticket Opened</h2>

<p>
  Your ticket has been opened.
  Your ticket id is: <strong>@ViewBag.TicketId</strong>
</p>

<p>
  <a href="@Url.Content("~/")">Return Home</a>
</p>

Checkpoint – The Web App Works, Now What?

If you run the web app now, you’ll be able to create a new ticket, and you’ll even get a new ticket ID assigned each time! If you go into the RabbitMQ management interface (see this post for instructions), you will see that there’s an exchange named Loosely.Bus.Contracts:TicketOpened that isn’t connected to any other exchanges or queues. If you’ve been following my blog, you’ll know this is because we don’t have anyone listening for these types of messages yet.

Creating the Backend Service

It’s time to do something with these tickets that are being created by the web app. Let’s create a Windows service that can run in the background on any machine and subscribe to TicketOpened messages that are published to the service bus. We’ll use the open source project TopShelf for creating our Windows service. TopShelf is published by the same trio of geniuses that gave us MassTransit and it makes creating Windows services extremely simple.

Start by adding a new Console application to our solution and name it Loosely.CustomerPortal.Backend. Add project references to Loosely.Bus.Configuration and Loosely.Bus.Contracts, as well as a framework reference to System.Configuration. Finally, add NuGet packages MassTransit and TopShelf to the project.

Backend Configuration

First, let’s setup a little configuration so that TopShelf will log any messages to a log file we can create. We’ll also setup the configuration to log general Trace messages. To send email messages, we’ll use Gmail’s SMTP server, so we also need some place to store our Gmail credentials.

All this configuration can go in the App.config file in the new console application:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
  </startup>
  <system.diagnostics>
    <sharedListeners>
      <add name="traceLogListener" type="System.Diagnostics.TextWriterTraceListener" 
           initializeData="C:\Logs\Loosely.CustomerPortal.Backend.log" />
    </sharedListeners>
    <sources>
      <source name="Default">
        <listeners>
          <add name="traceLogListener" />
          <remove name="Default" />
        </listeners>
      </source>
    </sources>
    <trace autoflush="true" indentsize="4">
      <listeners>
        <add name="traceLogListener" />
        <remove name="Default" />
      </listeners>
    </trace>
  </system.diagnostics>

  <appSettings file="C:\Config\Loosely.CustomerPortal.Backend.config">
    <add key="Gmail.Account" value="youraccount@gmail.com"/>
    <add key="Gmail.Password" value="yourpassword"/>
  </appSettings>
  
</configuration>

Notice we’re logging messages to C:\Logs\Loosely.CustomerPortal.Backend.log, so be sure to create a C:\Logs folder (or change this path to somewhere else you might prefer).

Email Helper

When a new ticket is opened, we want to send a confirmation to the customer’s email address that they supplied. Create a new EmailHelper class to handle the down and dirty SMTP communication:

using System.Configuration;

namespace Loosely.CustomerPortal.Backend
{
  class EmailHelper
  {
    readonly static string gmailAccount = ConfigurationManager.AppSettings.Get("Gmail.Account");
    readonly static string gmailPassword = ConfigurationManager.AppSettings.Get("Gmail.Password");

    public static void Send(string customerEmail, string subject, string messageBody)
    {
      var client = new System.Net.Mail.SmtpClient("smtp.gmail.com", 587);
      client.EnableSsl = true;
      client.Credentials = new System.Net.NetworkCredential(gmailAccount, gmailPassword);
      client.Send(gmailAccount, customerEmail, subject, messageBody);
    }
  }
}

Nothing magical here. We just pull the Gmail credentials out of our config file and then open a secure connection to Gmail’s smtp server to send a message. Obviously, in a true production app, this code would be written to connect to the appropriate SMTP server and likely not use Gmail like this. Gmail works well for a simple example, however.

A Consumer for TicketOpened

Now we need a consumer class to which MassTransit can send TicketOpened messages. Add a new class called TicketOpenedConsumer:

using Loosely.Bus.Contracts;
using MassTransit;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                      "Id: " + envelope.Message.Id + "\r\n" +
                      "Email: " + envelope.Message.CustomerEmail + "\r\n" + 
                      "Message: " + envelope.Message.Message);

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.\n\n" + 
                        "Your Message:\n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

This is the real meat of the backend service. The Consume method will be called for every TicketOpened message that MassTransit picks up off the bus for us. In this example, we’re simply logging the information from the ticket and then sending the confirmation email to the customer.

The Service Class

Next let’s create a class that we’ll use to host our service. We’ll furnish this class to TopShelf, who will call the Start method when the service is started and the Stop method when the service is stopped. Create a new class called TicketService:

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

The Start method is the perfect place to put our bus initialization code. Notice how we are adding a subscription in MassTransit and providing it our TicketOpenedConsumer class. We’re also making it a permanent subscription.

The Startup Glue

Now we have all the classes we need. Open the Program.cs file and put the following TopShelf configuration code into the Main() function:

using System.Diagnostics;
using Topshelf;

namespace Loosely.CustomerPortal.Backend
{
  class Program
  {
    static void Main(string[] args)
    {
      HostFactory.Run(x =>
      {
        x.Service<TicketService>(s =>
        {
          s.ConstructUsing(name => new TicketService());
          s.WhenStarted(ts => ts.Start());
          s.WhenStopped(ts => ts.Stop());
        });
        x.RunAsLocalSystem();

        x.SetDescription("Loosely Coupled Labs Customer Portal Backend");
        x.SetDisplayName("Loosely.CustomerPortal.Backend");
        x.SetServiceName("Loosely.CustomerPortal.Backend");
      });
    }
  }
}

I’ll refer you to the TopShelf documentation for details on how TopShelf works. For this example, you just need to know that this code wires up our TicketService class to the TopShelf framework. The great thing about TopShelf is you can just run the executable and it will run your program as a standard console app. If you run it with the “install” command line parameter, it will install it as a Windows service.

Let’s Do This

Right-click on the solution and choose “Set Startup Projects…” and make both Loosely.CustomerPortal.Backend and Loosely.CustomerPortal.WebApp startup apps. Run the solution and you’ll get a web browser with the web app and a console window running your new service.

Try submitting a few tickets. You should see the emails (assuming you used your own email address to create the ticket) as well as log entries in the C:\Logs\Loosely.CustomerPortal.Backend.log file:

image

image

Install the Service

Now, let’s actually make the backend an actual Windows service. Open a command prompt as administrator (as administrator is important). Change to the directory where the Loosely.CustomerPortal.Backend.exe was built from Visual Studio. This will likely be the Loosely.CustomerPortal\Loosely.CustomerPortal.Backend\bin Debug folder. Run the following command:

> Loosely.CustomerPortal.Backend.exe install

Now you should be able to launch the Windows services MMC snap-in (services.msc) and see the new Loosely.CustomerPortal.Backend Windows service and fire it up!

image

With the service running, you will want to change your Visual Studio solution back to having only the web app as the startup project.

What’s Next?

Now that we’ve built this app, I’d like to refer back to it in future blog posts so we can refine it to be more robust and “enterprise-ready”. Let’s use it to look at things like message retry logic, sagas, and multiple subscribers (for scale or for different functions). As always, let me know if there’s anything specific you’d like to see me write about.

]]>
https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/feed/ 2
Creating a RabbitMQ Cluster for Use with MassTransit https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/ https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/#comments Wed, 09 Jul 2014 01:09:17 +0000 http://looselycoupledlabs.com/?p=46 Continue reading ]]> In my last post, A Simple MassTransit Publish/Subscribe Example, we looked at how to build basic publishers and subscribers using MassTransit and RabbitMQ. In the example, however, we were only using a single RabbitMQ instance on a single machine. In the real world, your publishers are most likely to live on separate machines from your subscribers. To do that, we need to set up a RabbitMQ cluster.

RabbitMQ Clustering

A RabbitMQ cluster (or broker) can consist of multiple nodes, each running an instance of the RabbitMQ application. All of the nodes will share configuration information, such as users, exchanges, and queues. Recall from our last discussion that an exchange is like a post office and a queue is like an actual mailbox containing the resting place of messages. The actual queue, or mailbox, only lives on the node where the queue was created, but the knowledge of the queue’s existence is shared by the cluster, as is the full routing information contained in the exchanges. (Note that queues can be replicated for high-availability requirements, which we may cover in a future post.)

A Common Clustering Pattern

A pattern I like to use for clustering RabbitMQ nodes is to have a node live on each machine that participates in the overall application. For example, I will install instances of RabbitMQ on each web server that makes up a web application and will be publishing messages related to the application. Then, I will install an instance of RabbitMQ on each machine that will be running a service that will subscribe to the messages. The diagram below illustrates this pattern.

RabbitMQ Cluster Diagram

Setting Up the Cluster

For the purposes of this example, we are going to assume you have two Windows machines (virtual or otherwise) named Publisher and Subscriber. Install RabbitMQ on each of these machines, referring to the installation instructions in my previous blog post.

Erlang Cookie

The first thing that needs to be done is to set up the Erlang cookie on each machine so that the cookies are the same. (Remember, RabbitMQ is an Erlang application.) ASP.NET developers may be familiar with the concept of creating a common Machine Key for a cluster of web servers. The Erlang cookie is a corollary to this process. It is a key that indicates that the multiple Erlang nodes can communicate with each other. All the nodes in a RabbitMQ cluster must have the same Erlang cookie.

When installing Erlang on Windows and installing RabbitMQ as a Windows Service, the Erlang cookie is placed in two locations – C:\Users\Current User and C:\Windows. The RabbitMQ clustering guide recommends making sure all of the cookies on the machine are the same. Copy the cookie from C:\Windows on the Publisher machine to C:\Users\Current User on the same machine and then to both C:\Users\Current User and C:\Windows on the Subscriber machine. All four cookies should have identical content (you can inspect with Notepad to be sure).

Restart the RabbitMQ service on both machines.

Final Cluster Setup

If you’ve got the Windows Firewall turned on, be sure to allow incoming TCP traffic on ports 4369 and 25672 on both the Publisher and Subscriber machines.

Now, from the Subscriber machine, open up the “RabbitMQ Command Prompt (sbin dir)” item (added by the RabbitMQ installer). Run the following commands:

> rabbitmqctl stop_app
> rabbitmqctl join_cluster rabbit@Publisher

What we are doing here is telling the instance of RabbitMQ running on the Subscriber machine that we want to join a cluster that is hosted by the Publisher machine (which is currently a cluster of one). Restart the RabbitMQ Windows service after running this command.

Inspecting Your Cluster

Now, on either the Publisher or the Subscriber, open the web-based management console at http://localhost:15672/. (Recall from our previous post that the web-based management console is a plugin we need to install.) On the first page, you will see the details of our newly created, multi-node cluster!

rabbitmq-cluster

MassTransit in Action

Now let’s put our cluster to the test. Build the sample applications that we created in the previous post. Copy the contents of TestPublisher\bin\Debug to a folder on the Publisher. Likewise, copy the contents of TestSubscriber\bin\Debug to a folder on the Subscriber.

Launch TestSubscriber.exe on the Subscriber machine first. Then, launch TestPublisher on the Publisher machine. Finally, send a message from the Publisher, and watch it appear on the Subscriber.

rabbitmq-cluster-example

Notice how we didn’t have to modify any code in the sample application? This is because we set up the code to connect to RabbitMQ on localhost. Because each local instance of RabbitMQ on the two machines was part of a single cluster, they shared information about the exchanges and queues necessary to make the MassTransit service bus work its magic, as if there were only a single RabbitMQ instance!

What’s Next?

Next up, we’ll take a look at scaling this up to a real-world example. We’ll create an ASP.NET web application that can publish messages and a custom Windows service that can be our subscriber of the messages. Down the road, we’ll look at having multiple subscribers for the same message type in two different patterns: one for scaling out message processing and another for having different processing occur in separate Windows services.

As always, please send your suggestions for things you’d like to learn how to do with MassTransit.

]]>
https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/feed/ 20