Accueil Nos publications Blog Azure EventHub et IoT Hub, bus de données pour le BigData et l’IoT

Azure EventHub et IoT Hub, bus de données pour le BigData et l’IoT

Pour faire du Big Data, il faut bien sûr de la donnée et si, en plus, le traitement est temps réel, il faut que le moyen de collecte dispose de facultés pour l’ingérer massivement en un temps très court.
Pour ce faire, il existe des produits appelés « broker de messages » qui jouent le rôle de bus de collecte de données. Les principaux sont Apache Kafka, Amazon Kinesis et Azure Event Hub.

C’est ce dernier que nous allons étudier dans cet article.

Cet article est la première partie d’une série dédiée à l’IoT, au BigData sur Microsoft Azure. Retrouvez l’introduction à cette série pour plus de détails.

Principes de fonctionnement

Azure EventHub est un morceau inclus dans la brique Azure Service Bus. Cependant, contrairement aux files et sujets MoM (Message Oriented Middleware) de celui-ci, l’orientation est clairement vers les gros débits, laissant de côté l’aspect transactionnel.

Conceptuellement, c’est un gros tampon où chaque message est numéroté et dont l’ordre d’arrivée est préservé. Après une certaine période, le message expire et sort du tampon. Contrairement à une file, lire un message ne provoque pas la suppression ou l’invisibilité du message dans le tampon.

Comme on parle de gros volumes, il faut parler de sharding. Un bus EventHub est divisé en plusieurs partitions (l’ordre des messages est préservé uniquement à l’intérieur de celles-ci), chaque message pouvant définir une clef de partitionnement qui sera hachée pour définir la partition-cible.

Le bus peut être dépilé pour servir plusieurs usages en parallèle (un qui historise, un qui effectue un traitement X, un qui effectue un traitement Y, etc.). Afin de gérer ceci, et notamment la concurrence au sein d’un même usage, la notion de ConsumerGroup a été introduite : il ne peut y avoir qu’un consommateur par groupe et par partition.

Bon, ces quelques paragraphes étaient un peu théoriques, concrètement voici ce que cela implique…

Je produis de la donnée

En tant que producteur, c’est assez simple : un bus EventHub s’adresse en HTTP ou AMQP. J’envoie un message sur celui-ci en précisant éventuellement une partition. Je peux également grouper mes messages par batch mais chaque batch doit alors s’adresser à la même clef de partition.

Je consomme de la donnée

En tant que consommateur, il n’y a que l’AMQP disponible. Il faut ouvrir une connexion par partition en précisant le ConsumerGroup. Ensuite, nous verrons qu’il existe plusieurs moyens d’interroger le bus : un mode direct et un mode managé. Dans les deux cas, la lecture peut se faire en précisant l’offset auquel commencer la lecture (pour reprendre où l’on s’était arrêté par exemple).

Mise en place

Je crée le bus

La création du bus est aisée. Sur le portail azure, il est possible de créer un EventHub (ou Concentrateur d’évènements) sur un espace de nom Service Bus déjà existant ou créé à la volée.
La création personnalisée permet de régler la rétention des messages dans le bus ainsi que le nombre de partitions.

Je produis de la donnée

Pour pousser de la donnée dans Service Bus, il est possible de faire ceci via une requête HTTP ou via AMQP. Mais tout d’abord, il faut avoir le droit de le faire.
En effet, les bus ne sont pas ouverts, ils sont sécurisés par un mécanisme de clef SAS (similaire à ce qui existe sur le Azure Storage). Dans cet exemple, nous ne détaillerons pas l’obtention de cette clef.

La version HTTP est assez simple, un POST sur l’url du bus avec une autorisation suffit :


POST https://namespace.servicebus.windows.net/hubname/messages?timeout=60&api-version=2014-01 HTTP/1.1
Authorization: SharedAccessSignature sr=namespace.servicebus.windows.net&sig=abcdefg&se=123456789&skn=RootManageSharedAccessKey
Host: namespace.servicebus.windows.net
Content-Type: application/xml; charset=utf-8

{ "Timestamp":"2016-02-08T15:34:18.296411+01:00", "Temperature":"42.0" }

N’importe quel objet sur le terrain qui peut faire du HTTPS peut donc poster dans un bus.

