Montez votre architecture Big Data temps réel

Écrit par Michel Remise, le 06 février 2018

Prêt à entrer dans l’ère du Big Data en temps réel ? Dans cette série de tutoriels nous couvrirons la partie stockage et processing en traitant un flux avec deux frameworks adaptés pour faire du streaming : Couchbase pour le stockage et Spark pour le processing.

COLLECT : on simulera un flux qui sera traité en streaming

STORE : les données seront insérées de manière pérenne dans Couchbase notre base de stockage distribuée

PROCESS : la couche processing Spark détectera les mutations au sein de la base de stockage, traitera les données et puis restituera les résultats à la volée en les injectant dans Couchbase

VISUALIZE : les résultats peuvent être visualisés en pseudo temps réel, cette partie ne rentre pas dans le cadre du tutoriel

Pourquoi Couchbase ?

C’est une base de stockage distribuée qui est axée sur une architecture unique et centrée sur la mémoire, scalable, haut débit au niveau des flux, et surtout basée sur du NoSQL : tous ces critères rassemblés sont idéals pour faire du stockage en temps réel. Comparé à une base de données relationnelle classique, le NoSQL est bien plus scalable et délivre des performances bien supérieures. En plus, si vous voulez analyser un large volume de données dont la structure peut changer, Couchbase est votre allié.

Pourquoi Spark ?

Spark, dont la réputation n’est plus à refaire, permet des vitesses de traitement ultra-rapides (Map-reduce) et combine SQL, Streaming et analyses Machine Learning pour traiter les données. Grâce à sa librairie « ML », il est possible d’entrainer des modèles. La librairie « Spark streaming » combinée avec le connecteur Couchbase – Spark permet de récupérer les nouvelles données provenant de Couchbase en temps réel et de les confronter au modèle Spark. De la même manière que les données peuvent être récupérées, on pourra insérer les résultats dans Couchbase depuis Spark.

Allons-y ! Il est temps de créer notre architecture Big Data Real Time !

Création du cluster Couchbase

 But principal du tutoriel :

  • Le déploiement d’un cluster Couchbase ~ 15 minutes

Ce dont vous avez besoin :

  • X machines Ubuntu 14.04 LTS (à date Couchbase n’est pas compatible avec Ubuntu 16.04). Plus il y a de données, plus il faut une capacité de stockage importante, vous pouvez utiliser autant de nœuds que vous le souhaitez

Déploiement du cluster Couchbase :

Avant de passer à la pratique, il convient de rappeler l’enjeu principal de cette partie : on souhaite monter un cluster Couchbase qui constituera notre base de stockage distribuée, elle sera composée de plusieurs machines, appelées nœuds. Notre cluster doit être scalable et supporter le flux de données entrant. Pour rappel, Couchbase Server est une database NoSQL basée sur une architecture distribuée qui concilie performance, scalabilité et disponibilité. Entre autres, elle permet aux développeurs de construire des bases de stockage distribuées facilement et rapidement en utilisant la puissance du SQL et la flexibilité JSON. Un cluster Couchbase n’a pas de nœuds maître et esclaves, tous les nœuds sont égaux, et les données sont répliquées automatiquement pour éviter un single point of failure. Ainsi, si un nœud n’est plus opérationnel, les données seront toujours accessibles ! Nous utilisons Couchbase dans ce tutoriel car il est adapté pour stocker en temps réel.

Nous utiliserons 3 machines dans le même sous réseau sur lesquelles nous allons déployer notre cluster Couchbase.

Les adresses IP et noms de vos machines diffèrent, il convient donc de les remplacer en concordance avec votre configuration tout au long de ce tutoriel.

Etape 0 : preconfiguration

Tous les nœuds de votre cluster doivent être visibles, pour cela nous allons configurer le fichier /etc/hosts sur chacune de vos machines :

Ouvrez le fichier /etc/hosts :

$ vi /etc/hosts

Et remplacez son contenu par :

127.0.0.1 localhost192.168.3.174
node1.local node1192.168.3.175
node2.local node2192.168.3.176
node3.local node3

