Dites-nous en plus sur votre projet
Aneo est une agence conseil en transformation qui vous accompagne tout au long de la vie de vos projets sur des problématiques de :
Le + d’Aneo : le conseil sans frontière !
Notre richesse, c’est de vous proposer des équipes pluridisciplinaires, chaque membre avec une approche, une expérience et une sensibilité différente : coach agile, formateur soft skills, Data Scientist designer, Architecte etc. On mise sur la complémentarité des disciplines pour vous apporter la meilleure réponse !
Depuis 2015, Aneo est une organisation plate à gouvernance plate. Aplatir la hiérarchie, supprimer les silos, instaurer des bonus collectifs, ce nouveau modèle d’organisation avec comme objectif: apporter plus de valeur à l’entreprise, aux collaborateurs et aux clients.
Le + d’Aneo : l’inattendu !
La sérendipité, c’est « le don de faire par hasard des découvertes fructueuses ». Chez Aneo, nous croyons fermement que les meilleures solutions sont parfois les plus inattendues. Ne rien s’interdire, expérimenter, oser s’entourer de profils atypiques et avoir une obsession : apporter la juste valeur.
But principal du tutoriel :
Ce dont vous avez besoin :
Déploiement du cluster Spark :
Un cluster Spark se compose d’un nœud master et plusieurs nœuds workers. Le master s’occupe uniquement de la gestion du cluster et les workers sont les exécuteurs des jobs MapReduce pour la parallélisation.
Pour exécuter un traitement sur un cluster Spark, il faut soumettre une application dont le traitement sera piloté par un driver. Deux modes d’exécution sont possibles :
Etape 1 : Installation de Spark sur tous les nœuds du cluster
La première étape est de télécharger Spark sur chacun des nœuds de votre cluster.
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz $ tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz
A date la version de Spark est la 2.1.0, vous pouvez choisir une version plus récente si vous le souhaitez.
Etape 2 : Installation de SBT et Scala sur tous les nœuds
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 $ apt-get update$ apt-get install sbt $ apt-get install default-jdk$ apt-get install scala $ apt-get update
Etape 3 : Démarrage du nœud master
$ spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh
Options :
Par défaut :
Vous pouvez donc ouvrir un navigateur à l’adresse de votre nœud master sur le port 8080 pour suivre l’état de votre cluster.
Etape 4 : Démarrage des workers
$ sudo spark-2.1.0-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker spark://<adresse-master>:7077 --cores 2 --memory 4G
Options :
* : un worker utilise par défaut toute la mémoire de la machine, moins d’1 Go est laissé à l’OS.
** : un nœud master peut aussi être un worker
Exemple d’un cluster avec 3 nœuds workers, la mémoire et le nombre de cores se cumulent :
Votre cluster Spark est maintenant créé, nous allons lui permettre de recevoir le flux de données provenant de Couchbase grâce à un connecteur.
Configuration du connecteur Couchbase / Spark :
Je vous propose de suivre le quickstart de Spark pour vous familiariser avec le déploiement d’une application avec SBT : https://spark.apache.org/docs/latest/quick-start.html. A l’issue de ce tutoriel vous serez familier avec le déploiement d’une application Scala sur votre cluster Spark.
La librairie du connecteur que nous allons utiliser est disponible à l’adresse : https://github.com/couchbase/couchbase-spark-connector
Pour ajouter le connecteur Spark / Couchbase le plus simple est d’ajouter cette ligne dans votre build.sbt :
libraryDependencies += "com.couchbase.client" %% "spark-connector" % "1.2.0"
Lors du build de votre application, avant qu’elle soit déployée dans le cluster Spark, le connecteur Couchbase / Spark sera automatiquement installé. Pour pouvoir l’utiliser dans votre code scala il suffira d’inclure cette ligne d’import :
import com.couchbase.spark.streaming._
Maintenant il est temps de créer le code scala qui nous permettra de récupérer les données de Couchbase en streaming :
import com.couchbase.spark._ import com.couchbase.spark.sql._ import com.couchbase.spark.streaming._ import org.apache.log4j.Logger import org.apache.log4j.Level import org.apache.spark.streaming._ import org.apache.spark.{SparkConf, SparkContext} import com.couchbase.client.java.document.{JsonArrayDocument, JsonDocument, Document} import com.couchbase.client.java.document.json.{JsonArray, JsonObject} import scala.util.parsing.json._ object StreamingExample { Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) val conf = new SparkConf() .setMaster("local[*]") .setAppName("StreamingExample") .set("com.couchbase.bucket.default", "") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(".") def main(args: Array[String]): Unit = { val stream = ssc .couchbaseStream(from = FromBeginning, to = ToInfinity) .filter(_.isInstanceOf[Mutation]) .map(m => new String(m.asInstanceOf[Mutation].content)) .map(s => ( JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("Amount").getOrElse(-1), //3 double JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("@timestamp").getOrElse(-1) //4 string ) ) .map( d => (d._1.toString.toDouble, d._2.toString)) stream.foreachRDD(rdd => {rdd.foreach(println)}) // Start the Stream and await termination ssc.start() ssc.awaitTermination() } }
Ce code permet de détecter les mutations au sein de la base de stockage puis d’en récupérer le contenu. Ensuite nous filtrons ce dont nous avons besoin (Amount et @timestamps) pour le processer par la suite en temps réel. La récupération du contenu des documents n’est pas forcément explicite dans la documentation Couchbase, en effet seule la détection des mutations est traitée. Avec ce code vous pouvez récupérer le contenu des documents JSON qui sont déversés en temps réel dans la base de stockage distribuée.
Vous avez à présent une meilleure visibilité sur ce qu’est une architecture temps réel, l’architecture Couchbase / Spark que vous venez de créer vous permet maintenant de traiter un flux de données. Comme vous avez pu le constater, sa mise en place est relativement simple. Ensuite, si vous souhaitez aller plus loin, vous pourrez traiter ce flux ou même créer votre propre modèle machine learning grâce à la librairie ML de Spark !
Tous les articles de la série « Montez votre architecture Big Data temps réel » ici.
Crédit : Michel REMISE