Accueil Nos publications Blog BigData temps réel, Azure Stream Analytics et Architecture Lambda

BigData temps réel, Azure Stream Analytics et Architecture Lambda

Gordon FreemanCollecter des données en masse pour le BigData est un point important, comme nous l’avions vu dans notre article précédent dédié à Azure EventHub.
Néanmoins, le plus important est de transformer ces données brutes en une donnée consolidée qui permet une analyse et donc une prise de décision.

Dans cet article, nous allons utiliser Azure Stream Analytics, équivalent PaaS d’Apache Storm, afin d’analyser les données produites par les flux d’informations relevées par le simulateur SoCars.

Cet article est la seconde partie d’une série dédiée à l’IoT et au BigData sur Microsoft Azure. Retrouvez l’introduction de cette série pour plus de détails ainsi que l’index des articles.

Présentation de Stream Analytics

Introduction

Comme son concurrent Amazon AWS, Microsoft Azure propose des produits qui s’intègrent entre eux. Dans le précédent article, nous avions donc vu EventHub et IoTHub, et bien ces deux produits ont été conçus pour s’intégrer avec Stream Analytics.

Stream Analytics est un produit qui permet l’analyse de ces flux de données en temps réel. Pour cela, trois étapes principales sont nécessaires : définir la ou les entrées, définir la ou les sorties et modéliser l’analyse.
Le langage de modélisation est un dérivé du SQL, adapté pour travailler sur des notions de fenêtre temporelle.

Il existe deux types d’entrées possibles : les flux de données et les références. Le premier est la source principale de données sur lequel travailler, le second sert de données auxiliaires pour de la correlation et sa fréquence de changement est faible.
Dans le cas d’une source flux, elle peut provenir d’un bus EventHub, d’un bus IotHub ou d’un élément Blob du Azure Storage. Pour une entrée référence, ça ne peut être qu’un Blob Azure Storage.

Les sorties sont plus diverses : une base de données SQL, une table ou un blob Azure Storage, un bus EventHub, une file ou une rubrique Azure Service Bus, une base NoSQL DocumentDB ou enfin PowerBI.

D’un point de vue transformation, la requête la plus simple possible est la suivante :


SELECT * FROM [my-source]

Elle joue le rôle de passthrough (passe-plat) pour recopier l’entrée vers la sortie. Cela permet de faire une adaptation de protocole, comme par exemple historiser tous les évènements EventHub dans une table NoSQL.

Le langage SQL pour Stream Analytics

Il existe une référence du langage SQL pour Stream Analytics sur le site de Microsoft, toutefois nous allons présenter quelques points importants.

Le SELECT permet de faire une projection des champs de la requête. On peut y appeler des champs de la source ainsi que des opérations sur ceux-ci, telles que des fonctions sur les chaînes comme SUBSTRING, CONCAT, des fonctions mathématiques comme ABS ou des fonctions d’agrégation comme SUM ou COUNT. Cependant, ces dernières doivent se faire, comme en SQL traditionnel, avec une clause GROUP BY.
Il est possible de préciser dans le SELECT une clause INTO qui précise dans quelle sortie la donnée doit se déverser (ce qui est utile dans les cas où plusieurs sorties sont possibles).

FROM indique la source, qui peut être une entrée (uniquement un flux) ou une requête temporaire (cf. WITH ci-dessous).

Il est possible de faire des jointures avec JOIN. Elles peuvent être entre deux flux ou un flux avec une donnée référence.
Dans le cas d’une jointure entre un flux et une donnée référence, la syntaxe est semblable à une jointure SQL standard. Dans le cas d’une jointure entre flux de données, la clause ON doit obligatoirement inclure une jointure temporelle (cf. sous-titre suivant).

Pour filtrer les données, les clauses WHERE et HAVING sont disponibles et leur syntaxe est identique à la syntaxe SQL habituelle.

Enfin, la dernière clause est GROUP BY qui diffère légèrement. En effet, en plus d’accepter un nom de colonne sur lequel regrouper les lignes, il est obligatoire de préciser une fenêtre temporelle de regroupement. Cette notion de fenêtre temporelle étant un élément-clé de Stream Analytics, nous allons l’aborder tout de suite.

