Intermédiaire

Facilitez les échanges entre vos microservices en .NET CORE avec MassTransit

Temps de lecture : 9 minutes

Problématique

Dans une architecture orientée messages, il peut être fastidieux de développer les couches techniques.

Au moment d’envoyer un message, on doit penser à vérifier / créer les échangeurs, les "queues", leurs liaisons, gérer les problèmes réseaux… Il faut ensuite binariser notre message puis le débinariser à réception, etc. en faisant attention à l’encodage.

Toutes ces problématiques peuvent prendre du temps et de l’énergie, qui pourraient être employés à développer du code à forte valeur ajoutée.

C’est la promesse que tient MassTransit : ce framework nous permet d’abstraire la couche transport, permettant de nous concentrer sur le code métier.

Si vous souhaitez en savoir plus sur les bases de cet outil en .NET Core, je vous invite à lire la suite de cet article.

Prérequis

  • Avant de continuer cet article il me paraît important de dire qu’une bonne connaissance de .NET et des injections de dépendances aidera à la compréhension.
  • Il me paraît également bon de connaître les principes de base des agents de messages (dit "message broker") et de leur fonctionnement.
  • Être également à l’aise avec la programmation asynchrone.

Features de MassTransit

Abstraction “user-friendly” de la couche de transport de messages.

La documentation officielle est plutôt complète, les APIs faciles à utiliser.
C’est compatible avec plusieurs messages brokers, comme RabbitMQ, Azure Service Bus, Amazon SQS ou Apache Active MQ.
La sérialisation des messages est gérée par MassTransit, avec ou sans encryption.
Il y a également une forte intégration avec différents moteurs d’injection de dépendances : Microsoft, Autofac, Unity…

Gestion de la concurrence d’accès aux données

MassTransit utilise de façon intelligente les APIs asynchrones de .NET Core pour augmenter la productivité sans mettre en danger le serveur.

Gestion des connections et des erreurs réseaux

En cas de panne réseau, MassTransit gère la reconnection automatiquement et s’assure que les échangeurs, “queues” et liaisons sont rétablis.

Politiques d’échecs, de renvoi des messages, et des messages “poison”

MassTransit gère les accusés de réception, offre une possibilité de “retry” en cas d’erreur, voire même de “redelivery” du message. J’expliquerai cela plus bas.
Il est possible aussi de faire du Scheduling d’envoi de message via une intégration avec Quartz.Net, ou un plugin RabbitMQ, ou encore Azure Service Bus.

Sagas et persistence

Le pattern Saga est plus facile à mettre en place avec MassTransit, qui s’interface facilement avec différents ORM, tels que Entity Framework Core, Dapper, MongoDB, NHibernate..

Tests unitaires

MassTransit propose des APIs de tests interfaçables avec les frameworks classiques (NUnit, XUnit..), décorrélés de l’infrastructure choisie grâce à des bus en mémoire.

Bus

Définition

C’est un mot qui revient souvent. Un bus de message est l’autoroute sur laquelle transitent tous les messages. Un bus est composé d’échangeurs, de files (dites "queues"), de liaisons. Si vous n’êtes pas à l’aise avec ces concepts, je vous invite à lire par exemple la documentation de RabbitMQ.

Apport de MassTransit

