Comparatif Apache Flink et Apache Spark : redondance ?

Image non disponible

Apache Flink est un Top Level Project Apache depuis décembre 2014.

Anciennement nommé Stratosphere et projet de recherche par Data Artisans, il a été créé en 2009 (comme Spark).

Dans cet article nous allons comparer Spark et Flink, deux projets Apache répondant au même besoin : fournir un framework de traitements distribués en mémoire (fast data).

Vous pouvez donner votre avis sur ce comparatif et nous faire part de vos expériences sur le forum Big Data : 1 commentaire Donner une note à l'article (5)

Article lu   fois.

L'auteur

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Introduction

Image non disponible

Dans cette catégorie, on peut citer (uniquement chez Apache) : ListeFWK

Image non disponible

Parmi ces solutions, Spark et Flink semblent très proches :

  • compatibilité Hadoop ;
  • remplacement de MapReduce ;
  • capables de traiter des flux de données temps réel ou des batchs ;
  • favorisent les traitements en mémoire (sur disque si nécessaire) ;
  • Lambda architecture (plateforme de batch et de streaming).

Cependant on peut noter quelques différences :

  • Spark Streaming est une extension de Spark ;
  • Flink a été conçu dès le départ pour le temps réel ;
  • Spark a été écrit initialement en Scala et supporte Java grâce à des wrappers ;
  • Flink a été écrit initialement en Java et supporte Scala grâce à des wrappers ;
  • Flink peut exécuter des traitements Hadoop directement (idéal pour une transition en douceur).

Concernant le streaming, cette différence est avant tout conceptuelle, car souvent on va borner le flux temps réel pour produire des résultats intermédiaires.

Pour Flink, un batch est un Stream borné dans le temps, dans le nombre d'éléments à traiter, etc.

Apache Flink comprend :

  • des API en Java/Scala pour le traitement par batch et en temps réel ;
  • un moteur de transformation des programmes en flots de données parallèles ;
  • un moteur d'exécution pour la distribution des traitements sur un cluster.

II. Écosystème Flink

Image non disponible

Pour rappel, voici l'écosystème Spark :

Image non disponible

III. Comparaison Spark/Flink

Image non disponible

III-A. Gestion des clusters

Options disponibles avec Spark :

  • Standalone, Mesos, Yarn, cloud (EC2, Google DataFlow…).

Options disponibles avec Flink :

  • Standalone, Yarn, cloud (EC2, Google DataFlow…), Apache TEZ.

Flink est plus lié à Hadoop (et surtout à Yarn) que Spark.

III-B. Liste des opérations

Extrait de la liste des opérateurs disponibles (Flink) :

Image non disponible

Les opérateurs sont nombreux et s'il y a quelques petites différences entre Flink et Spark, elles sont mineures.

III-C. Exemple de code (batch)

Spark

 
Sélectionnez
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile(input);
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split("\\W+")));
JavaPairRDD<String, Integer> counts = words
    .mapToPair(w -> new Tuple2<String, Integer>(w, 1))
    .reduceByKey((x, y) -> x + y);
counts.saveAsTextFile(output);

Flink

 
Sélectionnez
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile (input);
DataSet<Tuple2<String, Integer>> counts= text
    .map(l -> l.split("\\W+")) 
    .flatMap (
        (String[] tokens, Collector<Tuple2<String, Integer>> out) ->
            { Arrays.stream(tokens)
            .filter(t -> t.length() > 0)
            .forEach(t -> out.collect(new Tuple2<>(t, 1)));
            })
    .groupBy(0)
    .sum(1);
counts.writeAsText(output);
env.execute("Word Count Example");

Les codes sont très similaires, on notera cependant quelques différences.

Flink a absolument besoin d'un « sink » (point de sortie) qui peut être :

  • affichage du résultat dans la console ;
  • sauvegarde dans un fichier ;

III-D. Streaming (micro batch)

Dans Flink, tout comme Spark, le choix entre batch et streaming se fait au travers :

  • de l'environnement d'exécution (StreamingContext vs StreamExecutionEnvironment) ;
  • un type dédié (DStream vs DataStream).

Liste des connecteurs streaming :

  • File System (HDFS, S3, Azure…) ;
  • JDBC ;
  • Cache (Memcached) ;
  • NoSQL (Hbase, Redis…) ;
  • Kafka ;
  • Rabbit MQ ;
  • Flume ;
  • Twitter ;