Cependant, pour obtenir un meilleur débit, le protocole AMQP est préférable. En effet, à chaque requête HTTP la négociation SSL se fait (HTTP étant stateless), ce qui peut peser dans les performances lors d’un fort volume de données. A l’inverse, en AMQP, la négociation ne se fait qu’une fois, le flux TCP restant ouvert après (connexion stateful).
Microsoft propose une implémentation de librairie pour .Net qui prend en charge l’AMQP et l’HTTP. Elle se trouve dans le paquet NuGet WindowsAzure.ServiceBus.
Son utilisation est très simple :


var eventHubClient = Microsoft.ServiceBus.Messaging.EventHubClient.CreateFromConnectionString("EntityPath=hubName;Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=KeyName;SharedAccessKey=KeyValue");
var payload = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new { Temperature = 42.0f, TimeStamp = DateTimeOffset.Now }));
await eventHubClient.SendAsync(new Microsoft.ServiceBus.Messaging.EventData(payload));

Il existe également la méthode SendBatchAsync(IEnumerable<Microsoft.ServiceBus.Messaging.EventData> batch).
Il est aussi possible de préciser la clef de partition : new Microsoft.ServiceBus.Messaging.EventData(payload) { PartitionKey = "myKey" }

Je consomme de la donnée

D’un point de vue consommation, comme indiqué ci-dessus, il existe deux méthodes.
La première version est « au plus près du métal », c’est à dire qu’elle permet d’écouter une partition et un ConsumerGroup précis. C’est au développeur de gérer la consommation de toutes les partitions et la reprise au dernier endroit consommé.


var eventHubClient = Microsoft.ServiceBus.Messaging.EventHubClient.CreateFromConnectionString("EntityPath=hubName;Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=KeyName;SharedAccessKey=KeyValue");
var ri = await eventHubClient.GetRuntimeInformationAsync();
var consumerGroup = eventHubClient.GetDefaultConsumerGroup();
var receiver0 = await consumerGroup.CreateReceiverAsync(ri.PartitionIds[0]);
var msg = await receiver0.ReceiveAsync();
if (msg != null)
{
    var data = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(msg.GetBytes()));
}

Le premier paramètre de CreateReceiverAsync est l’identifiant de partition que l’on récupère à la liste des partitions de GetRuntimeInformationAsync.

Les appels suivants à la méthode ReceiveAsync dépileront la suite des messages, cependant, en cas de redémarrage de l’exécutable, l’information d’où en est le pointeur sera perdue.
Dans ce cas-là, il faut donc sauvegarder manuellement. C’est le concept de checkpoint. Au redémarrage, il suffit de passer le dernier checkpoint en paramètre lors de la création d’un Receiver.


var receiver0 = await consumerGroup.CreateReceiverAsync(ri.PartitionIds[0], GetLastSavedCheckpoint());
var msg = await receiver0.ReceiveAsync().Dump();
if (msg != null)
{
    var data = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(msg.GetBytes()));
    SaveCheckpoint(msg.Offset);
}

Ce code fonctionne très bien pour écouter une partition, cependant, au moins 4 partitions traitent les évènements. Il faut donc faire en sorte de toutes les écouter. Dans une architecture scalable, il faudra répartir la tâche entre plusieurs instances et répartir les partitions sur les différentes instances.
Ce travail peut s’avérer fastidieux dans le cas de la montée ou de la descente en nombre d’instances mais également pour la gestion des pannes : qui pour reprendre la charge de l’instance tombée ? Enfin, il faut s’assurer que les sauvegardes de checkpoints pour chaque partition soient accessibles par toutes les instances.
En bref, pas mal de plomberie.

Afin de prendre en compte ce genre de cas, il existe un autre mode de consommation, basé sur l’interface Microsoft.ServiceBus.Messaging.IEventProcessor.
L’implémentation est assez simple :


public class MyProcessor : IEventProcessor
{
    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (var element in events)
        {
            //Do something with the EventData instance
        }

        await context.CheckpointAsync();
    }

    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        return Task.FromResult(0);
    }
}

On retrouve la notion de checkpoint évoquée ci-dessus.

Microsoft met à disposition une librairie pour motoriser cette implémentation et qui se base sur Azure Storage pour persister les différents checkpoints ainsi que gérer la concurrence.
Son utilisation est très simple :


var host = new EventProcessorHost(<hubName>, <consumerGroup>, <serviceBusConnectionString>, <storageConnectionString>);
await host.RegisterEventProcessorAsync<MyProcessor>();

