{"id":115,"date":"2014-07-27T02:18:36","date_gmt":"2014-07-27T02:18:36","guid":{"rendered":"http:\/\/looselycoupledlabs.com\/?p=115"},"modified":"2014-07-28T15:15:54","modified_gmt":"2014-07-28T15:15:54","slug":"error-handling-in-masstransit-consumers","status":"publish","type":"post","link":"https:\/\/looselycoupledlabs.com\/2014\/07\/error-handling-in-masstransit-consumers\/","title":{"rendered":"Error Handling in MassTransit Consumers"},"content":{"rendered":"
Previously<\/a>, we built a simple Customer Portal application where a user could use an ASP.NET MVC app to open a new customer service ticket. The website created a TicketOpened message and published it to the MassTransit<\/a> service bus. Then, we built a Windows Service, using the TopShelf<\/a> library, that subscribed to TicketOpened messages and handled creating the ticket and emailing the customer a confirmation email. (I recommend you review the blog post<\/a> if you aren\u2019t familiar with it as we are going to build on that application here.)<\/p>\n But what happens when something goes wrong? Blog posts usually assume the happy path when showing code examples in order to keep them easily digestible. We all know, however, things can and will go wrong. Let\u2019s look at how we can leverage the message queuing infrastructure to handle what may be transient errors as well as perhaps more permanent failures.<\/p>\n So what will happen to our TicketOpened messages if there\u2019s an error in the TicketOpenedConsumer? In our example, we\u2019re only sending an email, but the email server could be down. If we were persisting to a data store, that could be down, or maybe there was a SQL deadlock. As you know, there\u2019s a number of things that could go wrong. Let\u2019s start by looking at what the default MassTransit behavior is when an exception occurs in your consumer.<\/p>\n To see what MassTransit does, let\u2019s inject a way to get the consumer to throw an exception. Start by cloning the https:\/\/github.com\/dprothero\/Loosely.CustomerPortal<\/a> repository (master branch) or by building the application in my previous blog post<\/a>. The final code is in the same repository, but in the error-handling branch.<\/p>\n Here\u2019s the new Consume method in our Loosely.CustomerPortal.Backend.TicketOpenedConsumer class:<\/p>\n We just check to see if the text of the message contains the word \u201cpoison\u201d and, if it does, throw an exception. Now we can run the app, open a ticket, and type \u201cpoison\u201d into the message field to get our consumer to throw the exception:<\/p>\n Take a look at the log file (C:\\Logs\\Loosely.CustomerPortal.Backend.log<\/strong>) and you\u2019ll see these entries:<\/p>\n What\u2019s going on here? What MassTransit does, by default, is retry any message that causes an exception to be thrown in its consumer exactly 4 more times. There\u2019s no delay between retries (we\u2019ll look at that later). Since our exception isn\u2019t really transient, then it\u2019s going to try 5 times without success. Next question\u2026 where\u2019s the TicketOpened message now?<\/p>\n Go into the RabbitMQ<\/a> management interface (see this post<\/a> for instructions \u2013 should be at http:\/\/localhost:15672<\/a>) and click on the Queues tab. Notice we have our normal Loosely_CustomerPortal_Backend queue, but we also have a Loosely_CustomerPortal_Backend_error queue, and it should have 1 message in it:<\/p>\n Click on the error queue and scroll down to the \u201cGet messages\u201d section. Set Requeue to \u2018No\u2019 and click \u201cGet Message(s).\u201d This will remove the message from the queue and display it to us. You can see our poison message in JSON format:<\/p>\n If you want to change MassTransit\u2019s default retry limit of 5 to something else, put the highlighted line below in the Loosely.CustomerPortal.Backend.TicketService class, within your bus initializer code.<\/p>\n That will set the retry limit to 1.<\/p>\n If you end up with messages in the error queue, you may want to move them back to the primary queue to be processed. In RabbitMQ this can be accomplished using the Shovel<\/a> plugin. First, make sure your consumer process isn\u2019t running. Then, open up the \u201cRabbitMQ Command Prompt (sbin dir)\u201d item from your Start menu and run the following two commands to install the Shovel and corresponding management plugins:<\/p>\n After restarting the RabbitMQ Windows Service, take a look in the RabbitMQ management interface. Navigate to the Admin tab and go into \u201cShovel Management.\u201d Click \u201cAdd a new shovel\u201d and name it something like \u201cTemporary Error Move.\u201d Set the Source Queue to \u201cLoosely_CustomerPortal_Backend_error\u201d and the Destination Queue to \u201cLoosely_CustomerPortal_Backend.\u201d Click \u201cAdd shovel.\u201d<\/p>\n This starts a shovel that runs in the background and will move all messages in the error queue back to the primary queue:<\/p>\n Now go back to the Admin tab, Shovel Management, and click on your \u201cTemporary Error Move\u201d shovel. From there, click on the \u201cDelete this shovel\u201d button. If you don\u2019t delete the shovel, it will continue to move messages from the error queue back into the primary queue\u2026 essentially creating an infinite retry loop.<\/p>\n Obviously, when we start up our consumer again, it will try 5 times and fail again, moving it back to the error queue. What we have with our \u201cpoison\u201d message is really a permanent<\/span> failure.<\/p>\n With a permanent failure, we\u2019re talking about a message that just can\u2019t be processed \u2013 at least not with the code written the way it currently is. Perhaps there\u2019s a message that invokes a code path that throws an exception due to a coding error. In this case, these messages would end up in the error queue and should probably stay there until the error is corrected.<\/p>\n Perhaps the error is such an edge case that we won\u2019t fix it and so we\u2019re ok with the occasional message going to the error queue (we should write something to periodically clean up the error queue). It just depends on your business requirements. If, however, the message is mission critical, then the likely scenario would be to fix the bug, redeploy the new code, move the error messages back into the primary queue, and then let them get processed.<\/p>\n What about the examples of failures mentioned earlier? A down email or database server? A deadlock condition in the SQL Server? These could be considered transient<\/em> failures \u2013 meaning, if we just were to retry later, the message could likely be processed just fine with no modifications to the message or the consumer code.<\/p>\n As we saw, MassTransit has a bit of a blunt method to try to account for transient failures\u2026 it tries the message 5 times. Perhaps in a deadlock situation, this would work great, but probably not in a network or server outage situation. You\u2019d likely expect those to last a little longer. What would be ideal is if we could have the message retry after some timeout delay. Perhaps we could even escalate the delay if subsequent retries fail. For example, try 1 minute later on the first retry, then 5 minutes later on the second retry, and then perhaps fail.<\/p>\n NServiceBus<\/a>, a commercial analog to MassTransit, has this retry delay ability built into it (called \u201csecond-level retries<\/a>\u201d). However, MassTransit does not. We will have to roll our own, but it won\u2019t be difficult.<\/p>\n Assuming this is a pattern you want to implement for a larger application with multiple message types, you will probably want to build the retry delay logic into a common helper class. However, for this example, let\u2019s just build the logic into our TicketOpenedConsumer class.<\/p>\n Here\u2019s the new TicketOpenedConsumer class with progressive retry delay logic:<\/p>\n So let\u2019s take a look at a few lines of code in isolation and discuss what\u2019s happening. First, we setup a dictionary to indicate what we\u2019d like the progression of delays to be.<\/p>\n The key is the last number of seconds delayed and the value is the next delay value to use. We start with 0 as you can see in the initialization code:<\/p>\n Then we check for a header on the message called \u201cloosely.retry-delay-seconds.\u201d Yes, I just made that up. Headers are meta-data you can attach to a message and can contain whatever string data you\u2019d like. When we want to retry a message later, we\u2019ll add a header with the number of seconds we just delayed so the next time through the code can know the next delay value to use if the message fails again.<\/p>\n Now we just have a method that can check for some magic strings in our message to see if it should trigger a contrived exception:<\/p>\n Within that function, we define three strings (in addition to the original \u201cpoison\u201d string) for which we will scan.<\/p>\nWhen It All Goes Horribly Wrong<\/h1>\n
MassTransit Default Error Handling<\/h2>\n
using Loosely.Bus.Contracts;\r\nusing MassTransit;\r\nusing System;\r\nusing System.Diagnostics;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n class TicketOpenedConsumer : Consumes<TicketOpened>.Context\r\n {\r\n public void Consume(IConsumeContext<TicketOpened> envelope)\r\n {\r\n \/\/ Here is where you would persist the ticket to a data store of some kind.\r\n \/\/ For this example, we'll just write it to the trace log.\r\n Trace.WriteLine(\"=========== NEW TICKET ===========\\r\\n\" +\r\n \"Id: \" + envelope.Message.Id + \"\\r\\n\" +\r\n \"Email: \" + envelope.Message.CustomerEmail + \"\\r\\n\" + \r\n \"Message: \" + envelope.Message.Message);\r\n\r\n if (envelope.Message.Message.Contains(\"poison\"))\r\n throw (new Exception(\"Something bad has happened!\"));\r\n\r\n \/\/ Send email confirmation to the customer.\r\n var messageBody = \"Ticket ID \" + envelope.Message.Id + \" has been opened for you! \" +\r\n \"We will respond to your inquiry ASAP.\\n\\n\" + \r\n \"Your Message:\\n\" + envelope.Message.Message;\r\n\r\n EmailHelper.Send(envelope.Message.CustomerEmail, \"Ticket Opened\", messageBody);\r\n }\r\n }\r\n}\r\n<\/pre>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/a><\/p>\n
<\/h2>\n
Sidebar: Changing Default Retry Limit<\/h3>\n
using Loosely.Bus.Configuration;\r\nusing MassTransit;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n class TicketService\r\n {\r\n IServiceBus _bus;\r\n\r\n public TicketService() { }\r\n\r\n public void Start()\r\n {\r\n _bus = BusInitializer.CreateBus(\"CustomerPortal_Backend\", x =>\r\n {\r\n x.SetDefaultRetryLimit(1);\r\n x.Subscribe(subs =>\r\n {\r\n subs.Consumer<TicketOpenedConsumer>().Permanent();\r\n });\r\n });\r\n }\r\n\r\n public void Stop()\r\n {\r\n _bus.Dispose();\r\n }\r\n }\r\n}\r\n<\/pre>\n
Requeuing Error Messages<\/h2>\n
> rabbitmq-plugins enable rabbitmq_shovel\r\n> rabbitmq-plugins enable rabbitmq_shovel_management\r\n<\/pre>\n
<\/a><\/p>\n
<\/h1>\n
Transient Versus Permanent Failures<\/h1>\n
Transient Failures<\/h2>\n
Roll Your Own Retry Delay Logic<\/h2>\n
using Loosely.Bus.Contracts;\r\nusing MassTransit;\r\nusing System;\r\nusing System.Collections.Generic;\r\nusing System.Diagnostics;\r\nusing System.Threading;\r\n\r\nnamespace Loosely.CustomerPortal.Backend\r\n{\r\n class TicketOpenedConsumer : Consumes<TicketOpened>.Context\r\n {\r\n static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()\r\n {\r\n {0, 60}, {60, 300}, {300, -1}\r\n };\r\n\r\n public void Consume(IConsumeContext<TicketOpened> envelope)\r\n {\r\n int retryDelay = 0;\r\n int.TryParse(envelope.Headers[\"loosely.retry-delay-seconds\"], out retryDelay);\r\n var nextRetryDelay = DelayProgression[retryDelay];\r\n bool sleepAndRepublish = false;\r\n \r\n try\r\n {\r\n \/\/ Here is where you would persist the ticket to a data store of some kind.\r\n \/\/ For this example, we'll just write it to the trace log.\r\n Trace.WriteLine(\"=========== NEW TICKET ===========\\r\\n\" +\r\n \"Id: \" + envelope.Message.Id + \"\\r\\n\" +\r\n \"Email: \" + envelope.Message.CustomerEmail + \"\\r\\n\" +\r\n \"Message: \" + envelope.Message.Message + \"\\r\\n\" +\r\n \"Current\/Next Retry Delay: \" + retryDelay.ToString() + \"\/\" + \r\n nextRetryDelay.ToString() + \"\\r\\n\" +\r\n \"Current Time: \" + DateTime.Now.ToString());\r\n\r\n CheckForContrivedErrorConditions(envelope);\r\n\r\n \/\/ Send email confirmation to the customer.\r\n var messageBody = \"Ticket ID \" + envelope.Message.Id + \" has been opened for you! \" +\r\n \"We will respond to your inquiry ASAP.\\n\\n\" +\r\n \"Your Message:\\n\" + envelope.Message.Message;\r\n\r\n EmailHelper.Send(envelope.Message.CustomerEmail, \"Ticket Opened\", messageBody);\r\n\r\n \/\/ Here is where you would commit any open database transaction\r\n Trace.WriteLine(\"Message committed.\");\r\n }\r\n catch (Exception ex)\r\n {\r\n Trace.WriteLine(\"Exception caught.\");\r\n if (ex.Message.Contains(\"server is down\") && nextRetryDelay > -1)\r\n sleepAndRepublish = true;\r\n else throw;\r\n }\r\n\r\n if(sleepAndRepublish)\r\n {\r\n Thread.Sleep(nextRetryDelay * 1000);\r\n envelope.Bus.Publish<TicketOpened>(envelope.Message, x => {\r\n x.SetHeader(\"loosely.retry-delay-seconds\", nextRetryDelay.ToString());\r\n x.SetDeliveryMode(MassTransit.DeliveryMode.Persistent);\r\n });\r\n }\r\n }\r\n\r\n private void CheckForContrivedErrorConditions(IConsumeContext<TicketOpened> envelope)\r\n {\r\n if (envelope.Message.Message.Contains(\"poison\"))\r\n throw (new Exception(\"Something bad has happened!\"));\r\n\r\n if (envelope.Message.Message.Contains(\"server-blip\"))\r\n {\r\n envelope.Message.Message = envelope.Message.Message.Replace(\"server-blip\", \r\n \"server-online(blipped)\");\r\n throw (new Exception(\"The mail server is down.\"));\r\n }\r\n\r\n if (envelope.Message.Message.Contains(\"server-down\"))\r\n {\r\n envelope.Message.Message = envelope.Message.Message.Replace(\"server-down\",\r\n \"server-blip(downed)\");\r\n throw (new Exception(\"The mail server is down.\"));\r\n }\r\n\r\n if (envelope.Message.Message.Contains(\"server-disaster\"))\r\n throw (new Exception(\"The mail server is down.\"));\r\n\r\n }\r\n }\r\n}\r\n<\/pre>\n
static private Dictionary<int, int> DelayProgression = new Dictionary<int, int>()\r\n {\r\n {0, 60}, {60, 300}, {300, -1}\r\n };\r\n<\/pre>\n
int retryDelay = 0;\r\nint.TryParse(envelope.Headers[\"loosely.retry-delay-seconds\"], out retryDelay);\r\nvar nextRetryDelay = DelayProgression[retryDelay];\r\n<\/pre>\n
CheckForContrivedErrorConditions(envelope);\r\n<\/pre>\n