RabbitMQ – Loosely Coupled Labs https://looselycoupledlabs.com **ARCHIVED** A Blog Loosely Related to System Architecture by David Prothero **ARCHIVED** Sat, 04 Jul 2015 23:09:43 +0000 en-US hourly 1 https://wordpress.org/?v=5.8.6 MassTransit 3 Update: A Simple Publish/Subscribe Example https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/ https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/#comments Sat, 04 Jul 2015 22:54:48 +0000 http://looselycoupledlabs.com/?p=263 Continue reading ]]> With the announcement of MassTransit 3, we learned that there were many changes to the API that were coming. The changes are all welcome ones, making MassTransit 3 simpler to work with and now completely asynchronous. What follows is this blog’s inaugural post, but updated to work with the current pre-release version of MassTransit 3.

Whenever there is an MT3 specific update, I will call it out like this.

When I first sat down to learn how to use MassTransit, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker.

Setting Up Your Environment

The first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others, but I find that RabbitMQ is really the way to go. That’s especially true when using the publish/subscribe pattern. The reason for this is that RabbitMQ has a complete routing framework built-in and MassTransit will leverage this when persisting your subscriptions. When creating a cluster of RabbitMQ servers for availability, this routing information is replicated to all the nodes.

MT3 has dropped support for MSMQ (read about that) but has officially adopted Azure Service Bus as an in-the-box supported transport.

In this article, you’re going to run RabbitMQ on your local Windows development box. Both our publisher and subscriber will connect to the same RabbitMQ instance. In a future post, I’ll detail how to set up multiple RabbitMQ instances in a cluster.

Installing RabbitMQ

RabbitMQ requires the Erlang runtime, so that’s the first thing you need to download and install. Head over to Erlang.org’s download page and get the latest binary release for Windows (it’s likely you’ll want the 64-bit version). It’s a simple setup wizard, so you’ll have Erlang installed on your machine in short order.

Next, download the latest version of RabbitMQ for Windows. Again, it’s an easy setup wizard that you can quickly fly through. Just accept the defaults.

Enabling the RabbitMQ Web Management Interface

One RabbitMQ feature that I found extremely useful (but which isn’t enabled by default) is the web-based management interface. With this, you can see the exchanges and queues that are set up by MassTransit in RabbitMQ. To enable this, find the “RabbitMQ Command Prompt (sbin dir)” item that the RabbitMQ installer added to your Start menu and launch it. From the command line, run the following command:

> rabbitmq-plugins enable rabbitmq_management

It will confirm that the plugin and its dependencies have been enabled and instruct you to restart RabbitMQ. When installed on Windows, RabbitMQ runs as a Windows service. You can use the Services MMC snap-in to restart it or just run the following command:

> net service stop RabbitMQ
...
> net service start RabbitMQ

Now go to http://localhost:15672/ to open the management console. Default credentials to login are guest/guest (you can change the credentials from the Admin tab).

There’s not much to see yet, but we’ll set the stage. Go to the Exchanges tab. You’ll see the following default RabbitMQ exchanges:

image

An exchange is something you can send messages to. It cannot hold messages. It’s merely a set of routing instructions that tell RabbitMQ where to deliver the message. We’ll come back here in a little while.

Now click on the Queues tab. Nothing here yet. Queues can actually hold messages and are where applications can actually pick up messages.

So, to use a real world analogy, an Exchange is like the local Post Office, and a Queue is like your mailbox. The only thing that an Exchange can do that most traditional Post Offices don’t do is actually make multiple copies of a message to be delivered to multiple mailboxes.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubExample

To view the code for MT3, select the “mt3” branch in this GitHub repository.

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

The original article used a separate Configuration class to handle the common configuration tasks for both the publisher and subscriber. MT3’s configuration code is much simpler and, for clarity, I’ve moved the appropriate code into the publisher and subscriber.

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for some text and then publishes that text as part of a SomethingHappened message. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts project.

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for text, and publish that text onto the bus. Real quick first, however, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.RabbitMq package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

To get the pre-release version of MassTransit 3 you need to add the -Pre flag to the Install-Package commands in the Package Manager Console:

Install-Package -Pre MassTransit.RabbitMq
Install-Package -Pre Masstransit.Log4Net
using System;
using Contracts;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x => 
        x.Host(new Uri("rabbitmq://localhost/"), h => { }));
      var busHandle = bus.Start();
      var text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage()
        {
          What = text, When = DateTime.Now
        };
        bus.Publish<SomethingHappened>(message);
      }

      busHandle.Stop().Wait();
    }
  }
}
In MT3, most operations are asynchronous, including the Publish() method. However, there is no need to await the call to publish here.

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like.

MT3 now requires an explicit call to start the bus, returning a handle that you can use to later stop the bus. Notice the Wait() method chained to the Stop() method. This is because the Stop() method is asynchronous. If you were calling Stop() inside an async method, you could await it. However, since this is a simple Console app, we are just blocking the thread until the shutdown is complete.

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

What’s Going on in RabbitMQ So Far?

If you go back into the RabbitMQ web interface and jump over to the Exchanges tab, you’ll see we have a couple new arrivals.

image

MT3 doesn’t require creating a queue if you aren’t hosting an endpoint. So, for our publisher, the MtPubSubExample_TestPublisher queue will not be created as it was in the original post.

Contracts:SomethingHappened is a new exchange created for the SomethingHappened message type. When we published this message, MassTransit automatically created this exchange. Click on it and scroll down to the Bindings section, and you’ll see there are no bindings yet:

image

That’s because nobody has subscribed to SomethingHappened messages yet. They go to the exchange and then die because there’s no queue to route them to.

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit.RabbitMq and MassTransit.Log4Net NuGet packages.

Don’t forget the -Pre switch to get the pre-release version of MT3.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using System;
using System.Threading.Tasks;
using Contracts;
using MassTransit;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : IConsumer<SomethingHappened>
  {
    public Task Consume(ConsumeContext<SomethingHappened> context)
    {
      Console.Write("TXT: " + context.Message.What);
      Console.Write("  SENT: " + context.Message.When);
      Console.Write("  PROCESSED: " + DateTime.Now);
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
      return Task.FromResult(0);
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

The Consume method is meant to be asynchronous, so it returns a Task. In this example, we aren’t making any other asynchronous calls, so we just use the Task.FromResult() helper method to return a Task with a zero result. If you were doing something asynchronous in the Consume method you could use async/await:

public async Task Consume(ConsumeContext<SomethingHappened> context)
{
  await SomeAsynchronousMethod(context.Message);
}

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using System;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      Log4NetLogger.Use();
      var bus = Bus.Factory.CreateUsingRabbitMq(x =>
      {
        var host = x.Host(new Uri("rabbitmq://localhost/"), h => { });

        x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e =>
          e.Consumer<SomethingHappenedConsumer>());
      });
      var busHandle = bus.Start();
      Console.ReadKey();
      busHandle.Stop().Wait();
    }
  }
}
Notice we are now passing a queue name to the new ReceiveEndpoint MT3 method. Make sure not to share this queue name with other applications.

Now right-click on the MtPubSubExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run.

Type some messages into the publisher. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in RabbitMQ Now?

Go back into the RabbitMQ web interface and go back to the Exchanges tab. You’ll see a new exchange called MtPubSubExample_TestSubscriber, but first click on the Contracts:SomethingHappened exchange and scroll down to the Bindings section. You’ll see we now have a binding.

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up this binding for us. Click on the MtPubSubExample_TestSubscriber here, and you’ll see you’re taken to the setup page for an exchange called MtPubSubExample_TestSubscriber. Scroll down to Bindings, and you’ll see we’re bound to a queue named the same as the exchange (in the binding diagrams, exchanges show up as rectangles with rounded corners, whereas queues have straight corners).

image

The web interface is great in how it shows the predecessor in addition to the successor in the path. Click the MtPubSubExample_TestSubscriber queue here, and you’ll be taken to the queue setup page for that queue. If you haven’t fired up the TestSubscriber app since we published those last three messages, you should see that there are three messages in the queue:

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 6-minute lag (the 6 minutes we were poking around in RabbitMQ before starting up the subscriber again).

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit 3. I will work on updating more posts like this one.

Until then…

]]>
https://looselycoupledlabs.com/2015/07/masstransit-3-update-a-simple-publishsubscribe-example/feed/ 9
Monitoring RabbitMQ https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/ https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/#comments Wed, 20 Aug 2014 03:52:58 +0000 http://looselycoupledlabs.com/?p=158 Continue reading ]]> We’ve talked a lot about using MassTransit with RabbitMQ in a variety of scenarios on this blog. We talked about error handling, which is something you need to know how to implement in a real world MassTransit + RabbitMQ deployment. Another important aspect to a production deployment is monitoring. Lets take a look at how we can leverage the RabbitMQ Management HTTP API (installed with the RabbitMQ Management Plugin) to query our queues so we can monitor the health of our service bus.

Enabling the RabbitMQ Management Plugin

In A Simple MassTransit Publish/Subscribe Example, I detailed how to install RabbitMQ on Windows along with the RabbitMQ Management Plugin. You can refer to that post for the full details, but it’s important to know the RabbitMQ Management Plugin isn’t installed out of the box. One simple command will install it:

