Montez votre architecture Big Data temps réel
[TUTORIEL 3/4]

  • Le 6 février 2018

Article rédigé par Michel REMISE

Si vous n’avez pas lu l’introduction, c’est ici, et la partie 1 pour la création de notre base de stockage, c’est ici !

Partie 2 : Simulation d’un flux de données et upsertion dans la base de stockage

 But principal du tutoriel :

  • Configuration du cluster Couchbase pour accueillir le flux ~ 5 minutes
  • Génération du flux de données et upsertion dans Couchbase ~ 5 minutes

Ce dont vous avez besoin :

  • Un cluster Couchbase

 

Dans le cadre de ce tutoriel, le flux sera généré automatiquement avec un script python, mais le même principe peut être appliqué à un flux provenant d’une API, datalake, etc. Nous allons simuler des transactions bancaires qui auront le même format : Transaction id, Card id, Card type, Name, Amount, @timestamp

Par exemple:

Transaction id: 123456789
Card id: 4815162342
Card type: Amex
Amount: 100.97$
@timestamp : 20-06-2017 13:50:23.345676

Les transactions seront générées manière non périodique, ce qui constituera notre flux de données, puis chaque transaction sera convertie dans un format JSON pour être insérée dans le cluster Couchbase :

Par exemple, la transaction montrée plus haut sera convertie au format JSON avant d’être insérée dans Couchbase :

{
     transaction_id : 123456789,
     card_id : 4815162342,
     card_type : “Amex”,
     amount : 100.97,
     @timestamp : “20-06-2017 13:50:23.345676”
}

 

Configuration de Couchbase :

Il faut définir le bucket dans lequel vous allez insérer les données par défaut le bucket ‘default’. Il faut indexer ce bucket pour pouvoir requêter les documents :

Allez dans « Query » et exécutez la commande :

 

 Maintenant, votre cluster Couchbase est prêt à recevoir le flux de données.

 

Génération du flux de données et upsertion dans Couchbase :

Les données seront générées sur un nœud distant ou interne à notre cluster.

Pour se faire, nous allons installer la SDK Python de Couchbase sur la machine où nous souhaitons générer les données :

$ apt-get install python
$ wget http://packages.couchbase.com/releases/couchbase-release/couchbase-release-1.0-2-amd64.deb
$ dpkg -i couchbase-release-1.0-2-amd64.deb
$ apt-get update$ apt-get install libcouchbase-dev build-essential python-dev python-pip
$ pip install couchbase

 

Notre générateur de transaction :

Attention, pour que l’upsertion soit possible, il convient de changer l’adresse IP dans le code par celle d’un nœud du cluster Couchbase

# Transaction generator

from couchbase.bucket import Bucket
from couchbase.n1ql import N1QLQuery
from collections import defaultdict
import json
import time
import random
import datetime

# bind our bucket 'default' on the right port
bucket = Bucket('couchbase://192.168.3.174/default')

transaction_id = 0
card_id = 0
card_type_list = ["Amex", "CB", "Visa", "Mastercard"]
amount = 0

timestamp_event = datetime.datetime.now()

while(1):

    if(random.randint(0, 10)>3):
        delay = random.uniform(0, 0.05)
    else:
        delay = random.uniform(0, 3)
    timestamp_event += datetime.timedelta(seconds=delay)
    transaction_id += 1
    card_id = random.randint(0,10000)
    card_type = card_type_list[random.randint(0,3)]
    amount = random.uniform(0, 1000)

     # create json
    transaction = {}
    transaction['transaction_id'] = transaction_id
    transaction['card_id'] = card_id
    transaction['card_type'] = card_type
    transaction['amount'] = amount
    transaction['@timestamp'] = str(timestamp_event)

     print(transaction)

     # upsert json in couchbase bucket
    bucket.upsert("transaction_"+str(transaction_id), transaction)

     time.sleep(delay)

 

Lorsque vous exécutez ce code vous devriez voir les données s’upserter en temps réel dans Couchbase :

Les données reçues en streaming dans Couchbase sont formalisées en documents NoSQL : le flux de données est représenté par les transactions avec une notion de temporalité.

 

A la fin de cette étape : les données sont insérées en temps réel dans Couchbase, il est possible de modifier le code d’exemple pour upserter des données de différentes sources : API, base de données, etc.

 

Great ! On stocke maintenant le flux dans notre cluster Couchbase.

Tous les articles de la série « Montez votre architecture Big Data temps réel » ici.

Crédit : Michel REMISE

Les prochaines occasions de se rencontrer

L’expérience de paiement en assurance

Participer