Architecture et programmation réactives avec Akka et Scala – Partie 3
Précédemment dans le deuxième blog post de la série “Architecture et programmation réactives avec Akka et Scala”, on avait exploré Akka Streams
dans la pratique en introduisant son architecture de base fonctionnant grâce au système d’acteur et en présentant les notions de modularité et compositions des étages grâce à des exemples de code.
Dans cette troisième et dernière partie, on va se pencher sur un cas d’utilisation concret : faire fonctionner un flux d’exécution réactif prenant comme point de départ un simple fichier texte.
Manipulation réactive sur un fichier
Pour cet exemple, considérons un scénario simple :
- Dans une première étape on va lire le contenu d’un fichier depuis le disque et effectuer les actions nécessaires pour compter le nombre de caractères par ligne lue.
- Dans une seconde étape on va écrire le résultat obtenu dans un autre fichier qui indiquera par un message le nombre de caractères trouvé pour telle ou telle ligne.
L’esprit de cet exercice est de mieux cerner le style de programmation réactive car le même résultat peut très bien être obtenu avec un style impératif ou fonctionnel.
Le but étant d’appréhender comment façonner un mini système réactif avec une architecture simple.
Reformulons autrement les étapes de notre exemple :
- Lire le contenu depuis un fichier origine résidant sur le disque.
- Compter le nombre de caractères pour chaque ligne composant ce fichier
- Écrire le résultat obtenu pour chaque ligne dans un fichier cible
Code de la manipulation
On commence par traduire l’énoncé précédent en code Scala
, par la suite on décortique chaque ligne de code afin d’expliquer le rôle qu’elle joue :
val srcFilePath = Paths.get("local_resources/source-file.txt") val linesSource = FileIO.fromPath(srcFilePath) .mapConcat(identity) // byteStr => byteStr .map(byte => byte.toChar) .splitAfter( ch => ch == '\n') .filterNot(ch => ch =='\n') .fold(0)( (acc,ch) => acc + 1 ) .concatSubstreams .zipWithIndex .map(tpl => s"Line ${tpl._2 + 1} has ${tpl._1} Chars") .map(str => ByteString.fromString(str + "\n")) val snkFilePath = Paths.get("local_resources/sink-file.txt") val linesSink = FileIO.toPath(snkFilePath) val runResult = linesSource.runWith(linesSink)
- Ligne 1
Scala
étant un langage de la familleJVM
, il accepte et intègre naturellement tout code écrit enJava
.
Grâce à cette propriété, on a utilisé dans cette ligne l’utilitairePaths
disponible nativement dansJava 8
pour retrouver un fichier depuis son chemin d’accès.
En passant en argument à la méthodeget
le chemin"local_resources/source-file.txt"
du fichier origine on obtient en retour lePath
de celui-ci.
- Ligne 2
FileIO
est un utilitaire deAkka Streams
qui permet de créer des flux à partir d’un objetPath
représentant un chemin d’accès vers un fichier. On fait appel dans cette ligne à la méthodefromPath
avec comme argument lePath
créé précédemment.
Le résultat retourné sera de typeSource
, pour être plus précis il s’agira d’unSource[ByteString]
.ByteString
est une classe particulière deAkka
dont le but est d’apporter une structure de donnée optimisée pour les accès et les opérations sur les listes/suites d’octet (Byte
).
Pour rendre la chose plus claire on pourra dire qu’on va avoir affaire à un objet de type<strong>Source[Seq[Byte]]</strong>
.
- Ligne 3
La méthodemapConcat
peut être vue comme l’homologue deflatMap
desStream
s deJava 8
.
C’est-à-dire simap
retourne un résultat dont le type estGen[T]
,flatMap
permet d’apporter un effet de désencapsulation en retournant uniquement<strong>T</strong>
.
Par analogie, dans notre cas, si on appliquait uniquementmap
à laSource[ByteString]
(équivalent àSource[Seq[Byte]]
) on ne pourrait pas accéder aux objetsByte
.
On fait appel alors à la méthodemapConcat
qui retourneraSource[Byte]
en aval du flux (au lieu de se contenter juste deSource[Seq[Byte]]
).identity
n’est qu’une sorte de raccourci dansScala
pour l’expression lambdax ⇒ x
. On la passe en argument car on n’a pas besoin d’apporter une modification surByteString
, on ne fera qu’extraire les objets de typeByte
grâce àmapConcat
.
- Ligne 4
Puisque on opère désormais sur des objetsByte
on appellemap
pour convertir cesByte
en caractères avec la méthodetoChar
.
- Ligne 5
L’appel àsplitAfter
permet, suite à l’évaluation d’un prédicat (expression retournant un booléen), de scinder le flux en cours en plusieurs sous flux.
⇒ Si le prédicat retournetrue
pour l’élément en cours alors tous les éléments suivants seront émis dans un nouveau sous flux.
⇒ Si le prédicat retournefalse
alors l’élément en cours restera dans le même sous flux.
Pour notre cas de figure, étant donné qu’on va procéder ligne par ligne, il faudra placer chaque ligne dans un sous flux.
Pour ce faire, il faudra vérifier grâce au caractère de fin de ligne\n
qu’on est bien passé sur tous les caractères de la ligne courante et qu’à la prochaine émission on entamera un nouveau sous flux avec la ligne suivante.
- Ligne 6
À partir de ce stade, toutes les opérations seront effectuées sur les sous flux en cours d’exécution. On ne prend plus en compte le flux parent.filterNot
va grâce à un prédicat filtrer les objets transitant et ne garder que ceux qui ne satisfont pas la condition du prédicat.
Ici on éjecte de chaque sous flux le caractère de fin de ligne\n
.
- Ligne 7
L’opérationfold
va s’exécuter sur chaque sous flux dans lequel sera émis caractère par caractère le contenu d’une ligne donnée.
La pièce la plus importante dans l’opérationfold
est la fonction(acc,ch) ⇒ acc + 1
. Passée en argument elle prend comme paramètres un accumulateuracc
et l’objetch
de typeChar
transitant actuellement dans le sous flux. Ici pour chaque objetChar
reçu on va incrémenter de1
la valeur de l’accumulateuracc
puis renvoyer la valeur résultante.
Cette valeur résultante sera affectée àacc
lors du prochain traitement. Donc une fois le sous flux consommé, l’accumulateuracc
contiendra le nombre total des objets qui y ont transité.
On peut déjà conclure queacc
sera nécessairement de typeInt
.
Si on a bien suivi l’explication on doit se demander quelle sera la valeur initiale deacc
? C’est tout simplement indiqué avec le premier argument defold
: il s’agit de0
.
Le résultat final retourné parfold
étant un objet de typeInt
, on aura en aval un flux sous la forme d’unSource[Int]
.
- Ligne 8
On fusionne avecconcatSubstreams
les sous flux afin de retrouver un flux unique formé par les objets qui étaient contenus dans chaque sous flux.
À noter qu’on conserve l’ordre lors de cette fusion : le énième sous flux créé sera le énième à être fusionné. Ce qui veut dire qu’on conserve l’ordre avec lequel ont été lues les lignes du fichier origine.
Cet étage va émettre des objetsInt
dont la valeur n’est autre que le nombre de caractères de chaque ligne.
- Ligne 9
L’opérationzipWithIndex
est assez simple : elle permet de regrouper dans un couple (tuple de 2 éléments) chaque objet avec son ordre ou index dans le flux (comme le ferait le zip d’une ouverture éclair).
Puisqu’on a conservé l’ordre dans l’opération précédente, chaque index va correspondre en fin de compte au numéro de la ligne du fichier origine.
- Ligne 10
Désormais le flux va opérer sur des tuples composés chacun d’un premierInt
correspondant à la longueur en caractères d’une ligne et d’un secondInt
correspondant au numéro de la ligne.
On utilise la variable tupletpl
pour construire un objetString
formant un message ayant le format suivantLine (idx) has (n) Chars
.
⇒idx sera remplacé par le second élément du tupletpl._2 + 1
(On ajoute+ 1
car l’index commence à la valeur0
).
⇒n sera remplacé par la longueur de la ligne en caractères qu’on extrait de tpl._1
On remarquera l’utilisation des habituels préfixes
et expression d’interprétation${ … }
pour la construction de l’objet String.
- Ligne 11
Lors de cette dernière opération sur l’étageSource
de départ on reconstruit unByteString
depuis les objetsString
s en amont, l’objectif étant de préparer le terrain à l’étape d’écriture dans le fichier cible.
Car identiquement à la lecture du fichier origine qui renvoie uneSource[ByteString]
, l’écriture requiert d’avoir un étageSink[ByteSource]
.
Ne pas oublier de concaténer le caractère de fin de ligne\n
pour inscrire un message par ligne dans le fichier cible.
- Ligne 14
On définit lePath
du fichier cible depuis le chemin d’accès"local_resources/sink-file.txt"
.
- Ligne 15
On instancie un objetSink
pointant vers lePath
précédent et ce grâce toujours à l’utilitaireFileIO
.
- Ligne 18
On raccordelinesSource
de typeSource[ByteString]
qu’on a construit depuis le fichier source aveclinesSink
de typeSink[ByteString]
qu’on vient d’instancier.
Encore une nouvelle méthode introduite :runWith
. Elle fait en sorte d’exécuter directement le flux avec leSink
passé en argument et aussi de garder la valeur de matérialisation de ce dernier.
Autrement dit, pour un objetSink
nommésnk
l’appelrunWith(snk)
est équivalent àtoMat(snk)(Keep.right).run()
.
Après ces longues explications, il est temps d’exécuter l’analyse du fichier source source-file.txt
et de “savourer” le résultat obtenu dans le fichier cible sink-file.txt
😃
source-file.txt
AAAAA BBBBBBB CCCCCCCC DDDDDDDDDD
sink-file.txt
Line 1 has 5 Chars Line 2 has 7 Chars Line 3 has 8 Chars Line 4 has 10 Chars
Si c’est plutôt satisfaisant comme résultat on ne pourra nier que l’explication et l’analyse du code restent assez longues.
Rien de tel alors qu’une bonne illustration claire et démonstrative pour aider à la digestion de toutes ces informations :
Récapitulation et horizons
Malgré son air de complexité, la programmation réactive avec Akka
peut devenir rapidement un outil simple à utiliser une fois les notions introduites bien appréhendées.
La construction d’une application ou d’un système réactif répondra à une méthodologie plutôt intuitive en manipulant des étages ou modules à raccorder pour former une sorte de chaîne de production industrielle.
Se basant sur le système d’acteur enveloppant lui-même un pool de thread
s Java
natifs, Akka
apporte une exécution multi-thread
ée qui garde les avantages du parallélisme et élimine les inconvénients du blocage. On parlera alors d’exécution asynchrone.
Les étages Source
, Flow
et Sink
représentent les pierres angulaires dans l’élaboration de l’architecture d’un système réactif, qu’elle soit simple (i.e. graphe linéaire) ou complexe (i.e. graphe/réseau).
Ces étages ont la particularité de se raccorder intégralement ou partiellement pour former des modules composés pouvant être partagés et réutilisés par de tierces parties puisqu’ils ne sont que l’image ou le calque d’un flux et non pas son exécution.
Lors de cette présentation, l’illustration du système réactif s’était restreinte à un périmètre local dans lequel si une application existait, elle n’interagirait qu’avec un nombre limité d’utilisateurs et surtout avec aucun système extérieur.
Sur le terrain, une réelle application réactive est censée évoluer dans un écosystème réactif où elle sollicite certaines applications et est elle-même sollicitée par d’autres.
Globalement, les interactions sont réalisées grâce à des communications en HTTP
ce qui implique pour une application réactive :
- La réception de multiples requêtes
HTTP
formera un point de consommation de ces requêtes, donc il aura la forme d’unSink[HttpRequest]
- L’émission de multiples réponses
HTTP
formera un point de production de réponses, donc il aura la forme d’unSource[HttpResponse]
À partir de ces deux affirmations, en raccordant ces derniers Sink
puis Source
, on pourrait conclure qu’une application évoluant dans un écosystème réactif interagissant en HTTP
aura la forme d’un flux <strong>Flow[HttpRequest,HttpRespnse,_]</strong>
. Ensuite, entre la réception d’un objet HttpRequest
et l’émission d’un objet HttpResponse
, plusieurs étages peuvent se raccorder afin d’appliquer certains traitements requis par la logique du métier.
En étendant ce procédé sur tout un ensemble d’applications réactives, on commence ainsi à dessiner les traits d’une architecture ou d’un mode de construction d’un système réactif étendu composé par ce même ensemble d’applications réactives interagissant afin atteindre un but précis.
© SOAT
Toute reproduction interdite sans autorisation de la société SOAT