Notion de temps et de fenêtres temporelles

Le point principal qui différencie une source de données flux d’une source référence est le caractère temporel.
En effet, chaque évènement du flux est horodaté, soit automatiquement par EventHub au moment où le message entre dans le bus, soit par une date fonctionnelle contenue dans le message.
Cet horodatage est crucial dans le fonctionnement, puisque c’est lui qui va être utilisé dans les fenêtres temporelles lors des jointures ou regroupements.

Lorsque l’horodatage est contenu dans le message, il est nécessaire d’utiliser TIMESTAMP BY pour préciser la colonne qui le contient, celle-ci devant être une date compréhensible par le système (ISO-8601).

Lors de la jointure il est obligatoire de préciser la différence maximale entre les deux horodatages via DATEDIFF(unité, source1, source2) BETWEEN min AND MAX.

Lors du regroupement, il est possible de définir trois types de fenêtres :

  • TumblingWindow
  • SlidingWindow
  • HoppingWindow

TumblingWindow est la fenêtre la plus simple, elle « saucissonne » le temps en parts égales. Ainsi pour GROUP BY TumblingWindow(minute, 10), la sortie se fera par bloc de 10 minutes (de 0 à 10, 10 à 20, 20 à 30, etc.) toutes les 10 minutes.

SlidingWindow est une fenêtre glissante : chaque évènement d’entrée déclenche un évènement lors de son entrée dans la fenêtre mais aussi à sa sortie.

HoppingWindow est une évolution de TumblingWindow qui distingue fréquence d’échantillonage et plage d’échantillons. Pour GROUP BY HoppingWindow(minute, 10, 5), la sortie se fera comme pour TumblingWindow, par bloc de 10 minutes mais toutes les 5 minutes.
Une TumbingWindow n’est qu’une HoppingWindow où les deux valeurs sont égales : GROUP BY TumblingWindow(minute, 10)GROUP BY HoppingWindow(minute, 10, 10)

Exemples et application à SoCars

Données d’exemple

Pour rappel, SoCars est une solution intelligente de trafic routier qui sert à illustrer cette suite d’articles. Dans l’article précédent, nous avons vu comment transmettre des évènements dans un bus EventHub.
Ici, le bus est alimenté par un capteur qui transmet la plaque d’immatriculation à chaque fois qu’une voiture entre dans une rue.

Voici une partie d’un jeu de données d’exemple :

Timestamp Plate Street
2016-02-22 14:13:55.745 AA-123-AA QuaiPanhardLevassor
2016-02-22 14:13:55.747 BB-456-BB QuaiPanhardLevassor
2016-02-22 14:15:55.758 AA-123-AA RueTolbiac
2016-02-22 14:16:25.747 BB-456-BB RueTolbiac
2016-02-22 14:16:25.758 AA-123-AA RueFrigos
2016-02-22 14:17:25.747 BB-456-BB AvenueFrance

Qui peut s’écrire sous la forme JSON suivante (pour EventHub) :


[
    {"Timestamp":"2016-02-22T14:13:55.7450000Z", "Plate":"AA-123-AA", "Street":"QuaiPanhardLevassor"},
    {"Timestamp":"2016-02-22T14:13:55.7470000Z", "Plate":"BB-456-BB", "Street":"QuaiPanhardLevassor"},
    {"Timestamp":"2016-02-22T14:15:55.7580000Z", "Plate":"AA-123-AA", "Street":"RueTolbiac"},
    {"Timestamp":"2016-02-22T14:16:25.7470000Z", "Plate":"BB-456-BB", "Street":"RueTolbiac"},
    {"Timestamp":"2016-02-22T14:16:25.7580000Z", "Plate":"AA-123-AA", "Street":"RueFrigos"},
    {"Timestamp":"2016-02-22T14:17:25.7470000Z", "Plate":"BB-456-BB", "Street":"AvenueFrance"}
]

Nous avons également un jeu de données de références (sous la forme d’un CSV) des cartes grises :