> rabbitmq-plugins enable rabbitmq_management

After a restart of the RabbitMQ service, visiting http://localhost:15672/ should bring you to the management GUI (default credentials are guest/guest). The HTTP API is available at http://localhost:15672/api/.

Simple REST API Call

Type the following URL into your browser to get a list of queues in your RabbitMQ instance:

http://localhost:15672/api/queues

You will need to supply the guest/guest credentials to access the API method. You should see the list of queues returned as JSON. If you want to get data for a single, specific queue, use the url format /api/queues/vhost/queue-name. The default vhost in RabbitMQ is “/” which URL encoded is %2F, so the the URL to retrieve info for the queue named MtPubSubExample_TestSubscriber would be http://localhost:15672/api/queues/%2F/MtPubSubExample_TestSubscriber and that would return the following JSON:

{
   "memory":21856,
   "messages":1,
   "messages_details":{
      "rate":0.0
   },
   "messages_ready":1,
   "messages_ready_details":{
      "rate":0.0
   },
   "messages_unacknowledged":0,
   "messages_unacknowledged_details":{
      "rate":0.0
   },
   "idle_since":"2014-08-16 9:30:50",
   "consumer_utilisation":"",
   "policy":"",
   "exclusive_consumer_tag":"",
   "consumers":0,
   "backing_queue_status":{
      "q1":0,
      "q2":0,
      "delta":[
         "delta",
         "undefined",
         0,
         "undefined"
      ],
      "q3":1,
      "q4":0,
      "len":1,
      "pending_acks":0,
      "target_ram_count":"infinity",
      "ram_msg_count":0,
      "ram_ack_count":0,
      "next_seq_id":180224,
      "persistent_count":1,
      "avg_ingress_rate":0.0,
      "avg_egress_rate":0.0,
      "avg_ack_ingress_rate":0.0,
      "avg_ack_egress_rate":0.0
   },
   "state":"running",
   "incoming":[

   ],
   "deliveries":[

   ],
   "consumer_details":[

   ],
   "name":"MtPubSubExample_TestSubscriber",
   "vhost":"/",
   "durable":true,
   "auto_delete":false,
   "arguments":{

   },
   "node":"rabbit@PROWIN1"
}

In this snippet of JSON you can see the queue has a single message in it. Now, let’s see what we can do with this new knowledge to keep tabs on our queues.

What to Monitor

At a minimum, I would monitor the number of ready messages in your queues. What constitutes an alerting threshold obviously will depend on your application. If you expect to have fairly real-time processing of your messages, and have the necessary horsepower in your consumer(s) to keep up, then you might alert if the queue goes over 10 unprocessed messages. You just need to decide how sensitive you need it to be. I’ll show you how to get the data out.

Which queues should you monitor? Any queue you expect to receive messages likely warrants monitoring. This means, for example, in our simple publish/subscribe example, we would monitor the MtPubSubExample_TestSubscriber queue. We also will want to monitor the corresponding error queue (e.g. MtPubSubExample_TestSubscriber_error). In our customer portal example, we would monitor Loosely_CustomerPortal_Backend and Loosely_CustomerPortal_Backend_error.

How to Monitor

This is largely up to you and what monitoring tools you already have in place. Most monitoring tools such as Nagios, NewRelic, AlertSite, and System Center will have some notion of a custom counter or metric that you can have the system monitor and send alerts when they reach a certain threshold. If your selected monitoring tool has a RabbitMQ plugin, you’re in luck. Otherwise, you will need to write a small script to supply the data points to your monitoring system. Let’s take a look at how to do this in C#.

Checking Your RabbitMQ Queues in C#

The code for this sample is available on github at https://github.com/dprothero/Loosely.RabbitMq.Monitor. Clone that repository or follow along here to build the project from scratch. I am using Visual Studio 2013.

First, create a new C# Console project. We can call it Loosely.RabbitMq.Monitor and name the solution something like Loosely.RabbitMq.Utilities. Install the NuGet package Json.NET to your project. That will help us read the JSON response from the web service.

Add a new class file to your project and name it QueueInfo.cs. Delete the QueueInfo class declaration (leaving just the namespace declaration). Copy the JSON code above to your clipboard and then position your cursor within the namespace declaration in QueueInfo.cs. From the Edit menu, select Paste Special, and then Paste JSON as Classes. This will create C# class declarations that will follow your expected JSON. Rename the newly pasted class named Rootobject to QueueInfo. You should end up with the classes below in QueueInfo.cs:

namespace Loosely.RabbitMq.Monitor
{
  public class QueueInfo
  {
    public int memory { get; set; }
    public int messages { get; set; }
    public Messages_Details messages_details { get; set; }
    public int messages_ready { get; set; }
    public Messages_Ready_Details messages_ready_details { get; set; }
    public int messages_unacknowledged { get; set; }
    public Messages_Unacknowledged_Details messages_unacknowledged_details { get; set; }
    public string idle_since { get; set; }
    public string consumer_utilisation { get; set; }
    public string policy { get; set; }
    public string exclusive_consumer_tag { get; set; }
    public int consumers { get; set; }
    public Backing_Queue_Status backing_queue_status { get; set; }
    public string state { get; set; }
    public object[] incoming { get; set; }
    public object[] deliveries { get; set; }
    public object[] consumer_details { get; set; }
    public string name { get; set; }
    public string vhost { get; set; }
    public bool durable { get; set; }
    public bool auto_delete { get; set; }
    public Arguments arguments { get; set; }
    public string node { get; set; }
  }

  public class Messages_Details
  {
    public float rate { get; set; }
  }

  public class Messages_Ready_Details
  {
    public float rate { get; set; }
  }

  public class Messages_Unacknowledged_Details
  {
    public float rate { get; set; }
  }

  public class Backing_Queue_Status
  {
    public int q1 { get; set; }
    public int q2 { get; set; }
    public object[] delta { get; set; }
    public int q3 { get; set; }
    public int q4 { get; set; }
    public int len { get; set; }
    public int pending_acks { get; set; }
    public string target_ram_count { get; set; }
    public int ram_msg_count { get; set; }
    public int ram_ack_count { get; set; }
    public int next_seq_id { get; set; }
    public int persistent_count { get; set; }
    public float avg_ingress_rate { get; set; }
    public float avg_egress_rate { get; set; }
    public float avg_ack_ingress_rate { get; set; }
    public float avg_ack_egress_rate { get; set; }
  }

  public class Arguments
  {
  }

}

If you wanted, you could clean up the property and class names to better match C# naming conventions. Just be sure to add the necessary JsonProperty attribute so JSON.NET can deserialize the JSON into the correct object properties. For example:

[JsonProperty("avg_ack_egress_rate")]
public float AvgAckEgressRate { get; set; }

Note that JSON.NET is case insensitive when deserializing, so you don’t need these attributes if you are only adjusting the case of the identifier (e.g. “name” to “Name”).

Next, we could just hard-code the following in Program.cs to get the message count for the MtPubSubExample_TestSubscriber queue:

using Newtonsoft.Json;
using System;
using System.Net;

namespace Loosely.RabbitMq.Monitor
{
  class Program
  {
    static void Main(string[] args)
    {
      var client = new WebClient();
      client.Credentials = new NetworkCredential("guest", "guest");
      var data = client.DownloadString("http://localhost:15672/api/queues/%2F/MtPubSubExample_TestSubscriber");
      var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);

      Console.WriteLine("Queue: " + queueInfo.Name);
      Console.WriteLine("Queue Depth: " + queueInfo.MessagesReady.ToString());

      Console.Write("\r\nPress any key to continue.");
      Console.ReadKey();
    }
  }
}

The code should be pretty straight forward. We make a request to the HTTP API, deserialize the JSON data into a plain old C# object (POCO), and then output the properties we are interested in. However, to make this really useful, we should refactor this code so it can be command line driven. Then, we can simply pass parameters on the command line to tell the program the instance of RabbitMQ to monitor (base URL for the API), the credentials to connect to the server, and the queue we want to interrogate.

For command line argument parsing to a console application, I like to use the NDesk.Options NuGet package. It makes supporting command line parameters (-x or –xxxx or /XXXX) super easy. Add the NDesk.Options package to your project. With that added, we can modify our Program.cs file to the following:

using NDesk.Options;
using Newtonsoft.Json;
using System;
using System.Net;

namespace Loosely.RabbitMq.Monitor
{
  class Program
  {
    static void Main(string[] args)
    {
      string baseUrl = "http://localhost:15672/api",
             userName = "guest",
             password = "guest",
             queueName = null,
             vhost = "/";

      var p = new OptionSet () {
        { "u|baseUrl=",  v => baseUrl = v },
        { "U|user=",     v => userName = v },
        { "p|password=", v => password = v },
        { "q|queue=",    v => queueName = v },
        { "v|vhost=",    v => vhost = v }
      };
      p.Parse(args);
      
      var client = new WebClient();
      if(userName != null)
        client.Credentials = new NetworkCredential(userName, password);

      var url = baseUrl + "/queues/" + Uri.EscapeDataString(vhost) + "/" + Uri.EscapeDataString(queueName);
      var data = client.DownloadString(url);
      var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);

