Accueil Nos publications Blog RxJava : Écriture de code asynchrone

RxJava : Écriture de code asynchrone

RxJava LogoLes applications sont de plus en plus interconnectées. Une architecture type WOA (Web Oriented Architecture) et l’utilisation des micros-services nécessitent de faire de plus en plus d’appels à différents services web. Comment composer un résultat à partir de ces différents services dans un contexte asynchrone ?

RxJava offre une manière élégante de faire de l’asynchrone et de la composition au sein de son application. Regardons, à travers différents patterns d’utilisation, comment RxJava peut remplacer votre code asynchrone à base de Callback et autre CompletableFuture.

Appel Synchrone

Il existe différentes stratégies pour effectuer un appel à un service distant. Le plus simple à mettre en œuvre est de faire un appel synchrone. L’inconvénient de ce type d’appel est qu’il impose à votre application un couplage temporel fort. L’application distante va avoir un impact sur votre application, qui devra attendre la réponse du service. Ainsi, si ce service est lent, alors votre application sera lente.

List<Data> datas = service.getRemoteDatas();

Exemple d’appel synchrone

Les applications deviennent de plus en plus interconnectées : ces appels synchrones les dégraderont de plus en plus.

Appel Asynchrone

La solution est alors de réaliser des appels asynchrones : ces appels seront effectués dans un autre contexte, nous permettant de continuer à exécuter le code de l’application. Plusieurs approches sont possibles sur la plate-forme Java : l’utilisation de Future, l’utilisation d’API à base de Callback ou la nouvelle API CompletableFuture de Java 8. Chacune ayant des avantages et des inconvénients.

Les Futures sont simples d’utilisation mais vont se révéler complexes à orchestrer, pouvant bloquer trop tôt votre code. Les API basées sur des callbacks sont également faciles à mettre en œuvre. De plus par leurs designs, elles ne peuvent pas bloquer trop tôt. Par contre, il est complexe de chaîner ou de composer des appels asychrones via des callbacks. Généralement, cela aboutit à l’utilisation de Latch ou d’empilement de callbacks, aussi connu sous le nom de “Callback Hell”.

service.getData(dataA -> {
    service.getData(dataB -> {
        service.getData(dataC -> {
            // ...
        });
    });        
});

Exemple d’appel asynchrone à base de callback

La nouvelle API CompletableFuture est une évolution des Future de Java, permettant de chaîner des tâches, gérer des erreurs, …

RxJava, de par ses caractéristiques, couvre un champ d’action plus large que les CompletableFuture : cette bibliothèque est compatible Java 6 (et donc utilisable sur la plate-forme Android), possède une API riche et peut consommer un ensemble de valeurs.

Appel à base d’Observable

Le résultat d’un appel asynchrone peut être vu comme un événement: le résultat va être “émis” dès lors qu’il sera disponible. RxJava va observer et manipuler ces événements à travers la structure Observable.

On peut observer les événements émis via la méthode Subscribe, qui sera notifiée selon le contrat suivant :

onNext* (onError | onCompleted)?

L’observer peut être notifié une ou plusieurs fois d’un résultat, mais ne sera notifié qu’une seule fois à complétion de l’Observable, que ce soit dans un cas nominal ou par une erreur. Dans le cas de plusieurs appels à la méthode onNext, les appels seront séquentiels : chaque valeur pourra être consommée l’une après l’autre. Vous n’aurez pas, au niveau de cet callback, de synchronisation à gérer car elle aura été gérée en amont par RxJava, dans le but de respecter son contrat (un opérateur ne respectant pas ce contrat sera considéré comme buggé). Un appel asynchrone d’une API Rest notifiera, typiquement, une fois sur le onNext lorsque le résultat sera disponible, puis notifiera sur onCompleted pour indiquer que le flux est terminé.

Observable<Result> async = service.appelAsynchrone();
async.subscribe(
    result -> System.out.println(“onNext : “ + result),
    error -> System.err.println(“onError : “ + error),
     () -> System.out.println(“onCompleted”)
);

onNext : Resultat
onCompleted

Un Observable possède une liste d’opérateurs permettant de manipuler les évènements : opérateur de filtre (filter, take, …), de transformation (map), de génération (flatMap) et bien d’autres encore, permettant de manipuler complètement notre flux d’évènements.