Plate, Car, Driver Name, Address
AA-123-AA, Dodge Charger, Dominic Toretto, "1492 N Glendale Blvd, Los Angeles"
BB-456-BB, Nissan Skyline GT-R, Brian O'Conner, "11000 Wilshire Blvd, Los Angeles"

Ainsi qu’un jeu de données sur les routes :


Street, Length, SpeedLimit
QuaiPanhardLevassor, 2000, 60
RueTolbiac, 500, 50
AvenueFrance, 100, 50

Jointure avec une référence

Le premier exemple est une simple jointure pour associer le nom du conducteur avec les évènements :


SELECT [events].[Timestamp], [drivers].[Name], [drivers].[Car], [events].[Street]
FROM [events] TIMESTAMP BY [Timestamp]
JOIN [drivers] ON [events].[Plate] = [drivers].[Plate] 
Timestamp Name Car Street
2016-02-22T14:13:55.745Z Dominic Toretto Dodge Charger QuaiPanhardLevassor
2016-02-22T14:13:55.747Z Brian O’Conner Nissan Skyline GT-R QuaiPanhardLevassor
2016-02-22T14:15:55.758Z Dominic Toretto Dodge Charger RueTolbiac
2016-02-22T14:16:25.747Z Brian O’Conner Nissan Skyline GT-R RueTolbiac
2016-02-22T14:16:25.758Z Dominic Toretto Dodge Charger RueFrigos
2016-02-22T14:17:25.747Z Brian O’Conner Nissan Skyline GT-R AvenueFrance

On remarque ici l’instruction TIMESTAMP BY qui permet de préciser au moteur d’utiliser le champ métier Timestamp comme horodatage.

Regroupement

Voici un exemple de regroupement par TumblingWindow


SELECT DateAdd(minute,-5,System.TimeStamp) as WindowStart,
    System.Timestamp as WindowEnd,
    COUNT(*) 
FROM [events] TIMESTAMP BY [Timestamp]
GROUP BY TumblingWindow(minute, 5)
WindowStart WindowEnd Count
2016-02-22T14:10:00.000Z 2016-02-22T14:15:00.000Z 2
2016-02-22T14:15:00.000Z 2016-02-22T14:20:00.000Z 4

Sans surprise, on retrouve le nombre d’évènements survenus entre le début et la fin de la fenêtre.

Le regroupement par HoppingWindow change légèrement le résultat :


SELECT DateAdd(minute,-10,System.TimeStamp) as WindowStart,
    System.Timestamp as WindowEnd,
    COUNT(*) 
FROM [events] TIMESTAMP BY [Timestamp]
GROUP BY HoppingWindow(minute, 10, 5)
WindowStart WindowEnd Count
2016-02-22T14:05:00.000Z 2016-02-22T14:15:00.000Z 2
2016-02-22T14:10:00.000Z 2016-02-22T14:20:00.000Z 6
2016-02-22T14:15:00.000Z 2016-02-22T14:25:00.000Z 4

On voit bien que la fenêtre englobe tous les évènements dans une fenêtre de 10mn et ce, toutes les 5mn.

Enfin, le dernier cas, avec la fenêtre glissante :


SELECT DateAdd(minute,-5,System.TimeStamp) as WindowStart,
    System.Timestamp as WindowEnd,
    COUNT(*) 
FROM [events] TIMESTAMP BY [Timestamp]
GROUP BY SlidingWindow(minute, 5)
WindowStart WindowEnd Count
2016-02-22T14:08:55.745Z 2016-02-22T14:13:55.745Z 1
2016-02-22T14:08:55.747Z 2016-02-22T14:13:55.747Z 2
2016-02-22T14:10:55.758Z 2016-02-22T14:15:55.758Z 3
2016-02-22T14:11:25.747Z 2016-02-22T14:16:25.747Z 4
2016-02-22T14:11:25.758Z 2016-02-22T14:16:25.758Z 5
2016-02-22T14:12:25.747Z 2016-02-22T14:17:25.747Z 6
2016-02-22T14:13:55.745Z 2016-02-22T14:18:55.745Z 5
2016-02-22T14:13:55.747Z 2016-02-22T14:18:55.747Z 4
2016-02-22T14:15:55.758Z 2016-02-22T14:20:55.758Z 3
2016-02-22T14:16:25.747Z 2016-02-22T14:21:25.747Z 2
2016-02-22T14:16:25.758Z 2016-02-22T14:21:25.758Z 1