      // Here is where we report the message count to our monitoring system.
      // Replace the console output/wait with your code.
      Console.WriteLine("Queue: " + queueInfo.Name);
      Console.WriteLine("Queue Depth: " + queueInfo.MessagesReady.ToString());

      Console.Write("\r\nPress any key to continue.");
      Console.ReadKey();
    }
  }
}

To be able to run from within Visual Studio, you can try out different command line arguments by going to the Debug project properties page:

image

Conclusion

As you can see, it is fairly easy to interrogate RabbitMQ to get information about the status of its queues (as well as any number of other objects). If you want to make extensive use of the HTTP API, I would suggest taking a look at Mike Hadlow’s EasyNetQ.Client.Management library, available on NuGet. This is a complete C# wrapper around the HTTP API, making it trivial to work with the API from C#.

I hope you found value in this blog post. Please don’t hesitate to contact me if you have any questions or suggestions for future blog posts. Until then…

]]>
https://looselycoupledlabs.com/2014/08/monitoring-rabbitmq/feed/ 2
Scaling Out Subscribers With MassTransit https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/ https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/#comments Mon, 04 Aug 2014 00:41:33 +0000 http://looselycoupledlabs.com/?p=150 Continue reading ]]> So far on this blog, we’ve been looking at the publish/subscribe messaging pattern using MassTransit and RabbitMQ. So far, we’ve dealt with a single publisher and a single subscriber. We looked at how we can have those two roles live on separate servers. Finally, we looked at how to handle errors in the subscriber.

What happens, now, when your subscriber can’t process messages as fast as the messages are being published on the bus? This is a special situation, to be sure, but it certainly is possible in high message volume environments. It can be compounded if the messages themselves are fairly expensive to process.

Open the Flood Gates

Let’s take our simple publish/subscribe example and tweak it so we can have it dump a large number of messages onto the bus extremely quickly. Originally, the example prompted for a string and published that string as a single SomethingHappened message. Instead, let’s prompt for a number of messages that should be put onto the bus.

using Configuration;
using Contracts;
using System;
using System.Threading.Tasks;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter number of messages to generate (quit to exit): ");
        text = Console.ReadLine();

        int numMessages = 0;
        if (int.TryParse(text, out numMessages) && numMessages > 0)
        {
          Parallel.For(0, numMessages, i =>
          {
            var message = new SomethingHappenedMessage() { What = "message " + i.ToString(), When = DateTime.Now };
            bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
          });
        }
        else if(text != "quit")
        {
          Console.WriteLine("\"" + text + "\" is not a number.");
        }
      }

      bus.Dispose();
    }
  }
}

We’re using the System.Threading.Tasks.Parallel.For method to be able to simultaneously publish multiple messages onto the bus.

Now in our subscriber, let’s have it simulate 250 milliseconds of processing time with a call to System.Threading.Thread.Sleep. Also, because MassTransit will run 4 threads per CPU for our consumer and the messages will be flying in, we’ll condense our Console output to a single WriteLine call instead of multiple calls to Write so as to avoid the output from multiple messages getting jumbled together.

using Contracts;
using MassTransit;
using System;
using System.Threading;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.WriteLine("TXT: " + message.Message.What +
                        "  SENT: " + message.Message.When.ToString() +
                        "  PROCESSED: " + DateTime.Now.ToString() + 
                        " (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");

      // Simulate processing time
      Thread.Sleep(250);
    }
  }
}

Running the Test

The test code for this example can be found on github.

Make sure both TestPublisher and TestSubscriber are set up as startup projects and run the project in Visual Studio. Try publishing 1,000 messages:

image

As you could probably see, the prompt on the publisher returned well before the subscriber finished processing the messages. This means we were able to publish messages to the bus faster that we could process them. This could be a problem if you don’t expect any “lulls” in publishing which would allow the subscriber to catch up.

We can further illustrate the backlog by looking at the graph for the MtPubSubExample_TestSubscriber queue in the RabbitMQ management interface (found at http://localhost:15672/ – see this post for details). You have to have the interface up and be watching the graph while your publisher/subscriber test is actually running:

image

Here you can see that the publisher hit a peak of 200 messages per second, while the subscriber sustained a steady rate of about 40 messages per second.

With a spike and then nothing for a time, perhaps slow and steady wins the race for our subscriber. Try 10,000 messages and watch the RabbitMQ graphs:

image

This more dramatically illustrates the problem. The number of queued messages (top graph) is continuing to go up with no relief in sight. And the bottom graph shows we’re publishing messages at a rate of 259 per second, but we only process them at a rate of around 40 per second. Again, since the publish storm eventually passes, the subscriber does eventually catch up.

Let’s look at a couple ways we can increase the throughput of our subscriber.

Option 1: Increase the Prefetch Count

If you do the math on the rate of 40 messages per second that we observe, you will arrive at what appears to be 10 simultaneous threads processing messages (each message takes a quarter of a second). However, the default number of threads that MassTransit can use for consumers is actually the number of processors in your machine multiplied by 4. So, on my 8 core machine, that would be 32 threads. Why are we only observing 10?

The reason is due to the number of messages the RabbitMQ transport will “prefetch” from the queue. The default for this is 10, so we can only process 10 messages simultaneously. To increase this, you include a “prefetch=X” parameter in the query string of the queue URL. For example:

x.UseRabbitMq();
x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName + "?prefetch=32");

Now that this is set to 32 to match the maximum thread of 32, we should observe a 128 message per second processing rate.

Option 2: Increase the Thread Count

We can also tell MassTransit to allow more threads to be used to consume messages. You put a call to SetConcurrentConsumerLimit in your bus initialization code. Below we bump the thread count to 64 (doubling the number of threads):

using Configuration;
using MassTransit;
using System;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestSubscriber", x =>
      {
        x.SetConcurrentConsumerLimit(64);
        x.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }
  }
}

Don’t forget to also increase your prefetch setting (see option 1 above) to match. Now we’re processing 256 messages per second! That’s pretty close to our 259 per second we observed being published onto the bus.

However, at some point, your machine is going to run out of processing power. Perhaps it already has. We’re just sleeping the thread here for 250ms, so the ceiling is pretty high on how many threads we could run, but if there was real processing happening, we might be maxing out the CPU on the machine. As any good architect knows, don’t scale up, scale out!

Option 3: Run More Subscribers

Try dumping another 10,000 messages onto the bus. While you’ve got one subscriber running, you can simply execute another instance of the TestSubscriber executable and it will start processing messages too, effectively doubling your processing rate!

Having multiple subscribers connected to the same RabbitMQ queue is what’s called the “competing consumer” pattern. RabbitMQ will make sure each consumer gets unique messages in essentially a round-robin fashion.

Again, however, if we’re already maxing out the CPU on the subscriber machine, what we really need is to run another subscriber on another machine with its own available resources. If we use our current example code, however, we will get duplicate messages because each machine is connecting to localhost for its RabbitMQ instance. (Don’t forget to make sure all the RabbitMQ instances are in a cluster.) Since each instance will have its own queue, then MassTransit will treat each queue as a unique consumer as opposed to competing consumers. Each queue will get a copy of the same message routed to it. Remember this diagram from our clustering article illustrating how each machine has its own RabbitMQ instance:

Clearly, this is not what we want. In order to be competing consumers, the two subscriber processes must be connected to the same RabbitMQ instance and queue. What we need is an architecture more like the following:

cluster-2

Obviously, this makes the RabbitMQ server a single point of failure and a dependency for the two subscriber machines. If high availability is a requirement, then you would need to look into some type of virtual IP address based clustering (like keepalived on Linux or NLB on Windows). You will also need to implement highly available queues in RabbitMQ so that the queues are replicated across your multiple instances.

Implementing the Centralized RabbitMQ Server

Obviously, the first step is to install RabbitMQ on a new server. We’ll call this machine “submaster” (for subscription master). Instructions for installing RabbitMQ can be found in this blog post. Then, join the RabbitMQ instance on submaster to a cluster with the RabbitMQ instance on your publisher machine. Instructions for creating a RabbitMQ cluster can be found in this blog post. We should have these nodes in our RabbitMQ cluster:

image

In order to connect to a remote RabbitMQ instance, we need to do some security housekeeping. First, we need to modify the Windows Firewall on the submaster machine to allow in the default RabbitMQ port of 5672. Next, we need to create a new user in RabbitMQ that the subscriber can use to login. We’ll call it “testsubscriber” and give it a password of “test”. On the Admin tab of the RabbitMQ management interface, you can begin adding a new user:

image

Type in the username, password, administrator tag, and click Add user. Initially, the user won’t have any permissions:

image

Click on the testsubscriber user and then click “Set permission” as seen here:

image

Now we need to modify our Configuration.BusInitializer class to be able to connect to a specific machine name instead of hard-coding localhost as well as utilize our username and password. We’ll have it read these items from our App.config. Remember, our publisher can still use localhost (which doesn’t require username/password), but our subscriber needs to connect to the submaster machine with some credentials.

First, add a reference to System.Configuration to the Configuration project. Then modify the BusInitializer class to allow reading the machine name, username, and password from configuration:

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using System;
using System.Configuration;

