PC & Mobile

Apprentissage automatique pour la détection d'anomalies et la surveillance de l'état

Apprentissage automatique pour la détection d'anomalies et la surveillance de l'état


Téléchargez le jeu de données:

Pour répliquer les résultats dans l'article d'origine, vous devez d'abord télécharger le jeu de données à partir de la base de données Acoustique et vibrations du système NASA. Voir le document Readme téléchargé pour IMS Bearing Data pour plus d'informations sur l'expérience et les données disponibles.

Chaque ensemble de données est constitué de fichiers individuels qui sont des instantanés de signaux de vibration d’une seconde, enregistrés à des intervalles spécifiques. Chaque fichier comprend 20,480 points avec un taux d'échantillonnage fixé à 20 kHz. Le nom du fichier indique quand les données ont été collectées. Chaque enregistrement (ligne) du fichier de données est un point de données. Des intervalles d'horodatage plus grands (indiqués dans les noms de fichiers) indiquent la reprise de l'expérience le jour ouvrable suivant.

Importer des packages et des bibliothèques:

La première étape consiste à importer des packages et des bibliothèques utiles pour l'analyse:

# Importations communes
importation os
importation pandas comme pd
importation numpy comme np
de apprend importation prétraitement
importation marin comme sns
sns.set (color_codes =Vrai)
importation matplotlib.pyplot comme plt
%matplotlib en ligne

de numpy.random importation la graine
de tensorflow importation set_random_seed

de keras.layers importation Entrée, abandon
de keras.layers.core importation Dense
de keras.models importation Modèle, séquentiel, load_model
de keras importation régularisateurs
de keras.models importation model_from_json

Chargement des données et pré-traitement:

Une hypothèse est que la dégradation des engrenages se produit progressivement dans le temps. Nous utilisons donc un point de données toutes les 10 minutes dans l'analyse suivante. Chaque point de données de 10 minutes est agrégé en utilisant la valeur absolue moyenne des enregistrements de vibration sur les 20,480 points de données de chaque fichier. Nous fusionnons ensuite tous les éléments dans un seul cadre de données.