Diagramme Marble d’un filtre
Diagramme Marble d’un filtre

Cette class Observable est une monade, et possède donc toutes les caractéristiques de cette dernière : une opération de transformation (map), une opération de génération (flatMap) et pourra se composer avec d’autres Observable, pour donner un nouvel Observable. On va donc pouvoir, grâce à ces différentes propriétés, “programmer” un Observable à partir d’un autre Observable, en lui ajoutant un filtre, une opération de transformation ou encore en le composant avec un autre Observable.

service.appelAsynchrone()
    .filter(r -> r.value >= 50) 
    .flatMap(r -> service.autreAppelAsynchrone(r.name)) 
    .map(r -> r.toString())
    .subscribe(r -> System.out.println(r));

Création d’un Observable via différents opérateurs

L’abstraction offerte par l’Observable et le flux poussé de RxJava offrent la possibilité de composer des sources de natures complètement différentes, à condition de représenter ces flux sous forme d’évènements. La source d’un Observable pourra être un appel asynchrone, des clics sur un bouton ou encore le résultat d’un long calcul. Cette source pourra aussi être la composition de différents Observables sous-jacents, eux aussi de natures différentes : tout ce que l’utilisateur manipulera sera un Observable, qu’importe sa source et/ou nature sous-jacente.

Utilisation d’Observable

Ci-dessous, différents scénarios d’utilisation de RxJava à travers différents appels asynchrones. Ces appels cibleront soit l’API Rest swapi.co (API sur l’univers StarWars), via Retrofit, soit une base MongoDB via le driver Mongo utilisant RxJava.

Le code de ces différents exemples est disponible sur le GitHub de SOAT.

Chaînage d’appels asynchrones

Pour mettre en avant la capacité d’enchaîner des appels asynchrones, sans avoir de phénomène de “Callback Hell”, le code ci-dessous va faire une succession de différents appels.