namespace Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        var serverName = GetConfigValue("rabbitmq-server-name", "localhost");
        var userName = GetConfigValue("rabbitmq-username", "");
        var password = GetConfigValue("rabbitmq-password", "");
        var queueUri = "rabbitmq://" + serverName + "/MtPubSubExample_" + queueName + "?prefetch=64";

        if (userName != "")
        {
          x.UseRabbitMq(r =>
          {
            r.ConfigureHost(new Uri(queueUri), h =>
            {
              h.SetUsername(userName);
              h.SetPassword(password);
            });
          });
        }
        else
          x.UseRabbitMq();

        x.ReceiveFrom(queueUri);
        moreInitialization(x);
      });

      return bus;
    }

    private static string GetConfigValue(string key, string defaultValue)
    {
      string value = ConfigurationManager.AppSettings[key];
      return string.IsNullOrEmpty(value) ? defaultValue : value;
    }
  }
}

Since we’re only going to deviate from the default of localhost on our subscriber, open the TestSubscriber project and add the following lines into the App.config:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
  </startup>

  <appSettings>
    <add key="rabbitmq-server-name" value="submaster" />
    <add key="rabbitmq-username" value="testsubscriber" />
    <add key="rabbitmq-password" value="test" />
  </appSettings>
</configuration>

Running the Test

On my dev machine, I fired up one instance of TestSubscriber with the above configuration. Then, on the publisher machine, I started up both TestPublisher and TestSubscriber. Here’s it running after pushing 100,000 messages onto the bus:

Capture

Lots of blinking lights. The more interesting thing is to observe the messages processed per second in RabbitMQ:

image

Wrap Up

So now you can see how it would be possible to scale out your message processing. Perhaps in a future post, we’ll take a look at leveraging the cloud. It should be possible to monitor the number of messages in your queue and spin up new cloud workers to pick up the slack and then shut them down when the queue quiets back down. Until then…

]]>
https://looselycoupledlabs.com/2014/08/scaling-out-subscribers-with-masstransit/feed/ 7
Error Handling in MassTransit Consumers https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/ https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/#comments Sun, 27 Jul 2014 02:18:36 +0000 http://looselycoupledlabs.com/?p=115 Continue reading ]]> Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post if you aren’t familiar with it as we are going to build on that application here.)

But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let’s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.

When It All Goes Horribly Wrong

So what will happen to our TicketOpened messages if there’s an error in the TicketOpenedConsumer? In our example, we’re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there’s a number of things that could go wrong. Let’s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.

MassTransit Default Error Handling

To see what MassTransit does, let’s inject a way to get the consumer to throw an exception. Start by cloning the https://github.com/dprothero/Loosely.CustomerPortal repository (master branch) or by building the application in my previous blog post. The final code is in the same repository, but in the error-handling branch.

Here’s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                      "Id: " + envelope.Message.Id + "\r\n" +
                      "Email: " + envelope.Message.CustomerEmail + "\r\n" + 
                      "Message: " + envelope.Message.Message);

      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.\n\n" + 
                        "Your Message:\n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

We just check to see if the text of the message contains the word “poison” and, if it does, throw an exception. Now we can run the app, open a ticket, and type “poison” into the message field to get our consumer to throw the exception:

image

Take a look at the log file (C:\Logs\Loosely.CustomerPortal.Backend.log) and you’ll see these entries:

image

What’s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There’s no delay between retries (we’ll look at that later). Since our exception isn’t really transient, then it’s going to try 5 times without success. Next question… where’s the TicketOpened message now?

Go into the RabbitMQ management interface (see this post for instructions – should be at http://localhost:15672) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:

image

Click on the error queue and scroll down to the “Get messages” section. Set Requeue to ‘No’ and click “Get Message(s).” This will remove the message from the queue and display it to us. You can see our poison message in JSON format:

image

Sidebar: Changing Default Retry Limit

If you want to change MassTransit’s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.SetDefaultRetryLimit(1);
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

That will set the retry limit to 1.

Requeuing Error Messages

If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel plugin. First, make sure your consumer process isn’t running. Then, open up the “RabbitMQ Command Prompt (sbin dir)” item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:

> rabbitmq-plugins enable rabbitmq_shovel
> rabbitmq-plugins enable rabbitmq_shovel_management

After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into “Shovel Management.” Click “Add a new shovel” and name it something like “Temporary Error Move.” Set the Source Queue to “Loosely_CustomerPortal_Backend_error” and the Destination Queue to “Loosely_CustomerPortal_Backend.” Click “Add shovel.”

This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:

image

Now go back to the Admin tab, Shovel Management, and click on your “Temporary Error Move” shovel. From there, click on the “Delete this shovel” button. If you don’t delete the shovel, it will continue to move messages from the error queue back into the primary queue… essentially creating an infinite retry loop.

Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our “poison” message is really a permanent failure.

Transient Versus Permanent Failures

With a permanent failure, we’re talking about a message that just can’t be processed – at least not with the code written the way it currently is. Perhaps there’s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.

Perhaps the error is such an edge case that we won’t fix it and so we’re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.

Transient Failures

What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient failures – meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.

As we saw, MassTransit has a bit of a blunt method to try to account for transient failures… it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You’d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.

NServiceBus, a commercial analog to MassTransit, has this retry delay ability built into it (called “second-level retries”). However, MassTransit does not. We will have to roll our own, but it won’t be difficult.

Roll Your Own Retry Delay Logic

Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let’s just build the logic into our TicketOpenedConsumer class.

Here’s the new TicketOpenedConsumer class with progressive retry delay logic:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
      {
        {0, 60}, {60, 300}, {300, -1}
      };

    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      int retryDelay = 0;
      int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
      var nextRetryDelay = DelayProgression[retryDelay];
      bool sleepAndRepublish = false;
      
      try
      {
        // Here is where you would persist the ticket to a data store of some kind.
        // For this example, we'll just write it to the trace log.
        Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                        "Id: " + envelope.Message.Id + "\r\n" +
                        "Email: " + envelope.Message.CustomerEmail + "\r\n" +
                        "Message: " + envelope.Message.Message + "\r\n" +
                        "Current/Next Retry Delay: " + retryDelay.ToString() + "/" + 
                          nextRetryDelay.ToString() + "\r\n" +
                        "Current Time: " + DateTime.Now.ToString());

        CheckForContrivedErrorConditions(envelope);

        // Send email confirmation to the customer.
        var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                          "We will respond to your inquiry ASAP.\n\n" +
                          "Your Message:\n" + envelope.Message.Message;

        EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);

        // Here is where you would commit any open database transaction
        Trace.WriteLine("Message committed.");
      }
      catch (Exception ex)
      {
        Trace.WriteLine("Exception caught.");
        if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
          sleepAndRepublish = true;
        else throw;
      }

      if(sleepAndRepublish)
      {
        Thread.Sleep(nextRetryDelay * 1000);
        envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
          x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
          x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
        });
      }
    }

    private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)
    {
      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      if (envelope.Message.Message.Contains("server-blip"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-blip", 
          "server-online(blipped)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-down"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-down",
            "server-blip(downed)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-disaster"))
        throw (new Exception("The mail server is down."));

    }
  }
}

So let’s take a look at a few lines of code in isolation and discuss what’s happening. First, we setup a dictionary to indicate what we’d like the progression of delays to be.

static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
  {
    {0, 60}, {60, 300}, {300, -1}
  };

The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:

int retryDelay = 0;
int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
var nextRetryDelay = DelayProgression[retryDelay];

Then we check for a header on the message called “loosely.retry-delay-seconds.” Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you’d like. When we want to retry a message later, we’ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.

Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:

CheckForContrivedErrorConditions(envelope);

Within that function, we define three strings (in addition to the original “poison” string) for which we will scan.

“server-disaster” Simulate mail server down for a very long time.
“server-down” Simulate mail server down for less than 5 minutes.
“server-blip” Simulate the mail server down for less than 30 seconds.

Finally, we wrap all of the actual message processing in a try…catch block. If an exception occurs, we check the message to see if it’s a message we know to be a transient condition and if the next retry delay value is not negative one (-1). Negative one will be our trigger to tell us we need to give up on retrying.

Trace.WriteLine("Exception caught.");
if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
  sleepAndRepublish = true;
else throw;

If the condition is met, we set a flag to indicate we want to sleep (delay) for a bit and then republish the message so it will be retried later. If the condition is not met, we re-throw the exception and MassTransit will handle it per normal message processing rules (default being to retry 4 more times and then move to the error queue).

If we do want to sleep and republish, that code is simple:

if(sleepAndRepublish)
{
  Thread.Sleep(nextRetryDelay * 1000);
  envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
    x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
  });
}

We put the thread to sleep for the prescribed number of seconds (more on that later) and then, after the time has elapsed, we republish the message to the bus with a “loosely.retry-delay-seconds” header value of the amount of time we delayed before republishing the message. That will put the message back on the bus and our consumer will get called again with it. This time, the message will have the special header on it and we’ll know to move onto the next delay value (or stop retrying if that value is –1).

Did You Seriously Just Block the Consumer Thread?

