Intelligence artificielle

Simulateur de camion autonome avec PyTorch – détecteurs précis et uniques

Simulateur de camion autonome avec PyTorch - détecteurs précis et uniques


C’est la suite d’un précédent article où je fais un tour complet de la construction d’un simulateur de camion autonome à l’aide de fast.ai, mais ces méthodes peuvent en définitive fonctionner dans tous les cas où vous avez besoin de peaufiner des modèles pré-entraînés ou de développer des modèles prédictifs. et des cours ensemble.

Mon objectif est maintenant de passer en revue certains des aspects les plus techniques des processus de formation et d’inférence et d’expliquer en détail la façon dont ils sont mis en œuvre dans PyTorch. Vous pouvez également référencer la base de code dans ce référentiel Github.

Rappelez-vous du dernier message qu'il y a deux réseaux de neurones à l'œuvre ici.

  1. Le DNN pour prédire le sens de rotation.
  2. Le DNN pour prédire les boîtes englobantes et les classes de voitures, de personnes, etc.

Les deux réseaux commencent par un réseau resnet34 pré-entraîné et sont configurés en fonction de la tâche appropriée.

Un resnet34 pré-entraîné peut être obtenu auprès de torchvision.models

importation torchvision.models comme des modèlesarch = models.resnet34 (pretrained =Vrai)

Tous les modèles de pré-entraînement ont été pré-entraînés sur le jeu de données Imagenet de classe 1000.

Pour peaufiner un réseau de pré-entraînement, nous commençons essentiellement avec un ensemble de poids contenant déjà beaucoup d’informations sur le jeu de données Imagenet. Nous pouvons donc faire cela de deux manières. Une solution consisterait à geler toutes les premières couches en définissant require_grad =Faux et alors seulement require_grad =Vrai pour les couches finales. Une autre solution consisterait simplement à utiliser tous les poids comme initialisation et à poursuivre la formation sur nos nouvelles données de formation.

Pour l’option 1 où nous gèlons les couches précédentes et n’entraînons que les couches finales, nous pouvons définir: require_grad =Faux pour tous les calques, puis supprimez et remplacez les derniers calques (chaque fois que vous affectez un calque à un réseau, il définit automatiquement la requiert_grad attribuer à Vrai).

classe Aplatir (nn.Module):
def __en soi):
super (Aplatir, soi-même) .__ init __ ()
def transmettre (auto, x):
x = x.view (x.size (0), -1)
revenir X
classe normaliser (nn.Module):
def __en soi):
super (normaliser, auto) .__ init __ ()
def transmettre (auto, x):
x = F. normaliser (x, p = 2, dim = 1)
revenir X
layer_list = list (arch.children ())[-2:]
arch = nn.Sequential (*liste (arch.children ())[:-2])
arch.avgpool = nn.AdaptiveAvgPool2d (taille_sortie = (1,1))
arch.fc = nn.Sequential (
Aplatir(),
nn.Linear (in_features = layer_list[1].in_features,
out_features = 3,
biais =Vrai),
normaliser()
)
arch = arch.to (périphérique)

Si vous regardez l’architecture de resnet34, vous pouvez voir que le dernier bloc de conv est suivi d’un AdaptiveAvgPool2d et un Linéaire couche.

(2): BasicBlock (
(conv1): Conv2d (512, 512, taille du noyau = (3, 3), stride = (1, 1), remplissage = (1, 1), biais =Faux)
(bn1): BatchNorm2d (512, eps = 1e-05, moment = 0,1, affine = nn.Sequential, track_running_stats =Vrai)
(relu): ReLU (inplace =Vrai)
(conv2): Conv2d (512, 512, taille du noyau = (3, 3), stride = (1, 1), remplissage = (1, 1), biais =Faux)
(bn2): BatchNorm2d (512, eps = 1e-05, moment = 0,1, affine = True, track_running_stats =Vrai)
)
)
(avgpool): AdaptiveAvgPool2d (taille_sortie = (1, 1))
(fc): linéaire (in_features = 512, out_features = 1000, biais =Vrai)
)