Exemple de code (streaming)

 
Sélectionnez
public static void main(String[] args) throws Exception {
// set up the execution environment 
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data (10 tweets here) 
DataStream<String> streamSource = env.addSource(new TwitterSource(propertiesPath, 10));
DataStream<Tuple2<String, Integer>> tweets =
// normalize and split each line 
streamSource
    .map(line -> line.toLowerCase().split("\\W+"))
    // convert splitted line in pairs (2-tuples) 
    .flatMap((String[] tokens, Collector<Tuple2<String, Integer>> out) -> {
        // emit the pairs with non-zero-length words 
        Arrays.stream(tokens).filter(t -> t.length() > 0).forEach(t -> out.collect(new Tuple2<>(t, 1)));
        })
    // group by the tuple field "0" and sum up tuple field "1" 
    .groupBy(0).sum(1);
// emit result 
tweets.writeAsText(outputPath);
}

L'API Streaming de Flink est donc différente de celle de Spark et plus proche de celle d'Apache Storm.

III-E. API fonctionnelle

Spark

L'API DataFrames a fait récemment son apparition chez Spark. Le but étant d'offrir une API plus proche du langage SQL et faire la jonction entre Spark et les « Data Analysts ».

L'API DataFrames a été conçue pour les batchs et son utilisation pour les microbatchs demande des manipulations supplémentaires.

Flink

L'API Table est aussi très récente et permet de formaliser les traitements dans une forme proche de la syntaxe SQL.

Cette API est disponible pour le batch et le temps réel et offre une API de haut niveau qui apporte concision et clarté.

Exemple :

 
Sélectionnez
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
TableEnvironment tableEnv = new TableEnvironment();
DataSet<WC> input = ...;
Table table = tableEnv.toTable(input);
Table filtered = table
    .groupBy("word")
    .select("word.count as count, word")
    .filter("count = 2");

DataSet<WC> result = tableEnv.toSet(filtered, WC.class);

III-F. Gestion des graphes

Spark

Pour la gestion des graphes, Spark s'appuie sur la solution.

Flink

Jusqu'à récemment la gestion des graphes avec Flink était déléguée à Apache Spargel (projet générique).

Un projet nommé Gelly a été lancé afin d'offrir à Flink la gestion des graphes, tout en tirant profit des spécificités de Flink (flots itératifs).

Gelly offre :

  1. Utilitaires d'analyse de graphes ;
  2. Traitements itératifs sur les graphes ;
  3. Algorithmes dédiés aux graphes.

Gelly n'est compatible qu'avec des objets héritant des DataSet (Vertex et Edges) et n'est donc compatible qu'avec les batchs et non les flux temps réels.

Parmi les algorithmes :

  • PageRank ;
  • Weakly Connected Components ;
  • Single Source Shortest Paths ;
  • Label propagation.

Gelly est actuellement en bêta et le but à terme étant de remplacer Spargel.

III-G. Machine learning

Spark

Spark utilise la bibliothèque MLlib dont la notoriété est grandissante.

Flink

Flink Machine Learning Library (Flink-ML), orienté pipeline inspiré de scikit-learn (framework de Machine Learning écrit en python).

C'est une nouveauté de la version 0.9.

Concepts :

  • Transformer : comme le nom l'indique, ce composant va transformer les données (format pour le traitement), mais aussi les filtrer ou les échantillonner ;
  • Learner : les learners vont permettre de constituer un modèle dynamique d'apprentissage à partir des données et des algorithmes implémentés ;
  • Model : c'est l'élément obtenu à partir des learners, afin de prédire le comportement de données futures.

Flink-ML vise deux objectifs principaux :

  1. Accessibilité du plus grand nombre au Machine Learning ;
  2. Performances.

Implémentation des principaux algorithmes de Machine Learning optimisés spécifiquement pour Flink.

Flink-ML dispose des algorithmes suivants :

  • Classification ;
  • Regression (Logistic regression) ;
  • Clustering (k-Means) ;
  • Recommendation (Alternating least squares).

Il existe d'autres algorithmes, mais uniquement en Scala pour le moment (est-ce une tentative de séduction des « Data Analysts » ?).

Une intégration avec Mahout DSL comme moteur d'exécution est en cours.

III-H. Maturité du produit