On remarque que le nombre de lignes en sortie est (2×n)−1 (où n est le nombre de lignes d’entrée). En effet, chaque ligne d’entrée déclenche une ligne pour l’entrée dans la fenêtre mais aussi pour sa sortie.
Ainsi, la première ligne du jeu d’entrée provoque la ligne 1 et 7 en sortie.
Attention : On pourrait s’attendre à ce qu’il y ait 2×n lignes en sortie. Le −1 vient du fait qu’il n’est pas possible de faire une agrégation sur une agrégation vide (lorsque le dernier évènement sort de la fenêtre).
Il serait possible de faire un COUNT mais une fonction comme AVG provoquerait une division par zéro et personne ne souhaite ça.

Fonctions sur partition

De la même manière qu’en SQL, il est possible d’utiliser des fonctions analytiques sur partition à l’aide de la clause OVER PARTITION BY. C’est notamment utilisé en conjugaison avec la fonction LAG et dont voici un exemple d’utilisation :


SELECT    
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [InTimestamp],
    [events].[Timestamp] as [OutTimestamp]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE ISFIRST(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0

On considère, dans cet exemple, à chercher l’heure à laquelle une voiture est entrée dans une rue et à quel moment elle en est sortie (qui est en fait le moment où elle est entrée dans la rue suivante).
LAG permet de remonter dans les évènements précédents. On précise qu’on cherche le dernier évènement par voiture (d’où le PARTITION BY [events].[Plate]).
Il existe également la fonction ISFIRST qui permet ici d’éliminer les lignes où LAG ne retourne rien (lorsque c’est le premier évènement de la partition).

Plate Street InTimestamp OutTimestamp
AA-123-AA QuaiPanhardLevassor 2016-02-22T14:13:55.745Z 2016-02-22T14:15:55.758Z
BB-456-BB QuaiPanhardLevassor 2016-02-22T14:13:55.747Z 2016-02-22T14:16:25.747Z
AA-123-AA RueTolbiac 2016-02-22T14:15:55.758Z 2016-02-22T14:16:25.758Z
BB-456-BB RueTolbiac 2016-02-22T14:16:25.747Z 2016-02-22T14:17:25.747Z

Cette requête commence à être intéressante d’un point de vue fonctionnel. En effet, elle permet de calculer le temps qu’a mis une voiture pour parcourir la rue. Il serait donc tentant de faire la différence entre les deux dernières colonnes.
C’est possible car la fonction DATEDIFF existe et qu’il est possible d’imbriquer un LAG à l’intérieur :


SELECT    
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [InTimestamp],
    [events].[Timestamp] as [OutTimestamp],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as Duration
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0

On peut cependant noter que la requête devient peu lisible avec notamment une répétition du LAG(Timestamp).

Requêtes par étapes

Il est possible et fort pratique de faire les requêtes en plusieurs étapes, ce qui améliore la lisibilité et la maintenabilité. Cela peut se faire grâce à la clause WITH.
En reprenant notre requête au-dessus :


WITH [q1] AS
(SELECT    
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [InTimestamp],
    [events].[Timestamp] as [OutTimestamp]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0)

SELECT System.Timestamp as [Timestamp],
    [q1].[Plate],
    [q1].[Street],
    DATEDIFF(second, [q1].[InTimestamp], [q1].[OutTimestamp]) as [Duration]
FROM [q1]

Cas concret #1 : Vitesse moyenne par rue

A l’aide des quelques bases vues ci-dessus, nous allons pousser pour produire des statistiques intéressantes pour analyser le trafic.

Le premier chiffre intéressant s’obtient en calculant le temps moyen pour parcourir les différentes rues tous les quarts d’heure.


WITH [q1] AS
(SELECT    
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0)

SELECT DATEADD(minute, -15, System.TimeStamp) as [WindowStart],
    System.Timestamp as [WindowEnd],
    [q1].[Street],
    AVG([q1].[Duration])
FROM [q1]
GROUP BY [Street], TumblingWindow(minute, 15)
WindowStart WindowEnd Street Avg
2016-02-22T14:15:00.000Z 2016-02-22T14:30:00.000Z QuaiPanhardLevassor 135
2016-02-22T14:15:00.000Z 2016-02-22T14:30:00.000Z RueTolbiac 45

Cas concret #2: Détection d’anomalies

Il serait bien utile de pouvoir comparer la durée du parcours de chaque voiture dans une rue par rapport au temps moyen constaté. Ainsi, il serait possible de déclencher une alerte si le temps constaté est largement supérieur.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0),