On peut enlever les deux dernières couches avec nn.Sequential (*liste (arch.children ())[:-2]) , puis rattachez-les à la fin avec arch.avgpool = nn.AdaptiveAvgPool2d (taille_sortie = (1,1)) et un autre nn.sequential avec unAplatir, Linéaire, et normaliser couches. Nous voulons finalement prédire 3 classes: gauche, droite, droite - alors notre out_features sera 3.

Nous allons maintenant créer notre dataset et notre chargeur de données pour le modèle de directions. Puisque nos données sont simplement des images et des classes [left, right, straight], nous pourrions simplement utiliser la classe de jeu de données de torche intégrée, mais j'aime utiliser une classe personnalisée malgré tout, car je peux voir exactement comment les données sont extraites plus facilement.

classe InstructionsDataset (Dataset):
"" "Jeu de données d'itinéraire." ""
def __init __ (self, csv_file, root_dir, transform = None):
"" "
Args:
csv_file (string): chemin d'accès au fichier csv avec des étiquettes.
root_dir (string): Répertoire avec toutes les images.
transform (callable, optionnel): transform optionnel
"" "
self.label = pd.read_csv (csv_file)
self.root_dir = root_dir
self.transform = transformer
def __len __ (auto):
revenir len (self.label)
def __getitem __ (self, idx):
img_name = os.path.join (self.root_dir,
self.label.iloc[idx, 0])
image = io.imread (img_name + '. jpg')
échantillon = image
label = self.label.iloc[idx, 1]
si self.transform:
échantillon = self.transform (échantillon)
revenir échantillon, étiquette

Les noms de mes images dans le fichier csv n’ont pas d’extension. img_name + ’. jpg’ .

