Accueil Nos publications Blog [SQL Server] Comment utiliser le système de file d’attente de SQL Server Service Broker

[SQL Server] Comment utiliser le système de file d’attente de SQL Server Service Broker

sql-server-logo-300x110Comme l’a montré cet article, SQL Service Broker permet de notifier nos applications de changements dans notre base de données. Parmi les différentes méthodologies à notre disposition, il existe un système de file d’attente qui va nous permettre d’empiler et dépiler des messages personnalisés afin de pouvoir déporter leurs traitement dans une application tierce ou une autre couche de notre application.

Activer SQL Server Service Broker

SQL Server Service Broker doit être activé au niveau de notre base de données. Après avoir créé notre base de données, il suffit pour cela d’éxécuter le script suivant :

    ALTER DATABASE CONTACTS SET ENABLE_BROKER

Si besoin, le script suivant permet de vérifier que le service broker est bien activé :

    SELECT is_broker_enabled, * FROM sys.databases WHERE name = 'CONTACTS';

Avant de pouvoir créer notre file d’attente SQL Server, nous avons besoin de plusieurs éléments pour mettre en place la structure nécessaire :

  • Un type de message qui définira le modèle de données que nous utiliserons pour l’insertion dans la file d’attente.
  • Un contrat de service lié au type de message.
  • Deux services entre lesquels nous allons démarrer une conversation afin de pouvoir poster / lire les messages.

Type de message

Un type de message correspond à un ensemble de données / propriétés / attributs qui définissent le modèle de données qui sera insérée dans notre file d’attente.
Nous allons donc créer un modèle au format XML qui correspondra à notre structure à insérer. Imaginons donc que l’on veut poster les informations d’un contact (ID, prénom, nom et date de naissance). Notre schéma sera :

    CREATE XML SCHEMA COLLECTION ContactSchema AS
    N'<?xml version="1.0" encoding="UTF-16" ?>
        <xsd:schema xmlns:xsd="https://www.w3.org/2001/XMLSchema"
                    elementFormDefault="qualified">
            <xsd:complexType name="ContactType">
                <xsd:sequence>
                    <xsd:element name="ContactID" type="xsd:integer" />
                    <xsd:element name="ContactFirstName" type="xsd:string" />
                    <xsd:element name="ContactLastName" type="xsd:string" />
                    <xsd:element name="ContactBirthDate" type="xsd:date" />
                </xsd:sequence>
            </xsd:complexType>

            <xsd:element name="Contact" type="ContactType" />
        </xsd:schema>' ;