Dans l'exemple suivant, j'utilise les données du test de défaillance 2nd Gear (voir le document readme pour plus d'informations sur cette expérience).

data_dir = '2nd_test'
merged_data = pd.DataFrame ()

pour nom de fichier dans os.listdir (data_dir):
print (nom de fichier)
dataset = pd.read_csv (os.path.join (rép_données, nom_fichier), sep = ' t')
dataset_mean_abs = np.array (dataset.abs (). mean ())
dataset_mean_abs = pd.DataFrame (dataset_mean_abs.reshape (1,4))
dataset_mean_abs.index = [filename]
merged_data = merged_data.append (dataset_mean_abs)

merged_data.columns = ['Bearing 1','Bearing 2','Bearing 3','Bearing 4']

Après avoir chargé les données de vibration, nous transformons l'index au format date-heure (en utilisant la convention suivante), puis nous trions les données par index dans l'ordre chronologique avant d'enregistrer le jeu de données fusionné dans un fichier .csv.

merged_data.index = pd.to_datetime (merged_data.index, format = '% Y.% m.%ré.% H.% M.% S ')
merged_data = merged_data.sort_index ()
merged_data.to_csv ('merged_dataset_BearingTest_2.csv')
merged_data.head ()
Cadre de données résultant: «merged_data»

Définir les données de train / test:

Avant de configurer les modèles, nous devons définir les données de train / test. Pour ce faire, nous effectuons une scission simple en nous entraînant sur la première partie du jeu de données (qui devrait représenter des conditions de fonctionnement normales) et en effectuant des tests sur les autres parties du jeu de données menant à la défaillance du roulement.

dataset_train = merged_data['2004-02-12 11:02:39':'2004-02-13 23:52:39']
dataset_test = merged_data['2004-02-13 23:52:39':]
dataset_train.plot (figsize = (12,6))
Données d'entraînement: conditions de fonctionnement normales

Normaliser les données:

J'utilise ensuite des outils de prétraitement de Scikit-learn pour mettre à l'échelle les variables d'entrée du modèle. Le «MinMaxScaler» rééchelle simplement les données pour les faire correspondre à la plage [0,1].

scaler = pré-traitement.MinMaxScaler ()

X_train = pd.DataFrame (scaler.fit_transform (dataset_train),
columns = dataset_train.columns,
index = dataset_train.index)

# Données d'entraînement aléatoire
X_train.sample (frac = 1)

X_test = pd.DataFrame (scaler.transform (dataset_test),
colonnes = dataset_test.columns,
index = dataset_test.index)

Modèle de type PCA pour la détection d'anomalies:

Comme il est souvent difficile de traiter des données de capteur de grandes dimensions, il existe plusieurs techniques pour réduire le nombre de variables (réduction de dimensionnalité). L’une des principales techniques est l’analyse en composantes principales (ACP). Pour une introduction plus détaillée, je me réfère à mon article original sur le sujet.

Dans un premier temps, compressons les lectures du capteur en deux composantes principales.

de sklearn.decomposition importation PCA
pca = PCA (n_components = 2, svd_solver = 'complet')
X_train_PCA = pca.fit_transform (X_train)
X_train_PCA = pd.DataFrame (X_train_PCA)
X_train_PCA.index = X_train.index

X_test_PCA = pca.transform (X_test)
X_test_PCA = pd.DataFrame (X_test_PCA)
X_test_PCA.index = X_test.index

La métrique de distance de Mehlanobis:

La distance de Mahalanobis est largement utilisée dans les techniques d’analyse par grappes et de classification. Pour utiliser la distance de Mahalanobis pour classer un point de test appartenant à l'une des N classes, on commence par estimer la matrice de covariance de chaque classe, généralement à partir d'échantillons connus pour appartenir à chaque classe. Dans notre cas, comme nous ne souhaitons classer que les «normales» et les «anomalies», nous utilisons des données d'apprentissage qui ne contiennent que des conditions de fonctionnement normales pour calculer la matrice de covariance. Ensuite, à partir d'un échantillon test, nous calculons la distance de Mahalanobis à la classe «normale» et classons le point test comme une «anomalie» si la distance est supérieure à un certain seuil.

Pour une introduction plus détaillée à ces aspects techniques, vous pouvez consulter mon article précédent, qui couvre ces sujets plus en détail.

Définissez les fonctions utilisées dans le modèle PCA:

Calculez la matrice de covariance:

def cov_matrix (data, verbose =Faux):
covariance_matrix = np.cov (data, rowvar =Faux)
si is_pos_def (covariance_matrix):
inv_covariance_matrix = np.linalg.inv (covariance_matrix)
si is_pos_def (inv_covariance_matrix):
revenir covariance_matrix, inv_covariance_matrix
autre:
print ("Erreur: La matrice inverse de la covariance n'est pas positive définitive!")
autre:
print ("Erreur: la matrice de covariance n’est pas positive définitive!")

Calculez la distance de Mahalanobis:

def MahalanobisDist (inv_cov_matrix, mean_distr, data, verbose =Faux):
inv_covariance_matrix = inv_cov_matrix
vars_mean = mean_distr
diff = data - vars_mean
md = []
pour je dans gamme (len (diff)):
md.append (np.sqrt (diff[i].dot (inv_covariance_matrix) .dot (diff[i])))
revenir Maryland

Détection des valeurs aberrantes:

def MD_detectOutliers (dist, extreme =Faux, verbeux =Faux):
k = 3. si extrême autre 2
seuil = np.mean (dist) * k
valeurs aberrantes = []
pour je dans gamme (len (dist)):
si dist[i] > = seuil:
outliers.append (i) # index de la valeur aberrante
revenir np.array (valeurs aberrantes)

Calculez la valeur de seuil pour classer le point de donnée en anomalie:

def MD_threshold (dist, extreme =Faux, verbeux =Faux):
k = 3. si extrême autre 2
seuil = np.mean (dist) * k
revenir seuil

Vérifiez si la matrice est positive définie:

def is_pos_def (A):
si np.allclose (A, A.T):
essayer:
np.linalg.cholesky (A)
revenir Vrai
sauf np.linalg.LinAlgError:
revenir Faux
autre:
revenir Faux

Configurer le modèle PCA:

Définissez le train / test à partir des deux principales composantes:

data_train = np.array (X_train_PCA.values)
data_test = np.array (X_test_PCA.values)

Calculez la matrice de covariance et son inverse en vous basant sur les données de l'ensemble d'apprentissage:

cov_matrix, inv_cov_matrix = cov_matrix (train de données)

Nous calculons également la valeur moyenne pour les variables d'entrée dans l'ensemble d'apprentissage, car elles sont utilisées ultérieurement pour calculer la distance de Mahalanobis par rapport aux points de données dans l'ensemble d'essai.

mean_distr = data_train.mean (axe = 0)

En utilisant la matrice de covariance et son inverse, nous pouvons calculer la distance de Mahalanobis pour les données d'apprentissage définissant les «conditions normales» et trouver la valeur de seuil pour marquer les points de données sous forme d'anomalie. On peut ensuite calculer la distance de Mahalanobis pour les points de données de l'ensemble de test et la comparer au seuil d'anomalie.

dist_test = MahalanobisDist (inv_cov_matrix, mean_distr, data_test, verbose =Faux)
dist_train = MahalanobisDist (inv_cov_matrix, mean_distr, data_train, verbose =Faux)
seuil = MD_threshold (dist_train, extreme = Vrai)

Valeur seuil pour signaler une anomalie:

Le carré de la distance de Mahalanobis au centroïde de la distribution doit suivre une distribution 2 si l'hypothèse des variables d'entrée distribuées normales est remplie. C'est également l'hypothèse qui sous-tend le calcul ci-dessus de la «valeur seuil» pour signaler une anomalie. Comme cette hypothèse n'est pas nécessairement remplie dans notre cas, il est utile de visualiser la distribution de la distance de Mahalanobis afin de définir une bonne valeur de seuil pour le signalement des anomalies. Encore une fois, je me réfère à mon article précédent, pour une introduction plus détaillée de ces aspects techniques.

Nous commençons par visualiser le carré de la distance de Mahalanobis, qui devrait alors idéalement suivre une distribution 2.

plt.figure ()
sns.distplot (np.square (dist_train),
bacs = 10,
kde = Faux)
plt.xlim ([0.0,15])
Distance de la place de la Mahalanobis

Puis visualisez la distance de Mahalanobis elle-même:

plt.figure ()
sns.distplot (dist_train,
bacs = 10,
kde = Vrai,
couleur = 'vert');
plt.xlim ([0.0,5])
plt.xlabel ('Mahalanobis dist')

À partir des distributions ci-dessus, la valeur de seuil calculée de 3,8 pour signaler une anomalie semble raisonnable (définie comme 3 écarts-types par rapport au centre de la distribution)

Nous pouvons ensuite enregistrer la distance de Mahalanobis, ainsi que la valeur de seuil et la variable «indicateur d'anomalie» pour les données de test dans un cadre de données:

anomalie = pd.DataFrame ()
anomalie['Mob dist']= test_dist
anomalie['Thresh'] = seuil
# Si Mob dist au-dessus du seuil: marquer comme anomalie
anomalie['Anomaly'] = anomalie['Mob dist'] > anomalie['Thresh']
anomaly.index = X_test_PCA.index
anomalie.head ()
Cadre de données résultant des données de test

Sur la base des statistiques calculées, toute distance au-dessus de la valeur de seuil sera signalée comme une anomalie.

Nous pouvons maintenant fusionner les données dans une seule image de données et les enregistrer dans un fichier .csv:

anomaly_alldata = pd.concat ([anomaly_train, anomaly])
anomaly_alldata.to_csv ('Anomaly_distance.csv')

Vérification du modèle PCA sur des données de test:

Nous pouvons maintenant tracer la métrique d'anomalie calculée (Mob dist) et vérifier quand elle franchit le seuil d'anomalie (notez l'axe des y logarithmique).

anomaly_alldata.plot (logy =Vrai, figsize = (10,6), ylim = [1e-1,1e3], couleur = ['green','red'])

La figure ci-dessus montre que le modèle est capable de détecter l'anomalie environ 3 jours avant la défaillance réelle du roulement.

Autre approche: modèle Autoencoder pour la détection d'anomalies

L'idée de base est d'utiliser un réseau de neurones auto-codeur pour «compresser» les lectures du capteur en une représentation de petite dimension, qui capture les corrélations et les interactions entre les différentes variables. (Essentiellement le même principe que le modèle PCA, mais ici, nous permettons également des non-linéarités parmi les variables d'entrée).

Pour une introduction plus détaillée aux autoencodeurs, vous pouvez consulter mon article précédent, qui couvre le sujet plus en détail.

Définition du réseau Autoencoder:

Nous utilisons un réseau de neurones à 3 couches: la première couche a 10 nœuds, la couche intermédiaire a 2 nœuds et la troisième couche a 10 nœuds. Nous utilisons l'erreur quadratique moyenne comme fonction de perte et formons le modèle à l'aide de l'optimiseur «Adam».

graine (10)
set_random_seed (10)
act_func = 'elu'

# Couche d'entrée:
modèle = séquentiel ()
# Première couche cachée, connectée au vecteur d’entrée X.
model.add (Dense (10, activation = act_func,
kernel_initializer = 'glorot_uniform',
kernel_regularizer = regularizers.l2 (0.0),
input_shape = (X_train.shape[1],)
)
)

model.add (Dense (2, activation = act_func,
kernel_initializer = 'glorot_uniform'))

model.add (Dense (10, activation = act_func,
kernel_initializer = 'glorot_uniform'))

model.add (Dense (X_train.shape[1],
kernel_initializer = 'glorot_uniform'))

model.compile (loss = 'mse', optimizer = 'adam')

# Modèle de train pour 100 époques, taille de lot de 10:
NUM_EPOCHS = 100
BATCH_SIZE = 10

Montage du modèle:

Pour garder une trace de la précision pendant l’entraînement, nous utilisons 5% des données d’entraînement pour la validation après chaque époque (validation_split = 0.05).

history = model.fit (np.array (X_train), np.array (X_train),
batch_size = BATCH_SIZE,
epochs = NUM_EPOCHS,
validation_split = 0.05,
verbeux = 1)

Visualiser la perte de formation / validation:

plt.plot (histoire.histoire['loss'],
«b»,
label = 'Perte de formation')
plt.plot (histoire.histoire['val_loss'],
'r',
label = 'perte de validation')
plt.legend (loc = 'en haut à droite')
plt.xlabel ('Epochs')
plt.ylabel ('Perte, [mse]')
plt.ylim ([0,.1])
plt.show ()
Perte de formation / validation

Répartition de la fonction de perte dans l’entraînement:

En traçant la distribution de la perte calculée dans l'ensemble d'apprentissage, on peut l'utiliser pour identifier une valeur de seuil appropriée pour identifier une anomalie. Ce faisant, vous pouvez vous assurer que ce seuil est défini au-dessus du «niveau de bruit» et que toutes les anomalies signalées doivent être statistiquement significatives au-dessus du bruit de fond.

X_pred = model.predict (np.array (X_train))
X_pred = pd.DataFrame (X_pred,
colonnes = X_train.columns)
X_pred.index = X_train.index

marqué = pd.DataFrame (index = X_train.index)
marqué['Loss_mae'] = np.mean (np.abs (X_pred-X_train), axis = 1)

plt.figure ()
sns.distplot (marqué['Loss_mae'],
bacs = 10,
kde = Vrai,
couleur = 'bleu');
plt.xlim ([0.0,.5])
Répartition des pertes, set d'entraînement

Essayons un seuil de 0,3 pour signaler une anomalie à partir de la distribution des pertes ci-dessus. Nous pouvons ensuite calculer la perte dans l'ensemble de test pour vérifier quand la sortie dépasse le seuil d'anomalie.

X_pred = model.predict (np.array (X_test))
X_pred = pd.DataFrame (X_pred,
colonnes = X_test.columns)
X_pred.index = X_test.index

marqué = pd.DataFrame (index = X_test.index)
marqué['Loss_mae'] = np.mean (np.abs (X_pred-X_test), axis = 1)
marqué['Threshold'] = 0,3
marqué['Anomaly'] = marqué['Loss_mae'] > marqué['Threshold']
marqué.head ()

Nous calculons ensuite les mêmes mesures également pour l'ensemble d'apprentissage et fusionnons toutes les données dans un seul cadre de données:

X_pred_train = model.predict (np.array (X_train))
X_pred_train = pd.DataFrame (X_pred_train,
colonnes = X_train.columns)
X_pred_train.index = X_train.index

score_train = pd.DataFrame (index = X_train.index)
marqué_train['Loss_mae'] = np.mean (np.abs (X_pred_train-X_train), axis = 1)
marqué_train['Threshold'] = 0,3
marqué_train['Anomaly'] = marqué_train['Loss_mae'] > score_train['Threshold']

marqué = pd.concat ([scored_train, scored])

Résultats du modèle Autoencoder:

Après avoir calculé la distribution des pertes et le seuil d'anomalie, nous pouvons visualiser la sortie du modèle dans le temps qui a précédé la défaillance du roulement:

score.plot (logy =Vrai, figsize = (10,6), ylim = [1e-2,1e2], couleur = ['blue','red'])

Résumé:

Les deux approches de modélisation donnent des résultats similaires, dans la mesure où elles sont en mesure de signaler le dysfonctionnement du roulement à venir bien avant la défaillance. La principale différence réside essentiellement dans la manière de définir une valeur de seuil appropriée pour signaler les anomalies, afin d'éviter de nombreux faux positifs dans des conditions de fonctionnement normales.

J'espère que ce tutoriel vous a inspiré pour essayer vous-même ces modèles de détection d'anomalies. Une fois que vous avez correctement configuré les modèles, il est temps de commencer à expérimenter avec les paramètres de modèle, etc. et de tester la même approche sur de nouveaux jeux de données. Si vous rencontrez des cas d'utilisation intéressants, merci de me le faire savoir dans les commentaires ci-dessous.

S'amuser!

Autres articles:

Si vous avez trouvé cet article intéressant, vous pourriez aussi aimer certains de mes autres articles:

  1. Comment utiliser l'apprentissage automatique pour la détection des anomalies et la surveillance des conditions
  2. Comment (ne pas) utiliser Machine Learning pour la prévision de séries chronologiques: éviter les pièges
  3. Comment utiliser l'apprentissage automatique pour optimiser la production: utiliser les données pour améliorer les performances
  4. Comment enseignez-vous la physique aux systèmes d'IA?
  5. Pouvons-nous construire des réseaux de cerveau artificiels utilisant des aimants nanométriques?
Show More

SupportIvy

SupportIvy.com : Un lieu pour partager le savoir et mieux comprendre le monde. Meilleure plate-forme de support gratuit pour vous, Documentation &Tutoriels par les experts.

Related Articles

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Close
Close