Apache Spark avec Kubernetes et accès rapide S3 – Yifeng Jiang
Pour tirer le meilleur parti d’Apache Spark, nous avons besoin de deux choses:
- Un stockage distribué pour stocker les données
- Un planificateur pour exécuter les exécuteurs Spark sur un cluster informatique
Les gens ont fait cela différemment sur site et dans le cloud. Avec sur site, la plupart utilisent Spark avec Hadoop, ou en particulier HDFS pour le stockage et YARN pour le planificateur. Dans le cloud, la plupart utilisent le stockage d’objets comme Amazon S3 pour le stockage et un service cloud natif distinct comme Amazon EMR ou Databricks pour le planificateur. Et si nous pouvions utiliser Spark dans une architecture unique sur promesse ou dans le cloud?
Entrez Spark avec Kubernetes et S3. Les points forts de cette architecture incluent:
- Architecture unique pour exécuter Spark sur le cloud hybride.
- Faites évoluer, utilisez le calcul et le stockage indépendamment.
- Provisionnement, déploiement et mise à niveau rapides.
- Pas besoin de Hadoop, qui est complexe à utiliser et à exploiter.
Dans ce blog, je vais vous expliquer comment exécuter Spark avec Kubernetes en utilisant le Spark sur Kubernetes Operator. Je décrirai également les configurations pour un accès rapide aux données S3 en utilisant Connecteur S3A et Committers S3A. Cette architecture fonctionne à la fois pour le stockage d’objets cloud et pour le stockage d’objets compatible S3 sur site comme FlashBlade S3.
Suivez ceci Guide de démarrage rapide pour installer l’opérateur. Assurez-vous d’activer le webhook dans l’installation.
helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubatorhelm install incubator/sparkoperator --namespace spark-operator --set enableWebhook=true
Créez un compte de service spark et reliez-les dans l’espace de noms par défaut de Kubernetes en utilisant ceci manifeste.
kubectl create -f manifest/spark-rbac.yaml
Exécutez l’exemple Spark Pi pour tester l’installation. Cela créera deux modules Spark dans Kubernetes: un pour le pilote, un autre pour un exécuteur.
kubectl apply -f examples/spark-pi.yaml
Grâce à l’opérateur Spark, avec quelques commandes, j’ai pu déployer un travail Spark simple exécuté sur Kubernetes. Ma prochaine tâche consiste à accéder aux données dans S3 dans mon travail Spark. Hadoop’s Connecteur S3A offre des E / S hautes performances contre Amazon S3 et des implémentations de stockage d’objets compatibles, y compris FlashBlade S3.
Création d’une image Docker avec le dernier connecteur S3A
L’opérateur Spark utilise une image Docker Spark prédéfinie de Google Cloud. Cependant, l’image n’inclut pas le connecteur S3A. Bien qu’il soit possible de personnaliser et d’ajouter S3A, l’image Spark par défaut est construite avec Hadoop 2.7, qui est connu pour avoir une implémentation S3A inefficace et lente. J’ai donc décidé de créer ma propre image Docker avec Spark et le dernier connecteur S3A.
Je vais omettre les détails du processus de construction car c’est simple, mais les points clés sont d’utiliser le binaire Spark-without-Hadoop pré-construit et Hadoop fourni par l’utilisateur. Mon fichier Docker est disponible sur mon Github.
Mon image Docker avec Spark 2.4.5, Hadoop 3.2.1 et la dernière S3A est disponible sur Docker Hub:
docker pull uprush/apache-spark:2.4.5
Configuration du connecteur S3A
La configuration S3A minimale pour que Spark accède aux données dans S3 est la suivante:
"spark.hadoop.fs.s3a.endpoint": "192.168.170.12"
"spark.hadoop.fs.s3a.access.key": "S3_ACCESS_KEY"
"spark.hadoop.fs.s3a.secret.key": "S3_SECRET_KEY"
"spark.hadoop.fs.s3a.connection.ssl.enabled": "false"
Dans un cluster Spark normal, cela est placé dans le spark-default.conf,
ou core-site.xml
fichier si vous utilisez Hadoop. Avec Spark Operator, il est configuré sous spec.sparkConf
dans le fichier YAML de votre application. Se référer au Doc API pour une description complète de l’API Spark Operator.
Spark et S3: l’exemple DogLover
Un exemple de travail est le meilleur pour montrer comment cela fonctionne. Voici un exemple de code pour lire et écrire des données dans S3 à partir d’un programme Spark appelé DogLover. J’ai collecté des tweets d’amateurs de chiens sur Twitter, en utilisant l’API Twitter, et les stocke sous forme de fichiers JSON dans un bucket S3 sur FlashBlade. Le programme DogLover Spark est un travail ETL simple, qui lit les fichiers JSON de S3, effectue l’ETL à l’aide de Spark Dataframe et réécrit le résultat dans S3 en tant que fichier Parquet, tout au long du connecteur S3A.
Pour gérer le cycle de vie des applications Spark dans Kubernetes, l’opérateur Spark ne permet pas aux clients d’utiliser spark-submit
directement pour exécuter le travail. Au lieu de cela, je télécharge le fichier jar sur S3, et dans mon doglover.yaml
fichier de spécification, je laisse l’opérateur Spark télécharger à partir de là et exécuter le programme sur Kubernetes.
spec:
type: Scala
mode: cluster
image: "uprush/apache-spark:2.4.5"
imagePullPolicy: Always
mainClass: com.uprush.example.DogLover
mainApplicationFile: "s3a://deephub/doglover/doglover_2.12-0.1.0-SNAPSHOT.jar"
sparkVersion: "2.4.5"
Je peux ensuite soumettre le travail Spark comme ceci:
kubectl create -f doglover.yaml
Après quelques secondes, mon travail Spark s’exécutait sur Kubernetes.
Une fois le travail terminé, nous devrions voir un octet zéro _SUCCESS
fichier avec plusieurs fichiers Parquet dans le répertoire de sortie S3.
Mon architecture ressemble à ceci:
À mon avis, c’est une meilleure façon de soumettre des travaux Spark car ils peuvent être soumis à partir de n’importe quel client Kubernetes disponible. Cela rend mon travail moins dépendant de l’infrastructure, donc plus portable. Par exemple, pour exécuter le même travail dans AWS, je peux d’abord répliquer mes données de FlashBlade S3 vers Amazon S3 à l’aide Réplication d’objets FlashBlade. Je peux ensuite facilement exécuter le même travail Spark de la même manière dans un cluster Kubernetes dans le cloud AWS.
Lorsque vous travaillez avec S3, Spark s’appuie sur les validateurs de sortie Hadoop pour écrire de manière fiable la sortie dans le stockage d’objets S3. Le FileOutputCommitter traditionnel est conçu pour HDFS, donc lors de l’utilisation avec S3, il est connu pour être inefficace, lent et moins fiable car il repose sur une opération HDFS atomique «renommer», qui n’est pas disponible pour le stockage d’objets. Dans Hadoop 3, les nouveaux Committers S3A «zéro-renommage» sont créés pour résoudre ces problèmes en tirant parti de la fonctionnalité S3 native du cloud. En savoir plus sur Engager le travail dans S3 avec les Committers S3A.
Netflix a contribué à un committer S3A appelé Committer de mise en scène, qui possède un certain nombre de fonctionnalités attrayantes
- N’exige aucune exigence du magasin d’objets de destination.
- Connu pour fonctionner.
Le committer écrit les sorties de tâche dans un répertoire temporaire appelé répertoire de tentative de tâche sur le système de fichiers local. Lors de la validation de la tâche, le validateur énumère les fichiers dans le répertoire de tentative de tâche. Chaque fichier est téléchargé sur S3 à l’aide du API de téléchargement en plusieurs parties. Les informations nécessaires pour valider le téléchargement sont enregistrées dans un répertoire de transfert HDFS et validées via ce protocole: lorsque le travail est validé, les parties de téléchargement en attente des tâches réussies sont toutes validées.
Configurations minimales de committer de transfert S3A pour Spark sur Kubernetes (sans HDFS):
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a":"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory"
"spark.hadoop.fs.s3a.committer.name": "directory"
"spark.sql.sources.commitProtocolClass": "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
"spark.sql.parquet.output.committer.class": "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter"
En fait, le répertoire intermédiaire ne doit pas nécessairement être en HDFS, il peut également s’agir d’un volume NFS qui est partagé avec tous les pods Spark. Dans mon cas, j’utilise FlashBlade NFS parce que je ne veux pas avoir de dépendance HDFS.
Créer un PV intermédiaire et le monter sur tous les pods Spark est facile à utiliser Pure Service Orchestrator (PSO).
Pour utiliser FlashBlade NFS comme PV, créez un staging-pvc.yaml
fichier spec et spécifiez la classe de stockage pure-file
.
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-staging-share
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 1Ti
storageClassName: pure-file
Appliquez le fichier de spécifications pour créer un PVC.
kubectl create -f staging-pvc.yaml
Je crée ensuite un PV et le monte sur tous les pods Spark sous le répertoire de travail du committer, qui est /home/spark/tmp
dans mon cas.
spec:
volumes:
- name: "staging-vol"
persistentVolumeClaim:
claimName: data-staging-share
driver:
volumeMounts:
- name: "staging-vol"
mountPath: "/home/spark/tmp"
executor:
instances: 2
volumeMounts:
- name: "staging-vol"
mountPath: "/home/spark/tmp"
Enfin, je configure S3A pour utiliser ce PV.
"spark.hadoop.fs.s3a.committer.tmp.path": "file:///home/spark/tmp/staging"
Ce n’est pas obligatoire, mais il est souvent préférable d’utiliser un PV pour le répertoire tampon du committer S3A. Le répertoire tampon est un répertoire du système de fichiers local pour les données écrites par le committer. Étant donné que le validateur intermédiaire écrit sa sortie dans le système de fichiers local et télécharge uniquement les données lors des validations de tâches, il est important de s’assurer que suffisamment de stockage local est disponible pour stocker les sorties générées par toutes les tâches non validées s’exécutant sur l’hôte. Les petits hôtes / VM peuvent manquer de disque. Pour éviter cette situation, je configure S3A pour utiliser le même PV que ci-dessus pour le répertoire tampon. Même s’il s’agit d’un stockage à distance, les performances ne posent aucun problème car FlashBlade est très rapide.
"spark.hadoop.fs.s3a.buffer.dir": "/home/spark/tmp/buffer"
Avec ceux-ci, je peux utiliser le committer de staging S3A efficace, rapide et fiable pour écrire des données sur S3 à partir de Spark fonctionnant sur Kubernetes.
L’exemple des amoureux des chiens avec S3A Committer
Avec la configuration ci-dessus, mon architecture se transforme en:
Mettez tout au-dessus dans le doglover.yaml fichier spec, réexécutez le travail. Contrairement à avant, cette fois le _SUCCESS
le fichier créé par le travail n’est pas zéro octet. Il contient des métriques et des compteurs du committer S3A.
cat _SUCCESS
{
"name" : "org.apache.hadoop.fs.s3a.commit.files.SuccessData/1",
"timestamp" : 1588673361792,
"date" : "Tue May 05 10:09:21 UTC 2020",
"hostname" : "doglover-driver",
"committer" : "directory",
"description" : "Task committer attempt_20200505100916_0000_m_000000_0",
"metrics" : {
"stream_write_block_uploads" : 0,
"files_created" : 1,
...
Cela indique que le committer S3A est correctement configuré et que Spark écrivait plus efficacement sur S3. Faire référence à mon deck pour les détails des committers S3A et leur caractère de performance.
Avec Spark sur Kubernetes, et en mettant des données dans S3, j’ai pu facilement et rapidement faire monter et descendre les tâches Spark de manière portable. J’ai également pu exécuter mes tâches Spark avec de nombreuses autres applications telles que Presto et Apache Kafka dans le même cluster Kubernetes, en utilisant le même stockage FlashBlade. Et je n’avais pas besoin de gérer un cluster Hadoop pour tout cela. Kubernetes, FlashBlade et PSO offrent ensemble une solution désagrégée simple, évolutive et hautes performances pour exécuter des systèmes d’analyse modernes tels que Spark comme un service.
L’avantage d’utiliser Spark avec Kubernetes et S3 est certainement énorme, mais il a des limites. Cette architecture est idéale pour les types de charge de travail de type «ignorer et oublier» comme les lots ETL. Bien que ce ne soit peut-être pas la meilleure architecture Spark pour des choses comme l’intelligence d’affaires (BI) et le backend de bloc-notes, car je ne pouvais pas trouver un moyen facile de faire fonctionner le Thrift Server ou la session Spark via Spark Operator. Mais je sais qu’il existe de meilleures façons de les faire sur Kubernetes avec Spark et d’autres solutions. Restez à l’écoute.
Ça prend juste une étincelle!