Accueil Nos publications Blog Architecture et programmation réactives avec Akka et Scala – Partie 3

Architecture et programmation réactives avec Akka et Scala – Partie 3

Précédemment dans le deuxième blog post de la série “Architecture et programmation réactives avec Akka et Scala”,  on avait exploré Akka Streams dans la pratique en introduisant son architecture de base fonctionnant grâce au système d’acteur et en présentant les notions de modularité et compositions des étages grâce à des exemples de code.
Dans cette troisième et dernière partie, on va se pencher sur un cas d’utilisation concret : faire fonctionner un flux d’exécution réactif prenant comme point de départ un simple fichier texte.

Manipulation réactive sur un fichier

Pour cet exemple, considérons un scénario simple :

  • Dans une première étape on va lire le contenu d’un fichier depuis le disque et effectuer les actions nécessaires pour compter le nombre de caractères par ligne lue.
  • Dans une seconde étape on va écrire le résultat obtenu dans un autre fichier qui indiquera par un message le nombre de caractères trouvé pour telle ou telle ligne.

L’esprit de cet exercice est de mieux cerner le style de programmation réactive car le même résultat peut très bien être obtenu avec un style impératif ou fonctionnel.
Le but étant d’appréhender comment façonner un mini système réactif avec une architecture simple.

Reformulons autrement les étapes de notre exemple :

  1. Lire le contenu depuis un fichier origine résidant sur le disque.
  2. Compter le nombre de caractères pour chaque ligne composant ce fichier
  3. Écrire le résultat obtenu pour chaque ligne dans un fichier cible

Code de la manipulation

On commence par traduire l’énoncé précédent en code Scala, par la suite on décortique chaque ligne de code afin d’expliquer le rôle qu’elle joue :

val srcFilePath = Paths.get("local_resources/source-file.txt") 
val linesSource = FileIO.fromPath(srcFilePath) 
                          .mapConcat(identity) // byteStr => byteStr 
                          .map(byte => byte.toChar) 
                          .splitAfter( ch => ch == '\n') 
                          .filterNot(ch => ch =='\n') 
                          .fold(0)( (acc,ch) => acc + 1 ) 
                          .concatSubstreams 
                          .zipWithIndex 
                          .map(tpl => s"Line ${tpl._2 + 1} has ${tpl._1} Chars") 
                          .map(str => ByteString.fromString(str + "\n")) 


val snkFilePath = Paths.get("local_resources/sink-file.txt") 
val linesSink = FileIO.toPath(snkFilePath) 