[q2] AS
(SELECT DATEADD(minute, -1, System.TimeStamp) as [WindowStart],
    System.Timestamp as [WindowEnd],
    [Street],
    AVG([Duration]),
    MAX([Duration])
FROM [q1]
GROUP BY [Street], TumblingWindow(minute, 1))

SELECT System.Timestamp as Timestamp,
    [q1].[Plate],
    [q1].[Street],
    [q1].[Duration],
    [q2].[Avg]
FROM [q1]
JOIN [q2] ON [q1].[Street] = [q2].[Street]
    AND DATEDIFF(second, [q2], [q1]) BETWEEN 0 AND 60
WHERE [q1].[Duration] >= 2 * [q2].[Avg]

La requête se déroule en trois temps. Les deux premiers sont la requête du cas concret #1. La fenêtre est rabaissée pour correspondre aux données de l’exemple.
La dernière requête combine les deux flux : pour chaque temps calculé, on le met en regard de la moyenne du créneau précédent.
On ne retient que les valeurs où le temps constaté est deux fois supérieur à la moyenne.

Timestamp Plate Street Duration Avg
2016-02-22T14:17:25.747Z BB-456-BB RueTolbiac 60 30

Pour comprendre un peu mieux le résultat, on peut modifier légèrement la requête et afficher les sorties intermédiaires en ajoutant les deux lignes suivantes :


SELECT * INTO query1 FROM [q1]
SELECT * INTO query2 FROM [q2]

Query1 :

Timestamp Plate Street Duration
2016-02-22T14:15:55.758Z AA-123-AA QuaiPanhardLevassor 120
2016-02-22T14:16:25.747Z BB-456-BB QuaiPanhardLevassor 150
2016-02-22T14:16:25.758Z AA-123-AA RueTolbiac 30
2016-02-22T14:17:25.747Z BB-456-BB RueTolbiac 60

Query2 :

WindowStart WindowEnd Street Avg Max
2016-02-22T14:15:00.000Z 2016-02-22T14:16:00.000Z QuaiPanhardLevassor 120 120
2016-02-22T14:16:00.000Z 2016-02-22T14:17:00.000Z QuaiPanhardLevassor 150 150
2016-02-22T14:16:00.000Z 2016-02-22T14:17:00.000Z RueTolbiac 30 30
2016-02-22T14:17:00.000Z 2016-02-22T14:18:00.000Z RueTolbiac 60 60

On retrouve, dans le premier tableau, les durées de parcours de chaque voiture. Dans le second tableau, on voit la moyenne par rue de chaque fenêtre temporelle (avec une durée de la fenêtre de 1mn).

Lors du premier évènement de Query1, à 14:15:55.758, on va essayer de joindre un évènement de Query2 où la fenêtre est 14:14‐14:15 et la rue identique. Comme il n’y en a pas (ca n’est pas un OUTER JOIN), aucune ligne n’est retournée.

Pour la deuxième ligne de Query1, il existe une ligne correspondante dans Query2 (la première ligne) où les fenêtres correspondent et la rue également; la jointure peut donc se faire. Cependant, elle n’apparaît pas dans le résultat de sortie à cause du WHERE.

La troisième ligne de Query1 est similaire à la première ligne, avec aucune ligne à joindre.

La dernière ligne de Query1 se joint à la troisième ligne de Query2. La clause WHERE étant positive, on la retrouve en sortie.