Good catch. Yes, this can have performance implications. MassTransit has a setting called ConcurrentConsumerLimit, which is set to the number of CPU’s times 4 (so 16 on a 4 processor machine). We’re essentially “killing” one of these 16 (or however many) threads while we sleep, thus limiting the number of messages we can process while we’re waiting.

But is this really a problem? In this example, our service is only responsible for processing TicketOpened messages. Every TicketOpened message needs to trigger an email to be sent. If the email server is down, then none of the TicketOpened messages are going to be able to successfully be processed. In this case, it probably makes sense for the entire service to slow down and wait until the mail server is back online.

If the service were responsible for processing many different types of messages, then this would certainly be an issue. However, it begs the question whether it makes sense for a single service to handle different types of messages. In some cases it might, particularly if they all need to be handled in much the same way. But in a lot of cases, it will make more sense to create separate services for your different message types.

What If the Service Crashes While Sleeping?

So we have our consumer sleeping on the consumer thread and the message is “in limbo” while we’re waiting. What happens to the message if the service crashes during the Thread.Sleep? If you send a “server-down” message to get the message to go through the retry logic, take a look at the queue in RabbitMQ:

image

It shows one message in the “Unacked” (unacknowledged) column. This means two things: 1) it won’t deliver the message to any other consumer threads or processes, and 2) it won’t remove the message unless it is acknowledged. If the process hosting our consumer service dies before acknowledging the message, RabbitMQ will move the message back to the “Ready” state.

Caveats and Disclaimers

These bloggers, sheesh. Always cutting corners in the code for the “sake of brevity.” It’s difficult to balance a good, crisp article with well crafted code. First, you don’t see any unit tests. Bad programmer. Next, with a good suite of tests watching your back, the code in this example could be refactored into shorter methods and perhaps a helper class for the retry delay progression. Finally, the call to Thread.Sleep should probably be refactored into a loop to wake up every couple of seconds to see if the service needs to stop.

Other Options

Of course there are other ways to build delayed retry logic into your services. The method used in this post is just the simplest to illustrate, but you can take this further. For example, take a look at the MassTransit-Quartz project. This uses the open source Quartz.NET scheduler to enable delayed publishing of messages. It does, however, require an ADO.NET database to persist the scheduled jobs so you don’t lose your delayed messages. If you need scheduling and visibility into messages that were delayed, then this is your ticket.

Another pattern that could be implemented is that of moving the delayed messages to another RabbitMQ queue. Then you could write something that periodically polled that queue and moved the messages back into the primary queue after the desired delay.

Next Stop…

Let’s take a look at how we can implement multiple consumers in a couple different use cases. In one case, we might want to spin up multiple. identical consumers on different machines to scale out message processing. In another case, we may want to have completely different consumers subscribing to the same message type but intending to do different things with the messages. After that, we’ll probably take a look at Sagas (chaining multiple messaging processing steps together) and then… who knows? Send me your thoughts and questions regarding anything you’d like to see here.

]]>
https://looselycoupledlabs.com/2014/07/error-handling-in-masstransit-consumers/feed/ 7
A Real-World MassTransit Customer Portal Example https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/ https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/#comments Sat, 19 Jul 2014 04:11:17 +0000 http://looselycoupledlabs.com/?p=98 Continue reading ]]> Now that we’ve seen some simple examples of how to use MassTransit with the Publish/Subscribe pattern on multiple machines, let’s build something that resembles a more real-world app. In this article, we’ll build an ASP.NET MVC Customer Portal app where a customer can create a new support ticket. The ticket will be published onto the service bus. We’ll create a Windows Service to be the subscriber of these messages and it will handle the tickets, in this example, sending a confirmation email to the customer.

You can get all the code from this blog post at https://github.com/dprothero/Loosely.CustomerPortal.

This is a big one, so roll up your sleeves…

The Web App

Let’s build a rudimentary front-end application that will be a stand-in for a true customer portal style web site. We’ll build an app that has a single menu option on it’s home page:

image

When the user clicks “Open a new support ticket,” they will get a very simple form asking for their email address and a description of their problem or question:

image

When the user clicks “Open Ticket,” they will see a confirmation message containing their ticket number:

image

So let’s dig into the code to build this web app.

New ASP.NET MVC Project

Open Visual Studio and choose File… New Project. Select the “ASP.NET Web Application” project template. Give it the name “Loosely.CustomerPortal.WebApp” and name the solution “Loosely.CustomerPortal.”

In the “New ASP.NET Project” select the “Empty” template and check the “MVC” box under the “Add folders and core references for” heading.

Contracts

Now we need a place to keep our “contracts” for our service bus. A contract is an interface that specifies the format of our message type. Add a new class library to the solution and name it “Loosely.Bus.Contracts.” Add a new file to the class library called TicketOpened and define the following interface:

namespace Loosely.Bus.Contracts
{
  public interface TicketOpened
  {
    string Id { get; }
    string CustomerEmail { get; set; }
    string Message { get; set; }
  }
}

This is the message type we will publish onto the service bus whenever a user of the web application wants to open a new support ticket.

Configuration

Now let’s add another class library named “Loosely.Bus.Configuration” where we’ll keep our common MassTransit configuration code. Add the MassTransit.Log4Net and MassTransit.RabbitMQ NuGet packages to this new class library.

We’ll put the common service bus initialization code into a class called BusInitializer:

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using System;

namespace Loosely.Bus.Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        x.UseRabbitMq();
        x.ReceiveFrom("rabbitmq://localhost/Loosely_" + queueName);
        moreInitialization(x);
      });

      return bus;
    }
  }
}

You may recall, in a previous post, when we did the same thing. In fact, this code is nearly identical. The only thing we’ve changed is the prefix for the queue name. In that earlier post I describe what we’re doing here in detail. In summary, we’re setting up a new instance of a MassTransit service bus that will use RabbitMQ for it’s transport mechanism.

Put ASP.NET on the Bus

Returning to the Loosely.CustomerPortal.WebApp in our solution, right-click on References and add project references to the Loosely.Bus.Configuration and Loosely.Bus.Contracts projects. Also, add the MassTransit NuGet package to the project.

The best place I’ve found to create and configure MassTransit in an ASP.NET app is in the Global.asax’s Application_Start event handler. Open Global.asax.cs and make sure the code looks like this:

using Loosely.Bus.Configuration;
using MassTransit;
using System.Web.Mvc;
using System.Web.Routing;

namespace Loosely.CustomerPortal.WebApp
{
  public class MvcApplication : System.Web.HttpApplication
  {
    public static IServiceBus Bus {get; set;}

    protected void Application_Start()
    {
      AreaRegistration.RegisterAllAreas();
      RouteConfig.RegisterRoutes(RouteTable.Routes);

      Bus = BusInitializer.CreateBus("CustomerPortal_WebApp", x => { });
    }

    protected void Application_End()
    {
      Bus.Dispose();
    }
  }
}

We’re adding a public, static property to the MvcApplication class that we can use elsewhere in our application to get access to our service bus. In the Application_Start event handler, after the routing code added by Visual Studio, we can use our BusInitializer class to create a new bus and assign it to the static property. Later, when we want to use the bus, we’ll simply use the expression MvcApplication.Bus.

Don’t forget to call Dispose on the bus in the Application_End event.

Ticket Model

Still within the WebApp project, add a new model to the Models folder and call it “Ticket.” This will be the model we will bind our support ticket data entry form to. We’ll also have it implement the TicketOpened interface so we can publish it to our service bus.

using Loosely.Bus.Contracts;
using System;

namespace Loosely.CustomerPortal.WebApp.Models
{
  public class Ticket : TicketOpened
  {
    private string _id;
    
    public string Id { get { return _id; } }
    public string CustomerEmail { get; set; }
    public string Message { get; set; }

    public Ticket()
    {
      _id = Guid.NewGuid().ToString();
    }

    public void Save()
    {
      MvcApplication.Bus.Publish<TicketOpened>(this, x => { 
        x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
      });
    }
  }
}

Because we want to be able to tell the user what their ticket ID is right away, we need some method to generate a statistically unique, random ID. GUIDs work well for this in terms of being easy to implement for a developer, as it’s a single line of code. They aren’t a great user experience, of course, due their length.

Timeout to Pontificate

The point to remember is that we need a way to generate an identifier that we know should be unique in whatever data storage repository we will be storing the tickets in without having to consult said data storage repository. Remember, the reason we’re using a service bus is to have this web application loosely coupled to whatever backend is used for our ticketing system.

When the rubber meets the road, however, you may be integrating with a ticketing system that wants to assign it’s own IDs. In that case, you will have to decide whether the requirement to display the ticket ID immediately to the user is worth a round-trip to the ticketing system to get it. There’s no one right answer. Building systems is a constant series of trade-offs.

Back to the Model…

We also have a Save method that we will call from our controller (coming soon). Instead of what you typically see in a Save method (saving to a database), we’re publishing the Ticket onto the bus. Since the ticket implements the TicketOpened interface from our Contracts assembly, other processes can subscribe to these TicketOpened messages and do something interesting with them.

The Controllers and Views

Now let’s build some UI. Add an empty controller called “HomeController” to the “Controllers” folder. Nothing much is needed in this controller – just return a view that will have our “menu” of options (a menu of one option, that is):