MassTransit gère tout cela, et encapsule ces concepts en un "Endpoint", point de terminaison.
On envoie alors les messages sur une URI. (en rabbitmq://, en https://… selon le fournisseur du bus).

Formats des URIs

URI complète : rabbitmq://localhost/myVirtualHost/myQueue
URI relative d’une file : queue:myQueue
URI relative d’un échangeur : exchanger:myExchanger

Configuration

Via le moteur d’injection de dépendances choisi, il est possible de déclarer un bus de la façon suivante, par exemple avec celui natif de .NET Core :

static void ConfigureServiceCollection(HostBuilderContext hostingContext, IServiceCollection services)
{
    // Autres injections
    // (...)
    // Ajout de MassTransit
    services.AddMassTransit(cfgGlobal =>
    {
        cfgGlobal.AddBus(serviceProvider =>
        {
            return Bus.Factory.CreateUsingRabbitMq(cfgBus =>
            {
                cfgBus.Host(new Uri($"rabbitmq://localhost/virtualHostTest"), cfgRabbitMq =>
                {
                    cfgRabbitMq.Username("guest");
                    cfgRabbitMq.Password("guest");
                });
            });
        });
    });
}

Par la suite on mettra bien évidemment les infos RabbitMQ dans un fichier de config en JSON, injectées alors en IOptions

À noter qu’on peut également tout faire sans injection de dépendances, via Bus.Factory.

public class Program
{
    public static async Task Main()
    {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => cfg.Host("localhost"));

        // Il faut démarrer le bus avant de l'utiliser !
        await busControl.StartAsync();
        try
        {
            do
            {
                // Du code en boucle
            }
            while (true);
        }
        finally
        {
            // On arrête le bus
            await busControl.StopAsync();
        }
    }
}

Démarrage du bus

Arrêtons-nous maintenant sur cette ligne-là.

services.AddHostedService()

En effet, il faut savoir que le bus est à instancier au démarrage de l’application. C’est pourquoi il est PRIMORDIAL de démarrer le bus avant de l’utiliser.
J’insiste car c’est la plupart du temps la raison pour laquelle rien ne se passe quand on lance le programme pour la première fois. ◔_◔

public class BusControlService : IHostedService
{
    private readonly IBusControl busControl;

    public BusControlService(IBusControl busControl)
    {
        this.busControl = busControl;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await this.busControl.StartAsync(cancellationToken);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await this.busControl.StopAsync(cancellationToken);
    }
}

Messages

MassTransit nous simplifie la tâche en n’utilisant que des interfaces. Il n’y a aucune implémentation à fournir pour les messages que vous voulez faire transiter, juste donner les signatures de propriétés.
MassTransit crée dynamiquement des implémentations en Runtime.

public interface MyMessage
{
    string Content { get; }
}

Le pattern Envelope Wrapper est utilisé : les messages sont wrappés avec des en-têtes (tels que MessageId, SourceAddress, des timestamps…). Tout est géré par MassTransit, vous n’avez donc qu’à vous soucier du contenu du message !

Deux types de messages

Avec MassTransit on est donc concentré sur le code métier, et on parle donc ici de messages en tant qu’objet métier, non plus en tant qu’enveloppe d’octets.
Il y a alors deux types de messages :

Commande

C’est un message qui est envoyé (Send), qui donne un ordre à 1 seul destinataire (1 to 1). Si plusieurs applications écoutent ce message, seule la première pourra le traiter, les autres ne le recevront jamais.

Il est donc important de savoir vous voulez envoyer votre message, vers quelle application.

Il est conseillé d’utiliser un nom d’interface supposant un ordre, donc un impératif (ou un infinitif).
Pourquoi pas le suffixer par "Command", bien que cela ne soit pas forcément necessaire.
Ex.: SubmitOrder, CreateCustomerCommand

Événement

C’est un message qui est publié (Publish), qui avertit qu’un événement s’est produit. Tous les abonnés (1 to n) reçoivent le message et peuvent le traiter séparement.

Il est conseillé d’utiliser un nom d’interface supposant une action passée, donc un participe passé.
Pourquoi pas le suffixer par "Event", bien que cela ne soit pas forcément necessaire.
Ex.: OrderSubmitted, CustomerCreatedEvent

Recevoir un message

Consumer

La réception de messages se fait par le biais de Consumers. Il s’agit d’une classe qui implémente IConsumerTMessage est le type du message attendu.
Il y a alors une méthode asynchrone à implémenter, Consume(ConsumeContext context)

Par exemple :

public interface SubmitOrder
{
    ulong CustomerId {get;}
    ulong OrderId {get;}
}

public class SubmitOrderConsumer : IConsumer<SubmitOrder>
{
    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        await myRepository.Save(context.Message.CustomerId, context.Message.OrderId);
    }
}