Note: Il est important de comprendre à quel moment chaque ligne « apparaît » et est disponible pour une jointure : pour Query1, c’est au moment indiqué par la colonne Timestamp, pour Query2 c’est la colonne WindowEnd.
En effet, il n’est possible d’agréger les évènements d’une fenêtre que lorsque celle-ci est terminée. C’est pour cela qu’on fait une jointure de chaque évènement de Query1 avec la fenêtre précédente de Query2, car la fenêtre en cours n’est pas disponible car pas terminée.
Les évènements de sortie se produisent donc au même moment que les évènements d’entrée.

Il est toutefois possible d’inverser la clause de jointure DATEDIFF BETWEEN pour qu’à chaque fin de fenêtre, la fenêtre soit mise en regard des évènements qui se sont produits pendant celle-ci.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0),

[q2] AS
(SELECT DATEADD(minute, -1, System.TimeStamp) as [WindowStart],
    System.Timestamp as [WindowEnd],
    [Street],
    AVG([Duration]),
    MAX([Duration])
FROM [q1]
GROUP BY [Street], TumblingWindow(minute, 1))

SELECT System.Timestamp as [OutputTimestamp],
    [q1].[Timestamp] as [Q1 Timestamp],
    [q2].[WindowStart],
    [q2].[WindowEnd],
    [q1].[Plate],
    [q1].[Street],
    [q1].[Duration],
    [q2].[Avg]
FROM [q1]
JOIN [q2] ON [q1].[Street] = [q2].[Street]
    AND DATEDIFF(second, [q1], [q2]) BETWEEN 0 AND 60
Output Timestamp Q1 Timestamp Window Start Window End Plate Street Duration Avg
14:16:00.000Z 14:15:55.758Z 14:15:00.000Z 14:16:00.000Z AA-123-AA QuaiPanhardLevassor 120 120
14:17:00.000Z 14:16:25.747Z 14:16:00.000Z 14:17:00.000Z BB-456-BB QuaiPanhardLevassor 150 150
14:17:00.000Z 14:16:25.758Z 14:16:00.000Z 14:17:00.000Z AA-123-AA RueTolbiac 30 30
14:18:00.000Z 14:17:25.747Z 14:17:00.000Z 14:18:00.000Z BB-456-BB RueTolbiac 60 60

Bien sûr, avec une fenêtre si petite et notre cas d’exemple, c’est peu intéressant, mais avec une fenêtre de 10mn voici ce que l’on obtient :

Window Start Window End Plate Street Duration Avg
2016-02-22T14:10:00.000Z 2016-02-22T14:20:00.000Z AA-123-AA QuaiPanhardLevassor 120 135
2016-02-22T14:10:00.000Z 2016-02-22T14:20:00.000Z BB-456-BB QuaiPanhardLevassor 150 135
2016-02-22T14:10:00.000Z 2016-02-22T14:20:00.000Z AA-123-AA RueTolbiac 30 45
2016-02-22T14:10:00.000Z 2016-02-22T14:20:00.000Z BB-456-BB RueTolbiac 60 45

Et là, on peut observer des données plus intéressantes. Il est à noter que, désormais, chaque évènement d’entrée influe sur la valeur des aggregations (ie. 150 est utilisé pour calculer la moyenne de 135), contrairement au cas précédent (puisqu’on se servait d’une fenêtre qui n’incluait pas l’évènement).

Le chiffre n’est donc pas tout à fait le même et certaines utilisations n’ont plus de sens. En effet, on aurait pu, au lieu de comparer la durée avec la moyenne, comparer la durée avec le max de la fenêtre précédente et retourner les valeurs strictement supérieures.
Ici, ça n’aurait pas de sens puisque aucune valeur ne peut être strictement supérieure au max.

Pour aller plus loin, on pourrait se dire que le choix de 2×AVG() est arbitraire. Pourquoi 2 ?
On peut donc, si la distribution des données respecte à peu près une gaussienne, se servir de l’écart type et de la largeur à mi-hauteur pour détecter les anomalies.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0),

