Opérationnalisation des pipelines ML sur Apache Mesos et Hadoop à l’aide d’Airflow
Une architecture pour mettre en production des modèles ML à NEW YORKER
Commençons par les exigences du modèle que nous souhaitons opérationnaliser, ainsi que celles découlant de notre infrastructure actuelle et de notre architecture d’ingénierie des données.
Pour répondre aux exigences énoncées ci-dessus, nous avons conçu l’architecture suivante.
Nous avons décidé de conteneuriser la formation du modèle et le calcul d’inférence et d’utiliser DC / OS pour l’exécuter. Cela présente deux avantages. Tout d’abord, il préserve l’indépendance du cadre d’apprentissage automatique choisi par nos Data Scientists. Deuxièmement, nous pouvons tirer parti des GPU existants sur le cluster DC / OS pour accélérer les futures approches d’apprentissage en profondeur.
Ayant un conteneur paramétré pour la formation et l’inférence du modèle en place, nous pouvons maintenant procéder au déploiement et à l’orchestration de plusieurs instances de celui-ci pour former un pipeline approprié à l’aide d’Airflow:
Aucun opérateur de métronome n’existait pour Airflow au meilleur de nos connaissances au début du projet. Nous en avons donc créé un.
Dans la définition du travail Metronome, nous pouvons spécifier l’utilisation du CPU, de la RAM et du GPU pour le conteneur. Par conséquent, nous pouvons évoluer verticalement. En ce qui concerne la mise à l’échelle horizontale, nous le faisons en parallélisant la formation et l’inférence du modèle pour les groupes de pays. Le degré de parallélisme n’est cependant pas illimité et est limité par les ressources de cluster disponibles. Afin de ne pas utiliser toutes les ressources, nous limitons le nombre de conteneurs exécutés simultanément à l’aide de la fonction de pool de ressources Airflow. Le nom du pool de ressources est défini dans toutes les instances de MetronomeOperator, comme nous le verrons dans le code DAG ci-dessous. La taille du pool de ressources est donc égale au degré de parallélisme.
Vous trouverez ci-dessous la version simplifiée du DAG Airflow:
default_args = {
"owner": "ny-data-science",
"start_date": datetime(2020, 1, 1),
"provide_context": True
}dag = DAG(
"markdown-pricing-dag",
schedule_interval="0 8 * * 1", # every Monday at 8am
dagrun_timeout=timedelta(days=1),
default_args=default_args,
max_active_runs=1,
catchup=True
)# Example list of countries for the model training and inference
COUNTRY_LIST = ["DEU", "AUT", "NLD", "FRA", "ESP", "ITA"]# Job template we will use in the MetronomeOperator
METRONOME_JOB_TEMPLATE = """
{{
"id": "{}",
"description": "Markdown Pricing ML Job",
"run": {{
"cpus": 4,
"mem": 32768,
"gpus": 0,
"disk": 0,
"ucr": {{
"image": {{
"id": "registry-dns-address:1234/markdown-pricing:latest-master",
"forcePull": true
}}
}},
"env": {{
"COUNTRY_GROUP": "{}",
"DB_USER": "{}",
"DB_PASSWORD": "{}"
}}
}}
}}
"""# Creates a Metronome job on DC/OS by instantiating the MetronomeOperator with the job template above
# and setting the country group and other environment variables
def create_metronome_job_for_country_group(country_group, index):
return MetronomeOperator(
task_id=f"metronome_operator_{index}",
metronome_job_json=
METRONOME_JOB_TEMPLATE.format(f"markdown-job-{index}",
country_group,
Variable.get("markdown_erp_db_user"), Variable.get("markdown_erp_db_pwd")),
dcos_http_conn_id="dcos_master",
dcos_robot_user_name=Variable.get("robot_user_name_dcos"),
dcos_robot_user_pwd=Variable.get("robot_user_pwd_dcos"),
dag=dag,
pool="markdown_metronome_job_pool",
retries=3
)# Get the resource pool size (in slots) for the MetronomeOperator instances from Airflow configuration
metronome_job_pool_size = get_pool_size(pool_name="markdown_metronome_job_pool")# Split the country list into groups into N parts of approximately equal length for parallelization purposes.
# N is here the size of the Metronome job pool.
# Given the COUNTRY_LIST defined above and N = 3, the function will return: [["DEU","AUT"], ["NLD","FRA"], ["ESP","ITA"]]
country_groups = split_country_list(COUNTRY_LIST, metronome_job_pool_size)# Iterates through the country groups and creates a Metronome job for each of those groups
metronome_countries_jobs = [create_metronome_job_for_country_group(country_group=country_group, index=index) for
index, country_group in enumerate(country_groups)]# HDFS sensor on the latest training data
training_data_sensor = NYHDFSSensor(
task_id="training_data_sensor",
filepaths=[f"/data/production/markdown_training_data/{get_current_date()}/_SUCCESS"],
hdfs_conn_id="hdfs_conn_default",
retries=1440,
retry_delay=timedelta(minutes=1),
timeout=0,
dag=dag)# Create DAG
training_data_sensor >> metronome_countries_jobs
Nous avons montré une architecture qui nous permet d’orchestrer facilement des pipelines d’apprentissage automatique dans un environnement de cluster mixte Mesos et Hadoop à l’aide d’Airflow. Nous tirons parti de l’orchestration des conteneurs et de la planification des ressources de Mesos pour faire évoluer la formation et l’inférence du modèle à la fois horizontalement et verticalement, tout en profitant de l’accélération matérielle disponible fournie par les GPU. Nous utilisons les puissantes fonctionnalités d’Airflow, telles que des capteurs et des DAG dynamiques, pour gérer efficacement l’ensemble du flux de travail à travers les clusters.