ConsumeContext context est le contexte complet du message, il y a dedans le message lui-même (une instance de SubmitOrder) mais également les en-têtes générés par MassTransit ou ceux définis à l’envoi par le producteur du message.

Bien que non recommandé, il est possible d’être consumer de plusieurs messages, en implémentant plusieurs IConsumer différents.

Cycle de vie d’un consumer

Il faut voir le consumer comme un contrôleur d’une WebApi. Une véritable pipeline entre en jeu, composée de plusieurs middlewares, qu’ils soient de MassTransit ou personalisés. Vous trouverez le schéma complet sur la documentation officielle.

Configuration

Pour fonctionner, un consumer doit être déclaré dans MassTransit, puis connecté à un point de terminaison de réception. Cela est fait au moment de la configuration du bus.
Ce point de terminaison est celui sur lequel il faut lui envoyer les commandes. Pour les événements, MassTransit se chargera de les lui acheminer tout seul.

services.AddMassTransit(cfgMassTransit =>
{
    cfgMassTransit.AddConsumer<CreateBaseCoffeeCommandConsumer>();
    cfgMassTransit.AddBus(serviceProvider =>
    {
        return Bus.Factory.CreateUsingRabbitMq(cfgBus =>
        {
            // Configuration du bus
            // (...)
            // Ajout du Endpoint
            cfgBus.ReceiveEndpoint("myQueueName", cfgEndpoint =>
            {
                cfgEndpoint.ConfigureConsumer<MyMessageConsumer>(serviceProvider);
            });
        });
    });
});

Il est possible ensuite dans la configuration du Consumer d’ajouter / modifier des middlewares, mais c’est pour des cas très spécifiques.

Enregistrement par bulk

MassTransit propose cette petite feature dont je suis plutôt friand.
Il est possible d’enregistrer tous les consumers d’un même namespace d’un coup, et de créer automatiquement tous les endpoints via un nom relié au message.
Cela simplifie énormément la maintenance, puisqu’il n’y a plus à toucher à la configuration quand on ajoute / enlève un consumer.

services.AddMassTransit(cfgGlobal =>
{
    cfgGlobal.AddConsumersFromNamespaceContaining<OneOfMyConsumer>();
    cfgGlobal.AddBus(serviceProvider =>
            {
                return Bus.Factory.CreateUsingRabbitMq(cfgBus =>
                {
                    // Configuration du bus
                    // (...)

                    // Enregistre tous les endpoints de tous les consumers d'un coup en nommant la queue associée au type de message au format kebab-case
                    cfgBus.ConfigureEndpoints(serviceProvider, KebabCaseEndpointNameFormatter.Instance);
                });
            });
});

Messages non remis, ou "dead-letter"

Si aucun consumer n’est paramétré pour un type de message, alors celui-ci n’est jamais remis. MassTransit appelle ça des messages "SKIPPED", et sont alors reroutés dans une "queue" (créée à la volée si non existante) qui porte le même nom que la "queue" où est arrivé le message, suffixé de "_skipped".
Il est possible de dire à MassTransit de changer ce comportement (par exemple un reroutage manuel, ou de simplement les détruire.)

Envoyer un message

Selon le type de message, la méthode à suivre n’est pas la même.
Quel que soit le type, il est même possible de différer l’envoi via le Scheduler enregistré au moment de la configuration, en fournissant un DateTime ou un TimeSpan en plus du message.

Commandes

On a besoin pour ça de l’interface ISendEndpointProvider. Celle-ci contient la méthode GetSendEndpoint(Uri), qui nous fournit à la volée un ISendEndpoint, qui aura alors la méthode asynchrone Send(object).

Et c’est là que la magie de MassTransit opère : pas besoin d’instancier de classe qui implémente l’interface de notre message, on peut directement passer un objet de type anonyme ! MassTransit fera lui-même le travail.
Par exemple :

// Envoi immédiat
await sendEndpoint.Send<SubmitOrder>(new { OrderId = 5, CustomerId = 9});
// Envoi dans une heure
await sendEndpoint.ScheduleSend<SubmitOrder>(TimeSpan.FromHours(1), new { OrderId = 5, CustomerId = 9});