[q2] AS
(SELECT DATEADD(minute, -10, System.TimeStamp) as [WindowStart],
    System.Timestamp as [WindowEnd],
    [Street],
    AVG([Duration]),
    STDEV([Duration]),
    AVG([Duration]) - 1.1775 * STDEV([Duration]) as FwhmLo,
    AVG([Duration]) + 1.1775 * STDEV([Duration]) as FwhmHi    
FROM [q1]
GROUP BY [Street], TumblingWindow(minute, 10))

SELECT
    [q2].[WindowStart],
    [q2].[WindowEnd],
    [q1].[Plate],
    [q1].[Street],
    [q1].[Duration],
    [q2].[Avg],
    [q2].[FwhmLo],
    [q2].[FwhmHi]
FROM [q1]
JOIN [q2] ON [q1].[Street] = [q2].[Street]
    AND DATEDIFF(second, [q1], [q2]) BETWEEN 0 AND 600
WHERE [q1].[Duration] <= [q2].[FwhmLo]
    OR [q1].[Duration] >= [q2].[FwhmHi]

Ainsi, on récupère les véhicules qui sont allé beaucoup plus vite et plus lentement que la majorité des autres véhicules.
Ces valeurs peuvent être dirigées dans une file Service Bus pour qu’un opérateur puisse les voir apparaître sur une console de monitoring et examiner la situation.

Cas concret #3 : détection de fraude

Dans notre cas, on considère qu’une fraude sur la route est un dépassement de la vitesse maximum autorisée. On dispose du temps qu’il a fallu pour qu’une voiture parcoure une rue, la longueur de cette rue et donc la vitesse moyenne dans la rue.
On peut ainsi la comparer à la vitesse limite et dresser des contraventions.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0)

SELECT System.Timestamp as [Timestamp],
    [q1].[Plate],
    [q1].[Street],
    3.6 * [roads].[Length] / [q1].[Duration] as [Speed],
    [roads].[SpeedLimit]
FROM [q1]
JOIN [roads] ON [q1].[Street] = [roads].[Street]

La sortie comporte une colonne avec la vitesse constatée et une avec la vitesse maximale autorisée. En ajoutant une clause WHERE comme ci-après, on obtient les excès de vitesse.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [events].[Plate],
    LAG(Street) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)) as [Street],
    DATEDIFF(second, LAG(Timestamp) OVER (PARTITION BY [events].[Plate] LIMIT DURATION(hour, 1)), [events].[Timestamp]) as [Duration]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE IsFirst(hour, 1) OVER (PARTITION BY [events].[Plate]) = 0),

[q2] AS
(SELECT System.Timestamp as [Timestamp],
    [q1].[Plate],
    [q1].[Street],
    3.6 * [roads].[Length] / [q1].[Duration] as [Speed],
    [roads].[SpeedLimit]
FROM [q1]
JOIN [roads] ON [q1].[Street] = [roads].[Street]
WHERE 3.6 * [roads].[Length] / [q1].[Duration] > [roads].[SpeedLimit] * 1.1)

SELECT CONCAT('Monsieur ',
        [drivers].[Name],
        ', Il a été constaté qu''avec votre ',
        [drivers].[Car],
        ' immatriculée ',
        [drivers].[Plate],
        ' vous avez été repéré ',
        [q2].[Street],
        ' le ',
        SUBSTRING(CAST(System.Timestamp as nvarchar(max)), 0, 11),
        ' à ',
        SUBSTRING(CAST(System.Timestamp as nvarchar(max)), 12, 8),
        ', circulant à une vitesse de ',
        [q2].[Speed],
        'km/h au lieu des ',
        [q2].[SpeedLimit],
        'km/h autorisés.') as Message,
        [drivers].[address]
FROM [q2]
JOIN [drivers] ON [q2].[Plate] = [drivers].[Plate]
Message Address
Monsieur Dominic Toretto, Il a été constaté qu’avec votre Dodge Charger immatriculée AA-123-AA vous avez été repéré RueTolbiac le 2016-02-22 à 14:16:25, circulant à une vitesse de 60km/h au lieu des 50km/h autorisés. 1492 N Glendale Blvd, Los Angeles