EventProcessorHost permet une scalabalité grâce à un mécanisme de baux. Lorsqu’une instance démarre, elle réserve un bail pour chaque partition qui n’en a pas d’actif, ce bail étant sur le storage. Ainsi, lorsque plusieurs instances démarrent, elles se partagent automatiquement le travail. Si une tombe, son bail expire et son travail est repris par une autre instance, à l’endroit sauvegardé.
Ainsi, il est possible de travailler par exemple avec des instances Azure Cloud Services qui écoutent, une règle d’autoscaling activée sur le CPU : lorsque trop de messages arrivent et saturent les instances de traitement en CPU, d’autres instances sont démarrées pour lisser la charge. Le principe inverse pour descendre en charge fonctionne également.

Sécuriser EventHub

Il existe plusieurs niveaux de clefs pour protéger un EventHub. Il est possible de définir une clef au global pour le namespace ServiceBus, pour l’EventHub ou enfin pour identifier un seul émetteur.
En plus de ceci, les clefs peuvent avoir plusieurs droits : émission, réception et gestion.

Une chaîne de connexion Service Bus ressemble à ceci :


//Chaine au niveau namespace avec paire clef/valeur
"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=abcd12345="

//Chaine au niveau EventHub avec paire clef/valeur
"EntityPath=hubName;Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=abcd12345="

//Chaine au niveau device avec paire clef/valeur
"Publisher=deviceId;EntityPath=hubName;Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=keyName;SharedAccessKey=abcd12345="

C’est à partir de ces clefs qu’il est possible de générer la clef SAS :


Authorization: SharedAccessSignature sr=namespace.servicebus.windows.net&sig=abcdefg&se=123456789&skn=RootManageSharedAccessKey

Cette signature se décompose en plusieurs paramètres :

  • sr qui est la ressource protégée (le namespace, le bus ou le slot pour le device)
  • se qui est l’expiration au format timestamp UNIX
  • skn qui est le nom de la clef
  • sig qui est la signature HMAC-256 des différents paramètres (ressource, expiration) par la valeur de la clef, le tout en Base64

Le plus simple étant de se servir de la librairie .Net pour en générer une :


var sas = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(<keyName>, <keyValue>, <resource>, <expiry>);

Une fois cette signature SAS obtenue, il est possible de l’utiliser via HTTP comme vu au-dessus, ou avec une chaîne de connexion adaptée :


//Chaine au niveau device avec signature SAS
"Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessSignature=SharedAccessSignature sr=sb%3a%2f%2fnamespace.servicebus.windows.net%2fhubName%2fPublishers%2fdeviceId&sig=abcdef12345=&se=123456789&skn=SenderKey;EntityPath=hubName;Publisher=deviceId"

Cet exemple est une chaîne de connexion valable uniquement pour le bus hubName et le device deviceId. Il est possible, et même souhaitable, de n’utiliser pour la publication d’évènements que cette forme de chaîne de connexion. Ainsi, il est possible d’identifier l’émetteur et même de le révoquer (au cas où celui-ci serait compromis).


var nsm = Microsoft.ServiceBus.NamespaceManager.CreateFromConnectionString(manageString);
nsm.RevokePublisher(eventHubName,publisherId);

Combien ça coûte ?

La tarification sur le cloud s’adapte à la consommation, c’est ce qui lui procure un avantage certain mais rend également le calcul des coûts un peu plus complexe.

La page officielle de tarification indique deux niveaux de services, le Basic et le Standard. Ce choix se fait au niveau du namespace Service Bus.
Le niveau Basic ne permet pas les Publisher Policies, à savoir l’identification de l’émetteur et la possibilité de le révoquer, ni d’avoir d’autres ConsumerGroup que celui par défaut.

Le premier axe de tarification est le nombre de messages transitant (l’unité étant le million), le second étant le nombre d’unités de débit (Throughput Unit). Chaque unité autorise un débit entrant de 1Mo/s et sortant de 2Mo/s, partagé par tous les bus EventHub. Il est possible d’augmenter le nombre d’unités dans la limite d’une unité par partition. Ainsi, pour un namespace avec 3 bus de 10 partitions chacun, le maximum est de 30 TU. Ces TU sont facturés à l’heure d’utilisation.

IoT Hub, le petit frère pour l’IoT

Présentation

