{"id":115,"date":"2014-07-27T02:18:36","date_gmt":"2014-07-27T02:18:36","guid":{"rendered":"http:\/\/looselycoupledlabs.com\/?p=115"},"modified":"2014-07-28T15:15:54","modified_gmt":"2014-07-28T15:15:54","slug":"error-handling-in-masstransit-consumers","status":"publish","type":"post","link":"https:\/\/looselycoupledlabs.com\/2014\/07\/error-handling-in-masstransit-consumers\/","title":{"rendered":"Error Handling in MassTransit Consumers"},"content":{"rendered":"

Previously<\/a>, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit<\/a> service bus. Then, we built a Windows Service, using the TopShelf<\/a> library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post<\/a> if you aren\u2019t familiar with it as we are going to build on that application here.)<\/p>\n

But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let\u2019s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.<\/p>\n

When It All Goes Horribly Wrong<\/h1>\n

So what will happen to our TicketOpened messages if there\u2019s an error in the TicketOpenedConsumer? In our example, we\u2019re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there\u2019s a number of things that could go wrong. Let\u2019s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.<\/p>\n

MassTransit Default Error Handling<\/h2>\n

To see what MassTransit does, let\u2019s inject a way to get the consumer to throw an exception. Start by cloning the https:\/\/github.com\/dprothero\/Loosely.CustomerPortal<\/a> repository (master branch) or by building the application in my previous blog post<\/a>. The final code is in the same repository, but in the error-handling branch.<\/p>\n

Here\u2019s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:<\/p>\n

using Loosely.Bus.Contracts;\r\nusing MassTransit;\r\nusing System;\r\nusing System.Diagnostics;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n  class TicketOpenedConsumer : Consumes<TicketOpened>.Context\r\n  {\r\n    public void Consume(IConsumeContext<TicketOpened> envelope)\r\n    {\r\n      \/\/ Here is where you would persist the ticket to a data store of some kind.\r\n      \/\/ For this example, we'll just write it to the trace log.\r\n      Trace.WriteLine(\"=========== NEW TICKET ===========\\r\\n\" +\r\n                      \"Id: \" + envelope.Message.Id + \"\\r\\n\" +\r\n                      \"Email: \" + envelope.Message.CustomerEmail + \"\\r\\n\" + \r\n                      \"Message: \" + envelope.Message.Message);\r\n\r\n      if (envelope.Message.Message.Contains(\"poison\"))\r\n        throw (new Exception(\"Something bad has happened!\"));\r\n\r\n      \/\/ Send email confirmation to the customer.\r\n      var messageBody = \"Ticket ID \" + envelope.Message.Id + \" has been opened for you! \" +\r\n                        \"We will respond to your inquiry ASAP.\\n\\n\" + \r\n                        \"Your Message:\\n\" + envelope.Message.Message;\r\n\r\n      EmailHelper.Send(envelope.Message.CustomerEmail, \"Ticket Opened\", messageBody);\r\n    }\r\n  }\r\n}\r\n<\/pre>\n

We just check to see if the text of the message contains the word \u201cpoison\u201d and, if it does, throw an exception. Now we can run the app, open a ticket, and type \u201cpoison\u201d into the message field to get our consumer to throw the exception:<\/p>\n

\"image\"<\/a><\/p>\n

Take a look at the log file (C:\\Logs\\Loosely.CustomerPortal.Backend.log<\/strong>) and you\u2019ll see these entries:<\/p>\n

\"image\"<\/a><\/p>\n

What\u2019s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There\u2019s no delay between retries (we\u2019ll look at that later). Since our exception isn\u2019t really transient, then it\u2019s going to try 5 times without success. Next question\u2026 where\u2019s the TicketOpened message now?<\/p>\n

Go into the RabbitMQ<\/a> management interface (see this post<\/a> for instructions \u2013 should be at http:\/\/localhost:15672<\/a>) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:<\/p>\n

\"image\"<\/a><\/p>\n

Click on the error queue and scroll down to the \u201cGet messages\u201d section. Set Requeue to \u2018No\u2019 and click \u201cGet Message(s).\u201d This will remove the message from the queue and display it to us. You can see our poison message in JSON format:<\/p>\n

\"image\"<\/a><\/p>\n

<\/h2>\n

Sidebar: Changing Default Retry Limit<\/h3>\n

If you want to change MassTransit\u2019s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.<\/p>\n

using Loosely.Bus.Configuration;\r\nusing MassTransit;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n  class TicketService\r\n  {\r\n    IServiceBus _bus;\r\n\r\n    public TicketService()  {  }\r\n\r\n    public void Start()\r\n    {\r\n      _bus = BusInitializer.CreateBus(\"CustomerPortal_Backend\", x =>\r\n      {\r\n        x.SetDefaultRetryLimit(1);\r\n        x.Subscribe(subs =>\r\n        {\r\n          subs.Consumer<TicketOpenedConsumer>().Permanent();\r\n        });\r\n      });\r\n    }\r\n\r\n    public void Stop()\r\n    {\r\n      _bus.Dispose();\r\n    }\r\n  }\r\n}\r\n<\/pre>\n

That will set the retry limit to 1.<\/p>\n

Requeuing Error Messages<\/h2>\n

If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel<\/a> plugin. First, make sure your consumer process isn\u2019t running. Then, open up the \u201cRabbitMQ Command Prompt (sbin dir)\u201d item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:<\/p>\n

> rabbitmq-plugins enable rabbitmq_shovel\r\n> rabbitmq-plugins enable rabbitmq_shovel_management\r\n<\/pre>\n

After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into \u201cShovel Management.\u201d Click \u201cAdd a new shovel\u201d and name it something like \u201cTemporary Error Move.\u201d Set the Source Queue to \u201cLoosely_CustomerPortal_Backend_error\u201d and the Destination Queue to \u201cLoosely_CustomerPortal_Backend.\u201d Click \u201cAdd shovel.\u201d<\/p>\n

This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:<\/p>\n

\"image\"<\/a><\/p>\n

Now go back to the Admin tab, Shovel Management, and click on your \u201cTemporary Error Move\u201d shovel. From there, click on the \u201cDelete this shovel\u201d button. If you don\u2019t delete the shovel, it will continue to move messages from the error queue back into the primary queue\u2026 essentially creating an infinite retry loop.<\/p>\n

Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our \u201cpoison\u201d message is really a permanent<\/span> failure.<\/p>\n

<\/h1>\n

Transient Versus Permanent Failures<\/h1>\n

With a permanent failure, we\u2019re talking about a message that just can\u2019t be processed \u2013 at least not with the code written the way it currently is. Perhaps there\u2019s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.<\/p>\n

Perhaps the error is such an edge case that we won\u2019t fix it and so we\u2019re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.<\/p>\n

Transient Failures<\/h2>\n

What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient<\/em> failures \u2013 meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.<\/p>\n

As we saw, MassTransit has a bit of a blunt method to try to account for transient failures\u2026 it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You\u2019d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.<\/p>\n

NServiceBus<\/a>, a commercial analog to MassTransit, has this retry delay ability built into it (called \u201csecond-level retries<\/a>\u201d). However, MassTransit does not. We will have to roll our own, but it won\u2019t be difficult.<\/p>\n

Roll Your Own Retry Delay Logic<\/h2>\n

Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let\u2019s just build the logic into our TicketOpenedConsumer class.<\/p>\n

Here\u2019s the new TicketOpenedConsumer class with progressive retry delay logic:<\/p>\n

using Loosely.Bus.Contracts;\r\nusing MassTransit;\r\nusing System;\r\nusing System.Collections.Generic;\r\nusing System.Diagnostics;\r\nusing System.Threading;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n  class TicketOpenedConsumer : Consumes<TicketOpened>.Context\r\n  {\r\n    static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()\r\n      {\r\n        {0, 60}, {60, 300}, {300, -1}\r\n      };\r\n\r\n    public void Consume(IConsumeContext<TicketOpened> envelope)\r\n    {\r\n      int retryDelay = 0;\r\n      int.TryParse(envelope.Headers[\"loosely.retry-delay-seconds\"], out retryDelay);\r\n      var nextRetryDelay = DelayProgression[retryDelay];\r\n      bool sleepAndRepublish = false;\r\n      \r\n      try\r\n      {\r\n        \/\/ Here is where you would persist the ticket to a data store of some kind.\r\n        \/\/ For this example, we'll just write it to the trace log.\r\n        Trace.WriteLine(\"=========== NEW TICKET ===========\\r\\n\" +\r\n                        \"Id: \" + envelope.Message.Id + \"\\r\\n\" +\r\n                        \"Email: \" + envelope.Message.CustomerEmail + \"\\r\\n\" +\r\n                        \"Message: \" + envelope.Message.Message + \"\\r\\n\" +\r\n                        \"Current\/Next Retry Delay: \" + retryDelay.ToString() + \"\/\" + \r\n                          nextRetryDelay.ToString() + \"\\r\\n\" +\r\n                        \"Current Time: \" + DateTime.Now.ToString());\r\n\r\n        CheckForContrivedErrorConditions(envelope);\r\n\r\n        \/\/ Send email confirmation to the customer.\r\n        var messageBody = \"Ticket ID \" + envelope.Message.Id + \" has been opened for you! \" +\r\n                          \"We will respond to your inquiry ASAP.\\n\\n\" +\r\n                          \"Your Message:\\n\" + envelope.Message.Message;\r\n\r\n        EmailHelper.Send(envelope.Message.CustomerEmail, \"Ticket Opened\", messageBody);\r\n\r\n        \/\/ Here is where you would commit any open database transaction\r\n        Trace.WriteLine(\"Message committed.\");\r\n      }\r\n      catch (Exception ex)\r\n      {\r\n        Trace.WriteLine(\"Exception caught.\");\r\n        if (ex.Message.Contains(\"server is down\") && nextRetryDelay > -1)\r\n          sleepAndRepublish = true;\r\n        else throw;\r\n      }\r\n\r\n      if(sleepAndRepublish)\r\n      {\r\n        Thread.Sleep(nextRetryDelay * 1000);\r\n        envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {\r\n          x.SetHeader(\"loosely.retry-delay-seconds\", nextRetryDelay.ToString());\r\n          x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);\r\n        });\r\n      }\r\n    }\r\n\r\n    private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)\r\n    {\r\n      if (envelope.Message.Message.Contains(\"poison\"))\r\n        throw (new Exception(\"Something bad has happened!\"));\r\n\r\n      if (envelope.Message.Message.Contains(\"server-blip\"))\r\n      {\r\n        envelope.Message.Message = envelope.Message.Message.Replace(\"server-blip\", \r\n          \"server-online(blipped)\");\r\n        throw (new Exception(\"The mail server is down.\"));\r\n      }\r\n\r\n      if (envelope.Message.Message.Contains(\"server-down\"))\r\n      {\r\n        envelope.Message.Message = envelope.Message.Message.Replace(\"server-down\",\r\n            \"server-blip(downed)\");\r\n        throw (new Exception(\"The mail server is down.\"));\r\n      }\r\n\r\n      if (envelope.Message.Message.Contains(\"server-disaster\"))\r\n        throw (new Exception(\"The mail server is down.\"));\r\n\r\n    }\r\n  }\r\n}\r\n<\/pre>\n

So let\u2019s take a look at a few lines of code in isolation and discuss what\u2019s happening. First, we setup a dictionary to indicate what we\u2019d like the progression of delays to be.<\/p>\n

static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()\r\n  {\r\n    {0, 60}, {60, 300}, {300, -1}\r\n  };\r\n<\/pre>\n

The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:<\/p>\n

int retryDelay = 0;\r\nint.TryParse(envelope.Headers[\"loosely.retry-delay-seconds\"], out retryDelay);\r\nvar nextRetryDelay = DelayProgression[retryDelay];\r\n<\/pre>\n

Then we check for a header on the message called \u201cloosely.retry-delay-seconds.\u201d Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you\u2019d like. When we want to retry a message later, we\u2019ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.<\/p>\n

Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:<\/p>\n

CheckForContrivedErrorConditions(envelope);\r\n<\/pre>\n

Within that function, we define three strings (in addition to the original \u201cpoison\u201d string) for which we will scan.<\/p>\n\n\n\n\n\n
\u201cserver-disaster\u201d<\/strong><\/td>\nSimulate mail server down for a very long time.<\/td>\n<\/tr>\n
\u201cserver-down\u201d<\/strong><\/td>\nSimulate mail server down for less than 5 minutes.<\/td>\n<\/tr>\n
\u201cserver-blip\u201d<\/strong><\/td>\nSimulate the mail server down for less than 30 seconds.<\/td>\n<\/tr>\n<\/tbody>\n<\/table>\n

Finally, we wrap all of the actual message processing in a try\u2026catch block. If an exception occurs, we check the message to see if it\u2019s a message we know to be a transient condition and<\/em> if the next retry delay value is not<\/em> negative one (-1). Negative one will be our trigger to tell us we need to give up on retrying.<\/p>\n

Trace.WriteLine(\"Exception caught.\");\r\nif (ex.Message.Contains(\"server is down\") && nextRetryDelay > -1)\r\n  sleepAndRepublish = true;\r\nelse throw;\r\n<\/pre>\n

If the condition is met, we set a flag to indicate we want to sleep (delay) for a bit and then republish the message so it will be retried later. If the condition is not met, we re-throw the exception and MassTransit will handle it per normal message processing rules (default being to retry 4 more times and then move to the error queue).<\/p>\n

If we do want to sleep and republish, that code is simple:<\/p>\n

if(sleepAndRepublish)\r\n{\r\n  Thread.Sleep(nextRetryDelay * 1000);\r\n  envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {\r\n    x.SetHeader(\"loosely.retry-delay-seconds\", nextRetryDelay.ToString());\r\n    x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);\r\n  });\r\n}\r\n<\/pre>\n

We put the thread to sleep for the prescribed number of seconds (more on that later) and then, after the time has elapsed, we republish the message to the bus with a \u201cloosely.retry-delay-seconds\u201d header value of the amount of time we delayed before republishing the message. That will put the message back on the bus and our consumer will get called again with it. This time, the message will have the special header on it and we\u2019ll know to move onto the next delay value (or stop retrying if that value is \u20131).<\/p>\n

Did You Seriously Just Block the Consumer Thread?<\/h2>\n

Good catch. Yes, this can have performance implications. MassTransit has a setting called ConcurrentConsumerLimit, which is set to the number of CPU\u2019s times 4 (so 16 on a 4 processor machine). We\u2019re essentially \u201ckilling\u201d one of these 16 (or however many) threads while we sleep, thus limiting the number of messages we can process while we\u2019re waiting.<\/p>\n

But is this really a problem? In this example, our service is only responsible for processing TicketOpened messages. Every TicketOpened message needs to trigger an email to be sent. If the email server is down, then none<\/em> of the TicketOpened messages are going to be able to successfully be processed. In this case, it probably makes sense for the entire service to slow down and wait until the mail server is back online.<\/p>\n

If the service were responsible for processing many different types of messages, then this would certainly be an issue. However, it begs the question whether it makes sense for a single service to handle different types of messages. In some cases it might, particularly if they all need to be handled in much the same way. But in a lot of cases, it will make more sense to create separate services for your different message types.<\/p>\n

What If the Service Crashes While Sleeping?<\/h2>\n

So we have our consumer sleeping on the consumer thread and the message is \u201cin limbo\u201d while we\u2019re waiting. What happens to the message if the service crashes during the Thread.Sleep? If you send a \u201cserver-down\u201d message to get the message to go through the retry logic, take a look at the queue in RabbitMQ:<\/p>\n

\"image\"<\/a><\/p>\n

It shows one message in the \u201cUnacked\u201d (unacknowledged) column. This means two things: 1) it won\u2019t deliver the message to any other consumer threads or processes, and 2) it won\u2019t remove the message unless<\/em> it is acknowledged. If the process hosting our consumer service dies before acknowledging the message, RabbitMQ will move the message back to the \u201cReady\u201d state.<\/p>\n

Caveats and Disclaimers<\/h1>\n

These bloggers, sheesh. Always cutting corners in the code for the \u201csake of brevity.\u201d It\u2019s difficult to balance a good, crisp article with well crafted code. First, you don\u2019t see any unit tests. Bad programmer. Next, with a good suite of tests watching your back, the code in this example could be refactored into shorter methods and perhaps a helper class for the retry delay progression. Finally, the call to Thread.Sleep should probably be refactored into a loop to wake up every couple of seconds to see if the service needs to stop.<\/p>\n

Other Options<\/h2>\n

Of course there are other ways to build delayed retry logic into your services. The method used in this post is just the simplest to illustrate, but you can take this further. For example, take a look at the MassTransit-Quartz<\/a> project. This uses the open source Quartz.NET<\/a> scheduler to enable delayed publishing of messages. It does, however, require an ADO.NET database to persist the scheduled jobs so you don\u2019t lose your delayed messages. If you need scheduling and visibility into messages that were delayed, then this is your ticket.<\/p>\n

Another pattern that could be implemented is that of moving the delayed messages to another RabbitMQ queue. Then you could write something that periodically polled that queue and moved the messages back into the primary queue after the desired delay.<\/p>\n

Next Stop\u2026<\/h1>\n

Let\u2019s take a look at how we can implement multiple consumers in a couple different use cases. In one case, we might want to spin up multiple. identical consumers on different machines to scale out message processing. In another case, we may want to have completely different consumers subscribing to the same message type but intending to do different things with the messages. After that, we\u2019ll probably take a look at Sagas (chaining multiple messaging processing steps together) and then\u2026 who knows? Send me your thoughts and questions regarding anything you\u2019d like to see here.<\/p>\n","protected":false},"excerpt":{"rendered":"

Previously, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit service bus. Then, we built a Windows Service, using the TopShelf library, that subscribed to TicketOpened messages and handled… Continue reading →<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":127,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":[],"categories":[6],"tags":[2,3,4,5],"_links":{"self":[{"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/posts\/115"}],"collection":[{"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/comments?post=115"}],"version-history":[{"count":10,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/posts\/115\/revisions"}],"predecessor-version":[{"id":129,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/posts\/115\/revisions\/129"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/media\/127"}],"wp:attachment":[{"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/media?parent=115"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/categories?post=115"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/looselycoupledlabs.com\/wp-json\/wp\/v2\/tags?post=115"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}