tensor_dataset = DirectionsDataset (csv_file = 'data / labels_directions.csv',
root_dir = 'data / train3 /',
transform = transforms.Compose ([[
transforms.ToTensor (),
transforms.Normalize (
(0,5, 0,5, 0,5), (0,5, 0,5, 0,5))]))
dataloader = DataLoader (tensor_dataset, batch_size = 16, shuffle =Vrai)

Nous sommes donc prêts à commencer à former le modèle.

def train_model (modèle, critère, optimiseur, ordonnanceur, 
chargeur de données, num_epochs = 25):
depuis = time.time ()
FT_losses = []
best_model_wts = copy.deepcopy (model.state_dict ())
best_acc = 0.0
iters = 0
pour époque dans gamme (num_epochs):
impression('Epoch {} / {}'. Format (epoch, num_epochs - 1))
impression('-' * dix)
scheduler.step ()
model.train () # Définit le modèle en mode de formation
running_loss = 0.0
running_corrects = 0
# Itérer sur les données.
pour i (entrées, étiquettes) dans énumérer (dataloader):
#set_trace ()
input = inputs.to (device)
labels = labels.to (périphérique)
# zéro les gradients de paramètres
optimizer.zero_grad ()
# vers l'avant
# piste historique si seulement dans le train
model.eval () # Définit le modèle pour évaluer le mode
avec torch.no_grad ():
sorties = modèle (entrées)
#set_trace ()
_, preds = torch.max (sorties, 1)

sorties = modèle (entrées)
perte = critère (sorties, libellés)

# en arrière + optimiser uniquement si en phase d'entraînement
loss.backward ()
optimizer.step ()
FT_losses.append (loss.item ())
# statistiques
running_loss + = loss.item () * inputs.size (0)
running_corrects + = torch.sum (preds == labels.data)
#set_trace ()
iters + = 1

si iters% 2 == 0:
impression('Prev Loss: {: .4f} Acc. Préc.: {: .4f}'. Format (
loss.item (), torch.sum (preds == labels.data) / inputs.size (0)))

epoch_loss = running_loss / dataset_size
epoch_acc = running_corrects.double () / dataset_size
print ('Perte: {: .4f} Acc: {: .4f}'. format (
epoch_loss, epoch_acc))
# copier le modèle en profondeur
si epoch_acc> best_acc:
best_acc = epoch_acc
best_model_wts = copy.deepcopy (model.state_dict ())
time_elapsed = time.time () - depuis
impression('La formation est terminée en {: .0f} m {: .0f} s'.format (
time_elapsed // 60, time_elapsed% 60))
impression('Best val Acc: {: 4f}'. Format (best_acc))
# charge les meilleurs poids de modèle
model.load_state_dict (best_model_wts)
revenir modèle, FT_losses

Dans cette boucle d’entraînement, nous pouvons suivre les meilleurs poids du modèle si la précision de cette époque est la meilleure à ce jour. Nous pouvons également suivre les pertes à chaque itération et à chaque époque et les renvoyer à la fin pour tracer et voir à quoi cela ressemble pour le débogage ou la présentation.

Gardez à l’esprit que le modèle est en cours d’entraînement à chaque itération et que si vous arrêtez la boucle d’entraînement, il conservera ces poids et que l’entraînement pourra se poursuivre en exécutant simplement le programme. train_modèle () commande à nouveau. Pour recommencer depuis le début, revenez en arrière et réinitialisez les poids avec l'architecture pré-entraînée.

critère = nn.CrossEntropyLoss ()
# Observez que tous les paramètres sont optimisés
optimizer_ft = optim.SGD (arch.parameters (), lr = 1e-2, momentum = 0.9)
# Decay LR par un facteur de * gamma * à chaque * étape_size *
exp_lr_scheduler = lr_scheduler.StepLR (optimizer_ft, step_size = 7, gamma = 0.1)
arch, FT_losses = train_model (arch, critère, optimizer_ft, exp_lr_scheduler, chargeur de données, num_epochs = 5)
échantillon de données

De nouveau, nous utiliserons une architecture resnet34 pré-entraînée. Cependant, cette fois-ci, nous devrons l'éditer plus substantiellement pour générer à la fois les prédictions de classe et les valeurs du cadre de sélection. De plus, il s’agit d’un problème de prédiction multi-classes, il peut donc y avoir une boîte englobante ou 15, voire 1 ou 15 classes.

Nous allons créer un tête personnalisée à l’architecture de la même manière que nous avons remplacé les couches dans le modèle de directions.

classe StdConv (nn.Module):
def __init __ (self, nin, nout, stride = 2, drop = 0.1):
super () .__ init __ ()
self.conv = nn.Conv2d (nin, nout, 3, stride = stride, padding = 1)
self.bn = nn.BatchNorm2d (nout)
self.drop = nn.Dropout (déposer)

def transmettre (auto, x):
revenir self.drop (self.bn (F.relu (self.conv (x))))

def flatten_conv (x, k):
bs, nf, gx, gy = x.size ()
x = x.permute (0,2,3,1) .contiguous ()
revenir x.view (bs, -1, nf // k)

classe OutConv (nn.Module):
def __init __ (self, k, nin, biais):
super () .__ init __ ()
self.k = k
self.oconv1 = nn.Conv2d (nin, (len (id2cat) +1) * k, 3, remplissage = 1)
self.oconv2 = nn.Conv2d (nin, 4 * k, 3, remplissage = 1)
self.oconv1.bias.data.zero _ (). add_ (biais)

def transmettre (auto, x):
revenir [flatten_conv(selfoconv1(x)selfk)[flatten_conv(selfoconv1(x)selfk)
flatten_conv (self.oconv2 (x), self.k)]baisse = 0.4classe SSD_MultiHead (nn.Module):
def __init __ (auto, k, biais):
super () .__ init __ ()
self.drop = nn.Dropout (déposer)
self.sconv0 = StdConv (512,256, stride = 1, drop = drop)
self.sconv1 = StdConv (256,256, drop = drop)
self.sconv2 = StdConv (256,256, drop = drop)
self.sconv3 = StdConv (256,256, drop = drop)
self.out0 = OutConv (k, 256, biais)
self.out1 = OutConv (k, 256, biais)
self.out2 = OutConv (k, 256, biais)
self.out3 = OutConv (k, 256, biais)
def transmettre (auto, x):
x = self.drop (F.relu (x))
x = self.sconv0 (x)
x = self.sconv1 (x)
o1c, o1l = self.out1 (x)
x = self.sconv2 (x)
o2c, o2l = self.out2 (x)
x = self.sconv3 (x)
o3c, o3l = self.out3 (x)
revenir [torch.cat([o1c,o2c,o3c], dim = 1),
torch.cat ([o1l,o2l,o3l], dim = 1)]

Nous souhaitons donc maintenant connecter cette tête personnalisée à l’architecture resnet34 et nous disposons d’une fonction pratique pour le faire.

classe ConvnetBuilder ():
def __init __ (self, f, c, is_multi, is_reg, ps =Aucun,
xtra_fc =Aucun, xtra_cut = 0,
custom_head =Aucunpré-entraînés =Vrai):
self.f, self.c, self.is_multi, self.is_reg, self.xtra_cut = f, c, is_multi, is_reg, xtra_cut
xtra_fc = [512]
ps = [0.25]* len (xtra_fc) + [0.5]
self.ps, self.xtra_fc = ps, xtra_fc
couper, self.lr_cut = [8,6] # spécifique à l'archive resnet_34
cut- = xtra_cut
layers = cut_model (f (pré-entrainé), cut)
self.nf = nombre_features (couches) * 2
self.top_model = nn.Sequential (* couches)
n_fc = len (self.xtra_fc) +1
self.ps = [self.ps]* n_fc
fc_layers = [custom_head]
self.n_fc = len (fc_layers)
self.fc_model = nn.Sequential (* fc_layers) .to (périphérique)
self.model = nn.Sequential (* (layers + fc_layers)). to (périphérique)
def cut_model (m, cut):
revenir liste (m.chenfants ())[:cut] si coupé [m]
def nombre de fonctionnalités (m):
c = enfants (m)
si len (c) == 0: retourne None
pour l en inverse (c):
if hasattr (l, 'num_features'): retourne l.num_features
res = num_features (l)
si res n'est pas None: return res
def enfants (m): revenir m si isinstance (m, (liste, tuple)) autre liste (m.chenfants ())

En utilisant cette ConvnetBuilder classe, nous pouvons combiner la tête personnalisée et l’architecture resnet34.

k = len (anchor_scales)
head_reg4 = SSD_MultiHead (k, -4.)
f_model = models.resnet34
modelss = ConvnetBuilder (f_model, 0, 0, 0, custom_head = head_reg4)

k est 9

Nous pouvons maintenant accéder au modèle via le modèle attribuer sur maquettes .

La fonction de perte doit pouvoir accepter à la fois les classifications (classes) et les valeurs continues (cadres de sélection) et générer une seule valeur de perte.

def ssd_loss (pred, targ, print_it =Faux):
lcs, lls = 0., 0.
pour b_c, b_bb, bbox, clas dans zip (* pred, * targ):
loc_loss, clas_loss = ssd_1_loss (b_c, b_bb, bbox, clas, print_it)
lls + = loc_loss
lcs + = clas_loss
si imprime le:
impression(f'loc: {lls.data.item ()}, clas: {lcs.data.item ()} ')
revenir lls + lcs
def ssd_1_loss (b_c, b_bb, bbox, clas, print_it =Faux):
bbox, clas = get_y (bbox, clas)
a_ic = actn_to_bb (b_bb, ancres)
overlaps = jaccard (bbox.data, anchor_cnr.data)
gt_overlap, gt_idx = map_to_ground_truth (overlaps, print_it)
gt_clas = clas[gt_idx]
pos = gt_overlap> 0.4
pos_idx = torch.nonzero (pos)[:,0]
gt_clas[1-pos] = len (id2cat)
gt_bbox = bbox[gt_idx]
loc_loss = ((a_ic[pos_idx] - gt_bbox[pos_idx]) .abs ()). mean ()
clas_loss = loss_f (b_c, gt_clas)
revenir loc_loss, clas_loss
def one_hot_embedding (labels, num_classes):
revenir torch.eye (num_classes)[labels.data.long().cpu()]
classe BCE_Loss (nn.Module):
def __init __ (self, num_classes):
super () .__ init __ ()
self.num_classes = num_classes
def transmettre (self, pred, targ):
t = one_hot_embedding (targ, self.num_classes + 1)
t = V (t[:,:-1].contiguous ()). cpu ()
x = pred[:,:-1]
w = self.get_weight (x, t)
revenir F.binary_cross_entropy_with_logits (x, t, w, size_average = False) /self.num_classes

def get_weight (self, x, t): revenir Aucun

loss_f = BCE_Loss (len (id2cat))def get_y (bbox, clas):
bbox = bbox.view (-1,4) / sz
bb_keep = ((bbox[:,2]-bbox[:,0])> 0) .nonzero ()[:,0]
revenir bbox[bb_keep], clas[bb_keep]
def actn_to_bb (actn, ancres):
actn_bbs = torch.tanh (actn)
actn_centers = (actn_bbs[:,:2]/ 2 * grid_sizes) + ancres[:,:2]
actn_hw = (actn_bbs[:,2:]/ 2 + 1) * ancres[:,2:]
revenir hw2corners (actn_centers, actn_hw)
def intersecter (box_a, box_b):
max_xy = torch.min (box_a[:, None, 2:], box_b[[Aucun,:, 2:])
min_xy = torch.max (box_a[:, None, :2], box_b[[Aucun,:,: 2])
inter = torch.clamp ((max_xy - min_xy), min = 0)
revenir entre[:, :, 0] * inter[:, :, 1]
def box_sz (b): revenir ((b[:, 2]-b[:, 0]) * (b[:, 3]-b[:, 1]))def jaccard (box_a, box_b):
inter = intersecter (box_a, box_b)
union = box_sz (box_a) .unsqueeze (1) + box_sz (box_b) .unsqueeze (0) - inter
revenir inter / union

Nous pouvons tester la fonction de perte sur une sortie batch de notre modèle bbox une fois que nous avons configuré notre jeu de données et notre chargeur de données.

Ici, nous avons réellement besoin d’une classe d’ensembles de données personnalisée pour utiliser ces types de données.

classe BboxDataset (Dataset):
"" "Jeu de données Bbox." ""
def __init __ (self, csv_file, root_dir, transform =Aucun):
"" "
Args:
csv_file (string): chemin d'accès au fichier csv avec des cadres de sélection.
root_dir (string): Répertoire avec toutes les images.
transform (callable, optionnel): transform optionnel.
"" "
self.label = pd.read_csv (csv_file)
self.root_dir = root_dir
self.transform = transformer
self.sz = 224
def __len __ (auto):
revenir len (self.label)
def __getitem __ (self, idx):
img_name = os.path.join (self.root_dir,
self.label.iloc[idx, 0])
image = io.imread (img_name)
échantillon = image

h, w = sample.shape[:2]; new_h, new_w = (224,224)
bb = np.array ([float(x)[float(x)pour X dans self.label.iloc[idx, 1].Divisé(' ')], dtype = np.float32)
bb = np.reshape (bb, (int (bb.shape[0]/ 2), 2))
bb = bb * [new_h / h, new_w / w]
bb = bb.flatten ()
bb = T (np.concatenate ((np.zeros ((189 * 4) - len (bb)), bb), axis = None)) # 189 est 21 * 9 où 9 = k

si self.transform:
échantillon = self.transform (échantillon)
revenir échantillon, bb
mbb.csv

Cette classe d’ensembles de données personnalisée traite des boîtes de limitation, mais nous voulons une classe d’ensemble de données qui traitera à la fois des classes et des boîtes de limitation.

bb_dataset = BboxDataset (csv_file = 'data / pascal / tmp / mbb.csv',
root_dir = 'data / pascal / VOCdevkit2 / VOC2007 / JPEGImages /',
transform = transforms.Compose ([[
transforms.ToPILImage (),
transforms.Resize ((224,224)),
transforms.ToTensor (),
transforms.Normalize (
(0,5, 0,5, 0,5), (0,5, 0,5, 0,5))]))
bb_dataloader = Chargeur de données (bb_dataset, batch_size = 16, shuffle =Vrai)

Ici, nous pouvons concaténer les deux classes d'ensembles de données afin que, pour chaque image, les classes et les cadres de sélection soient renvoyés.

classe ConcatLblDataset (Jeu de données):
def __init __ (self, ds, y2):
self.ds, self.y2 = ds, y2
self.sz = ds.sz
def __len __ (auto): revenir len (self.ds)

def __getitem __ (moi-même, i):
self.y2[i] = np.concatenate ((np.zeros (189 - len (self.y2[i])), self.y2[i]), axe = Aucun)
x, y = self.ds[i]
revenir (x, (y, self.y2[i]))

trn_ds2 = ConcatLblDataset (bb_dataset, mcs)

mcs est un tableau numpy de tableaux avec les classes de chaque image de formation.

PATH_pascal = Path ('data / pascal')
trn_j = json.load ((PATH_pascal / 'pascal_train2007.json'). open ())
chats = dict ((o['id']o['name']) pour o dans trn_j['categories'])
mc =[[chats[p[[cats[p[1]] pour p dans trn_anno[o]] pour o dans trn_ids]
id2cat = list (cats.values ​​())
cat2id = {v: k pour k, v dans énumérer (id2cat)}
mcs = np.array ([np.array([cat2id[p] pour p dans o]) pour o dans mc])

Nous pouvons maintenant tester notre perte personnalisée.

sz = 224
x, y = suivant(iter(bb_dataloader2))
batch = modelss.model (x)ssd_loss (batch, y, Vrai)tenseur ([0.6254])
tenseur ([0.6821, 0.7257, 0.4922])
tenseur ([0.9563])
tenseur ([0.6522, 0.5276, 0.6226])
tenseur ([0.6811, 0.3338])
tenseur ([0.7008])
tenseur ([0.5316, 0.2926])
tenseur ([0.9422])
tenseur ([0.5487, 0.7187, 0.3620, 0.1578])
tenseur ([0.6546, 0.3753, 0.4231, 0.4663, 0.2125, 0.0729])
tenseur ([0.3756, 0.5085])
tenseur ([0.2304, 0.1390, 0.0853])
tenseur ([0.2484])
tenseur ([0.6419])
tenseur ([0.5954, 0.5375, 0.5552])
tenseur ([0.2383])
loc: 1.844399333000183, clas: 79.79206085205078

En dehors[1024]:

tenseur (81,6365, grad_fn =)

Maintenant, pour former le modèle SSD.

beta1 = 0.5
optimiseur = optim.Adam (modelss.model.parameters (), lr = 1e-3, betas = (beta1, 0.99))
# Decay LR par un facteur de * gamma * à chaque * taille de pas *
exp_lr_scheduler = lr_scheduler.StepLR (optimiseur, step_size = 7, gamma = 0,1)

Nous pouvons utiliser essentiellement le même train_modèle () fonctionne comme auparavant, mais cette fois nous passons une liste des boîtes englobantes et des classes à la fonction de perte ssd_loss () .

Nous avons à présent formé nos deux modèles sur nos nouveaux jeux de données de formation et nous sommes prêts à les utiliser pour l'inférence dans notre jeu de simulateur de camion.

Je vous encourage à consulter ce dépôt Github pour connaître la mise en œuvre complète, dans laquelle vous pouvez former les modèles, enregistrer les données de formation et tester la mise en œuvre sur le jeu vidéo.

S'amuser!

Afficher plus

SupportIvy

SupportIvy.com : Un lieu pour partager le savoir et mieux comprendre le monde. Meilleure plate-forme de support gratuit pour vous, Documentation &Tutoriels par les experts.

Articles similaires

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *

Bouton retour en haut de la page
Fermer