using System.Web.Mvc;

namespace Loosely.CustomerPortal.WebApp.Controllers
{
  public class HomeController : Controller
  {
    // GET: Home
    public ActionResult Index()
    {
      return View();
    }
  }
}

Create a view named Index under the Views/Home folder, leaving the “Use a layout page” option checked. The layout page will give us a basic page template so we don’t have to worry about formatting too much. The view can then simply present our single menu option:

@{
    ViewBag.Title = "Index";
}

<h2>Customer Portal</h2>

<a href="@Url.Content("~/Ticket/Open")">Open a new support ticket</a>

As you can see, the link to open a new support ticket is taking us to /Ticket/Open, which means we need a TicketController with an Open action. Add this controller to the Controllers folder (choose empty again). Below is the code for this controller:

using System.Web.Mvc;

namespace Loosely.CustomerPortal.WebApp.Controllers
{
  public class TicketController : Controller
  {
    [HttpGet]
    public ActionResult Open()
    {
      var ticket = new Models.Ticket();
      return View(ticket);
    }

    [HttpPost]
    public ActionResult Open(Models.Ticket ticket)
    {
      ticket.Save();
      return Redirect("~/Ticket/Opened/" + ticket.Id);
    }

    public ActionResult Opened(string id)
    {
      ViewBag.TicketId = id;
      return View();
    }
  }
}

A few interesting things are going on now. First, we’ve got the Open action that’s tagged with the HttpGet attribute. This action simply creates a new ticket model and binds it to the (soon to be created) view. This view will be the data entry form allowing the user to supply their email and message text.

The next method is also called Open but is tagged with the HttpPost attribute. This is because when the user submits the form we will still be posting to the /Ticket/Open url (sorry if this is review for you MVC vets). The post action takes in a ticket model that should be populated with the data from the user’s form submission.

We take the ticket the user submits and call the Save method on it (which, as you’ll recall, is what will post the message to the service bus). Following the save, we redirect to the Opened action, passing the ticket ID.

Finally, we have the Opened action which passes the ticket ID into the view via the ViewBag so it can be displayed to the user.

Ticket Views

We need a couple views for the Ticket controller. Under the Views/Ticket folder, create an empty view named Open and be sure to select “Ticket (Loosely.CustomerPortal.WebApp.Models)” for the model class. This view will contain our data input form that will be bound to the Ticket model:

@model Loosely.CustomerPortal.WebApp.Models.Ticket

@{
    ViewBag.Title = "Open";
}

<h2>Open a Ticket</h2>

@using ( Html.BeginForm() )
{
  <fieldset>
    <legend>Ticket Info</legend>
    <div>@Html.LabelFor(model => model.CustomerEmail)</div>
    <div>@Html.TextBoxFor(model => model.CustomerEmail)</div>

    <div>@Html.LabelFor(model => model.Message)</div>
    <div>@Html.TextBoxFor(model => model.Message)</div>

    <input type="submit" value="Open Ticket" />

  </fieldset>
}

Add another view named Opened under the same Views/Ticket folder. This view will simply display the ticket ID:

@{
    ViewBag.Title = "Opened";
}

<h2>Ticket Opened</h2>

<p>
  Your ticket has been opened.
  Your ticket id is: <strong>@ViewBag.TicketId</strong>
</p>

<p>
  <a href="@Url.Content("~/")">Return Home</a>
</p>

Checkpoint – The Web App Works, Now What?

If you run the web app now, you’ll be able to create a new ticket, and you’ll even get a new ticket ID assigned each time! If you go into the RabbitMQ management interface (see this post for instructions), you will see that there’s an exchange named Loosely.Bus.Contracts:TicketOpened that isn’t connected to any other exchanges or queues. If you’ve been following my blog, you’ll know this is because we don’t have anyone listening for these types of messages yet.

Creating the Backend Service

It’s time to do something with these tickets that are being created by the web app. Let’s create a Windows service that can run in the background on any machine and subscribe to TicketOpened messages that are published to the service bus. We’ll use the open source project TopShelf for creating our Windows service. TopShelf is published by the same trio of geniuses that gave us MassTransit and it makes creating Windows services extremely simple.

Start by adding a new Console application to our solution and name it Loosely.CustomerPortal.Backend. Add project references to Loosely.Bus.Configuration and Loosely.Bus.Contracts, as well as a framework reference to System.Configuration. Finally, add NuGet packages MassTransit and TopShelf to the project.

Backend Configuration

First, let’s setup a little configuration so that TopShelf will log any messages to a log file we can create. We’ll also setup the configuration to log general Trace messages. To send email messages, we’ll use Gmail’s SMTP server, so we also need some place to store our Gmail credentials.

All this configuration can go in the App.config file in the new console application:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <startup>
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
  </startup>
  <system.diagnostics>
    <sharedListeners>
      <add name="traceLogListener" type="System.Diagnostics.TextWriterTraceListener" 
           initializeData="C:\Logs\Loosely.CustomerPortal.Backend.log" />
    </sharedListeners>
    <sources>
      <source name="Default">
        <listeners>
          <add name="traceLogListener" />
          <remove name="Default" />
        </listeners>
      </source>
    </sources>
    <trace autoflush="true" indentsize="4">
      <listeners>
        <add name="traceLogListener" />
        <remove name="Default" />
      </listeners>
    </trace>
  </system.diagnostics>

  <appSettings file="C:\Config\Loosely.CustomerPortal.Backend.config">
    <add key="Gmail.Account" value="youraccount@gmail.com"/>
    <add key="Gmail.Password" value="yourpassword"/>
  </appSettings>
  
</configuration>

Notice we’re logging messages to C:\Logs\Loosely.CustomerPortal.Backend.log, so be sure to create a C:\Logs folder (or change this path to somewhere else you might prefer).

Email Helper

When a new ticket is opened, we want to send a confirmation to the customer’s email address that they supplied. Create a new EmailHelper class to handle the down and dirty SMTP communication:

using System.Configuration;

namespace Loosely.CustomerPortal.Backend
{
  class EmailHelper
  {
    readonly static string gmailAccount = ConfigurationManager.AppSettings.Get("Gmail.Account");
    readonly static string gmailPassword = ConfigurationManager.AppSettings.Get("Gmail.Password");

    public static void Send(string customerEmail, string subject, string messageBody)
    {
      var client = new System.Net.Mail.SmtpClient("smtp.gmail.com", 587);
      client.EnableSsl = true;
      client.Credentials = new System.Net.NetworkCredential(gmailAccount, gmailPassword);
      client.Send(gmailAccount, customerEmail, subject, messageBody);
    }
  }
}

Nothing magical here. We just pull the Gmail credentials out of our config file and then open a secure connection to Gmail’s smtp server to send a message. Obviously, in a true production app, this code would be written to connect to the appropriate SMTP server and likely not use Gmail like this. Gmail works well for a simple example, however.

A Consumer for TicketOpened

Now we need a consumer class to which MassTransit can send TicketOpened messages. Add a new class called TicketOpenedConsumer:

using Loosely.Bus.Contracts;
using MassTransit;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                      "Id: " + envelope.Message.Id + "\r\n" +
                      "Email: " + envelope.Message.CustomerEmail + "\r\n" + 
                      "Message: " + envelope.Message.Message);

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.\n\n" + 
                        "Your Message:\n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

This is the real meat of the backend service. The Consume method will be called for every TicketOpened message that MassTransit picks up off the bus for us. In this example, we’re simply logging the information from the ticket and then sending the confirmation email to the customer.

The Service Class

Next let’s create a class that we’ll use to host our service. We’ll furnish this class to TopShelf, who will call the Start method when the service is started and the Stop method when the service is stopped. Create a new class called TicketService:

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

The Start method is the perfect place to put our bus initialization code. Notice how we are adding a subscription in MassTransit and providing it our TicketOpenedConsumer class. We’re also making it a permanent subscription.

The Startup Glue

Now we have all the classes we need. Open the Program.cs file and put the following TopShelf configuration code into the Main() function:

using System.Diagnostics;
using Topshelf;

namespace Loosely.CustomerPortal.Backend
{
  class Program
  {
    static void Main(string[] args)
    {
      HostFactory.Run(x =>
      {
        x.Service<TicketService>(s =>
        {
          s.ConstructUsing(name => new TicketService());
          s.WhenStarted(ts => ts.Start());
          s.WhenStopped(ts => ts.Stop());
        });
        x.RunAsLocalSystem();

        x.SetDescription("Loosely Coupled Labs Customer Portal Backend");
        x.SetDisplayName("Loosely.CustomerPortal.Backend");
        x.SetServiceName("Loosely.CustomerPortal.Backend");
      });
    }
  }
}

I’ll refer you to the TopShelf documentation for details on how TopShelf works. For this example, you just need to know that this code wires up our TicketService class to the TopShelf framework. The great thing about TopShelf is you can just run the executable and it will run your program as a standard console app. If you run it with the “install” command line parameter, it will install it as a Windows service.

Let’s Do This