Le message est formaté, l’adresse prête à être imprimée sur l’enveloppe, le traitement automatisé de constatation des infractions routières est opérationnel !

Cas concret #4 : Compter les voitures

Stream Analytics ne couvre malheureusement pas tous les cas d’utilisation. Prenons le cas où l’on souhaite compter le nombre de véhicules dans une rue.
Il est égal à la différence entre le nombre de voitures entrées et le nombre de voitures sorties. Puisqu’on a uniquement les entrées, il faut également en déduire les sorties, d’où le UNION dans la requête suivante.


WITH [q1] AS
(SELECT
    System.Timestamp as [Timestamp],
    [Plate],
    [Street],
    1 as [In],
    0 as [Out]
FROM [events] TIMESTAMP BY [Timestamp]
UNION
SELECT
    System.Timestamp as [Timestamp],
    [Plate],
    LAG([Street]) OVER (PARTITION BY [Plate] LIMIT DURATION(hour, 1)) as [Street],
    0 as [In],
    1 as [Out]
FROM [events] TIMESTAMP BY [Timestamp]
WHERE ISFIRST(hour, 1) OVER (PARTITION BY [Plate]) = 0)

SELECT 
    System.Timestamp as [Timestamp],
    [Street],
    SUM([In]) as [In],
    SUM([Out]) as [Out],    
    SUM([In]) - SUM ([Out]) as Delta
FROM [q1]
GROUP BY [Street], TumblingWindow(minute, 5)
windowstart windowend street in out delta
14:10:00.000Z 14:15:00.000Z QuaiPanhardLevassor 2 0 2
14:15:00.000Z 14:20:00.000Z AvenueFrance 1 0 1
14:15:00.000Z 14:20:00.000Z QuaiPanhardLevassor 0 2 -2
14:15:00.000Z 14:20:00.000Z RueFrigos 1 0 1
14:15:00.000Z 14:20:00.000Z RueTolbiac 2 2 0

Cependant, on peut s’apercevoir que ces valeurs ne sont que relatives à la période. Il n’existe aucun moyen de faire un aggrégateur de type count(t) = count(t-1) + delta qui nous permettrait d’avoir le nombre absolu de voitures.

Architecture Lambda

En BigData, on parle souvent d’architecture lambda. Aucun rapport avec ce cher Gordon Freeman, le lambda indique une bifurcation pour les données.

Le premier chemin est le chemin froid, c’est à dire celui qui archive la donnée pour pouvoir travailler dessus ultérieurement (par un batch nocturne, par exemple).

Le chemin chaud est le chemin emprunté par la donnée lorsqu’elle est traitée en temps réel. Ce qui est le cas ici, avec Stream Analytics.

Cependant, développer du code de plomberie qui, pour un évènement, l’historisera dans un stockage et qui le fera suivre pour le traitement temps réel n’apporte pas de valeur ajoutée (voire même est source de bugs).

Stream Analytics répond à cette problématique de manière élégante puisqu’il supporte plusieurs stockages permettant d’historiser (Azure Storage, DocumentDB, etc.). Comme il est possible de définir plusieurs sorties au sein d’une même requête, une architecture lambda est vraiment simple à mettre en place !


SELECT *
INTO [coldpath] /* this can be a DB, storage, etc. */
FROM [events]

SELECT DateAdd(minute,-5,System.TimeStamp) as WindowStart,
    System.Timestamp as WindowEnd,
    COUNT(*) 
INTO [hotpath] /* this can be another hub, a queue, etc. */
FROM [events] TIMESTAMP BY [Timestamp]
GROUP BY TumblingWindow(minute, 5)

Voilà, une architecture lambda en quelques lignes !

Conclusion

Nous avons vu dans cet article les possibilités d’Azure Stream Analytics, son intégration avec notamment Event Hub et quelques cas concrets très intéressants.
Outil très puissant, nous verrons dans les articles suivants son utilité dans le reste de la chaîne BigData, et notamment son intégration avec PowerBI pour des dashboards interactifs et avec Azure Machine Learning pour pousser encore plus loin la détection d’anomalies.