val runResult = linesSource.runWith(linesSink)



  • Ligne 1
    Scala étant un langage de la famille JVM, il accepte et intègre naturellement tout code écrit en Java.
    Grâce à cette propriété, on a utilisé dans cette ligne l’utilitaire Paths disponible nativement dans Java 8 pour retrouver un fichier depuis son chemin d’accès.
    En passant en argument à la méthode get le chemin "local_resources/source-file.txt" du fichier origine on obtient en retour le Path de celui-ci.

  • Ligne 2
    FileIO est un utilitaire de Akka Streams qui permet de créer des flux à partir d’un objet Path représentant un chemin d’accès vers un fichier. On fait appel dans cette ligne à la méthode fromPath avec comme argument le Path créé précédemment.
    Le résultat retourné sera de type Source, pour être plus précis il s’agira d’un Source[ByteString].
    ByteString est une classe particulière de Akka dont le but est d’apporter une structure de donnée optimisée pour les accès et les opérations sur les listes/suites d’octet (Byte).
    Pour rendre la chose plus claire on pourra dire qu’on va avoir affaire à un objet de type <strong>Source[Seq[Byte]]</strong>.

  • Ligne 3
    La méthode mapConcat peut être vue comme l’homologue de flatMap des Streams de Java 8.
    C’est-à-dire si map retourne un résultat dont le type est Gen[T], flatMap permet d’apporter un effet de désencapsulation en retournant uniquement <strong>T</strong>.
    Par analogie, dans notre cas, si on appliquait uniquement map à la Source[ByteString] (équivalent à Source[Seq[Byte]]) on ne pourrait pas accéder aux objets Byte.
    On fait appel alors à la méthode mapConcat qui retournera Source[Byte] en aval du flux (au lieu de se contenter juste de Source[Seq[Byte]]).
    identity n’est qu’une sorte de raccourci dans Scala pour l’expression lambda x ⇒ x. On la passe en argument car on n’a pas besoin d’apporter une modification sur ByteString, on ne fera qu’extraire les objets de type Byte grâce à mapConcat.

  • Ligne 4
    Puisque on opère désormais sur des objets Byte on appelle map pour convertir ces Byte en caractères avec la méthode toChar.

  • Ligne 5
    L’appel à splitAfter permet, suite à l’évaluation d’un prédicat (expression retournant un booléen), de scinder le flux en cours en plusieurs sous flux.
    Si le prédicat retourne true pour l’élément en cours alors tous les éléments suivants seront émis dans un nouveau sous flux.
    Si le prédicat retourne false alors l’élément en cours restera dans le même sous flux.
    Pour notre cas de figure, étant donné qu’on va procéder ligne par ligne, il faudra placer chaque ligne dans un sous flux.
    Pour ce faire, il faudra vérifier grâce au caractère de fin de ligne \n qu’on est bien passé sur tous les caractères de la ligne courante et qu’à la prochaine émission on entamera un nouveau sous flux avec la ligne suivante.

  • Ligne 6
    À partir de ce stade, toutes les opérations seront effectuées sur les sous flux en cours d’exécution. On ne prend plus en compte le flux parent.
    filterNot va grâce à un prédicat filtrer les objets transitant et ne garder que ceux qui ne satisfont pas la condition du prédicat.
    Ici on éjecte de chaque sous flux le caractère de fin de ligne \n.

  • Ligne 7
    L’opération fold va s’exécuter sur chaque sous flux dans lequel sera émis caractère par caractère le contenu d’une ligne donnée.
    La pièce la plus importante dans l’opération fold est la fonction (acc,ch) ⇒ acc + 1. Passée en argument elle prend comme paramètres un accumulateur acc et l’objet ch de type Char transitant actuellement dans le sous flux. Ici pour chaque objet Char reçu on va incrémenter de 1 la valeur de l’accumulateur acc puis renvoyer la valeur résultante.
    Cette valeur résultante sera affectée à acc lors du prochain traitement. Donc une fois le sous flux consommé, l’accumulateur acc contiendra le nombre total des objets qui y ont transité.
    On peut déjà conclure que acc sera nécessairement de type Int.
    Si on a bien suivi l’explication on doit se demander quelle sera la valeur initiale de acc ? C’est tout simplement indiqué avec le premier argument de fold: il s’agit de 0.
    Le résultat final retourné par fold étant un objet de type Int, on aura en aval un flux sous la forme d’un Source[Int].

  • Ligne 8
    On fusionne avec concatSubstreams les sous flux afin de retrouver un flux unique formé par les objets qui étaient contenus dans chaque sous flux.
    À noter qu’on conserve l’ordre lors de cette fusion : le énième sous flux créé sera le énième à être fusionné. Ce qui veut dire qu’on conserve l’ordre avec lequel ont été lues les lignes du fichier origine.
    Cet étage va émettre des objets Int dont la valeur n’est autre que le nombre de caractères de chaque ligne.

  • Ligne 9
    L’opération zipWithIndex est assez simple : elle permet de regrouper dans un couple (tuple de 2 éléments) chaque objet avec son ordre ou index dans le flux (comme le ferait le zip d’une ouverture éclair).
    Puisqu’on a conservé l’ordre dans l’opération précédente, chaque index va correspondre en fin de compte au numéro de la ligne du fichier origine.

  • Ligne 10
    Désormais le flux va opérer sur des tuples composés chacun d’un premier Int correspondant à la longueur en caractères d’une ligne et d’un second Int correspondant au numéro de la ligne.
    On utilise la variable tuple tpl pour construire un objet String formant un message ayant le format suivant Line (idx) has (n) Chars.
    ⇒idx sera remplacé par le second élément du tuple tpl._2 + 1 (On ajoute + 1 car l’index commence à la valeur 0).
    ⇒n sera remplacé par la longueur de la ligne en caractères qu’on extrait de tpl._1
    On remarquera l’utilisation des habituels préfixe s et expression d’interprétation ${ …​ } pour la construction de l’objet String.

  • Ligne 11
    Lors de cette dernière opération sur l’étage Source de départ on reconstruit un ByteString depuis les objets Strings en amont, l’objectif étant de préparer le terrain à l’étape d’écriture dans le fichier cible.
    Car identiquement à la lecture du fichier origine qui renvoie une Source[ByteString], l’écriture requiert d’avoir un étage Sink[ByteSource].
    Ne pas oublier de concaténer le caractère de fin de ligne \n pour inscrire un message par ligne dans le fichier cible.

  • Ligne 14
    On définit le Path du fichier cible depuis le chemin d’accès "local_resources/sink-file.txt".

  • Ligne 15
    On instancie un objet Sink pointant vers le Path précédent et ce grâce toujours à l’utilitaire FileIO.

  • Ligne 18
    On raccorde linesSource de type Source[ByteString] qu’on a construit depuis le fichier source avec linesSink de type Sink[ByteString] qu’on vient d’instancier.
    Encore une nouvelle méthode introduite : runWith. Elle fait en sorte d’exécuter directement le flux avec le Sink passé en argument et aussi de garder la valeur de matérialisation de ce dernier.
    Autrement dit, pour un objet Sink nommé snk l’appel runWith(snk) est équivalent à toMat(snk)(Keep.right).run().