database.getCollection("people")
        .find()
        .first() // (1)
        .flatMap(doc -> remoteApi.people(doc.getInteger("peopleId"))) // (2)
        .flatMap(people -> remoteApi.planet(people.getHomeworldId())) // (3)
        .subscribe((planet) -> System.out.println("Planet name => " + planet.getName()); // (4)

Le code récupère, en asynchrone, le premier document d’une collection MongoDB (1). Les données venant de ce document sont ensuite utilisées comme argument pour un second appel asynchrone (2), appel récupérant les informations d’un personnage. Les informations de ce personnage seront utilisées comme argument pour un dernier appel asynchrone (3). Ce dernier appel donnera les informations relatives à une planète, informations qui seront alors affichées sur la sortie standard (4).

Cet exemple s’appuie fortement sur l’opérateur flatMap. Cet opérateur permet, à partir d’une valeur, de créer un nouvel Observable. Ici, ce nouvel Observable représentera un nouvel appel asynchrone.

Composition d’appels asynchrones

Autre scénario d’utilisation : la composition d’appels asynchrones. Ce type de code, sans RxJava, va nécessiter la mise en place de latch pour pouvoir créer une réponse à partir du résultat de différents appels.

L’exemple ci-dessous récupère une personne de Star Wars (ici, Luke Skywalker) (1). A partir des données de ce dernier, nous allons récupérer les informations de tous les véhicules (2a) et de tous les vaisseaux (2b) qu’il est capable de piloter.

remoteApi.people(1) // (1)
    .flatMap(luke -> {

       Observable<String> vehicles = Observable.from(luke.getVehiclesIds())
                                               .flatMap(remoteApi::vehicle)
                                               .map(vehicle -> 
                                                   luke.getName() + " can drive " + vehicle.getName()); // (2a)


      Observable<String> starships = Observable.from(luke.getStarshipsIds())
                                               .flatMap(remoteApi::starship)
                                               .map(starship -> 
                                                    luke.getName() + " can fly with " + starship.getName()); // (2b)

     return Observable.merge(vehicles, starships); // (3)
 }).subscribe(System.out::println); // (4)

Techniquement, après le premier appel qui récupère les informations de Luke Skywalker (1), le code va construire un nouvel Observable, qui sera créé (3) à partir de la composition de deux Observables sous-jacents : le premier fera des appels asynchrones pour récupérer toutes les informations des véhicules que Luke utilise (2a) tandis que le second fera des appels asynchrones pour obtenir les informations des vaisseaux que Luke pilote (2b).

Il est à noter que, pour récupérer toutes les informations des véhicules et des vaisseaux, un seul appel au service swapi.co ne suffit pas : ici, c’est bien un appel par véhicule/vaisseau qui est opéré. Chaque réponse étant ensuite transformée en chaîne de caractères. Nous serons donc notifiés pour chaque vaisseau et véhicule que Luke pilote (4) et non une unique fois avec l’ensemble des véhicules. Il est possible, toutefois, d’avoir ce dernier comportement en utilisant l’opérateur toList qui a pour rôle d’agréger les réponses.

Fallback sur erreurs

Les exemples précédents portent des scénarios nominaux. Mais comment traiter les erreurs ? L’exemple ci-dessous va forcer une erreur. L’opérateur single (1) notifie une erreur s’il existe plus d’un résultat, ou s’il y a une absence de résultat. Dans notre cas, c’est l’absence de résultat qui notifiera une erreur car on va requêter une collection mongoDB vide. RxJava propose l’opérateur retry pour, quand celui-ci est notifié d’une erreur, re-souscrire au flux (2), et rejouer les opérations en amont. Si, à la suite de ça, le flux continue à échouer, alors il est possible de fallbacker sur un flux alternatif (3).

database.getCollection("emptyCollection")
        .find()
        .first()
        .single() // (1)
        .map(doc -> new People(doc.getString("name")))
        .retry(1) // (2)
        .onErrorResumeNext(remoteApi.people(-1) // (3)
                .doOnError((e) -> System.err.println("got this exception : " + e.getMessage() + ". Will fallback with default People"))
                .onErrorReturn((e) -> new People("Default People"))) // (4)
        .subscribe(System.out::println);

Ce flux alternatif (représenté par un Observable) peut être l’émission de valeurs en dur, ou encore être un autre appel asynchrone. Donc, dans notre exemple, s’il n’y a pas de donnée dans notre base, on va récupérer les informations via notre API Rest. Ce flux alternatif pourrait également alimenter notre base : on aurait alors un comportement proche d’un cache applicatif.

Ce flux alternatif étant un Observable, on peut également configurer un comportement si ce même flux tombe également en erreur (chute du site distant par exemple). Ici, nous allons émettre une valeur par défaut (4).

Point d’attention

RxJava propose de nouveaux concepts : penser en termes de flux. Il est préférable de découvrir ce nouveau paradigme sur de petits exemples avant de les mettre en pratique sur un projet de plus grosse envergure, où le debugging sera beaucoup plus fastidieux.

Les concepts, les différentes sémantiques, les nombreux opérateurs, les schedulers et autres notions font que l’apprentissage de RxJava est dense. D’ailleurs, cet article n’aborde qu’une petite facette de ce qu’il est possible de réaliser avec. Mais vous serez récompensés par la suite en ayant la capacité de gérer plus facilement des cas complexes de code asynchrone.

On peut s’abstraire des problèmes de concurrences via RxJava, mais ils ne disparaissent pas pour autant. Ainsi, si vous devez écrire un nouvel opérateur, vous devrez gérer les problématiques de concurrence, sous peine de casser le contrat Rx, qui aboutira à un comportement erratique des opérateurs suivants.

Des Future à RxJava aux Reactives Streams

RxJava permet d’écrire des scénarios complexes d’exécution de code asynchrone. Certains éditeurs de base de données l’ont bien compris : le driver de CouchBase utilise déjà des Observable dans son driver asynchrone. MongoDB, de son coté, a publié dernièrement une version de son driver s’appuyant sur RxJava.

Ces flux asynchrones vont prendre d’autant plus d’essor dans nos APIs avec la version 1.0.0 des Reactives Stream, qui permet l’interopérabilité entre différentes implémentations de flux asynchrones (comme RxJava, AkkaStream, …).

L’adoption de flux asynchrone dans vos applications est un investissement sur le futur : en effet, il est actuellement discuté de l’intégration ce type de flux dans Java 9, à travers la future interface java.util.concurrent.Flow.

Ressources

© SOAT
Toute reproduction interdite sans autorisation de l’auteur

Pour aller plus loin