Nouveau message

Si on souhaite envoyer un nouveau message de type commande, on peut récupérer un ISendEndpointProvider par injection de dépendance (DI) dans n’importe quelle classe.
Si on est dans un programme sans DI, on peut utiliser le Bus directement, mais c’est déconseillé.

Consumer

Dans le cadre d’un consumer, le Context est un ISendEndpointProvider lui-même, on peut donc directement l’utiliser.

var sendEndPoint = await context.GetSendEndpoint(new Uri("queue:somewhere"));
await sendEndPoint.Send<SubmitOrder>(new { OrderId = 5, CustomerId = 9});

Événement

On a besoin pour ça de l’interface IPublishEndpoint. Celle-ci contient la méthode asynchrone Publish(object).

Par exemple :

// Envoi immédiat
await publishEndpoint.Publish<OrderSubmitted>(new { OrderId = 5, CustomerId = 9});
// Envoi dans une heure
await publishEndpoint.SchedulePublish<OrderSubmitted>(TimeSpan.FromHours(1), new { OrderId = 5, CustomerId = 9});

Nouveau message

Si on souhaite envoyer un nouveau message de type événement, on peut récupérer un IPublishEndpoint par injection de dépendance dans n’importe quelle classe directement.
Si on est dans un programme sans DI, on peut utiliser le Bus directement, mais c’est déconseillé.

Consumer

Dans le cadre d’un consumer, le Context est un IPublishEndpoint lui-même, on peut donc directement l’utiliser.

await context.Publish<OrderSubmitted>(new { OrderId = 5, CustomerId = 9});

Request / Response

MassTransit propose ce pattern alternatif, qui ressemble fortement à un appel HTTP dans le sens où l’appellant attend la réponse de l’appelé avant de continuer, cela peut être utile dans des architectures simples qui n’ont pas besoin d’être totalement asynchrones, par contre c’est à éviter dans le cas d’un processus distribué complexe.
Du côté de la réception, cela se fait avec un Consumer, mais on utilise alors la méthode ResponseAsync à la place.
Du côté de l’appelant, cela se fait via l’interface IRequestClient, injectable, si elle a été configurée.
Hors injection, MassTransit propose une factory.

Configuration

Tout simplement dans la configuration de MassTransit.

services.AddMassTransit(cfgMassTransit =>
{
    // Ajout du bus
    cfgMassTransit.AddBus(...);

    // Enregistrement du client dans le moteur de DI.
    cfgMassTransit.AddRequestClient<TMessageRequest>();
});

Gestion des erreurs

Dans une architecture impliquant plusieurs applications, passant par le réseau, on sait que forcément à un moment donné quelque chose va mal se passer. MassTransit nous fournit de la résilience grâce à sa gestion des erreurs.

Middleware

Lorsqu’un consumer génère une exception avant la fin de la méthode Consume, le message reçu n’est alors pas "acknowledge", c’est-à-dire qu’il n’y a pas d’accusé de réception positif. Ce qui se passe ensuite dépend de ce qu’on a configuré.
Par défaut, sans politique de retry, un événement de type Fault associé est publié.

public interface Fault<TMessage>
    where TMessage : class
{
    Guid FaultId { get; }
    Guid? FaultedMessageId { get; }
    DateTime Timestamp { get; }
    ExceptionInfo[] Exceptions { get; }
    HostInfo Host { get; }
    TMessage Message { get; }
}

Ce message peut à son tour être consommé par un Consumer ! Qui pourrait à son tour faire une exception bien entendu…

Par défaut, MassTransit reroute dans une "queue" (créée à la volée si non existante) qui porte le même nom que la "queue" où est arrivé le message, suffixé de "_error".
Il est possible de dire à MassTransit de changer ce comportement (par exemple un reroutage manuel, ou de simplement les détruire.)

Redirection

On peut tout d’abord changer ce message en commande en indiquant lors de l’envoi du message initial un endpoint vers lequel rediriger les messages Fault associés.

await endpoint.Send<SubmitOrder>(new { OrderId = 27 }, context => context.FaultAddress = new Uri("rabbitmq://localhost/order_faults"));

