Avancé

Sagas et Machine à états au sein de votre architecture micro-services grâce à MassTransit

Temps de lecture : 7 minutes

Définition

La saga est un processus distribué à long terme sur plusieurs services, orchestrée par un coordinateur.
Elle est initialisée par un événement, qui contient un identifiant de corrélation, qui se retrouve ensuite dans tous les messages qui composent cette transaction. C’est bien souvent un Guid, mais ça peut être autre chose.
Ce coordinateur garantit la cohérence des données, et est capable de commander des actions de compensations en cas d’erreurs partielles dans le système. Par définition, une Saga est donc "stateful".

Dans cet article nous prendrons pour exemple la commande d’un café : on donne nos choix, on paie, la machine à café sort la base du café, puis on ajoute les toppings.

Prérequis

  • Une bonne connaissance de .NET et des injections de dépendances aidera à la compréhension.
  • Il me parait également bon de connaître les principes de base des agents de messages (dit "message broker") et de leur fonctionnement.
  • Être également à l’aise avec la programmation asynchrone.
  • Avoir lu le précédent article sur MassTransit

Fonctionnement d’une Saga

Saga

Un événement en provenance d’un service ou d’une UI lance la Saga, qui à son tour commande une action à un autre service, qui émet à son tour un événement pour signaler la fin de sa tâche. La Saga capte cet événement et réagit en fonction, en lançant la commande suivante telle qu’elle a été programmée.
La Saga peut être vue comme un chef d’orchestre. Elle ne joue aucun instrument (elle ne fait aucune action sur le système lui-même), mais elle commande aux différents acteurs (musiciens) de faire ce pour quoi ils sont programmmés (jouer leur instrument).

Ce fonctionnement a été décrit dans un document de Princeton University, USA en 1986, quand l’informatique n’en était qu’à ses balbutiements.
Il a été ensuite décrit également par Arnon Rotem-Gal-Oz plus tard dans ce document en 2012.

Machine à états

La machine a états est une modélisation des états et des transitions inhérentes à une saga. Chaque instance est liée à un identifiant de corrélation, qui démarre à un état initial et qui transitionne vers l’état final au fur et à mesure des événements captés et des commandes envoyées.

Exemple du Coffeeshop

Pour la suite on va prendre l’exemple d’un coffeeshop. Vous entrez dans le coffeeshop, vous commandez votre café. Puis vous payez et une fois fait, la base du café est réalisée puis les toppings sont ajoutés.

Nous retranscrivons cela via une Saga :

Saga

Une UI recueille les commandes et les paiements, un service de machine à café pose la base et un service dédié s’occupe des toppings.
Pour la démo, le paiement peut être refusé, la Saga ordonne alors à l’UI de demander un nouveau paiement. Dans une vraie application, on peut considérer le paiement dans un seul contexte et laisser les Retry Policies gérer cela.

La Saga se termine quand le café est prêt.

Intégration dans MassTransit : Messages

Il est important que tous les messages contiennent un identifiant de corrélation.
Pour cela rien de plus simple, il suffit que toutes vos interfaces de messages aient également CorrelatedBy. Cela ajoute alors la propriété CorrelationId à votre message.

 public interface RequestPaymentCommand : CorrelatedBy<Guid>
 {
    float Amount { get; set; }
 }

Consumer Saga

Saga sans vraiment être une machine à états, cette classe est en fait un aggrégat de consumers. Elle implémente ISaga, qui lui impose un CorrelationId.
On déclare qu’un événement l’initialise, par l’interface InitiatedBy. MassTransit enregistre alors automatiquement le CorrélationId dans l’instance de la Saga à l’arrivée du message, garantissant que tous les messages suivants avec le même identifiant de corrélation arrivent dans la même instance.
Tous les événements suivants sont orchestrés par la Saga, de fait, la Saga implémente logiquement Orchestrates.

Implémentation