Évidemment Flink est moins mature que Spark (bien que les deux projets soient nés en 2009).

Contributeurs respectifs :

  • Spark : 540 contributeurs ;
  • Flink : 94 contributeurs.

Spark dispose d'un net avantage, mais Flink a autant, voire plus de contributeurs que des projets comme Cassandra ou Mesos).

Flink souffre de quelques lacunes :

  • pour utiliser les lambda, il faut absolument utiliser le compilateur Eclipse JDT ;
  • pas de support des génériques dans les chemins de fichiers ;

Partenariats, clients… :

  • Flink est déployable sur Hadoop Data Platform d'HortonWorks et des évolutions sont en cours pour Cloudera (fichier de configuration Hadoop non compatible) ;
  • Cloud Google : disponibilité de Flink comme runtime pour Google Cloud Dataflow ;
  • Clients expérimentant Flink : Amadeus, Spotify…

III-I. Configuration

Le paramétrage dans Flink peut se faire de deux façons :

  • fichier de configuration « flink-conf.yaml » dans le répertoire $Flink_HOME/conf ;
  • ou depuis l'API depuis version 0.9.0 M1.

III-J. Optimisations

L'optimisation des traitements est un point important de Flink. Par analogie, on peut voir l'optimiseur comme celui d'une base de données.

Le meilleur « chemin » est choisi au moment de l'exécution en fonction de paramètres et en privilégiant le traitement le plus rapide.

Pour faire ce choix, l'optimiseur analyse principalement :

  • les types de données ;
  • les fonctions utilisées.

Il intervient à la fois pour les traitements batch et les traitements en temps réel.

Ainsi pour une méthode de type join utilisée dans le programme, Flink peut décider d'utiliser :

  • un partitionnement ou non des DataSet impliqués ;
  • un hash join ou un merge join avec tri ;

Une grosse partie a aussi lieu au moment des itérations :

  1. Mise en cache des données constantes
  2. Les opérations les plus coûteuses sont déplacées en dehors de la boucle ;

La plupart du temps l'optimiseur va faire les bons choix sans que vous ayez à vous en préoccuper, toutefois il est possible de forcer la stratégie d'exécution.

Image non disponible

III-K. Tolérance à la panne

Les systèmes NoSQL sont souvent classés en fonction du respect du théorème CAP ou BASE qui est plus spécifique.

Les systèmes de traitements distribués comme Spark ou Flink sont souvent catalogués selon les garanties de livraison/traitement des messages :

Image non disponible

Idéalement, nous souhaitons un système de type « Exactly once delivery/processing ».

Mais sans rentrer trop dans les détails, sachez que pour ce genre de système :

  • Exactly-once delivery est impossible en conditions dégradées ;
  • Exactly-once processing of messages est possible en conditions dégradées.

Mais le plus important est le traitement unique d'un message, soit le respect de la règle « Exactly-once processing ».

Comparaison Spark/Flink :

  • Spark : « exactly once » en conditions normales et « at least once » en conditions dégradées ;
  • Flink : « exactly once » en condition normales et dégradées (en cours de finalisation).

Les deux systèmes utilisent la même technique : la persistance des messages dans un système externe (Apache Kafka), mais ont une approche différente dans son utilisation :

  • de manière transparente pour Flink ;
  • activé par l'utilisateur pour Spark (Write Ahead Log).

