[Big Data] Hortonworks : utilisation d’un cluster Spark avec HDP 2.x

Ce billet montre comment lancer et utiliser un cluster Spark sur HDP, la plateforme Hadoop de Hortonworks. Des tests seront également effectués via le shell de Spark, dans l’interface Scala (RDD, Hive, ORC, SQL avec Thrift…).

Pour en savoir plus sur :

  • Spark et son utilité, vous pouvez jeter un coup d’œil par ici.
  • Hortonworks et HDP, vous pouvez aller ici.

Avant de commencer…

On supposera qu’un environnement HDP a déjà été installé (via la Sandbox, par exemple, comme vu ici). L’OS de la VM est Linux CentOS.

Pour nos tests, nous accéderons au cluster en mode SSH (ssh root@192.168.86.128 –p 2222, dans notre cas) via un client indépendant (PuTTY, KiTTY,…) ou la VM de la Sandbox.


 Activation du service Spark

Connectez-vous à l’interface Ambari, en tapant, dans un navigateur Web, l’adresse suivante : http://<Adresse_IP_générée_durant_l’installation_de_la_Sandbox> :8080 (soit, dans notre cas : http://192.168.86.128:8080). Ou à la place de l’IP, le nom d’hôte sandbox.hortonworks.com ou sandbox.

Après authentification, allez dans le service Spark, accessible via l’explorateur de services à gauche de l’interface d’accueil :


Dans l’interface d’administration de Spark, cliquez sur Service Actions, puis sélectionnez Start :


Confirmez ensuite le lancement :


Si tout va bien :


Lancement du Spark History Server

Le Spark History Server est automatiquement lancé en même temps que le service Spark.