public class CoffeeMachineSaga : ISaga,
// Evènement initiateur de la saga
InitiatedBy<OrderSubmittedEvent>,
// Evènements gérés par la saga
Orchestrates<PaymentAcceptedEvent>,
Orchestrates<PaymentRefusedEvent>,
Orchestrates<BaseCoffeeFinishedEvent>,
Orchestrates<ToppingsAddedEvent>
{
    public Guid CorrelationId { get; set; }
    // Autres propriétés de ma saga nécessaires à la logique métier
    // telles que le type de café, les toppings...
    // On peut ici aussi rajouter une propriété pour stocker l'état de la saga
    public string State {get; private set;} = "Not Started"
    // (...)

    // URIs des endpoints pour l'envoi des commandes
    // (...)

    public async Task Consume(ConsumeContext<OrderSubmittedEvent> context)
    {
        // On enregistre les données du message dans l'instance
        // (...)
        // On envoie une demande de paiment
        var sendEndpoint = await context.GetSendEndpoint(requestPaymentEndpoint);
        await sendEndpoint.Send<RequestPaymentCommand>(new { this.CorrelationId, this.Amount });
        this.State = "AwaitingPayment";
    }

    // On réagit au paiment accepté en demandant la base du café
    public async Task Consume(ConsumeContext<PaymentAcceptedEvent> context)
    {
        var sendEndpoint = await context.GetSendEndpoint(createBaseCoffeeEndpoint);
        await sendEndpoint.Send<CreateBaseCoffeeCommand>(new { this.CorrelationId, CoffeeType = this.CoffeeTypeRequested, NoTopping = string.IsNullOrWhiteSpace(this.ToppingsRequested) });
        this.State = "Paid";
    }

    // On réagit au paiement refusé en demandant un nouveau paiement
    public async Task Consume(ConsumeContext<PaymentRefusedEvent> context)
    {
        var sendEndpoint = await context.GetSendEndpoint(requestPaymentEndpoint);
        await sendEndpoint.Send<RequestPaymentCommand>(new { this.CorrelationId, this.Amount });
    }

    // Le café de base est fini, sans topping
    public async Task Consume(ConsumeContext<BaseCoffeeFinishedEvent> context)
    {
        // Si on a demandé des toppings
        if (!string.IsNullOrWhiteSpace(this.ToppingsRequested))
        {
            // On les demande
            var sendEndpoint = await context.GetSendEndpoint(addToppingsEndpoint);
            await sendEndpoint.Send<AddToppingsCommand>(new { this.CorrelationId, Toppings = this.ToppingsRequested.Split(",").Select(t => Enum.Parse<Topping>(t)) });
            this.State = "BaseCoffeeOK";
        }
        else
        {
            // Sinon on est arrivé à la fin de la saga ici
            this.State = "Ended";
            // On peut s'imaginer que la saga enregistre quelque chose en base, envoie une commande pour un service externe...
            // (...)
        }
    }

    // Les toppings ont été ajoutés
    public async Task Consume(ConsumeContext<ToppingsAddedEvent> context)
    {
        // La saga est terminée
        this.State = "Ended";
        // On peut s'imaginer que la saga enregistre quelque chose en base, envoie une commande pour un service externe...
        // (...)
    }
}

Configuration

 services.AddMassTransit(cfgMassTransit =>
 {
    // Ajout du bus
    cfgMassTransit.AddBus(...);
    // Autres configurations
    // (...)

    // Enregistrement de la saga
    cfgMassTransit.AddSaga<CoffeeMachineSaga>().InMemoryRepository();
 });

On peut changer InMemoryRepository() par une implémentation avec Dapper, EntityFramework…

Saga State Machine (machine à états)

Initilisation

Pour les machines à états, il nous faut d’abord un ensemble de propriétés qui vont évoluer tout au long de la vie de l’instance de la saga, dans une classe dédiée, que l’on suffixe souvent par State. Elle doit contenir le CorrelationId ainsi qu’une propriété qu’on définira comme l’état ("State"), qui peut être un enum, un int ou une string.
En Enum ou Int, il faut alors définir tous les states possibles dès le constructeur de la Saga, ce qui peut être contraignant. On va donc prendre ici un champ de type String.

public class CoffeeState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public string CustomerName { get; set; }
    public string ToppingsRequested { get; set; }
    public CoffeeType CoffeeTypeRequested { get; set; }
    public float Amount { get; set; }
}

Puis on peut commencer à créer notre Saga et sa logique, en lui disant quelle classe de State on prend, ainsi que la propriété dans laquelle enregistrer le state. De base, une machine à états a deux états : Initial et Final.

Dans le cas de notre coffeeshop, on en a d’autres, on peut donc préparer aussi à l’avance les States intermédiaires.

public class CoffeeStateMachine : MassTransitStateMachine<CoffeeState>
{
    public CoffeeStateMachine()
    {
        InstanceState(x => x.CurrentState);

        public State AwaitingPayment { get; private set; }
        public State Paid { get; private set; }
        public State BaseCoffeeOK { get; private set; }
    }
}

Configuration

services.AddMassTransit(cfgMassTransit =>
{
    // Autres configurations
    // (...)

    // Configuration du bus
    cfgMassTransit.AddBus(registrationContext => Bus.Factory.CreateUsingRabbitMq(cfgBus =>
    {
        // Config RabbitMQ
        // (...)

        // Configuration du endpoint de la Saga
        var repository = registrationContext.Container.GetService<ISagaRepository<CoffeeState>>();
        cfgBus.ReceiveEndpoint("state-machine", e => e.StateMachineSaga(registrationContext.Container.GetService<CoffeeStateMachine>(), repository));
    }));

    // Enregistrement de la Saga dans le DI
    cfgMassTransit.AddSagaStateMachine<CoffeeStateMachine, CoffeeState>().InMemoryRepository();
});

