Débutant Intermédiaire

Apache Spark contre les lapins crétins

Parmi les outils du Big Data, Apache Spark est le Framework de calcul distribué horizontalement scalable, qui a su assurer la relève, et même dépasser son vénérable prédécesseur, le Framework MapReduce d’Hadoop… Petit tour d’horizon et première prise en main de l’outil, et pour faire écho au précédent article de Cécile sur MapReduce, nous remplacerons les petits poneys par une armée de lapins crétins.

Présentation

Apache Spark est un outil (plus précisément, une collection d’outils) utilisé pour le traitement massif de données. Il contient une librairie “core”, implémentant le paradigme map/reduce, massivement parallèle, au même titre qu’Hadoop MapReduce.

Sur ce framework core de Spark s’appuient un certain nombre de librairies plus haut niveau, en l’occurrence :

Dans cet article, nous nous concentrerons sur la brique core d’Apache Spark.

Historique de Spark

Le projet Spark est né en 2009 dans les laboratoires de recherches du MIT. Opensourcé en 2010, il a rejoint la fondation Apache en 2013. C’est également à cette occasion qu’est née la société Databricks, source principale de contributions au projet opensource. Devenu rapidement un des projets les plus actifs, Spark est aujourd’hui l’un des projets “top level” de la fondation Apache.

Sous le capot

Langage

La programmation fonctionnelle est particulièrement bien adaptée à la manipulation de collections de données ; rien de surprenant donc à ce que /scala ait été choisi pour écrire Spark. C’est donc également dans ce langage que nous pouvons écrire nos programmes Spark. Néanmoins, Java 8 (et les expressions lambda) sont également nativement supportés, ainsi que Python, et R.

Performance et tolérance aux pannes

Les programmes Spark s’appuient sur le concept de RDD (Resilient Distributed Datasets). Même s’il est un peu simpliste de résumer ainsi ce qu’est un RDD, on pourrait le comparer à une vue logique d’une collection de données distribuées, offrant des capacités de tolérance à la panne et de résilience, et permettant à la demande de l’utilisateur :
– la manipulation de ces données via une API (transformation, filtrage, réduction).
– la persistance (sur disque ou RAM) de résultats intermédiaires, permettant leur réutilisation dans les étapes successives d’un même calcul.
– un partitionnement personnalisé de ces données, permettant d’optimiser leur localisation dans le cluster.

L’exploitation de ces capacités par le moteur d’exécution de Spark, ainsi que l’utilisation d’un graphe orienté acyclique pour établir le plan d’exécution des requêtes de transformation de données, aboutirait à des performances jusqu’à 100 fois supérieures à celle d’Hadoop MapReduce !

Mode d’exécution

Il existe plusieurs modes d’exécution de Spark. Bien entendu, l’objectif étant le Big Data, la façon nominale d’utiliser Spark en production sera :
Sur un cluster Spark d’une part, à l’aide d’un gestionnaire de cluster (Spark standalone, YARN , MESOS ou Amazon EC2), tirant profit de la parcellisation des traitements, de la tolérance aux pannes et de la résilience du cluster.
Et d’autre part, depuis un système de stockage distribué (HDFS, HBase, Cassandra…)

Mais même sans disposer d’un cluster Spark, ni d’un système de fichiers distribués , il est possible d’exécuter votre premier programme Spark “en local” :
– pour réaliser un POC
– ou même en pratique, durant la phase de développement

Enfin, on notera que les packages de Spark fournissent aussi un /shell Spark (REPL), permettant d’écrire et d’interpréter des commandes en scala à la volée (très pratique pour tester rapidement l’API).

Les mains dans le cambouis

Après ces quelques notions théoriques de base sur Spark, passons à présent à la pratique. Nous allons écrire notre premier programme Spark en java ; et pour changer du traditionnel “Hello world” consistant à compter le nombre d’occurrences du mot “Spark” dans sa documentation, nous allons rechercher le lapin le plus crétin de tous les lapins crétins !

Le problème

Les cuniculophiles le savent bien, les lapins crétins sont légion, mais tous différents. Nous allons nous baser sur un jeu de données (un très gros fichier texte) les recensant, et les décrivant :

lapins-cretins.txt (1 lapin par ligne) :

petit lapin crétin hurlant déguisé en pilote d essai avec des menottes dans la main gauche et une truelle dans la main droite
grand lapin crétin hurlant déguisé en canard avec une passoire entre les dents et une clef à molette sous le bras
grand lapin crétin qui fait bwaaaa déguisé en lapin de ménage avec un canard en plastique dans la main 
...

(pour ce POC, nous générons un fichier lapins-cretins.txt de test, en utilisant un programme dont les sources sont ici)

