Dites-nous en plus sur votre projet
Aneo est une agence conseil en transformation qui vous accompagne tout au long de la vie de vos projets sur des problématiques de :
Le + d’Aneo : le conseil sans frontière !
Notre richesse, c’est de vous proposer des équipes pluridisciplinaires, chaque membre avec une approche, une expérience et une sensibilité différente : coach agile, formateur soft skills, Data Scientist designer, Architecte etc. On mise sur la complémentarité des disciplines pour vous apporter la meilleure réponse !
Depuis 2015, Aneo est une organisation plate à gouvernance plate. Aplatir la hiérarchie, supprimer les silos, instaurer des bonus collectifs, ce nouveau modèle d’organisation avec comme objectif: apporter plus de valeur à l’entreprise, aux collaborateurs et aux clients.
Le + d’Aneo : l’inattendu !
La sérendipité, c’est « le don de faire par hasard des découvertes fructueuses ». Chez Aneo, nous croyons fermement que les meilleures solutions sont parfois les plus inattendues. Ne rien s’interdire, expérimenter, oser s’entourer de profils atypiques et avoir une obsession : apporter la juste valeur.
But principal du tutoriel :
Ce dont vous avez besoin :
Dans le cadre de ce tutoriel, le flux sera généré automatiquement avec un script python, mais le même principe peut être appliqué à un flux provenant d’une API, datalake, etc. Nous allons simuler des transactions bancaires qui auront le même format : Transaction id, Card id, Card type, Name, Amount, @timestamp
Par exemple:
Transaction id: 123456789 Card id: 4815162342 Card type: Amex Amount: 100.97$ @timestamp : 20-06-2017 13:50:23.345676
Les transactions seront générées manière non périodique, ce qui constituera notre flux de données, puis chaque transaction sera convertie dans un format JSON pour être insérée dans le cluster Couchbase :
Par exemple, la transaction montrée plus haut sera convertie au format JSON avant d’être insérée dans Couchbase :
{ transaction_id : 123456789, card_id : 4815162342, card_type : “Amex”, amount : 100.97, @timestamp : “20-06-2017 13:50:23.345676” }
Configuration de Couchbase :
Il faut définir le bucket dans lequel vous allez insérer les données par défaut le bucket ‘default’. Il faut indexer ce bucket pour pouvoir requêter les documents :
Allez dans « Query » et exécutez la commande :
Maintenant, votre cluster Couchbase est prêt à recevoir le flux de données.
Génération du flux de données et upsertion dans Couchbase :
Les données seront générées sur un nœud distant ou interne à notre cluster.
Pour se faire, nous allons installer la SDK Python de Couchbase sur la machine où nous souhaitons générer les données :
$ apt-get install python $ wget http://packages.couchbase.com/releases/couchbase-release/couchbase-release-1.0-2-amd64.deb $ dpkg -i couchbase-release-1.0-2-amd64.deb $ apt-get update$ apt-get install libcouchbase-dev build-essential python-dev python-pip $ pip install couchbase
Notre générateur de transaction :
Attention, pour que l’upsertion soit possible, il convient de changer l’adresse IP dans le code par celle d’un nœud du cluster Couchbase
# Transaction generator from couchbase.bucket import Bucket from couchbase.n1ql import N1QLQuery from collections import defaultdict import json import time import random import datetime # bind our bucket 'default' on the right port bucket = Bucket('couchbase://192.168.3.174/default') transaction_id = 0 card_id = 0 card_type_list = ["Amex", "CB", "Visa", "Mastercard"] amount = 0 timestamp_event = datetime.datetime.now() while(1): if(random.randint(0, 10)>3): delay = random.uniform(0, 0.05) else: delay = random.uniform(0, 3) timestamp_event += datetime.timedelta(seconds=delay) transaction_id += 1 card_id = random.randint(0,10000) card_type = card_type_list[random.randint(0,3)] amount = random.uniform(0, 1000) # create json transaction = {} transaction['transaction_id'] = transaction_id transaction['card_id'] = card_id transaction['card_type'] = card_type transaction['amount'] = amount transaction['@timestamp'] = str(timestamp_event) print(transaction) # upsert json in couchbase bucket bucket.upsert("transaction_"+str(transaction_id), transaction) time.sleep(delay)
Lorsque vous exécutez ce code vous devriez voir les données s’upserter en temps réel dans Couchbase :
Les données reçues en streaming dans Couchbase sont formalisées en documents NoSQL : le flux de données est représenté par les transactions avec une notion de temporalité.
A la fin de cette étape : les données sont insérées en temps réel dans Couchbase, il est possible de modifier le code d’exemple pour upserter des données de différentes sources : API, base de données, etc.
Great ! On stocke maintenant le flux dans notre cluster Couchbase.
Tous les articles de la série « Montez votre architecture Big Data temps réel » ici.
Crédit : Michel REMISE