Après ces longues explications, il est temps d’exécuter l’analyse du fichier source source-file.txt et de “savourer” le résultat obtenu dans le fichier cible sink-file.txt 😃



source-file.txt

AAAAA
BBBBBBB
CCCCCCCC
DDDDDDDDDD

sink-file.txt

Line 1 has 5 Chars
Line 2 has 7 Chars
Line 3 has 8 Chars
Line 4 has 10 Chars



Si c’est plutôt satisfaisant comme résultat on ne pourra nier que l’explication et l’analyse du code restent assez longues.
Rien de tel alors qu’une bonne illustration claire et démonstrative pour aider à la digestion de toutes ces informations :





Récapitulation et horizons

Malgré son air de complexité, la programmation réactive avec Akka peut devenir rapidement un outil simple à utiliser une fois les notions introduites bien appréhendées.
La construction d’une application ou d’un système réactif répondra à une méthodologie plutôt intuitive en manipulant des étages ou modules à raccorder pour former une sorte de chaîne de production industrielle.

Se basant sur le système d’acteur enveloppant lui-même un pool de threads Java natifs, Akka apporte une exécution multi-threadée qui garde les avantages du parallélisme et élimine les inconvénients du blocage. On parlera alors d’exécution asynchrone.

Les étages Source, Flow et Sink représentent les pierres angulaires dans l’élaboration de l’architecture d’un système réactif, qu’elle soit simple (i.e. graphe linéaire) ou complexe (i.e. graphe/réseau).
Ces étages ont la particularité de se raccorder intégralement ou partiellement pour former des modules composés pouvant être partagés et réutilisés par de tierces parties puisqu’ils ne sont que l’image ou le calque d’un flux et non pas son exécution.

Lors de cette présentation, l’illustration du système réactif s’était restreinte à un périmètre local dans lequel si une application existait, elle n’interagirait qu’avec un nombre limité d’utilisateurs et surtout avec aucun système extérieur.
Sur le terrain, une réelle application réactive est censée évoluer dans un écosystème réactif où elle sollicite certaines applications et est elle-même sollicitée par d’autres.

Globalement, les interactions sont réalisées grâce à des communications en HTTP ce qui implique pour une application réactive :

  • La réception de multiples requêtes HTTP formera un point de consommation de ces requêtes, donc il aura la forme d’un Sink[HttpRequest]
  • L’émission de multiples réponses HTTP formera un point de production de réponses, donc il aura la forme d’un Source[HttpResponse]

À partir de ces deux affirmations, en raccordant ces derniers Sink puis Source, on pourrait conclure qu’une application évoluant dans un écosystème réactif interagissant en HTTP aura la forme d’un flux <strong>Flow[HttpRequest,HttpRespnse,_]</strong>. Ensuite, entre la réception d’un objet HttpRequest et l’émission d’un objet HttpResponse, plusieurs étages peuvent se raccorder afin d’appliquer certains traitements requis par la logique du métier.

En étendant ce procédé sur tout un ensemble d’applications réactives, on commence ainsi à dessiner les traits d’une architecture ou d’un mode de construction d’un système réactif étendu composé par ce même ensemble d’applications réactives interagissant afin atteindre un but précis.

© SOAT

Toute reproduction interdite sans autorisation de la société SOAT