Pour déterminer le lapin le plus crétin, il faudra pour chaque lapin, appliquer un algorithme de scoring calculant le niveau de crétinisme, pour en déduire le lapin ayant le score maximum. Pour cet exercice, l’algorithme comptera le nombre d’occurrences des lettres ‘c’, ‘r, ‘é ‘t, ‘i, ‘n’ dans chaque ligne du fichier texte :

private static List lettres = Arrays.asList(‘c’, ‘r’, ‘é’, ‘t’, ‘i’, ‘n’);

public static Integer computeScore(String nom) {
  int score = 0;
  for (int i = 0; i < nom.length(); i++) {
     if (lettres.contains(nom.charAt(i))) 
        score++;
  }
  return score;
}

Solution simple en java standard

Si l’on ne prenait pas en compte la forte volumétrie, ce problème aurait une solution simple, consistant à :

  • calculer le score pour chaque lapin de la collection (opération parallélisable)
  • réduire la collection en recherchant le score maximal

On pourrait ainsi écrire en java core (voir Sources complètes ici) :

@AllArgsConstructor 
public class LapinScore implements Comparable  {
                String nom;
        Integer score;

        public static Integer computeScore(String nom) {
           ...
        }

        @Override
        public int compareTo(LapinScore other) {
            return score.compareTo(other.score);
        }

        public static LapinScore max(LapinScore l1, LapinScore l2) {
            return (l1.compareTo(l2) > 0) ? l1 : l2;
        }

    }

public class LapinLePlusCretinWithJava {

   public static void main(String[] args) throws IOException {
       long deb = System.currentTimeMillis();
       Stream lineStream = Files.lines(Paths.get("/home/my/dev/", "lapins-cretins.txt"));

       LapinScore best = lineStream
                           .parallel()
                           .map(line -> new LapinScore(line, LapinScore.computeScore(line)))
                           .reduce(LapinScore::max)
                           .get();

       long fin = System.currentTimeMillis();

       System.out.println(best.nom +  "\nscore : " + best.score);
       System.out.println("duration : " + (fin - deb) + " ms");
   }

}

Cette solution fonctionne, et nous donne le résultat suivant (voir logs) :

Le plus crétin des lapins crétins est :
petit lapin crétin qui fait daaaaaaaaaaa déguisé en pilote de tricycle avec une tronconneuse entre les dents et une balise de chantier dans la main droite
avec un score de : 52
duration : 6221 ms

Celle-ci a néanmoins pour ressource limitante le nombre de CPU de la machine sur laquelle tourne le programme (chaque transformation map() calculant le score peut s’exécuter en parallèle). Le temps de traitement sera donc au final proportionnel à la taille du fichier texte en entrée.

Solution Spark

Pour résoudre ce problème en un temps ”maîtrisable”, même avec un très grand nombre de lapins crétins, il suffit de paralléliser le calcul sur un cluster, scalable horizontalement. Spark va nous permettre d’appliquer le même algorithme que précédemment, à la différence près qu’au lieu de se limiter à une parallélisation sur les cœurs d’une machine, il pourra le faire sur l’ensemble des machines du cluster Spark.

Ecriture du programme Spark

La mauvaise nouvelle est que pour y parvenir, il nous faut réécrire le programme en utilisant l’API Spark. La bonne nouvelle (et c’est aussi une des clés du succès de Spark parmi la communauté des développeurs Big data), c’est que ’API de Spark est très “closure friendly”, comme peut l’être l’API Stream de java 8 ; la réécriture sera donc quasi à l’identique :

public class LapinLePlusCretinWithSparkLocal {

   public static void main(String[] args) throws IOException {
       long deb = System.currentTimeMillis();

       // configuration pour une execution locale sur autant de threads que de cœurs
       SparkConf conf = new SparkConf().setAppName("programme spark recherchant le Lapin crétin le plus crétin");
       conf.setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);

       // init un RDD depuis un fichier txt
       String lapinsFile = "/home/my/dev/lapins-cretins.txt";
       JavaRDD lapinsData = sc.textFile(lapinsFile);

       // execute un job spark (transformation puis action)
       LapinScore best = lapinsData.map(nom -> new LapinScore(nom, LapinScore.computeScore(nom)))
                                   .reduce(LapinScore::max);

       long fin = System.currentTimeMillis();

       System.out.println("Le plus crétin des lapins crétins est :\n" + best.nom + "\navec un score de : " + best.score);
       System.out.println("duration : " + (fin - deb) + " ms");
   }

(Sources complètes ici)

On constate que la partie “intelligente” du programme n’a pas changé (map, reduce). On remarquera cependant les 2 différences suivantes :

  • l’initialisation d’un contexte Spark bien sûr, en début de programme. C’est via ce contexte que l’on configure l’exécution du job. Pour ce POC, nous avons configuré une exécution locale (master “local”), sur tous les cœurs de la machine (“[*]”). Nous n’avons ainsi pas besoin d’un cluster Spark à disposition pour tester notre programme.
  • L’utilisation d’un type JavaRDD, implémentant l’abstraction d’une possible distribution des données sur un cluster, vu comme une seule et même collection.