Bindings des events

On a identifié les événements qui vont avoir lieu et vont faire vivre la Saga. Nous allons donc les déclarer et les relier à la saga via l’ID de Corrélation

public class CoffeeStateMachine : MassTransitStateMachine<CoffeeState>
{
    public CoffeeStateMachine()
    {
        // (...)

        // On indique à la Saga quelle est la propriété de corrélation dans les messages pour chaque événement par 'CorrelateById'
        // Pour l'event 'OrderSubmittedEvent', qui est l'événement déclencheur, on indique par SelectId la propriété d'où provient la corrélation.
        Event(() => OrderSubmittedEvent, x => x.CorrelateById(context => context.Message.CorrelationId).SelectId(context => context.Message.CorrelationId));
        Event(() => PaymentAcceptedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
        Event(() => PaymentRefusedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
        Event(() => BaseCoffeeFinishedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));
        Event(() => ToppingsAddedEvent, x => x.CorrelateById(context => context.Message.CorrelationId));

        // (...)
    }

    // (...)

    public Event<OrderSubmittedEvent> OrderSubmittedEvent { get; private set; }
    public Event<PaymentAcceptedEvent> PaymentAcceptedEvent { get; private set; }
    public Event<PaymentRefusedEvent> PaymentRefusedEvent { get; private set; }
    public Event<BaseCoffeeFinishedEvent> BaseCoffeeFinishedEvent { get; private set; }
    public Event<ToppingsAddedEvent> ToppingsAddedEvent { get; private set; }
}

Ajout de l’orchestration

Maintenant qu’on a déclaré nos événements et comment notre instance de Saga pouvait s’y retrouver dans les messages, on doit à présent implémenter l’orchestration.

Cela se fait dans le constructeur, sous un format plutôt explicite :

// Quand on est à l'état initial, on attend qu'un type d'événement : OrderSubmittedEvent
Initially(When(OrderSubmittedEvent)
            // Action qui est lancée à la réception de l'événement
            .Then(x =>
            {
                x.Instance.CustomerName = x.Data.CustomerName;
                x.Instance.CoffeeTypeRequested = x.Data.CoffeeType;
                x.Instance.ToppingsRequested = string.Join(",", x.Data.Toppings);
                x.Instance.Amount = CoffeePriceCalculator.Compute(x.Data.CoffeeType, x.Data.Toppings);
            })
            // On envoie une commande...
            .SendAsync(requestPaymentEndpoint, context => context.Init<RequestPaymentCommand>(new { context.Instance.CorrelationId, context.Instance.Amount }))
            // ... et on change d'état
            .TransitionTo(AwaitingPayment));

// Pendant cet état-là, on attend deux types d'événements : PaymentAcceptedEvent et PaymentRefusedEvent
During(AwaitingPayment,
        // En cas de paiement accepté
        When(PaymentAcceptedEvent)
        .SendAsync(createBaseCoffeeEndpoint, context => context.Init<CreateBaseCoffeeCommand>(new { context.Instance.CorrelationId, CoffeeType = context.Instance.CoffeeTypeRequested, NoTopping = string.IsNullOrWhiteSpace(context.Instance.ToppingsRequested) }))
        .TransitionTo(Paid),
        // En cas de paiement refusé
        When(PaymentRefusedEvent)
        .SendAsync(requestPaymentEndpoint, context => context.Init<RequestPaymentCommand>(new { context.Instance.CorrelationId, context.Instance.Amount })));

During(Paid, When(BaseCoffeeFinishedEvent)
            // Deux possibilités : si toppings demandés, on a un état en plus où il faut aller, sinon on peut passer au dernier état.
            .IfElse(context => !string.IsNullOrWhiteSpace(context.Instance.ToppingsRequested), x => x
                .SendAsync(addToppingsEndpoint, context => context.Init<AddToppingsCommand>(new { context.Instance.CorrelationId, Toppings = context.Instance.ToppingsRequested.Split(",").Select(t => Enum.Parse<Topping>(t)) }))
                .TransitionTo(BaseCoffeeOK),
            // Sinon, on prévient que c'est fini et la saga se termine.
            x => x.Finalize()));

During(BaseCoffeeOK, When(ToppingsAddedEvent)
                    // Fin de la saga : passage à l'état Final
                    .Finalize());

Je vous invite à comparer le code entre la ConsumerSaga et la SagaStateMachine. Vous retrouverez l’implémentation des consumers des commandes émises par la Saga dans mon Github de démonstration.

Liens utiles

Officiels

Documentation
GitHub, Webinars, Discord

Sagas

Princeton University, USA
Arnon Rotem-Gal-Oz

SOAT

Article : 1ere partie
Webinar du 7 mai 2020
Mon Github de démonstration

© SOAT
Toute reproduction interdite sans autorisation de la société SOAT

Nombre de vue : 146

AJOUTER UN COMMENTAIRE