LinkedIn Open Sources un petit composant pour simplifier l’interopérabilité TensorFlow-Spark
Spark-TFRecord permet le traitement des structures TFRecord de TensorFlow dans Apache Spark.
L’interopérabilité de TensorFlow et d’Apache Spark est un défi courant dans les scénarios d’apprentissage automatique du monde réel. TensorFlow est sans doute le cadre d’apprentissage en profondeur le plus populaire du marché, tandis qu’Apache Spark reste l’une des plates-formes de calcul de données les plus largement adoptées avec une grande installation basée sur de grandes entreprises et startups. Il est naturel que les entreprises tentent de combiner les deux. Bien qu’il existe des cadres qui adaptent TensorFlow à Spark, la racine du défi d’interopérabilité est souvent enracinée au niveau des données. TFRecord, la structure de données native de TensorFlow, n’est pas entièrement pris en charge dans Apache Spark. Récemment, les ingénieurs de LinkedIn open source Spark-TFRecord, une nouvelle source de données native de Spark basée sur le TensorFlow TFRecord.
Le fait que LinkedIn ait décidé de résoudre ce problème n’est pas surprenant. Le géant de l’internet a longtemps été un grand adopteur des technologies Spark et a contribué activement aux communautés open source TensorFlow et d’apprentissage automatique. En interne, les équipes d’ingénierie de LinkedIn essayaient régulièrement de mettre en œuvre une transformation entre le format TFRecord natif de TensorFlow et les formats internes de Spark tels que Avro ou Parquet. L’objectif du projet Spark-TFRecord était de fournir les fonctionnalités natives de la structure TFRecord dans les pipelines Spark.
Spark-TFRecord n’est pas le premier projet qui tente de résoudre les problèmes d’interopérabilité des données entre Spark et TensorFlow. Le projet le plus populaire dans cette bobine est le Connecteur Spark-Tensorflow promu par le créateur de Spark Databricks. Nous avons utilisé le Spark-TensorFlow-Connector de nombreuses fois avec divers degrés de succès. D’un point de vue architectural, le connecteur est une adaptation du format TFRecord en Spark SQL DataFrames. Sachant cela, il ne devrait pas être surprenant que le Spark-TensorFlow-Connector fonctionne très efficacement dans les scénarios d’accès aux données relationnelles mais reste très limité dans d’autres cas d’utilisation.
Si vous y réfléchissez, une partie importante d’un workflow TensorFlow est liée aux opérations d’E / S disque plutôt qu’à l’accès à la base de données. Dans ces scénarios, les développeurs finissent par écrire des quantités considérables de code lors de l’utilisation du connecteur Spark-TensorFlow. De plus, la version actuelle du Spark-TensorFlow-Connector manque encore de fonctions importantes telles que la PartitionBy qui sont régulièrement utilisés dans les calculs TensorFlow. Enfin, le connecteur ressemble plus à un pont pour traiter les enregistrements TensorFlow dans des cadres de données Spark SQL plutôt qu’à un format de fichier natif.
En tenant compte de ces limites, l’équipe d’ingénierie de LinkedIn a décidé de relever le défi d’interopérabilité Spark-TensorFlow dans une perspective légèrement différente.
Spark-TFRecord est un TensorFlow TFRecord natif pour Apache Spark. Plus précisément, Spark-TFRecord fournit les routines de lecture et d’écriture des données TFREcord depuis / vers Apache Spark. Au lieu de créer un connecteur pour traiter les structures TFRecord, Spark-TFRecord est construit en tant que jeu de données Spark natif, tout comme Avro, JSON ou Parquet. Cela signifie que toutes les routines d’E / S DataSet et DataFrame de Spark sont automatiquement disponibles dans un Spark-TFRecord.
Une question évidente qui mérite d’être explorée est pourquoi construire une nouvelle structure de données au lieu de simplement versionner le connecteur open source Spark-TensorFlow? Eh bien, il semble que l’adaptation du connecteur aux opérations d’E / S disque nécessite une refonte fondamentale.
Au lieu de suivre cette voie, l’équipe d’ingénierie de LinkedIn a décidé de mettre en œuvre un nouveau Interface Spark FileFormat qui est fondamentalement conçu pour prendre en charge les opérations d’E / S sur disque. La nouvelle interface adapterait les opérations natives de TFRecord à n’importe quel Spark DataFrames. Sur le plan architectural, Spark-TFRecord est composé d’une série de blocs de construction de base qui résument les routines de lecture / écriture et de sérialisation / désérialisation:
· Inférenceur de schéma: Il s’agit du composant le plus proche du connecteur Spark-TensorFlow. Cette interface mappe les représentations TFRecords dans les types de données Spark natifs.
· Lecteur TFRecord: Ce composant lit les structures TFRecord et les transmet au désérialiseur.
· Rédacteur TFRecord: Ce composant reçoit une structure TFRecord du sérialiseur et l’écrit sur le disque.
· Sérialiseur TFRecord: Ce composant convertit Spark InternalRow en structures TFRecord.
· Désérialiseur TFRecord: Ce composant convertit les enregistrements TFR en structures Spark InternalRow.
L’utilisation de Spark-TFRecord de LinkedIn n’est pas différente des autres jeux de données natifs de Spark. Un développeur doit simplement inclure la bibliothèque jar spark-tfrecord et utiliser l’API DataFrame traditionnelle pour lire et écrire des TFRecords comme illustré dans le code suivant:
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{ DataFrame, Row }
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._val path = "test-output.tfrecord"
val testRows: Array[Row] = Array(
new GenericRow(Array[Any](11, 1, 23L, 10.0F, 14.0, List(1.0, 2.0), "r1")),
new GenericRow(Array[Any](21, 2, 24L, 12.0F, 15.0, List(2.0, 2.0), "r2")))
val schema = StructType(List(StructField("id", IntegerType),
StructField("IntegerCol", IntegerType),
StructField("LongCol", LongType),
StructField("FloatCol", FloatType),
StructField("DoubleCol", DoubleType),
StructField("VectorCol", ArrayType(DoubleType, true)),
StructField("StringCol", StringType)))
val rdd = spark.sparkContext.parallelize(testRows)
//Save DataFrame as TFRecords
val df: DataFrame = spark.createDataFrame(rdd, schema)
df.write.format("tfrecord").option("recordType", "Example").save(path)
//Read TFRecords into DataFrame.
//The DataFrame schema is inferred from the TFRecords if no custom schema is provided.
val importedDf1: DataFrame = spark.read.format("tfrecord").option("recordType", "Example").load(path)
importedDf1.show()
//Read TFRecords into DataFrame using custom schema
val importedDf2: DataFrame = spark.read.format("tfrecord").schema(schema).load(path)
importedDf2.show()
L’interopérabilité entre Spark et les frameworks d’apprentissage en profondeur comme TensorFlow continuera probablement d’être un domaine difficile pour la plupart des organisations. Cependant, des projets comme Spark-TFRecord de LinkedIn, qui ont été testés à grande échelle, contribuent certainement à simplifier le pont entre ces deux technologies qui sont essentielles à tant d’architectures modernes d’apprentissage automatique.