Exécution du programme Spark en local

Nous avons écrit un programme Spark “autoportant”, qui peut s’exécuter simplement en local par la méthode main() depuis l’IDE, sans avoir besoin d’un autre process Spark local ou extérieur. Une simple dépendance vers les jars de Spark est nécessaire pour l’exécuter.

Durant l’exécution du programme (voir logs ), on notera en particulier qu’une console web de monitoring est démarrée :

17/04/27 21:57:12 INFO Utils: Successfully started service 'SparkUI' on port 4040.17/04/27 21:57:12 INFO SparkUI: Started SparkUI at http://192.168.1.11:4040

Cette console nous donne l’état d’avancement du job à l’instant t :
Nous pouvons voir également que les calculs de score d’un lapin ont été parallélisés sur un pool d “executors” ; le fichier d’entrée est splitté puis chaque bloc est traité dans une task :

17/04/27 21:57:13 INFO DAGScheduler: Submitting 29 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at LapinLePlusCretinWithSpark.java:86)
17/04/27 21:57:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 29 tasks
...
17/04/27 21:57:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2138 bytes)
17/04/27 21:57:13 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2138 bytes)
…
17/04/27 21:57:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/04/27 21:57:13 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
…
17/04/27 21:57:13 INFO HadoopRDD: Input split: file:/home/my/dev/lapins-cretins.txt:33554432+33554432
17/04/27 21:57:13 INFO HadoopRDD: Input split: file:/home/my/dev/lapins-cretins.txt:100663296+33554432
…
17/04/27 21:57:16 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2407 bytes result sent to driver
17/04/27 21:57:16 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2416 bytes result sent to driver
…
17/04/27 21:57:34 INFO TaskSetManager: Finished task 28.0 in stage 0.0 (TID 28) in 623 ms on localhost (29/29)

On peut ensuite voir l’action de réduction qui recherche le max parmi tous les scores :

17/04/27 21:57:34 INFO DAGScheduler: ResultStage 0 (reduce at LapinLePlusCretinWithSpark.java:87) finished in 21,459 s
17/04/27 21:57:34 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/27 21:57:34 INFO DAGScheduler: Job 0 finished: reduce at LapinLePlusCretinWithSpark.java:87, took 21,528097 s

Enfin, le driver finit par nous donner le résultat de la réduction :

Le plus crétin des lapins crétins est :
petit lapin crétin qui fait daaaaaaaaaaa déguisé en pilote de tricycle avec une tronconneuse entre les dents et une balise de chantier dans la main droite
avec un score de : 52
duration : 24835 ms

Exécution du programme Spark sur un cluster

Il est très pratique de pouvoir exécuter un programme Spark en local pour tester rapidement son programme. Cependant, le but final est bien évidemment de paralléliser massivement l’exécution sur un très gros volume de données, en utilisant un cluster de machines.

Démarrage d’un cluster spark standalone

Nous allons utiliser un cluster Spark en mode Standalone. Celui-ci est constitué :

  • d’un master
  • de workers, ou slaves (en l’occurrence pour ce POC, nous nous contenterons d’un petit cluster de 2 workers)

Le master sert de point d’entrée pour toute demande d’exécution d’un job. Il a pour rôle de découper le travail en tasks, qu’il soumet aux workers (il sert d’orchestrateur). Plus il y a de workers, plus il y a de mémoire et de cœurs, plus le cluster est capable de paralléliser un job.

La communication au sein d’un cluster est basée sur l’utilisation du framework akka.

Chaque nœud (master et slaves) expose lui aussi une petite interface web de monitoring, donnant quelques informations sur le nœud, le cluster, et les jobs exécutés ou en cours d’exécution sur le cluster.

Nous allons donc démarrer notre cluster Spark standalone.

Tout d’abord, démarrons le nœud master sur la machine octopus (192.168.1.11) :

$SPARK_HOME/sbin/start-master.sh --host 192.168.1.11 --port 7077

(voir logs du master)

Puis démarrons un Worker sur chacune des machines du cluster :

  • Sur kraken (192.168.1.13) :
    $SPARK_HOME/sbin/start-slave.sh spark://192.168.1.11:7077
    

    (voir logs du worker 1)

  • Sur octopus (192.168.1.11) :

    $SPARK_HOME/sbin/start-slave.sh spark://192.168.1.11:7077
    

    (voir logs du worker 2)

NB : dans ce POC, octopus héberge à la fois le master et un worker.

Exécution du programme Spark (driver)

L’exécution d’un programme Spark sur le cluster se fait via l’outil spark-submit, qui joue le rôle de client du nœud master (“driver”) en y envoyant le programme :

