Error Handling in MassTransit Consumers

Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post if you aren’t familiar with it as we are going to build on that application here.)

But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let’s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.

When It All Goes Horribly Wrong

So what will happen to our TicketOpened messages if there’s an error in the TicketOpenedConsumer? In our example, we’re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there’s a number of things that could go wrong. Let’s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.

MassTransit Default Error Handling

To see what MassTransit does, let’s inject a way to get the consumer to throw an exception. Start by cloning the https://github.com/dprothero/Loosely.CustomerPortal repository (master branch) or by building the application in my previous blog post. The final code is in the same repository, but in the error-handling branch.

Here’s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Diagnostics;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      // Here is where you would persist the ticket to a data store of some kind.
      // For this example, we'll just write it to the trace log.
      Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                      "Id: " + envelope.Message.Id + "\r\n" +
                      "Email: " + envelope.Message.CustomerEmail + "\r\n" + 
                      "Message: " + envelope.Message.Message);

      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      // Send email confirmation to the customer.
      var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                        "We will respond to your inquiry ASAP.\n\n" + 
                        "Your Message:\n" + envelope.Message.Message;

      EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);
    }
  }
}

We just check to see if the text of the message contains the word “poison” and, if it does, throw an exception. Now we can run the app, open a ticket, and type “poison” into the message field to get our consumer to throw the exception:

image

Take a look at the log file (C:\Logs\Loosely.CustomerPortal.Backend.log) and you’ll see these entries:

image

What’s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There’s no delay between retries (we’ll look at that later). Since our exception isn’t really transient, then it’s going to try 5 times without success. Next question… where’s the TicketOpened message now?

Go into the RabbitMQ management interface (see this post for instructions – should be at http://localhost:15672) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:

image

Click on the error queue and scroll down to the “Get messages” section. Set Requeue to ‘No’ and click “Get Message(s).” This will remove the message from the queue and display it to us. You can see our poison message in JSON format:

image

Sidebar: Changing Default Retry Limit

If you want to change MassTransit’s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.

using Loosely.Bus.Configuration;
using MassTransit;

namespace Loosely.CustomerPortal.Backend
{
  class TicketService
  {
    IServiceBus _bus;

    public TicketService()  {  }

    public void Start()
    {
      _bus = BusInitializer.CreateBus("CustomerPortal_Backend", x =>
      {
        x.SetDefaultRetryLimit(1);
        x.Subscribe(subs =>
        {
          subs.Consumer<TicketOpenedConsumer>().Permanent();
        });
      });
    }

    public void Stop()
    {
      _bus.Dispose();
    }
  }
}

That will set the retry limit to 1.

Requeuing Error Messages

If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel plugin. First, make sure your consumer process isn’t running. Then, open up the “RabbitMQ Command Prompt (sbin dir)” item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:

> rabbitmq-plugins enable rabbitmq_shovel
> rabbitmq-plugins enable rabbitmq_shovel_management

After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into “Shovel Management.” Click “Add a new shovel” and name it something like “Temporary Error Move.” Set the Source Queue to “Loosely_CustomerPortal_Backend_error” and the Destination Queue to “Loosely_CustomerPortal_Backend.” Click “Add shovel.”

This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:

image

Now go back to the Admin tab, Shovel Management, and click on your “Temporary Error Move” shovel. From there, click on the “Delete this shovel” button. If you don’t delete the shovel, it will continue to move messages from the error queue back into the primary queue… essentially creating an infinite retry loop.

Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our “poison” message is really a permanent failure.

Transient Versus Permanent Failures

With a permanent failure, we’re talking about a message that just can’t be processed – at least not with the code written the way it currently is. Perhaps there’s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.

Perhaps the error is such an edge case that we won’t fix it and so we’re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.

Transient Failures

What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient failures – meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.

As we saw, MassTransit has a bit of a blunt method to try to account for transient failures… it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You’d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.

NServiceBus, a commercial analog to MassTransit, has this retry delay ability built into it (called “second-level retries”). However, MassTransit does not. We will have to roll our own, but it won’t be difficult.

Roll Your Own Retry Delay Logic

Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let’s just build the logic into our TicketOpenedConsumer class.

Here’s the new TicketOpenedConsumer class with progressive retry delay logic:

using Loosely.Bus.Contracts;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Loosely.CustomerPortal.Backend
{
  class TicketOpenedConsumer : Consumes<TicketOpened>.Context
  {
    static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
      {
        {0, 60}, {60, 300}, {300, -1}
      };

    public void Consume(IConsumeContext<TicketOpened> envelope)
    {
      int retryDelay = 0;
      int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
      var nextRetryDelay = DelayProgression[retryDelay];
      bool sleepAndRepublish = false;
      
      try
      {
        // Here is where you would persist the ticket to a data store of some kind.
        // For this example, we'll just write it to the trace log.
        Trace.WriteLine("=========== NEW TICKET ===========\r\n" +
                        "Id: " + envelope.Message.Id + "\r\n" +
                        "Email: " + envelope.Message.CustomerEmail + "\r\n" +
                        "Message: " + envelope.Message.Message + "\r\n" +
                        "Current/Next Retry Delay: " + retryDelay.ToString() + "/" + 
                          nextRetryDelay.ToString() + "\r\n" +
                        "Current Time: " + DateTime.Now.ToString());

        CheckForContrivedErrorConditions(envelope);

        // Send email confirmation to the customer.
        var messageBody = "Ticket ID " + envelope.Message.Id + " has been opened for you! " +
                          "We will respond to your inquiry ASAP.\n\n" +
                          "Your Message:\n" + envelope.Message.Message;

        EmailHelper.Send(envelope.Message.CustomerEmail, "Ticket Opened", messageBody);

        // Here is where you would commit any open database transaction
        Trace.WriteLine("Message committed.");
      }
      catch (Exception ex)
      {
        Trace.WriteLine("Exception caught.");
        if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
          sleepAndRepublish = true;
        else throw;
      }

      if(sleepAndRepublish)
      {
        Thread.Sleep(nextRetryDelay * 1000);
        envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
          x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
          x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
        });
      }
    }

    private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)
    {
      if (envelope.Message.Message.Contains("poison"))
        throw (new Exception("Something bad has happened!"));

      if (envelope.Message.Message.Contains("server-blip"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-blip", 
          "server-online(blipped)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-down"))
      {
        envelope.Message.Message = envelope.Message.Message.Replace("server-down",
            "server-blip(downed)");
        throw (new Exception("The mail server is down."));
      }

      if (envelope.Message.Message.Contains("server-disaster"))
        throw (new Exception("The mail server is down."));

    }
  }
}