EventHub est très efficace pour collecter de la télémétrie, cependant, la transmission d’information est à sens unique, du device vers le cloud. Il est intéressant d’avoir un canal dans le sens inverse. Par exemple, si à la suite d’une collecte de données (telle que le trafic automobile), le système souhaite prendre une décision (changer la couleur du feu de circulation par exemple), EventHub est bien inutile ici.
Il est possible d’utiliser les files ServiceBus avec l’inconvénient d’utiliser deux parties de Service Bus certes proches mais néanmoins différentes.

Microsoft propose donc un produit dérivé, Azure IoT Hub, permettant une communication bi-directionnelle. Il remplace EventHub pour la transmission du device vers le cloud, permet la communication cloud vers device et propose également une gestion fine des devices. En effet, là où EventHub offrait la possibilité d’obtenir une signature SAS unique pour chaque device, IoT Hub propose un vrai registre de gestion des devices.

Gestion des devices

Chaque dispositif qui envoie des évènements ou reçoit des commandes doit s’authentifier sur le bus et doit donc posséder une clef. Pour cela, il faut l’enregistrer auprès du registre. Ce registre contient la liste des devices ainsi que des métadonnées associées, telles que l’état de la connexion, la date du dernier signe de vie du device, etc.
Il est également possible de révoquer un dispositif compromis.

Communication bi-directionnelle

Le SDK est disponible pour .Net, Java, NodeJS et C (ANSI C99) et a été éprouvé sur plusieurs plateformes et OS différents.
Les ressources sont disponibles sur cette page.

En .Net, voici un exemple pour écouter et envoyer des messages, côté device :


// Define the connection string to connect to IoT Hub
private const string DeviceConnectionString = "<replace>";

static void Main(string[] args)
{  
  // Create the IoT Hub Device Client instance
  DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(DeviceConnectionString);

  // Send an event
  SendEvent(deviceClient).Wait();

  // Receive commands in the queue
  ReceiveCommands(deviceClient).Wait();

  Console.WriteLine("Exited!\n");
}

// Create a message and send it to IoT Hub.
static async Task SendEvent(DeviceClient deviceClient)
{
  string dataBuffer;
  dataBuffer = Guid.NewGuid().ToString();
  Message eventMessage = new Message(Encoding.UTF8.GetBytes(dataBuffer));
  await deviceClient.SendEventAsync(eventMessage);
}

// Receive messages from IoT Hub
static async Task ReceiveCommands(DeviceClient deviceClient)
{
  Console.WriteLine("\nDevice waiting for commands from IoTHub...\n");
  Message receivedMessage;
  string messageData;
  while (true)
  {
    receivedMessage = await deviceClient.ReceiveAsync(TimeSpan.FromSeconds(1));

    if (receivedMessage != null)
    {
      messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
      Console.WriteLine("\t{0}> Received message: {1}", DateTime.Now.ToLocalTime(), messageData);
      await deviceClient.CompleteAsync(receivedMessage);
    }
  }
}

D’un point de vue traitement des évènements, le bus IoT Hub est vu comme un bus EventHub, le fonctionnement avec EventProcessorHost est donc identique.
Pour envoyer une commande vers un device, le code suivant montre un exemple :


static string connectionString = "{iot hub connection string}";

var serviceClient = ServiceClient.CreateFromConnectionString(connectionString);
var commandMessage = new Message(Encoding.ASCII.GetBytes("Cloud to device message."));
await serviceClient.SendAsync("myFirstDevice", commandMessage);

Il est également possible d’écouter les accusés de réception des commandes, afin de savoir si le device a pris en compte la commande ou si elle a expiré :


var feedbackReceiver = serviceClient.GetFeedbackReceiver();
var feedbackBatch = await feedbackReceiver.ReceiveAsync();
if(feedbackBatch != null)
{
    //Do something
}

Il est à noter qu’en plus de HTTP et AMQP, il est possible de mettre en place une passerelle MQTT pour les dispositifs prenant en charge ce protocole.

Conclusion

Nous avons vu le concept ainsi que le principe de fonctionnement d’Azure EventHub et IoT Hub.
Toutefois, nous n’avons pas encore vu les produits en action dans un cas concret.
Les briques Azure étant conçues pour s’imbriquer facilement, nous verrons dans les articles suivants les différentes manières d’utiliser EventHub et IotHub, ainsi que les applications métiers.