Uno de los mayores miedos que nos preocupa cuando hablamos de arquitecturas basadas en eventos, es la pérdida de mensajes. Subiendo el nivel de abstracción, la incosistencia entre sistemas. La típica pregunta que nos surge es: ¿Guardo primero los cambios y luego publico el mensaje? o viceversa. La realidad es que si solo contemplas esas 2 opciones, hay cabida a inconsistencias:
- Si guardas primero, y la posterior publicación falla, los consumidores estarán inconsistentes respecto a la fuente de la verdad.
- Si publicas primero, fallando esta vez la persistencia, los consumidores estarían en una realidad inexistente en la fuente de la verdad.
Entre estas 2 opciones me solía decantar por la primera. Hasta que Modesto nos habló del Outbox Pattern. Defendiendo que ambas operaciones, guardar y publicar, han de ser atómicas ya que lo contrario te podría llevar a errores en algún proceso de negocio.
¿Qué es el Outbox Pattern?
Es un patrón que busca que los cambios en tu modelo de datos y la publicación de los mensajes correspondientes se garanticen de manera atómica en la misma transacción.
¿Cómo lo hace?
Separando la intención de publicar un mensaje de la publicación en sí.
Por un lado, un caso de uso produce unos cambios y crea eventos que guardará transaccionalmente en la misma base de datos.
Por otro lado tienes una pieza que periódicamente acude a tu tabla (por ejemplo: OutboxMessages) y trata de publicar cada mensaje, si lo consigue lo marca como publicado, si no lo consigue lo reintentará en el próximo intervalo de tiempo.
¿Qué valor nos aporta?
- Evitar inconsistencias entre aplicaciones que producen mensajes y aplicaciones consumidoras. Asumiendo la consistencia eventual, claro :)
- Garantía de que el mensaje se publicará, los mensajes no se pierden ya que estará en el Outbox hasta que se publique.
Ejemplo de código
public class CreatePersonAction {
private readonly PersonRepository personRepository;
private readonly MessagePublisher messagePublisher;
private readonly TransactionScope transactionScope;
public CreatePersonAction(PersonRepository personRepository, MessagePublisher messagePublisher, TransactionScope transactionScope) {
this.personRepository = personRepository;
this.messagePublisher = messagePublisher;
this.transactionScope = transactionScope;
}
public async Task Execute(string personName, int personAge) {
await transactionScope.ExecuteAsync(async () => {
var person = Person.Create("<your person id>", personName, personAge);
await personRepository.Create(person);
await messagePublisher.Publish(new PersonCreated(person.Id, person.Name, person.Age));
});
}
}
Esto es un caso de uso para crear una Persona. Al ser un caso de uso, tiene su propio contexto transaccional. Ese contexto transaccional está vinculado a una conexión a base de datos. Dentro de la propia transacción, estamos por una lado persistiendo a la persona mediante el personsRepository.Save
, y por otro lado, publicando sus eventos con el messagePublisher.Publish
.
Si dentro del transactionScope
ocurriese cualquier excepción, se hace un rollback de la transacción abierta. Si por el contrario, todo va bien se hará un commit
de la transacción.
Nuestra interfaz MessagePublisher
tiene una implementación OutboxMessagePublisher
:
public async Task Publish<T>(T message) {
var options = publishingConfiguration.GetBy<T>();
var outboxMessage = new Message(
"<your message id>",
"<your message schema>",,
DateTime.UtcNow,
messageSerializer.Serialize(message),
correlationId.Value,
options);
await outboxMessagesRepository.Append(outboxMessage);
}
No hace más que guardar el mensaje. El resultado de este caso de uso en nuestra aplicación, no sería más que una nueva Persona en nuestra tabla SQL People
y un nuevo Evento PersonCreated
en nuestra tabla del Outbox
.
Finalmente cada 5 segundos (o lo que hayamos configurado), tenemos un API OutboxProcessorAPI
que acude a esta tabla y es capaz de publicar el mensaje en su destino. Que podría ser un Rabbit, un Azure Service Bus, un Stream de mensajes…
Outbox Processor
Nuestra primera implementación fue incluir este OutboxProcessor
como una tarea en background en el propio API. Hasta que nos hicimos la siguiente pregunta ¿Qué pasa si el API escala? Esta pregunta, nos llevó a la conclusión de que el OutboxProcessor debía ser una pieza independiente y única que pudiera leer los mensajes de manera secuencial. Una cosa era asumir idempotencia y otra enviar sistemáticamente los mismos mensajes una y otra vez… 😅
Hablando de tecnologías concretas, usamos netcore
. Nuestro Outbox Processor está implementado con un BackgroundService. Para desplegar on-premise en un IIS, hubieron ciertas riñas entre el IIS y los BackgroundServices, pero no fuímos los primeros en enfrentarnos a ello. Microsoft aunque hablan de esto en la documentación, te recomienda usar WindowsServices en lugar de un API para este tipo de casos, pero nuestro objetivo es tender a kubernetes así que tratamos de huir de dicha recomendación. De momento, nos está funcionando.
Gestión de errores
¿Qué pasa si la publicación de un mensaje falla? La respuesta simple es que lo volverá a intentar en la próxima iteración. En nuestro caso la publicación de mensajes se hace de manera secuencial siempre, si falla uno se para y lo vuelve a intentar en la próxima iteración. Ahora mismo nos sirve, pero quizás más adelante tengamos que invertir más esfuerzo en las políticas de reintentos.
Desarrollo transversal
Algo que para nosotros era clave, era proponer una solución transversal que cualquier equipo pudiera usar sin tener que currarse su propia solución custom. No tenía sentido que cada equipo estuviese definiendo su propia solución a un problema que tendríamos de manera transversal. Por otro lado, esto nos ha permitido no centrar la solución en las necesidades de un equipo sino que hemos tenido que pensar en un contexto más amplio y largo plazista.
Para esto ha sido esencial poner mucho foco en la Developer Experience. Simplificar su uso, pensar detalladamente en qué queríamos entregar. Prácticamente hemos desarrollado esta pieza como si fuéramos un equipo externo, haciendo entregas y recogiendo feedback de los equipos. Este enfoque nos ha permitido entregar algo muy sencillo de usar, pero que nos resuelve un problema que nos afectaba a nivel estratégico.
Conclusión
Ya llevamos con esta solución un tiempo en producción y está haciendo bien su trabajo. De hecho, hace no mucho tuvimos nuestro primer caso en el que el Outbox nos salvó bastante, pues por un error de infrastructura no pudimos publicar miles de mensajes. Y bueno, no perdimos ninguno, estaban todos esperando en el outbox a que solucionaramos el problema.
La adopción del Outbox aunque todavía es temprano, nos está ayudando a tener más confianza en la comunicación asíncrona. Previo al Outbox, cada equipo estaba solucionando este problema a su manera. Esto nos resultaba más caro por: incosistencias, estabamos dando solución al mismo problema múltiples veces, dificultaba rotaciones, soluciones demasiado acopladas…
Abordarlo como un desarrollo transversal, nos ha permitido solventar un problema a nivel estrategia técnica pero sin perjudicar el flujo de entrega de los distintos productos que desarrollamos. Evidentemente, esto tiene sus contras. Es fácil caer en la sobreingeniería y es importante recoger feedback frecuentemente tanto del alcance cómo del código para mantener los pies en la tierra. Para nosotros ha sido esencial facilitar mecanismos a los equipos que han comenzado a usarlo para que nos dieran todo el feedback que pudieran. Sin duda, el feedback recibido ha marcado la diferencia.
Bibliografía
Resilient Eventing in Microservices, using the Outbox Pattern.
The Best Practices for a Great Developer Experience (DX)
Tareas en segundo plano con servicios hospedados en ASP.NET Core
How to auto start and keep an ASP.NET core web application running on IIS