Right-click on the solution and choose “Set Startup Projects…” and make both Loosely.CustomerPortal.Backend and Loosely.CustomerPortal.WebApp startup apps. Run the solution and you’ll get a web browser with the web app and a console window running your new service.

Try submitting a few tickets. You should see the emails (assuming you used your own email address to create the ticket) as well as log entries in the C:\Logs\Loosely.CustomerPortal.Backend.log file:

image

image

Install the Service

Now, let’s actually make the backend an actual Windows service. Open a command prompt as administrator (as administrator is important). Change to the directory where the Loosely.CustomerPortal.Backend.exe was built from Visual Studio. This will likely be the Loosely.CustomerPortal\Loosely.CustomerPortal.Backend\bin Debug folder. Run the following command:

> Loosely.CustomerPortal.Backend.exe install

Now you should be able to launch the Windows services MMC snap-in (services.msc) and see the new Loosely.CustomerPortal.Backend Windows service and fire it up!

image

With the service running, you will want to change your Visual Studio solution back to having only the web app as the startup project.

What’s Next?

Now that we’ve built this app, I’d like to refer back to it in future blog posts so we can refine it to be more robust and “enterprise-ready”. Let’s use it to look at things like message retry logic, sagas, and multiple subscribers (for scale or for different functions). As always, let me know if there’s anything specific you’d like to see me write about.

]]>
https://looselycoupledlabs.com/2014/07/a-real-world-masstransit-customer-portal-example/feed/ 2
Creating a RabbitMQ Cluster for Use with MassTransit https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/ https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/#comments Wed, 09 Jul 2014 01:09:17 +0000 http://looselycoupledlabs.com/?p=46 Continue reading ]]> In my last post, A Simple MassTransit Publish/Subscribe Example, we looked at how to build basic publishers and subscribers using MassTransit and RabbitMQ. In the example, however, we were only using a single RabbitMQ instance on a single machine. In the real world, your publishers are most likely to live on separate machines from your subscribers. To do that, we need to set up a RabbitMQ cluster.

RabbitMQ Clustering

A RabbitMQ cluster (or broker) can consist of multiple nodes, each running an instance of the RabbitMQ application. All of the nodes will share configuration information, such as users, exchanges, and queues. Recall from our last discussion that an exchange is like a post office and a queue is like an actual mailbox containing the resting place of messages. The actual queue, or mailbox, only lives on the node where the queue was created, but the knowledge of the queue’s existence is shared by the cluster, as is the full routing information contained in the exchanges. (Note that queues can be replicated for high-availability requirements, which we may cover in a future post.)

A Common Clustering Pattern

A pattern I like to use for clustering RabbitMQ nodes is to have a node live on each machine that participates in the overall application. For example, I will install instances of RabbitMQ on each web server that makes up a web application and will be publishing messages related to the application. Then, I will install an instance of RabbitMQ on each machine that will be running a service that will subscribe to the messages. The diagram below illustrates this pattern.

RabbitMQ Cluster Diagram

Setting Up the Cluster

For the purposes of this example, we are going to assume you have two Windows machines (virtual or otherwise) named Publisher and Subscriber. Install RabbitMQ on each of these machines, referring to the installation instructions in my previous blog post.

Erlang Cookie

The first thing that needs to be done is to set up the Erlang cookie on each machine so that the cookies are the same. (Remember, RabbitMQ is an Erlang application.) ASP.NET developers may be familiar with the concept of creating a common Machine Key for a cluster of web servers. The Erlang cookie is a corollary to this process. It is a key that indicates that the multiple Erlang nodes can communicate with each other. All the nodes in a RabbitMQ cluster must have the same Erlang cookie.

When installing Erlang on Windows and installing RabbitMQ as a Windows Service, the Erlang cookie is placed in two locations – C:\Users\Current User and C:\Windows. The RabbitMQ clustering guide recommends making sure all of the cookies on the machine are the same. Copy the cookie from C:\Windows on the Publisher machine to C:\Users\Current User on the same machine and then to both C:\Users\Current User and C:\Windows on the Subscriber machine. All four cookies should have identical content (you can inspect with Notepad to be sure).

Restart the RabbitMQ service on both machines.

Final Cluster Setup

If you’ve got the Windows Firewall turned on, be sure to allow incoming TCP traffic on ports 4369 and 25672 on both the Publisher and Subscriber machines.

Now, from the Subscriber machine, open up the “RabbitMQ Command Prompt (sbin dir)” item (added by the RabbitMQ installer). Run the following commands:

> rabbitmqctl stop_app
> rabbitmqctl join_cluster rabbit@Publisher

What we are doing here is telling the instance of RabbitMQ running on the Subscriber machine that we want to join a cluster that is hosted by the Publisher machine (which is currently a cluster of one). Restart the RabbitMQ Windows service after running this command.

Inspecting Your Cluster

Now, on either the Publisher or the Subscriber, open the web-based management console at http://localhost:15672/. (Recall from our previous post that the web-based management console is a plugin we need to install.) On the first page, you will see the details of our newly created, multi-node cluster!

rabbitmq-cluster

MassTransit in Action

Now let’s put our cluster to the test. Build the sample applications that we created in the previous post. Copy the contents of TestPublisher\bin\Debug to a folder on the Publisher. Likewise, copy the contents of TestSubscriber\bin\Debug to a folder on the Subscriber.

Launch TestSubscriber.exe on the Subscriber machine first. Then, launch TestPublisher on the Publisher machine. Finally, send a message from the Publisher, and watch it appear on the Subscriber.

rabbitmq-cluster-example

Notice how we didn’t have to modify any code in the sample application? This is because we set up the code to connect to RabbitMQ on localhost. Because each local instance of RabbitMQ on the two machines was part of a single cluster, they shared information about the exchanges and queues necessary to make the MassTransit service bus work its magic, as if there were only a single RabbitMQ instance!

What’s Next?

Next up, we’ll take a look at scaling this up to a real-world example. We’ll create an ASP.NET web application that can publish messages and a custom Windows service that can be our subscriber of the messages. Down the road, we’ll look at having multiple subscribers for the same message type in two different patterns: one for scaling out message processing and another for having different processing occur in separate Windows services.

As always, please send your suggestions for things you’d like to learn how to do with MassTransit.

]]>
https://looselycoupledlabs.com/2014/07/creating-a-rabbitmq-cluster-for-use-with-masstransit/feed/ 20
A Simple MassTransit Publish/Subscribe Example https://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/ https://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/#comments Sat, 28 Jun 2014 00:23:10 +0000 http://looselycoupledlabs.com/?p=27 Continue reading ]]> When I first sat down to learn how to use MassTransit, I found it difficult to just get a simple example that published a message onto the bus with another process that subscribed to messages of the same type working. Hopefully, this primer will get you on the bus quicker.

NOTE: There is a newer version of this post for those using MassTransit 3. The post below is for MassTransit 2.x.

Setting Up Your Environment

The first thing you need is a message queuing framework. MassTransit supports MSMQ, RabbitMQ, and others, but I find that RabbitMQ is really the way to go. That’s especially true when using the publish/subscribe pattern. The reason for this is that RabbitMQ has a complete routing framework built-in and MassTransit will leverage this when persisting your subscriptions. When creating a cluster of RabbitMQ servers for availability, this routing information is replicated to all the nodes.

In this article, you’re going to run RabbitMQ on your local Windows development box. Both our publisher and subscriber will connect to the same RabbitMQ instance. In a future post, I’ll detail how to set up multiple RabbitMQ instances in a cluster.

Installing RabbitMQ

RabbitMQ requires the Erlang runtime, so that’s the first thing you need to download and install. Head over to Erlang.org’s download page and get the latest binary release for Windows (it’s likely you’ll want the 64-bit version). It’s a simple setup wizard, so you’ll have Erlang installed on your machine in short order.

Next, download the latest version of RabbitMQ for Windows. Again, it’s an easy setup wizard that you can quickly fly through. Just accept the defaults.

Enabling the RabbitMQ Web Management Interface

One RabbitMQ feature that I found extremely useful (but which isn’t enabled by default) is the web-based management interface. With this, you can see the exchanges and queues that are set up by MassTransit in RabbitMQ. To enable this, find the “RabbitMQ Command Prompt (sbin dir)” item that the RabbitMQ installer added to your Start menu and launch it. From the command line, run the following command:

> rabbitmq-plugins enable rabbitmq_management

It will confirm that the plugin and its dependencies have been enabled and instruct you to restart RabbitMQ. When installed on Windows, RabbitMQ runs as a Windows service. You can use the Services MMC snap-in to restart it or just run the following command:

> net service stop RabbitMQ
...
> net service start RabbitMQ

Now go to http://localhost:15672/ to open the management console. Default credentials to login are guest/guest (you can change the credentials from the Admin tab).

There’s not much to see yet, but we’ll set the stage. Go to the Exchanges tab. You’ll see the following default RabbitMQ exchanges:

image

An exchange is something you can send messages to. It cannot hold messages. It’s merely a set of routing instructions that tell RabbitMQ where to deliver the message. We’ll come back here in a little while.

Now click on the Queues tab. Nothing here yet. Queues can actually hold messages and are where applications can actually pick up messages.