A la fin de cette étape vous pouvez vérifier que les nœuds pingent bien entre eux.

Etape 1 : installation de Couchbase sur tous les nœuds

Effectuez ces opérations sur tous les nœuds.

Téléchargez le fichier d’installation de Couchbase Server 4.5.1 (à date) :

$ wget http://packages.couchbase.com/releases/4.5.1/couchbase-server-enterprise_4.5.1-ubuntu14.04_amd64.deb

Installez OpenSSL :

$ apt-get install libssl1.0.0

Installez Couchbase Server 4.5.1:

$ dpkg -i couchbase-server-enterprise_4.5.1-ubuntu14.04_amd64.deb

Etape 2 : création du cluster Couchbase

Nous allons créer le cluster Couchbase, en commençant par un nœud et en rattachant progressivement les autres nœuds au cluster

Dans votre navigateur allez à l’adresse :

192.168.3.174:8091

Une interface devrait apparaitre.

Step 1 of 5 : allouez de la RAM pour vos données et Index, définissez le hostname

Step 2 of 5 : Cliquez sur Next

Step 3 of 5 : Cliquez sur Next

Step 4 of 5 : Acceptez les conditions générales et cliquez sur Next

Step 5 of 5 : Configurez votre nom administrateur et mot de passe puis cliquez sur Next

A ce stade : vous avez créé un cluster Couchbase avec un nœud. Il faut maintenant rattacher les autres nœuds au cluster.

Dans votre navigateur allez à l’adresse (les opérations sont à effectuer pour le nœud 2 et 3 dans notre cas) :

192.168.3.175:8091

Cliquez sur « Join a cluster now », indiquez l’adresse IP du cluster le username et le mot de passe, puis cliquez sur Next.

Allez dans le volet « Server Nodes » puis « Pending Rebalance » et cliquez sur « Rebalance »

A la fin de cette étape : le cluster Couchbase a été créé. Pour vérifier que votre cluster est bien fonctionnel, allez dans le volet « Overview » et vérifiez qu’il y a 3 serveurs actifs.

Great ! Maintenant que nous avons notre cluster nous pouvons générer un flux de données et l’upserter en temps réel dans la base de stockage.

Simulation d’un flux de données et upsertion dans la base de stockage

 But principal du tutoriel :

  • Configuration du cluster Couchbase pour accueillir le flux ~ 5 minutes
  • Génération du flux de données et upsertion dans Couchbase ~ 5 minutes

Ce dont vous avez besoin :

  • Un cluster Couchbase

Dans le cadre de ce tutoriel, le flux sera généré automatiquement avec un script python, mais le même principe peut être appliqué à un flux provenant d’une API, datalake, etc. Nous allons simuler des transactions bancaires qui auront le même format : Transaction id, Card id, Card type, Name, Amount, @timestamp

Par exemple :

Transaction id: 123456789
Card id: 4815162342
Card type: Amex
Amount: 100.97$
@timestamp : 20-06-2017 13:50:23.345676

Les transactions seront générées manière non périodique, ce qui constituera notre flux de données, puis chaque transaction sera convertie dans un format JSON pour être insérée dans le cluster Couchbase.

Par exemple, la transaction montrée plus haut sera convertie au format JSON avant d’être insérée dans Couchbase :

{
     transaction_id : 123456789,
     card_id : 4815162342,
     card_type : “Amex”,
     amount : 100.97,
     @timestamp : “20-06-2017 13:50:23.345676”
}

Configuration de Couchbase :

Il faut définir le bucket dans lequel vous allez insérer les données par défaut le bucket ‘default’. Il faut indexer ce bucket pour pouvoir requêter les documents :

Allez dans « Query » et exécutez la commande.

Maintenant, votre cluster Couchbase est prêt à recevoir le flux de données.

Génération du flux de données et upsertion dans Couchbase :

Les données seront générées sur un nœud distant ou interne à notre cluster.

Pour se faire, nous allons installer la SDK Python de Couchbase sur la machine où nous souhaitons générer les données :