Nous pouvons maintenant créer le type de message associé à ce schéma.

    CREATE MESSAGE TYPE [//CONTACTMESSAGETYPE]
    AUTHORIZATION [dbo] 
    WITH VALIDATION = VALID_XML WITH SCHEMA COLLECTION ContactSchema

La validation du type de message est facultative. Il est possible de valider uniquement le bon formatage du XML (VALIDATION = WELL_FORMED_XML), ou de ne pas se soucier de l’entrée (VALIDATION = NONE). On valide ici le fait que le message a une structure XML correcte et qui correspond à notre schéma ContactSchema.

Contrat, file d’attente et services

Un contrat nous permet de dire quel type de message sera utilisé en entrée et en sortie de la file d’attente. Nous allons créer un contrat pour insérer des messages du type que nous avons créé précédemment.

    CREATE CONTRACT [//INITIATORCONTACTCONTRACT]
    AUTHORIZATION [dbo]
    (
        [//CONTACTMESSAGETYPE] SENT BY INITIATOR
    )

Il est temps maintenant de créer la file d’attente SQL.

    CREATE QUEUE ContactMessageQueue WITH STATUS = ON , RETENTION = OFF  ON [PRIMARY] 

En définissant le paramètre STATUS à ON, on indique que notre file d’attente est disponible et que des messages peuvent y être postés. Le paramètre RETENTION permet de garder les messages d’une conversation dans la file d’attente pendant toute la durée de la conversation. Si le paramètre est passé à ON, aucun message ne sera supprimé avant que l’on ferme la conversation. En le mettant à OFF, on s’assure que chaque message sera supprimé de la file d’attente une fois dépilé. De nombreux autres paramètres existent pour la création de files d’attente, je vous invite à consulter la MSDN pour en prendre connaissance (il est par exemple possible de lancer l’exécution d’une procédure stockée à chaque insertion de message).

On peut maintenant créer les services qui se chargeront de gérer les conversations pour l’insertion des messages dans la file d’attente.

    CREATE SERVICE [//CONTACTMESSAGE_ENQUEUE_SERVICE]
    AUTHORIZATION [dbo]  
    ON QUEUE ContactMessageQueue

Ce service sera notre initiateur (INITIATOR), c’est à dire le service sur lequel on démarrera la conversation et par lequel nous allons insérer les données dans la file d’attente.

    CREATE SERVICE [//CONTACTMESSAGE_DEQUEUE_SERVICE]
    AUTHORIZATION [dbo]  
    ON QUEUE ContactMessageQueue

Celui-ci sera notre cible (TARGET) par lequel on va récupérer les messages présents dans la file d’attente.

Notre file d’attente est maintenant prête à gérer des messages. Voyons comment les empiler et les dépiler.

Poster un message dans une file d’attente

La première étape pour l’insertion de messages dans la file d’attente est d’ouvrir une conversation entre les deux services que nous avons créés. Le script suivant se charge de cette ouverture :

    declare @handler uniqueidentifier

    BEGIN DIALOG @handler
    FROM SERVICE [//CONTACTMESSAGE_ENQUEUE_SERVICE]
    TO SERVICE '//CONTACTMESSAGE_DEQUEUE_SERVICE'
    ON CONTRACT [//INITIATORCONTACTCONTRACT]
    WITH ENCRYPTION = OFF;

Le handler récupéré est le handler du service initiateur. Il nous permettra d’insérer des messages dans la file d’attente.

Pour correspondre au schéma que nous avons créé, imaginons notre classe Contact comme telle :

    public class Contact
    {
        [XmlElement("ContactID")]
        public int Id { get; set; }
        [XmlElement("ContactFirstName")]
        public string FirstName { get; set; }
        [XmlElement("ContactLastName")]
        public string LastName { get; set; }
        [XmlElement("ContactBirthDate")]
        public DateTime BirthDate { get; set; }

        public string Serialize()
        {
            var serializer = new XmlSerializer(this.GetType());

            TextWriter writer = new StringWriter();
            serializer.Serialize(writer, this);

            return writer.ToString();
        }

        public static Contact Deserialize(string xml)
        {
            StringReader reader = new StringReader(xml);
            var serializer = new XmlSerializer(typeof(Contact));
            return serializer.Deserialize(reader) as Contact;
        }
    }

La méthode Serialize nous permettra d’obtenir un XML correspondant au type de données attendu par la conversation. On peut donc poster cette donnée dans la file d’attente. La validation XML que nous avons mise en place sur le schéma sera effectuée lors de l’insertion dans la file d’attente.

    SEND ON CONVERSATION @handler
    MESSAGE TYPE [//CONTACTMESSAGETYPE] (@contactXml)

Notez bien que l’identifiant que nous utilisons pour l’insertion est celui généré à l’ouverture de la conversation. Il correspond à l’identifiant de la conversation pour notre service initiateur, et non pas notre service cible. Nous reviendrons plus loin sur quelques données générées par SQL Server lors de l’ouverture d’une conversation qui peuvent être utiles pour récupérer les messages d’une conversation précise.

Lire un message dans la file d’attente

Nous avons donc une première application qui est chargée d’insérer des messages. On peut maintenant passer au traitement chargé de la lecture des messages dans la file d’attente.

Pour récupérer un message, nous utiliserons l’instruction RECEIVE couplée à une instruction WAITFOR pour temporiser la récupération.

    WAITFOR (
    RECEIVE message_type_name, message_body
    FROM ContactMessageQueue)
    , TIMEOUT 60000;

Cette instruction récupérera tous les messages présents concernant une conversation. Le paramètre TIMEOUT (en millisecondes) correspond au temps d’attente maximum de récupération des messages.

Il est bien sûr possible de récupérer les messages d’une conversation spécifique. Pour cela, quelques points sont à éclaircir. Comme dit plus haut, l’identifiant récupéré lors de l’ouverture de la communication est celui du service initiateur. On ne peut donc pas se baser sur cette donnée pour récupérer les messages attendus par le service cible. A chaque nouvelle conversation, SQL Server génère plusieurs informations dans les tables systèmes qui vont nous être utiles. Un groupe de conversation va être ouvert dans la table sys.conversation_groups. On va donc avoir un identifiant généré pour le service initiateur (récupéré via le @handler lors de l’ouverture de la conversation), et un autre pour le service cible. C’est ce dernier qui nous intéresse, car il va nous permettre de récupérer les messages d’une conversation précise. La requête suivante permet de le récupérer :

    -- @handler correspond à l'identifiant récupéré lors de l'ouverture de la conversation
    SELECT
        target_endpoint.conversation_handle AS target_conversation_handle
    FROM sys.conversation_endpoints initiator_endpoint
    JOIN sys.conversation_endpoints target_endpoint
     ON  initiator_endpoint.conversation_id = target_endpoint.conversation_id
        AND initiator_endpoint.is_initiator = 1
        AND target_endpoint.is_initiator = 0
        WHERE initiator_endpoint.conversation_handle = @handler

Notre RECEIVE ressemblera donc plutôt à :

    WAITFOR (
        RECEIVE message_type_name, message_body
        FROM ContactMessageQueue
        WHERE conversation_handle = @target_conversation_handle)
        , TIMEOUT 60000;

En ayant placé le paramètre RETENTION = OFF lors de la création de la file d’attente, chaque message récupéré sera automatiquement supprimé de la file d’attente sans attendre la fermeture de la conversation.

A savoir qu’en cas d’erreur lors de l’insertion d’un message (par exemple, quand les données insérées ne correspondent pas au schéma attendu), le champ message_type_name du message présent sera par défaut “https://schemas.microsoft.com/SQL/ServiceBroker/Error”. Dans ce cas, le message contenant la cause de l’erreur sera présent dans le champ message_body de la file d’attente. Ces messages sont également récupérés lors de l’instruction RECEIVE, il faudra donc gérer leurs traitement côté application.

Dans le cas d’un message valide, le contenu du message est présent dans le champ message_body en varbinary. Il faut donc le convertir en XML afin de récupérer le contenu inséré à un format consommable par la méthode Deserialize de notre classe Contact. Voici à quoi ressemble notre instruction WAITFOR au final :

    WAITFOR (
        RECEIVE message_type_name, message_body, convert(xml, message_body) 
        FROM ContactMessageQueue
        WHERE conversation_handle = @target_conversation_handle)
        , TIMEOUT 60000;

Une fois tous les messages d’une conversation récupérés, il faut fermer la conversation. On réutilise pour cela les identifiants initiateur et cible générés à l’ouverture de la conversation, et on utilise l’instruction END CONVERSATION.

    END CONVERSATION @handler --Fermeture du endpoint initiator
    END CONVERSATION @conversation_handle --Fermeture du endpoint target

Plus aucun message ne peut être échangé sur cette conversation, les lignes correspondantes sont supprimées des tables systèmes, à vous d’être certains que tous les messages attendus ont bien été insérés et récupérés avant de fermer la conversation afin d’éviter la perte de données (en gérant ça via un type de message spécifique par exemple).

Conclusion

Nous avons vu comment utiliser le service broker de SQL Server. Ces fonctionnalités sont particulièrement utiles pour créer une couche d’asynchronisme dans une application ou pour permettre de déporter le traitement des données insérées dans une autre application. Par exemple, on pourrait imaginer utiliser un webservice dédié à notre création de contacts qui se chargerait d’insérer les messages, pendant que notre application web lance un nouveau thread qui se met en écoute sur notre file d’attente pour récupérer les messages insérés, tout en couplant ça avec SignalR afin de notifier notre client à chaque message inséré.