So, to use a real world analogy, an Exchange is like the local Post Office, and a Queue is like your mailbox. The only thing that an Exchange can do that most traditional Post Offices don’t do is actually make multiple copies of a message to be delivered to multiple mailboxes.

Creating the Sample Applications

I used Visual Studio 2013 to create this sample, but it should work in 2012 as well. You can get the entire source from: https://github.com/dprothero/MtPubSubExample

Creating a Contract

I like to use the concept of a “contract” for my messages I want to put onto the service bus. This is an interface definition that both the publisher and subscriber have to agree upon. They don’t need to know anything about the implementation of this interface on either side. To keep the publisher and subscriber as loosely coupled as possible, I like to put my contracts in their own assembly so that this is the only shared dependency.

So, the first step is to create a new solution called MtPubSubExample and a new class library called “Contracts”. To the class library, add a single interface called “SomethingHappened.”

using System;

namespace Contracts
{
  public interface SomethingHappened
  {
    string What { get; }
    DateTime When { get; }
  }
}

SomethingHappened will be the message interface we use for our sample message. Our publisher will create an instance of a class implementing SomethingHappened, set What and When properties, and publish it onto the service bus.

Our subscriber will then set up a subscription (aka Consumer) to listen for all messages of type SomethingHappened. MassTransit will call our Consumer class whenever a SomethingHappened message is received, and we can handle it as we wish, presumably inspecting the What and the When properties.

Shared Configuration Setup Code

When you’re writing a new project from scratch, you go through many permutations and refactor as you go. Initially, this example had the service bus setup code duplicated in both the publisher and subscriber projects. This is fine, particularly if you really aren’t in a position to share much code between the two sides (except the contracts of course). However, in my case, I preferred to use a common class which I’ll call “BusInitializer” to set up my instance to MassTransit and get it configured.

So, add another class library to the MtPubSubExample solution and name it “Configuration”. Before creating our class, it’s time to head to NuGet and pull in MassTransit. The quickest way to get everything you need is to find the MassTransit.RabbitMq package and install that. Doing so will install all of MassTransit and its dependencies.

You still need one more package. I found that MassTransit doesn’t work unless you install one of the logging integration packages that are designed for it. For me, I selected the Log4Net integration package (MassTransit.Log4Net).

Now, create a new class called “BusInitializer.”

using MassTransit;
using MassTransit.BusConfigurators;
using MassTransit.Log4NetIntegration.Logging;
using System;

namespace Configuration
{
  public class BusInitializer
  {
    public static IServiceBus CreateBus(string queueName, Action<ServiceBusConfigurator> moreInitialization)
    {
      Log4NetLogger.Use();
      var bus = ServiceBusFactory.New(x =>
      {
        x.UseRabbitMq();
        x.ReceiveFrom("rabbitmq://localhost/MtPubSubExample_" + queueName);
        moreInitialization(x);
      });

      return bus;
    }
  }
}

We’re creating a static method called “CreateBus,” which both our publisher and subscriber can use to set up an instance of a bus, using the Log4NetLogger, and connect to a local RabbitMQ instance. Because there may be additional custom setup that the publisher or subscriber may want to do, we allow passing in a lambda expression to perform the additional setup.

Creating the Publisher

We’ll make the publisher a very simple console application that just prompts the user for some text and then publishes that text as part of a SomethingHappened message. Add a new Console Application project called “TestPublisher” to the solution and add a new class called “SomethingHappenedMessage.” This will be our concrete implementation of the SomethingHappened interface. You’ll need to add a project reference to the Contracts (and add one to Configuration too, while you’re at it).

using Contracts;
using System;

namespace TestPublisher
{
  class SomethingHappenedMessage : SomethingHappened
  {
    public string What { get; set; }
    public DateTime When { get; set; }
  }
}

Now, in the Main method of the Program.cs file in your Console Application, you can put in the code to set up the bus, prompt the user for text, and publish that text onto the bus. Real quick first, however, add a NuGet reference to the MassTransit package.

using Configuration;
using Contracts;
using System;

namespace TestPublisher
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestPublisher", x => { });
      string text = "";

      while (text != "quit")
      {
        Console.Write("Enter a message: ");
        text = Console.ReadLine();

        var message = new SomethingHappenedMessage() { What = text, When = DateTime.Now };
        bus.Publish<SomethingHappened>(message, x => { x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent); });
      }

      bus.Dispose();
    }
  }
}

Pretty simple, huh? We put the input capture and message publishing in a loop to make it easy to send multiple messages. Just put a catch for the string “quit” so we can exit the publisher when we’d like.

If you make TestPublisher the startup project of the solution and run it, right now you can publish messages all you like…. However, nobody is listening yet!

What’s Going on in RabbitMQ So Far?

If you go back into the RabbitMQ web interface and jump over to the Exchanges tab, you’ll see we have a couple new arrivals.

image

Contracts:SomethingHappened is a new exchange created for the SomethingHappened message type. When we published this message, MassTransit automatically created this exchange. Click on it and scroll down to the Bindings section, and you’ll see there are no bindings yet:

image

That’s because nobody has subscribed to SomethingHappened messages yet. They go to the exchange and then die because there’s no queue to route them to.

The MtPubSubExample_TestPublisher exchange (and corresponding queue on the Queues tab) were setup in our BusInitializer code. Our publisher isn’t listening for messages sent to it, so this isn’t really being used.

Creating the Subscriber

The final piece of the puzzle! Add another Console Application project to your solution and call it TestSubscriber. Again, add project references to Contracts and Configuration and then add the MassTransit NuGet package.

The first thing we need is a Consumer class to consume the SomethingHappened messages. Add a new class to the console app and call it “SomethingHappenedConsumer.”

using Contracts;
using MassTransit;
using System;

namespace TestSubscriber
{
  class SomethingHappenedConsumer : Consumes<SomethingHappened>.Context
  {
    public void Consume(IConsumeContext<SomethingHappened> message)
    {
      Console.Write("TXT: " + message.Message.What);
      Console.Write("  SENT: " + message.Message.When.ToString());
      Console.Write("  PROCESSED: " + DateTime.Now.ToString());
      Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId.ToString() + ")");
    }
  }
}

This consumer class implements a specific MassTransit interface whose Consume method will be called with the message context and SomethingHappened message each time a message is received. Here we are simply writing the message out to the console.

Finally, in the Main method of Program.cs, we can initialize the bus and, as part of the initialization, instruct MassTransit that we wish to subscribe to messages of type SomethingHappened.

using Configuration;
using MassTransit;
using System;

namespace TestSubscriber
{
  class Program
  {
    static void Main(string[] args)
    {
      var bus = BusInitializer.CreateBus("TestSubscriber", x =>
      {
        x.Subscribe(subs =>
        {
          subs.Consumer<SomethingHappenedConsumer>().Permanent();
        });
      });

      Console.ReadKey();

      bus.Dispose();
    }
  }
}

Now right-click on the MtPubSubExample solution in the solution explorer and choose “Set Startup Projects….” From here, choose the Multiple startup projects option and set the Action for both TestPublisher and TestSubscriber to Start. Now when you run your solution, both the publisher and subscriber will run.

Type some messages into the publisher. You should see them show up immediately in the subscriber window!

image

Now close just the Subscriber sample window and publish a few more messages in the Publisher window.

image

Go ahead and close the Publisher window for now. Let’s take a deeper look at where those three messages went.

What’s Going on in RabbitMQ Now?

Go back into the RabbitMQ web interface and go back to the Exchanges tab. You’ll see a new exchange called MtPubSubExample_TestSubscriber, but first click on the Contracts:SomethingHappened exchange and scroll down to the Bindings section. You’ll see we now have a binding.

image

So, by creating a subscription from our TestSubscriber, MassTransit automatically set up this binding for us. Click on the MtPubSubExample_TestSubscriber here, and you’ll see you’re taken to the setup page for an exchange called MtPubSubExample_TestSubscriber. Scroll down to Bindings, and you’ll see we’re bound to a queue named the same as the exchange (in the binding diagrams, exchanges show up as rectangles with rounded corners, whereas queues have straight corners).

image

The web interface is great in how it shows the predecessor in addition to the successor in the path. Click the MtPubSubExample_TestSubscriber queue here, and you’ll be taken to the queue setup page for that queue. If you haven’t fired up the TestSubscriber app since we published those last three messages, you should see that there are three messages in the queue:

image

Fire up the TestSubscriber app, and you should see it process the three messages left in the queue.

image

Notice the timestamp from the message versus the timestamp of when the subscriber actually published the message. In this case, there was a 6-minute lag (the 6 minutes we were poking around in RabbitMQ before starting up the subscriber again).

Wrap Up

Hopefully, this post was helpful in getting you off the ground with MassTransit. There’s more to come. In upcoming posts, I’ll dig into how to put this together in the real world. First, your publisher and subscriber are likely to live on separate machines, so we’ll look at how to set up a RabbitMQ cluster to make that work. We’ll set up an ASP.NET application that publishes event messages and then a Windows Service that will subscribe to the messages and log them to a data store. Let me know if you have other examples you would like to see.

Until then…

]]>
https://looselycoupledlabs.com/2014/06/masstransit-publish-subscribe-example/feed/ 5