So let’s take a look at a few lines of code in isolation and discuss what’s happening. First, we setup a dictionary to indicate what we’d like the progression of delays to be.

static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()
  {
    {0, 60}, {60, 300}, {300, -1}
  };

The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:

int retryDelay = 0;
int.TryParse(envelope.Headers["loosely.retry-delay-seconds"], out retryDelay);
var nextRetryDelay = DelayProgression[retryDelay];

Then we check for a header on the message called “loosely.retry-delay-seconds.” Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you’d like. When we want to retry a message later, we’ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.

Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:

CheckForContrivedErrorConditions(envelope);

Within that function, we define three strings (in addition to the original “poison” string) for which we will scan.

“server-disaster” Simulate mail server down for a very long time.
“server-down” Simulate mail server down for less than 5 minutes.
“server-blip” Simulate the mail server down for less than 30 seconds.

Finally, we wrap all of the actual message processing in a try…catch block. If an exception occurs, we check the message to see if it’s a message we know to be a transient condition and if the next retry delay value is not negative one (-1). Negative one will be our trigger to tell us we need to give up on retrying.

Trace.WriteLine("Exception caught.");
if (ex.Message.Contains("server is down") && nextRetryDelay > -1)
  sleepAndRepublish = true;
else throw;

If the condition is met, we set a flag to indicate we want to sleep (delay) for a bit and then republish the message so it will be retried later. If the condition is not met, we re-throw the exception and MassTransit will handle it per normal message processing rules (default being to retry 4 more times and then move to the error queue).

If we do want to sleep and republish, that code is simple:

if(sleepAndRepublish)
{
  Thread.Sleep(nextRetryDelay * 1000);
  envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {
    x.SetHeader("loosely.retry-delay-seconds", nextRetryDelay.ToString());
    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
  });
}

We put the thread to sleep for the prescribed number of seconds (more on that later) and then, after the time has elapsed, we republish the message to the bus with a “loosely.retry-delay-seconds” header value of the amount of time we delayed before republishing the message. That will put the message back on the bus and our consumer will get called again with it. This time, the message will have the special header on it and we’ll know to move onto the next delay value (or stop retrying if that value is –1).

Did You Seriously Just Block the Consumer Thread?

Good catch. Yes, this can have performance implications. MassTransit has a setting called ConcurrentConsumerLimit, which is set to the number of CPU’s times 4 (so 16 on a 4 processor machine). We’re essentially “killing” one of these 16 (or however many) threads while we sleep, thus limiting the number of messages we can process while we’re waiting.

But is this really a problem? In this example, our service is only responsible for processing TicketOpened messages. Every TicketOpened message needs to trigger an email to be sent. If the email server is down, then none of the TicketOpened messages are going to be able to successfully be processed. In this case, it probably makes sense for the entire service to slow down and wait until the mail server is back online.

If the service were responsible for processing many different types of messages, then this would certainly be an issue. However, it begs the question whether it makes sense for a single service to handle different types of messages. In some cases it might, particularly if they all need to be handled in much the same way. But in a lot of cases, it will make more sense to create separate services for your different message types.

What If the Service Crashes While Sleeping?

So we have our consumer sleeping on the consumer thread and the message is “in limbo” while we’re waiting. What happens to the message if the service crashes during the Thread.Sleep? If you send a “server-down” message to get the message to go through the retry logic, take a look at the queue in RabbitMQ:

image

It shows one message in the “Unacked” (unacknowledged) column. This means two things: 1) it won’t deliver the message to any other consumer threads or processes, and 2) it won’t remove the message unless it is acknowledged. If the process hosting our consumer service dies before acknowledging the message, RabbitMQ will move the message back to the “Ready” state.