Le message Fault associé sera alors une commande envoyée sur le endpoint "order_faults" au lieu d’être publié.

Retry

Il est possible de mettre en place une politique de retry, par consumer, par endpoint, ou les deux à la fois.
Si une politique de retry est activée, le message Fault est alors envoyé que si toutes les tentatives ont échoué. En attendant, le message reste dans la file.
Il existe plusieurs politiques de retry, et elles sont cumulables :

Immediate(int retryLimit)
On retente immédiatement, un nombre donné de fois.

Interval(int retryLimit, TimeSpan delay)
On retente à intervalle régulier, un nombre donné de fois.

Intervals(params TimeSpan[] delays)
On retente à des intervalles spécifiés dans le tableau en entrée.

Exponential(int retryLimit, TimeSpan minInterval, TimeSpan maxInterval, TimeSpan intervalGrowth)
On retente à des intervalles de plus en plus espacés, jusqu’à une certaine limite en temps et en nombre.

Incremental(int retryLimit, TimeSpan minInterval, TimeSpan intervalGrowth)
On retente à des intervalles de plus en plus espacés, jusqu’à une certaine limite en nombre.

cfgEndpoint.UseRetry(cfgRetry =>
{
    cfgRetry.Interval(3, TimeSpan.FromSeconds(5)).Intervals(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), TimeSpan.FromHours(1));
});

Ici on retentera 3 fois toutes les 5 secondes, mais aussi une fois à T+30s, une autre fois à T+1min, et une autre fois à T+1h.

cfgEndpoint.ConfigureConsumer<MyMessageConsumer>(serviceProvider, cfgConsumer =>
{
    cfgConsumer.UseRetry(cfgRetry =>
    {
        cfgRetry.Interval(3, TimeSpan.FromSeconds(5));
    });
});
cfgEndpoint.UseRetry(cfgRetry =>
{
    cfgRetry.Intervals(TimeSpan.FromSeconds(30), TimeSpan.FromSeconds(60), TimeSpan.FromHours(1));
});

Ici la première politique ne s’appliquera qu’au consumer de MyMessage, tandis que la seconde à tout l’endpoint.

Redelivery

Parfois, l’erreur est trop grave pour se permettre de retenter dans un court instant, et de garder le message en file. Il est donc possible de mettre en place une relivraison du message. Cette fois-ci, le message sera retiré de la file, et relivré plus tard. Il faut par contre avoir un Scheduler d’enregistré dans la configuration.

La configuration est la même pour pour le Retry, on utilise cfgEndpoint.UseScheduledRedelivery() à la place.

White/Black list

Que ce soit pour le retry ou le redelivery, il est possible de dire qu’on ne veut traiter que certaines exceptions, ou au contraire traiter toutes les exceptions sauf certaines. Attention, on doit choisir entre un système de liste blanche ou noire, mais il n’est pas possible de faire les deux à la fois par politique.

cfgEndpoint.UseRetry(cfgRetry =>
{
    cfgRetry.Interval(3, TimeSpan.FromSeconds(5)).Ignore<ArgumentException>();
    cfgRetry.Interval(2, TimeSpan.FromSeconds(30)).Handle<ArgumentException>();
});

Ici la première politique (3 fois maximum toutes les 5s) concernera toutes les exceptions sauf ArgumentException, et la seconde (2 fois maximum toutes les 30s) ne concernera que ArgumentException.

Démonstration

J’ai présenté un webinar le 7 mai 2020 autour de MassTransit, je vous invite à le revoir.
J’ai également poussé un repository GitHub public dans lequel j’ai mis en application ce qu’il y a dans cet article.

Sagas

Je présenterai les Sagas, intégrées à MassTransit dans un prochain article !

Liens utiles

Officiels

Documentation
GitHub
Webinars
Discord

SOAT

Webinar du 7 mai 2020
Mon Github de démonstration

© SOAT
Toute reproduction interdite sans autorisation de la société SOAT

Nombre de vue : 210

AJOUTER UN COMMENTAIRE