Les deux systèmes sont confrontés aux mêmes contraintes, la réplication des données en mémoire (redondance pour faciliter le fail over) :

  • Spark conseille de la désactiver (et donc d'utiliser les disques) pour garantir la sémantique exactly-once.

La tolérance à la panne côté Flink est en cours de développement et il est trop tôt pour vérifier si les promesses sont tenues et quelles en sont les limites.

IV. Avantages de Flink

IV-A. Son API très fonctionnelle

L'exemple suivant montre que l'on peut utiliser directement le nom des champs des structures dans les traitements de type filtre ou d'agrégation.

 
Sélectionnez
class Impression {
    public String url;
    public long count;
}
class Page {
    public String url;
    public String topic;
}
DataSet<Page> pages = ...;
DataSet<Impression> impressions = ...;
DataSet<Impression> aggregated = impressions.groupBy("url").sum("count");
pages.join(impressions).where("url").equalTo("url").print();

IV-B. Itérations

Flink propose deux types d'itérations :

  1. Itérations simples ;
  2. Delta-itérations.

La première facilite la gestion de flux successifs et l'agrégation des résultats.

La deuxième vise avant tout les performances.

Les itérations sont de moins en moins coûteuses au fur et à mesure des traitements.

L'inconvénient étant qu'il faut un certain nombre d'itérations avant d'obtenir le résultat final, mais ensuite si les entrées évoluent, seules les nouvelles seront traitées (delta-itérations).

IV-C. Exemple d'itérations

 
Sélectionnez
public class IterateDummyExample {
        static int max_iteration = 20;
        static int max_sequence = 100;
        static final Long iteration = new Long("1");
        public static void main(String[] args) throws Exception {
                final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                // on construit un DataSet de type ([1, (1,1)], [2, (1,2)], ... ) 
                // soit V0, n, V0Exp(n) : valeur initiale, nb factorisation, resultat 
                DataSet<Tuple3<Long, Long, BigDecimal>> initialInput = env.generateSequence(1, max_sequence).map(
                                l -> new Tuple3<Long, Long, BigDecimal>(l, iteration, BigDecimal.valueOf(l)));
                // On définit le nombre d'itérations 
                IterativeDataSet<Tuple3<Long, Long, BigDecimal>> iter = initialInput.iterate(max_iteration - 1);
                // Calcul sur le DataSet à chaque itération 
                DataSet<Tuple3<Long, Long, BigDecimal>> result = iter.closeWith(iter
                                .map(t -> new Tuple3<Long, Long, BigDecimal>(t.f0, t.f1 + 1, BigDecimal.valueOf(t.f0).multiply(t.f2))));
                // Affichage du résultat 
                result.print();
                env.execute("Calcul itératif exposant");
                // System.out.println(env.getExecutionPlan()); 
        }
}

Les itérations existent avec Spark, mais il faut faire des checkpoints (sauvegarde sur disque ou en mémoire).

Cela est disponible à la fois pour les batchs et les microbatchs (RDD ou DStream).

Exemple d'itération avec Spark :

 
Sélectionnez
JavaRDD lines = sc.textFile(...);
JavaRDD points = lines.map(new ParsePoint()).cache();
int ITERATIONS = ...;
for (int i = 0; i < ITERATIONS; i++) {
    // computation with rdd points     . . .
}

V. Performances

Flink affirme être 100 fois plus rapide qu'Hadoop, on a l'habitude avec Spark.

Mais Flink affirme être 2,5 fois plus rapide que Spark, ce qui est moins courant, sur un cas de grep de 1 To de Logs (cf. http://fr.slideshare.net/GyulaFra/flink-apachecon).

D'après mes observations, cet avantage se confirme que ce soit en batch et en streaming.

Pour atteindre ces performances, Flink se base sur trois points :

  1. L'optimiseur qui décide de la meilleure stratégie d'exécution ;
  2. Sa gestion de la mémoire ;
  3. La sérialisation et sa capacité à traiter les données sérialisées (sans les désérialiser donc).

V-A. Gestion de la mémoire

Flink gère lui-même la mémoire (sans la déléguer complètement à la JVM).

C'est dans un espace dédié de la Heap que Flink fait ses traitements.

L'avantage est d'éviter les fameuses OutOfMemoryException et d'être moins impacté par les temps de pause dus au passage du Garbage Collector.

V-B. Sérialisation

Flink propose son propre système de sérialisation et y recourt lourdement.

Cette implémentation est très performante.

Ajouté à sa capacité à opérer des traitements sur des données déjà sérialisées, Flink réduit considérablement les échanges réseau ainsi que l'occupation mémoire.

Avec Spark, la sérialisation est beaucoup plus rare et n'intervient que lorsqu'elle est strictement nécessaire (sauvegarde sur disque…).

Afin d'améliorer les performances, Spark a lancé le projet Tungsten qui a pour objectif une meilleure gestion de la mémoire, les opérations sur les données binaires, etc.

V-C. Plan d'exécution

Un aspect important dans l'optimisation et la compréhension d'un traitement est la possibilité d'afficher le plan d'exécution réel (i.e. après passage de l'optimiseur) d'un programme.

Flink propose deux méthodes pour afficher le plan d'exécution.

V-C-1. Dans l'interface web

Image non disponible

V-C-2. Par l'API

 
Sélectionnez
Remplacer
    env.execute();
par
    System.out.println(env.getExecutionPlan());

Cette méthode permet l'affichage au format JSON des informations sur l'exécution d'un traitement Flink.

Extrait :

 
Sélectionnez
{"id":4,"type":"pact","pact":"GroupReduce","contents":"SUM(1), at main(java8WC.java:23","parallelism":"4",
"predecessors":[{"id":5,"ship_strategy":"Forward"}],"driver_strategy":"Sorted Combine",
"global_properties":[{"name":"Partitioning","value":"RANDOM_PARTITIONED"},{"name":"Partitioning Order","value":"(none)"},{"name":"Uniqueness","value":"not unique"}],
"local_properties":[{"name":"Order","value":"(none)"},{"name":"Grouping","value":"not grouped"},{"name":"Uniqueness","value":"not unique"}],
"estimates":[{"name":"Est. Output Size","value":"(unknown)"},{"name":"Est. Cardinality","value":"26.52 M"}],
"costs":[{"name":"Network","value":"0.0"},{"name":"Disk I/O","value":"0.0"},{"name":"CPU","value":"0.0"},{"name":"Cumulative Network","value":"0.0"},{"name":"Cumulative Disk I/O","value":"754.12 M"},{"name":"Cumulative CPU","value":"0.0"}],
"compiler_hints":[{"name":"Output Size (bytes)","value":"(none)"},{"name":"Output Cardinality","value":"(none)"},{"name":"Avg. Output Record Size (bytes)","value":"(none)"},{"name":"Filter Factor","value":"(none)"}]}

Il faut ensuite utiliser le fichier HTML « tools/planVisualizer.html ».

Image non disponible

On voit :

  • le niveau de parallélisation de chaque étape ;
  • les étapes ajoutées par l'optimiseur (ici un tri sur la clé du DataSet (Hash partition)) ;
  • le coût de chacune des étapes (CPU, réseau, i/o) en double cliquant.

VI. Dashboard

Tout comme Spark, Flink propose une application web de suivi et de déploiement des traitements sur le cluster.

Visualisation d'un traitement et décomposition en étapes :

Image non disponible

Une fois le traitement terminé, il est possible de visualiser les durées de chacune des étapes :

Image non disponible

VII. Roadmap des évolutions

Image non disponible

VIII. Conclusion

Ce n'est pas un énième framework BigData, car Flink apporte de vraies nouveautés en termes de philosophie et d'API.

  • Spark est un framework de batch capable de faire du microbatch
  • Flink est un framework de streaming capable de faire du microbatch

De fait, les cas d'utilisation d'Apache Flink sont sans doute plus proches de Storm que de Spark.

Cependant leurs API et leurs paradigmes étant très proches, difficile de ne pas faire de rapprochement.

Une autre différence importante étant la gestion du cluster :

  • Spark délègue la gestion des ressources à Mesos ;
  • Flink a son propre système de gestion des ressources.

Toutefois la maturité des deux solutions n'est pas comparable :

  • même s'il existe des cas d'utilisation de Flink en production, ils sont encore loin des chiffres de Spark, et ce même si, Flink a été testé avec succès sur pratiquement 200 nœuds.

Enfin, il existe un point qui ne facilite pas l'adoption de Flink, c'est l'absence de REPL (Read-Eval-Print-Loop), la fameuse évolution de Java 9 qui permet de lancer des commandes dans une console et donc facilite l'adoption par des profils non développeurs comme les « Data Scientists ».

Difficile de ne pas aborder le cas des performances qui offre un net avantage à Flink vis-à-vis de Spark dont c'est pourtant un des points forts.

IX. Remerciements

Cet article a été publié avec l'aimable autorisation de Christophe Parageaud. L'article original peut être vu sur le blog de la société Ippon.

Nous remercions également Vincent Viale pour la mise au gabarit, Claude Leloup pour la relecture orthographique de cet article.

Vous pouvez réagir par rapport à cet article. 1 commentaire Donner une note à l'article (5)

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

  

Les sources présentées sur cette page sont libres de droits et vous pouvez les utiliser à votre convenance. Par contre, la page de présentation constitue une œuvre intellectuelle protégée par les droits d'auteur. Copyright © 2015 Christophe Parageaud. Aucune reproduction, même partielle, ne peut être faite de ce site et de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.