Toutefois, dans l’optique d’un lancement manuel, sachez qu’outre un clic sur Spark History Server de l’onglet Summary, vous pouvez également accéder à l’historique des activités relatives aux applications du cluster Spark en tapant, dans un navigateur Web, l’adresse suivante : http://<Adresse_IP_générée_durant_l’installation_de_la_Sandbox> :
18080 (soit, dans notre cas : http://192.168.86.128:18080). A titre informatif, une application est à Spark ce qu’un job est à Hadoop/MapReduce.

Par ailleurs, le Spark History Server peut également être lancé localement au sein du cluster, en tapant la commande shell suivante, au sein du SPARK_HOME ((par défaut, /usr/hdp/<version_de_HDP>/spark) :

./sbin/start-history-server.sh

Vérification des configurations-clés de Spark

A partir de l’onglet Configs de l’interface Ambari dédiée à Spark ou à partir du tableau de bord Spark UI, il est possible de vérifier la configuration de chaque processus participant au fonctionnement du cluster Spark. Dans notre contexte (onglet Configs), on peut notamment vérifier :

  • Les informations générales avancées par défaut (section Advanced spark-defaults) : JVM, YARN, Kerberos, Spark History Server,…



  • Les variables d’environnement (section Advanced spark-env)
    : répertoire local, répertoire de lancement,…


Quelques opérations de test

Pour la suite de nos tests, nous passerons en mode super utilisateur (spark).

su spark

Tests d’un RDD en Scala

Nous allons effectuer un comptage de mots au sein d’un RDD (Resilient Distributed Datasets) dont les données sont récupérées à partir d’un fichier exemple. Cette opération est appelée WordCount.

  • Récupération d’un exemple de fichier et copie dans un dossier dédié (tmp, dans notre cas), au sein de l’espace HDFS.
hadoop fs -copyFromLocal /usr/hdp/current/spark-client/conf/metrics.properties /tmp
  • Vérification de la présence du fichier exemple dans le répertoire tmp.
hdfs dfs -ls -R / | grep metrics.properties


  • Lancement du shell Spark.
spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m


  • Création d’un RDD à partir de la source de données copiée plus haut.
val file = sc.textFile("/tmp/metrics.properties")
  • Lancement de quelques opérations usuelles.

Au sein du shell Spark, il est possible de consulter la liste des commandes disponibles pour le traitement de données. Il suffit de taper le nom du RDD (dans notre cas, file ou plus loin, wordcounts) suivi d’un point, puis de presser sur TAB pour l’auto-complétion, ce qui donnerait quelque chose comme cet extrait :


On y retrouve des méthodes d’action (collect, count, cache,…) et de transformation (map, reduce, countByValue,…).

Ainsi :

  • Exemple de transformations : calcul du nombre de mots contenu dans le RDD file.
val wordcounts = file.flatMap(line =&amp;gt; line.split(" ")).map(word =&amp;gt; (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("/tmp/wordcount")

La commande ci-dessus mappe chaque mot (en tant que clé) à une valeur entière, puis utilise reduceByKey pour réduire leur redondance en comptant leur nombre d’occurrences. Les opérations de transformation vont créer un nouveau RDD à partir de l’original : wordcounts.

La dernière ligne est facultative : elle sert juste à sauvegarder le résultat dans l’espace HDFS pour une réutilisation, par exemple.

  • Premier exemple d’action : affichage des traitements en sortie.
wordcounts.count


  • Deuxième exemple d’action : chargement des données en mémoire via collect() avec restitution de tout le contenu en sortie.
wordcounts.collect().foreach(println)


La méthode collect() est à utiliser à bon escient. En effet, si la mémoire est sous-dimensionnée par rapport à la quantité des données à traiter, il y aura risque de crash. La méthode toArray() peut être utilisée à la place, sachant que dans tous les cas, les deux méthodes convertissent les résultats sous la forme d’un tableau à restituer.

Tests avec Hive UDF (collect_list)

Depuis sa version 0.13.1, Hive supporte une nouvelle UDF (User appelé collect_list() qui permet de retourner une liste d’objets avec des doublons. Dans nos tests, nous allons utiliser le fichier exemple people.txt fournit par Hortonworks comme source de données.

  • Lancement du shell Spark.
spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
  • Création du contexte Hive
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
  • Création d’une table Hive.
hiveContext.sql("CREATE TABLE IF NOT EXISTS MyHiveTable (key INT, value STRING)")


  • Chargement d’un fichier exemple dans la table Hive.
hiveContext.sql("LOAD DATA LOCAL INPATH 'usr/hdp/current/spark-client/examples/src/main/resources/kv1.txt' INTO TABLE MyHiveTable")


  • Appel de l’UDF collect_list.
hiveContext.sql("FROM MyHiveTable SELECT key, collect_list(value) GROUP BY key ORDER BY key").collect.foreach(println)


Si vous souhaitez approfondir l’utilisation des UDFs de Hive, vous pouvez consulter la documentation suivante: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF.

Tests avec le format ORC

Apache ORC (Optimized Row Columnar) est un format de stockage de données optimisé. Il permet de réaliserla lecture, la décompression et le traitement des colonnes requises par une requête, avec support des propriétés ACID (Atomicité, Consistance, Isolation, Durabilité) des transactions.

  • Création d’une table Hive en format ORC.
hiveContext.sql("CREATE TABLE MyORCTable(key INT, value STRING) STORED AS ORC")


  • Remplissage de la table Hive en format ORC.
hiveContext.sql("INSERT INTO TABLE MyORCTable SELECT * FROM MyHiveTable")


  • Consultation du contenu de la table Hive en format ORC.
hiveContext.sql("FROM MyORCTable SELECT *").collect().foreach(println)


Le contenu peut également être consulté à partir de HDFS, en format ORC, via des RDD :

val inputRDD = sc.hadoopFile("/apps/hive/warehouse/MyORCTable", classOf[org.apache.hadoop.hive.ql.io.orc.OrcInputFormat],classOf[org.apache.hadoop.io.NullWritable],classOf[org.apache.hadoop.hive.ql.io.orc.OrcStruct])
val transfRDD = inputRDD.map(pair =&gt; pair._2.toString)
val outputRDD = transfRDD.collect

Tests avec SparkSQL Thrift Server

SparkSQL Thrift Server est une fonctionnalité permettant des accès distants JDBC/ODBC à SparkSQL.

  • Démarrage de Thrift Server, à partir du SPARK_HOME.
./sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10001


  • Lancement du client JDBC Beeline, à partir du SPARK_HOME.
./bin/beeline


  • Au sein de l’interface Beeline, connexion au Thrift Server.
!connect jdbc:hive2://localhost:10001


Dans notre contexte, la sécurité n’ayant pas été activée, n’importe quel username et mot-de-passe fait l’affaire.

  • Execution de quelques commandes SQL.
    • Listing des tables Hive existantes.
show tables;


On peut noter la présence des 2 tables SQL créées plus haut au cours de nos tests avec Hive UDF et ORC.

  • Affichage du contenu filtré d’une table.
SELECT * FROM MyHiveTable WHERE key=483;


  • Arrêt du Thrift Server.
./sbin/stop-thriftserver.sh


Pour aller plus loin…

Jetez un coup d’œil ici. D’autres articles autour du Big Data et de HDP sont accessibles et continuerons à y être pondus. Vous pouvez également consulter la riche documentation d’Apache Spark ici.


Publicités

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:

Logo WordPress.com

Vous commentez à l'aide de votre compte WordPress.com. Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s