Caveats and Disclaimers

These bloggers, sheesh. Always cutting corners in the code for the “sake of brevity.” It’s difficult to balance a good, crisp article with well crafted code. First, you don’t see any unit tests. Bad programmer. Next, with a good suite of tests watching your back, the code in this example could be refactored into shorter methods and perhaps a helper class for the retry delay progression. Finally, the call to Thread.Sleep should probably be refactored into a loop to wake up every couple of seconds to see if the service needs to stop.

Other Options

Of course there are other ways to build delayed retry logic into your services. The method used in this post is just the simplest to illustrate, but you can take this further. For example, take a look at the MassTransit-Quartz project. This uses the open source Quartz.NET scheduler to enable delayed publishing of messages. It does, however, require an ADO.NET database to persist the scheduled jobs so you don’t lose your delayed messages. If you need scheduling and visibility into messages that were delayed, then this is your ticket.

Another pattern that could be implemented is that of moving the delayed messages to another RabbitMQ queue. Then you could write something that periodically polled that queue and moved the messages back into the primary queue after the desired delay.

Next Stop…

Let’s take a look at how we can implement multiple consumers in a couple different use cases. In one case, we might want to spin up multiple. identical consumers on different machines to scale out message processing. In another case, we may want to have completely different consumers subscribing to the same message type but intending to do different things with the messages. After that, we’ll probably take a look at Sagas (chaining multiple messaging processing steps together) and then… who knows? Send me your thoughts and questions regarding anything you’d like to see here.

  • udidahan

    Thanks for the NServiceBus call-out.

    BTW – we also have a graphical tool for monitoring the error queue and sending a message back to be reprocessed.

    Incidentally, the approach we recommend is having a single error queue for the entire system as it simplifies monitoring. I can understand why MT went with the approach of having the error queue be “under” the source queue (so that it is clear from which queue it came), but we deal with that by appending a header to the message when we put it in the error queue.

    In any case, great blog post!

    • dprothero

      Thanks Udi! Great to see the contrasts between NServiceBus and MT. Keep em’ coming!

  • Rafal Gwizdala

    Hi there,
    You touched an interesting subject
    However, blocking the consumer thread for the duration of retry delay is imho unacceptable. This is only good if
    a) the delay is no more than few seconds
    b) you don’t have too many failing messages
    otherwise you’ll quickly run out of available threads and processing will stop
    This is basically re-scheduling message to be processed later and it’s imho better to do it using a database – put the message into a database, specify the delivery time and let some daemon publish it when the time comes. This way you can schedule an unlimited number of messages without performance penalty. And this functionality has many other uses beyond retrying failed messages – for example, you can implement all kinds of timeouts with it.

    • dprothero

      “otherwise you’ll quickly run out of available threads and processing will stop”

      Of course it will. That’s why I discuss this in the section “Did You Seriously Just Block the Consumer Thread?”

      It all depends on your use case. If you have a service dedicated to processing one type of message and the inability to process those messages because of an external dependency being down occurs, there’s no reason to be concerned that it will stop all processing. That’s a good thing. No processing can occur anyway, why let the process spin its wheels?

      There will be cases where it’s not a good thing, of course.

      If you’ve got an error that will occur with only a few messages, but most will be OK, then this is definitely not the way to handle it. You will want to use a scheduler like Quartz and the MassTransit.Quartz integration project. Or you can roll your own and use a database or perhaps another queue in RabbitMQ.

      I just don’t believe in adding the additional complexity if it isn’t necessary. I also don’t believe there’s a “one right way” to do anything… it always depends on your use case and related factors. “Only a Sith deals in absolutes.” 😛

      Thanks for the comment!

      • Rafal Gwizdala

        Of course, everything that works is good.

  • Marcus Widblom

    Great post David, this works like a charm.

    I took the opportunity and wrote an extension method to MassTransit IConsumeContext:


    public static void ConsumeWithRetry(this IConsumeContext envelope, Dictionary retryPolicy, Action action, ILog logger = null, string exceptionLogMessage = "") where TMessage : class, new() where TException : Exception
    {
    var retryDelay = 0;
    int.TryParse(envelope.Headers["retry-delay-seconds"], out retryDelay);
    var nextRetryDelay = retryPolicy[retryDelay];
    var sleepAndRepublish = false;

    try
    {
    action();
    }
    catch (TException ex)
    {
    if (logger != null && !string.IsNullOrEmpty(exceptionLogMessage))
    logger.Error(exceptionLogMessage + " Exception message: " + ex.Message);

    if (nextRetryDelay > -1)
    {
    if (logger != null)
    logger.Info(string.Format("Sleeping consumer for {0} seconds and retrying...", nextRetryDelay));

    sleepAndRepublish = true;
    }
    else
    {
    throw;
    }
    }

    if (sleepAndRepublish)
    {
    Thread.Sleep(nextRetryDelay * 1000);
    envelope.Bus.Publish(envelope.Message, x =>
    {
    x.SetHeader("retry-delay-seconds", nextRetryDelay.ToString());
    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);
    });
    }
    }

    Cheers!