spark-submit --class fr.soat.LapinLePlusCretinWithSparkCluster --master spark://192.168.1.11:7077 target/best-lapin-cretin-spark-1.0-SNAPSHOT.jar 

On précisera a spark-submit :

  • le programme driver à exécuter avec l’option –class (voir sources du programme LapinLePlusCretinWithSparkCluster)
  • le jar dans lequel se trouve ce programme (on construira un uber-jar, à l’exception des libs Spark, qui sont déjà présentes dans le cluster)
  • l’URL du master du cluster cible, avec l’option –master. L’utilisation d’une URL spark:// précise à Spark que le cluster cible est un cluster Spark standalone.
Résultat

Nous pouvons constater à travers les logs du master et des workers, que le travail a une fois encore été découpé en tasks, mais qu’elles ont été exécutées par et sur les workers (le fichier lapins-cretins.txt ayant été préalablement copié “à la main” sur le disque local de chaque nœud)

Une fois l’exécution terminée (voir logs du driver), le résultat du calcul effectué sur le cluster est retourné au driver et s’affiche :

17/04/28 21:50:05 INFO DAGScheduler: ResultStage 0 (reduce at LapinLePlusCretinWithSparkCluster.java:91) finished in 11,452 s
17/04/28 21:50:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
17/04/28 21:50:05 INFO DAGScheduler: Job 0 finished: reduce at LapinLePlusCretinWithSparkCluster.java:91, took 11,589377 sLe plus crétin des lapins crétins est :
petit lapin crétin qui fait daaaaaaaaaaa déguisé en pilote de tricycle avec une tronconneuse entre les dents et une balise de chantier dans la main droite
avec un score de : 52
duration : 14466 ms

Conclusion

Nous venons de voir à travers cet exemple comment utiliser Apache Spark et son API core pour effectuer un calcul distribué sur un gros volume de données. Nous avons en outre constaté que le temps de traitement diminue en utilisant un cluster Spark. Le gain reste cependant modeste — d’autant plus, en comparaison de la solution java / API Stream parallèle ! — et cela pour 2 raisons :

  • l’utilisation de Spark est ici disproportionnée pour un fichier d’entrée de seulement 1Go (soit environ 7 millions de lignes) ; les volumes habituellement traités dans un programme Spark se comptant plutôt en tera ou petaoctets. Sur un aussi petit volume de données, l’overhead induit par Spark (découpage en tâches, agrégation des résultats intermédiaires, appels réseau…) n’est pas ou peu contrebalancé par le gain inhérent à la parallélisation sur le cluster.
  • nous avons utilisé un tout petit cluster (2 workers) ; le niveau de parallélisation est donc très faible. Un cluster minimal aurait plutôt une taille de 5 à 20 workers, jusqu’à quelques centaines pour les plus gros (le plus gros cluster Spark identifié par la fondation Apache ayant été utilisé à ce jour aurait atteint la taille de 8000 workers !)

Bien entendu, il s’agissait ici d’un POC, mais on imagine bien les possibilités que peut offrir un “vrai “cluster Spark, dans un contexte réellement “big data” (notamment la facilité que l’on aura à scaler horizontalement en ajoutant des workers dans le cluster pour traiter plus de données).

Cependant : ne perdons pas de vue le coût intrinsèque et la complexité liée à toute architecture distribuée en production :

  • la gestion et l’administration du cluster (l’allocation des ressources, la gestion des droits, de la sécurité…).
  • la maintenance des nœuds et les problématiques devops (mises à jour OS, JRE, Spark, etc.).
  • les besoins en monitoring (suivi des jobs, mais aussi charge et santé du cluster et de chaque nœud : utilisation disque, pannes hardware, coupures réseau…).
  • l’accès aux données : par un système de fichiers distribués (HDFS, S3…) ou non (NFS), une base noSQL ou une simple copie réseau (scp, rsync…).

Enfin, et pour conclure sur le sujet, voici 5 bonnes raisons d’adopter Spark comme moteur de calcul distribué :

  • son API plus simple et plus riche que MapReduce, avec ses paradigmes fonctionnels.
  • de meilleures performances.
  • le support de différents langages (/scala, Java , Python, R).
  • son adoption massive depuis quelques années et sa communauté active (même s’il reste à ce jour une grosse part de projets legacy utilisant Hadoop MapReduce)
  • et enfin, son écosystème riche (SparkSQL, MLib… que nous avons rapidement mentionné) et son intégration aux outils les plus populaires du big data (HDFS, Cassandra, HBase, YARN, Mesos, Kafka… Pour n’en citer que quelques uns)

© SOAT
Toute reproduction interdite sans autorisation de la société SOAT

Nombre de vue : 203

AJOUTER UN COMMENTAIRE