$ apt-get install python
$ wget http://packages.couchbase.com/releases/couchbase-release/couchbase-release-1.0-2-amd64.deb
$ dpkg -i couchbase-release-1.0-2-amd64.deb
$ apt-get update$ apt-get install libcouchbase-dev build-essential python-dev python-pip
$ pip install couchbase

Notre générateur de transaction :

Attention, pour que l’upsertion soit possible, il convient de changer l’adresse IP dans le code par celle d’un nœud du cluster Couchbase

# Transaction generator

from couchbase.bucket import Bucket
from couchbase.n1ql import N1QLQuery
from collections import defaultdict
import json
import time
import random
import datetime

# bind our bucket 'default' on the right port
bucket = Bucket('couchbase://192.168.3.174/default')

transaction_id = 0
card_id = 0
card_type_list = ["Amex", "CB", "Visa", "Mastercard"]
amount = 0

timestamp_event = datetime.datetime.now()

while(1):

    if(random.randint(0, 10)>3):
        delay = random.uniform(0, 0.05)
    else:
        delay = random.uniform(0, 3)
    timestamp_event += datetime.timedelta(seconds=delay)
    transaction_id += 1
    card_id = random.randint(0,10000)
    card_type = card_type_list[random.randint(0,3)]
    amount = random.uniform(0, 1000)

     # create json
    transaction = {}
    transaction['transaction_id'] = transaction_id
    transaction['card_id'] = card_id
    transaction['card_type'] = card_type
    transaction['amount'] = amount
    transaction['@timestamp'] = str(timestamp_event)

     print(transaction)

    # upsert json in couchbase bucket
    bucket.upsert("transaction_"+str(transaction_id), transaction)

     time.sleep(delay)

Lorsque vous exécutez ce code vous devriez voir les données s’upserter en temps réel dans Couchbase.

Les données reçues en streaming dans Couchbase sont formalisées en documents NoSQL : le flux de données est représenté par les transactions avec une notion de temporalité.

A la fin de cette étape : les données sont insérées en temps réel dans Couchbase, il est possible de modifier le code d’exemple pour upserter des données de différentes sources : API, base de données, etc.

Great ! On stocke maintenant le flux dans notre cluster Couchbase.

Récupération et traitement des données avec Spark en streaming

But principal du tutoriel :

  • Déploiement du cluster Spark ~ 5 minutes
  • Configuration du connecteur Couchbase / Spark ~ 10 minutes
  • Récupération du flux de données en streaming dans Spark ~ 10 minutes

Ce dont vous avez besoin :

  • Un cluster Couchbase
  • Le flux de données upserté en temps réel dans Couchbase (voir tutoriel précédent)

Déploiement du cluster Spark :

Un cluster Spark se compose d’un nœud master et plusieurs nœuds workers. Le master s’occupe uniquement de la gestion du cluster et les workers sont les exécuteurs des jobs MapReduce pour la parallélisation.

Pour exécuter un traitement sur un cluster Spark, il faut soumettre une application dont le traitement sera piloté par un driver. Deux modes d’exécution sont possibles :

  • mode client : le driver est créé sur la machine qui soumet l’application
  • mode cluster : le driver est créé à l’intérieur du cluster

Etape 1 : Installation de Spark sur tous les nœuds du cluster

La première étape est de télécharger Spark sur chacun des nœuds de votre cluster.

$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz

A date la version de Spark est la 2.1.0, vous pouvez choisir une version plus récente si vous le souhaitez.

Etape 2 : Installation de SBT et Scala sur tous les nœuds

$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ apt-get update$ apt-get install sbt
$ apt-get install default-jdk$ apt-get install scala
$ apt-get update

Etape 3 : Démarrage du nœud master

$ spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh

Options :

  • -i IP, --ip IP : adresse IP sur laquelle se binder
  • -p PORT, --port PORT : port utilisé pour la communication interne
  • --webui-port PORT : port pour l’interface Web

Par défaut :

  • le port pour la communication interne est le port 7077 pour le master, port aléatoire pour les workers
  • le port pour l’interface Web le port 8080 pour le master, port 8081 par défaut pour le premier worker, puis 8082, 8083, etc...

