Montez votre architecture Big Data temps réel
[TUTORIEL 4/4]

  • Le 6 février 2018

Article rédigé par Michel REMISE

 

Si vous n’avez pas lu l’introduction, c’est ici, la partie 1 pour la création de notre base de stockage, c’est ici, et la partie 2 pour la génération du flux de données et upsertion en temps réel dans Couchbase, c’est ici !

Partie 3 : récupération et traitement des données avec Spark en streaming

But principal du tutoriel :

  • Déploiement du cluster Spark ~ 5 minutes
  • Configuration du connecteur Couchbase / Spark ~ 10 minutes
  • Récupération du flux de données en streaming dans Spark ~ 10 minutes

Ce dont vous avez besoin :

  • Un cluster Couchbase
  • Le flux de données upserté en temps réel dans Couchbase (voir tutoriel précédent)

 

Déploiement du cluster Spark :

Un cluster Spark se compose d’un nœud master et plusieurs nœuds workers. Le master s’occupe uniquement de la gestion du cluster et les workers sont les exécuteurs des jobs MapReduce pour la parallélisation.

Pour exécuter un traitement sur un cluster Spark, il faut soumettre une application dont le traitement sera piloté par un driver. Deux modes d’exécution sont possibles :

  • mode client : le driver est créé sur la machine qui soumet l’application
  • mode cluster : le driver est créé à l’intérieur du cluster

Etape 1 : Installation de Spark sur tous les nœuds du cluster

La première étape est de télécharger Spark sur chacun des nœuds de votre cluster.

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz

A date la version de Spark est la 2.1.0, vous pouvez choisir une version plus récente si vous le souhaitez.

 

Etape 2 : Installation de SBT et Scala sur tous les nœuds

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ apt-get update$ apt-get install sbt
$ apt-get install default-jdk$ apt-get install scala
$ apt-get update

 

Etape 3 : Démarrage du nœud master

$ spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh

Options :

  • -i IP, –ip IP : adresse IP sur laquelle se binder
  • -p PORT, –port PORT : port utilisé pour la communication interne
  • –webui-port PORT : port pour l’interface Web

Par défaut :

  • le port pour la communication interne est le port 7077 pour le master, port aléatoire pour les workers
  • le port pour l’interface Web le port 8080 pour le master, port 8081 par défaut pour le premier worker, puis 8082, 8083, etc…

Vous pouvez donc ouvrir un navigateur à l’adresse de votre nœud master sur le port 8080 pour suivre l’état de votre cluster.

 

Etape 4 : Démarrage des workers

$ sudo spark-2.1.0-bin-hadoop2.7/bin/spark-class
org.apache.spark.deploy.worker.Worker spark://<adresse-master>:7077 --cores 2 --memory 4G

Options :

  • -c CORES, –cores CORES : nombre de cores alloués
  • -m MEM, –memory MEM : quantité de mémoire allouée

* : un worker utilise par défaut toute la mémoire de la machine, moins d’1 Go est laissé à l’OS.

** : un nœud master peut aussi être un worker

Exemple d’un cluster avec 3 nœuds workers, la mémoire et le nombre de cores se cumulent :

 

Votre cluster Spark est maintenant créé, nous allons lui permettre de recevoir le flux de données provenant de Couchbase grâce à un connecteur.

 

Configuration du connecteur Couchbase / Spark :

Je vous propose de suivre le quickstart de Spark pour vous familiariser avec le déploiement d’une application avec SBT : https://spark.apache.org/docs/latest/quick-start.html. A l’issue de ce tutoriel vous serez familier avec le déploiement d’une application Scala sur votre cluster Spark.

La librairie du connecteur que nous allons utiliser est disponible à l’adresse : https://github.com/couchbase/couchbase-spark-connector

Pour ajouter le connecteur Spark / Couchbase le plus simple est d’ajouter cette ligne dans votre build.sbt :

libraryDependencies += "com.couchbase.client" %% "spark-connector" % "1.2.0"

Lors du build de votre application, avant qu’elle soit déployée dans le cluster Spark, le connecteur Couchbase / Spark sera automatiquement installé. Pour pouvoir l’utiliser dans votre code scala il suffira d’inclure cette ligne d’import :

import com.couchbase.spark.streaming._

 

Maintenant il est temps de créer le code scala qui nous permettra de récupérer les données de Couchbase en streaming :

import com.couchbase.spark._
import com.couchbase.spark.sql._
import com.couchbase.spark.streaming._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import com.couchbase.client.java.document.{JsonArrayDocument, JsonDocument, Document}
import com.couchbase.client.java.document.json.{JsonArray, JsonObject}
import scala.util.parsing.json._

object StreamingExample {

   Logger.getLogger("org").setLevel(Level.OFF)
   Logger.getLogger("akka").setLevel(Level.OFF)

   val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("StreamingExample")
      .set("com.couchbase.bucket.default", "")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(10))
  ssc.checkpoint(".")

   def main(args: Array[String]): Unit = {

       val stream = ssc
                 .couchbaseStream(from = FromBeginning, to = ToInfinity)
                 .filter(_.isInstanceOf[Mutation])
                 .map(m => new String(m.asInstanceOf[Mutation].content))
                 .map(s =>
                     (

JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("Amount").getOrElse(-1), //3 double                    

JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("@timestamp").getOrElse(-1) //4 string
                     )
                 )
                 .map( d => (d._1.toString.toDouble, d._2.toString))

       stream.foreachRDD(rdd => {rdd.foreach(println)})       // Start the Stream and await termination
      ssc.start()
      ssc.awaitTermination()
  }
}

 

Ce code permet de détecter les mutations au sein de la base de stockage puis d’en récupérer le contenu. Ensuite nous filtrons ce dont nous avons besoin (Amount et @timestamps) pour le processer par la suite en temps réel. La récupération du contenu des documents n’est pas forcément explicite dans la documentation Couchbase, en effet seule la détection des mutations est traitée. Avec ce code vous pouvez récupérer le contenu des documents JSON qui sont déversés en temps réel dans la base de stockage distribuée.

Vous avez à présent une meilleure visibilité sur ce qu’est une architecture temps réel, l’architecture Couchbase / Spark que vous venez de créer vous permet maintenant de traiter un flux de données. Comme vous avez pu le constater, sa mise en place est relativement simple. Ensuite, si vous souhaitez aller plus loin, vous pourrez traiter ce flux ou même créer votre propre modèle machine learning grâce à la librairie ML de Spark !

Tous les articles de la série « Montez votre architecture Big Data temps réel » ici.

Crédit : Michel REMISE

Les prochaines occasions de se rencontrer

ANEO invité au Salon du Management

Participer

Nous serons au 4ème séminaire du club des DOP

Participer

Nous serons à la Flowcon 2018 !

Participer

Nous serons aux Journées Calcul & Données ’18

Participer