Blog
Processing commands with Hangfire and MediatR
Introduction
In the previous post about processing multiple instance aggregates of the same type I suggested to consider using eventual consistency approach. In this post I would like to present one way to do this.
Setup
In the beginning let me introduce stack of technologies/patterns:
- Command pattern - I am using commands but they do not look like theses described in GoF book. They just simple classes with data and they implement
IRequest
marker interface of MediatR.
public class RecalculateCustomerDiscountCommand : IRequest
{
public int CustomerId { get; private set; }
public RecalculateCustomerDiscountCommand(int customerId)
{
this.CustomerId = customerId;
}
}
- Mediator pattern. I am using this pattern because I want to decouple my clients classes (commands invokers) from commands handlers. Simple but great library created by Jimmy Bogard named MediatR implements this pattern very well. Here is simple usage.
// Simple MediatR usage - send requests
public class ClientClass
{
private readonly IMediator mediator;
public ClientClass(IMediator mediator)
{
this.mediator = mediator;
}
public async Task RecalculateCustomerDiscounts(List<int> customerIds)
{
foreach (int customerId in customerIds)
{
await mediator.Send(new RecalculateCustomerDiscountCommand(customerId));
}
}
}
And handler:
// MediatR simple handler
public class RecalculateCustomerDiscountHandler : IAsyncRequestHandler<RecalculateCustomerDiscountCommand>
{
public Task Handle(RecalculateCustomerDiscountCommand command)
{
// Get customer aggregate, processing.
}
}
- Hangfire. Great open-source library for processing and scheduling background jobs even with GUI monitoring interface. This is where my commands are scheduled, executed and retried if error occured.
Problem
For some of my uses cases, I would like to schedule processing my commands, execute them parallel with retry option and monitor them. Hangfire gives me all these kind of features but I have to have public method which I have to pass to Hangifre method (for example BackgroundJob.Enqueue
). This is a problem - with mediator pattern I cannot (and I do not want) pass public method of handler because I have decoupled it from invoker. So I need special way to integrate MediatR with Hangfire without affecting basic assumptions.
Solution
My solution is to have three additional classes:
CommandsScheduler
- serializes commands and sends them to Hangfire.
// CommandsScheduler
public class CommandsScheduler
{
private readonly CommandsExecutor commandsExecutor;
public CommandsScheduler(CommandsExecutor commandsExecutor)
{
this.commandsExecutor = commandsExecutor;
}
public string SendNow(IRequest request, string description = null)
{
var mediatorSerializedObject = this.SerializeObject(request, description);
return BackgroundJob.Enqueue(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject));
}
public string SendNow(IRequest request, string parentJobId, JobContinuationOptions continuationOption, string description = null)
{
var mediatorSerializedObject = this.SerializeObject(request, description);
return BackgroundJob.ContinueWith(parentJobId, () => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), continuationOption);
}
public void Schedule(IRequest request, DateTimeOffset scheduleAt, string description = null)
{
var mediatorSerializedObject = this.SerializeObject(request, description);
BackgroundJob.Schedule(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), scheduleAt);
}
public void Schedule(IRequest request, TimeSpan delay, string description = null)
{
var mediatorSerializedObject = this.SerializeObject(request, description);
var newTime = DateTime.Now + delay;
BackgroundJob.Schedule(() => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), newTime);
}
public void ScheduleRecurring(IRequest request, string name, string cronExpression, string description = null)
{
var mediatorSerializedObject = this.SerializeObject(request, description);
RecurringJob.AddOrUpdate(name, () => this.commandsExecutor.ExecuteCommand(mediatorSerializedObject), cronExpression, TimeZoneInfo.Local);
}
private MediatorSerializedObject SerializeObject(object mediatorObject, string description)
{
string fullTypeName = mediatorObject.GetType().FullName;
string data = JsonConvert.SerializeObject(mediatorObject, new JsonSerializerSettings
{
Formatting = Formatting.None,
ContractResolver = new PrivateJsonDefaultContractResolver()
});
return new MediatorSerializedObject(fullTypeName, data, description);
}
}
CommandsExecutor
- responods to Hangfire jobs execution, deserializes commands and sends them to handlers using MediatR.
// CommandsExecutor
public class CommandsExecutor
{
private readonly IMediator mediator;
public CommandsExecutor(IMediator mediator)
{
this.mediator = mediator;
}
[DisplayName("Processing command {0}")]
public Task ExecuteCommand(MediatorSerializedObject mediatorSerializedObject)
{
var type = Assembly.GetAssembly(typeof(RecalculateCustomerDiscountCommand)).GetType(mediatorSerializedObject.FullTypeName);
if (type != null)
{
dynamic req = JsonConvert.DeserializeObject(mediatorSerializedObject.Data, type);
return this.mediator.Send(req as IRequest);
}
return null;
}
}
MediatorSerializedObject
- wrapper class for serialized/deserialized commands with additional properties - command type and additional description.
// MediatorSerializedObject
public class MediatorSerializedObject
{
public string FullTypeName { get; private set; }
public string Data { get; private set; }
public string AdditionalDescription { get; private set; }
public MediatorSerializedObject(string fullTypeName, string data, string additionalDescription)
{
this.FullTypeName = fullTypeName;
this.Data = data;
this.AdditionalDescription = additionalDescription;
}
/// <summary>
/// Override for Hangfire dashboard display.
/// </summary>
/// <returns></returns>
public override string ToString()
{
var commandName = this.FullTypeName.Split('.').Last();
return $"{commandName} {this.AdditionalDescription}";
}
}
Finally with this implementation we can change our client clasess to use CommandsScheduler
:
// ClientClass uses CommandScheduler
public class ClientClass
{
private readonly CommandsScheduler commandsScheduler;
public ClientClass(CommandsScheduler commandsScheduler)
{
this.commandsScheduler = commandsScheduler;
}
public void RecalculateCustomerDiscounts(List<int> customerIds)
{
foreach (int customerId in customerIds)
{
commandsScheduler.SendNow(new RecalculateCustomerDiscountCommand(customerId));
}
}
}
And our commands are scheduled, invoked and monitored by Hangfire. I sketched sequence diagram which shows this interaction:
Processing commands with MediatR and Hanfire
Additionally, we can introduce interface for CommandsScheduler
- ICommandsScheduler
. Second implementation will not use Hangfire at all and only will execute MediatR requests directly - for example in development process when we do not want start Hangfire Server.
Summary
I presented the way of processing commands asynchronously using MediatR and Hangfire. With this approach we have:
- Decoupled invokers and handlers of commands.
- Scheduling commands mechanism.
- Invoker and handler of command may be other processes.
- Commands execution monitoring.
- Commands execution retries mechanism.
These benefits are very important during development using eventual consistency approach. We have more control over commands processing and we can react quickly if problem will appear.
Image credits: storyset on Freepik.