Vous pouvez donc ouvrir un navigateur à l’adresse de votre nœud master sur le port 8080 pour suivre l’état de votre cluster.

Etape 4 : Démarrage des workers

$ sudo spark-2.1.0-bin-hadoop2.7/bin/spark-class
org.apache.spark.deploy.worker.Worker spark://<adresse-master>:7077 --cores 2 --memory 4G

Options :

  • -c CORES, --cores CORES : nombre de cores alloués
  • -m MEM, --memory MEM : quantité de mémoire allouée

* : un worker utilise par défaut toute la mémoire de la machine, moins d’1 Go est laissé à l’OS.

** : un nœud master peut aussi être un worker

Votre cluster Spark est maintenant créé, nous allons lui permettre de recevoir le flux de données provenant de Couchbase grâce à un connecteur.

Configuration du connecteur Couchbase / Spark :

Je vous propose de suivre le quickstart de Spark pour vous familiariser avec le déploiement d’une application avec SBT : https://spark.apache.org/docs/latest/quick-start.html. A l’issue de ce tutoriel vous serez familier avec le déploiement d’une application Scala sur votre cluster Spark.

La librairie du connecteur que nous allons utiliser est disponible à l’adresse : https://github.com/couchbase/couchbase-spark-connector

Pour ajouter le connecteur Spark / Couchbase le plus simple est d’ajouter cette ligne dans votre build.sbt :

libraryDependencies += "com.couchbase.client" %% "spark-connector" % "1.2.0"

Lors du build de votre application, avant qu’elle soit déployée dans le cluster Spark, le connecteur Couchbase / Spark sera automatiquement installé. Pour pouvoir l’utiliser dans votre code scala il suffira d’inclure cette ligne d’import :

import com.couchbase.spark.streaming._

Maintenant il est temps de créer le code scala qui nous permettra de récupérer les données de Couchbase en streaming :

import com.couchbase.spark._
import com.couchbase.spark.sql._
import com.couchbase.spark.streaming._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import com.couchbase.client.java.document.{JsonArrayDocument, JsonDocument, Document}
import com.couchbase.client.java.document.json.{JsonArray, JsonObject}
import scala.util.parsing.json._

object StreamingExample {

   Logger.getLogger("org").setLevel(Level.OFF)
   Logger.getLogger("akka").setLevel(Level.OFF)

   val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("StreamingExample")
      .set("com.couchbase.bucket.default", "")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(10))
  ssc.checkpoint(".")

   def main(args: Array[String]): Unit = {

       val stream = ssc
                 .couchbaseStream(from = FromBeginning, to = ToInfinity)
                 .filter(_.isInstanceOf[Mutation])
                 .map(m => new String(m.asInstanceOf[Mutation].content))
                 .map(s =>
                     (

JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("Amount").getOrElse(-1), //3 double                    

JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("@timestamp").getOrElse(-1) //4 string
                     )
                 )
                 .map( d => (d._1.toString.toDouble, d._2.toString))

       stream.foreachRDD(rdd => {rdd.foreach(println)})       // Start the Stream and await termination
      ssc.start()
      ssc.awaitTermination()
  }
}

Ce code permet de détecter les mutations au sein de la base de stockage puis d’en récupérer le contenu. Ensuite nous filtrons ce dont nous avons besoin (Amount et @timestamps) pour le processer par la suite en temps réel. La récupération du contenu des documents n’est pas forcément explicite dans la documentation Couchbase, en effet seule la détection des mutations est traitée. Avec ce code vous pouvez récupérer le contenu des documents JSON qui sont déversés en temps réel dans la base de stockage distribuée.

Vous avez à présent une meilleure visibilité sur ce qu’est une architecture temps réel, l’architecture Couchbase / Spark que vous venez de créer vous permet maintenant de traiter un flux de données. Comme vous avez pu le constater, sa mise en place est relativement simple. Ensuite, si vous souhaitez aller plus loin, vous pourrez traiter ce flux ou même créer votre propre modèle machine learning grâce à la librairie ML de Spark !