Neste tutorial prático, forneceremos a você uma reimplementação do método de aprendizado auto-supervisionado Simclr para extratores de recursos robustos pré-treinantes. Este método é bastante geral e pode ser aplicado a qualquer conjunto de dados de visão, além de diferentes tarefas a jusante.
Em um tutorial anterior, Escrevi um pouco de experiência na arena de aprendizado auto-supervisionada. Hora de entrar em seu primeiro projeto executando o SimClr em um pequeno conjunto de dados com 100k imagens não marcadas chamadas STL10.
Código é Disponível no GitHub.
O método SIMCLR: aprendizado contrastante
Deixar Observe o produto DOT entre 2 normalizado e vetores (ou seja, similaridade de cosseno).
Em seguida, a função de perda para um par positivo de exemplos (i, j) é definido como:
onde é uma função indicadora avaliando para 1 sef . Para obter mais informações sobre essa verificação de como vamos indexar a matriz de similaridade para obter os pontos positivos e os negativos.
indica um parâmetro de temperatura. A perda final é calculada resumindo todos os pares positivos e dividir por
Existem diferentes maneiras de desenvolver perda contrastiva. Aqui fornecemos algumas informações importantes.
L2 Normalização e cálculo da matriz de similaridade de cosseno
Primeiro, é preciso aplicar uma normalização de L2 aos recursos; caso contrário, esse método não funciona. A normalização de L2 significa que os vetores são normalizados de modo que todos fiquem na superfície da esfera da unidade (hiper), onde a norma L2 é 1.
z_i = F.normalize(proj_1, p=2, dim=1)
z_j = F.normalize(proj_2, p=2, dim=1)
Concatenar as 2 vistas de saída na dimensão do lote. A forma deles será . Em seguida, calcule a similaridade/logits de todos os pares. Isso pode ser implementado por uma multiplicação da matriz da seguinte maneira. A forma de saída é igual a
def calc_similarity_batch(self, a, b):
representations = torch.cat((a, b), dim=0)
return F.cosine_similarity(representations.unsqueeze(1), representations.unsqueeze(0), dim=2)
Indexando a matriz de similaridade para a função de perda SIMCLR
Agora precisamos indexar a matriz resultante de tamanho apropriadamente.
Uma ilustração visual de SimClr. Imagem do autor
Ok, como diabos fazemos isso? Eu tive a mesma pergunta. Aqui, o tamanho do lote são 2 imagens, mas queremos implementar uma solução para qualquer tamanho de lote. Se você olhar de perto, verá que os pares positivos são deslocados da diagonal principal por 2, esse é o tamanho do lote. Uma maneira de fazer isso é torch.diag()
. É necessário a diagonal escolhida de uma matriz. O primeiro parâmetro é a matriz e a segunda especifica a diagonal, onde zero representa os principais elementos diagonais. Tomamos as diagonais que são deslocadas pelo tamanho do lote.
sim_ij = torch.diag(similarity_matrix, batch_size)
sim_ji = torch.diag(similarity_matrix, -batch_size)
positives = torch.cat((sim_ij, sim_ji), dim=0)
Há pares positivos. Outro exemplo para (6,6) Matrix (Batch_size = 3, Views = 2) é ter uma máscara que se parece exatamente com o seguinte:
(0., 0., 0., 1., 0., 0.),
(0., 0., 0., 0., 1., 0.),
(0., 0., 0., 0., 0., 1.),
(1., 0., 0., 0., 0., 0.),
(0., 1., 0., 0., 0., 0.),
(0., 0., 1., 0., 0., 0.)
Para o denominador, precisamos dos pares positivos e negativos. Portanto, a máscara binária será o elemento exato inverso da matriz de identidade.
self.mask = (~torch.eye(batch_size * 2, batch_size * 2, dtype=bool)).float()
pos_and_negatives = self.mask * similarity_matrix
Novamente, eles são os positivos e os negativos no denominador.
Você pode entender o resto (escala de temperatura e somando os negativos do denominador etc.):
Implementação de perda do SIMCLR
import torch
import torch.nn as nn
import torch.nn.functional as F
def device_as(t1, t2):
"""
Moves t1 to the device of t2
"""
return t1.to(t2.device)
class ContrastiveLoss(nn.Module):
"""
Vanilla Contrastive loss, also called InfoNceLoss as in SimCLR paper
"""
def __init__(self, batch_size, temperature=0.5):
super().__init__()
self.batch_size = batch_size
self.temperature = temperature
self.mask = (~torch.eye(batch_size * 2, batch_size * 2, dtype=bool)).float()
def calc_similarity_batch(self, a, b):
representations = torch.cat((a, b), dim=0)
return F.cosine_similarity(representations.unsqueeze(1), representations.unsqueeze(0), dim=2)
def forward(self, proj_1, proj_2):
"""
proj_1 and proj_2 are batched embeddings (batch, embedding_dim)
where corresponding indices are pairs
z_i, z_j in the SimCLR paper
"""
batch_size = proj_1.shape(0)
z_i = F.normalize(proj_1, p=2, dim=1)
z_j = F.normalize(proj_2, p=2, dim=1)
similarity_matrix = self.calc_similarity_batch(z_i, z_j)
sim_ij = torch.diag(similarity_matrix, batch_size)
sim_ji = torch.diag(similarity_matrix, -batch_size)
positives = torch.cat((sim_ij, sim_ji), dim=0)
nominator = torch.exp(positives / self.temperature)
denominator = device_as(self.mask, similarity_matrix) * torch.exp(similarity_matrix / self.temperature)
all_losses = -torch.log(nominator / torch.sum(denominator, dim=1))
loss = torch.sum(all_losses) / (2 * self.batch_size)
return loss
Aumentos
A chave para o aprendizado de representação auto-supervisionado são os aumentos de dados. Um pipeline de transformação comumente usado é o seguinte:
-
Corte em uma escala aleatória de 7% a 100% da imagem
-
Redimensione todas as imagens para 224 ou outras dimensões espaciais.
-
Aplique o lançamento horizontal com 50% de probabilidade
-
Aplique estofaturas de cores pesadas com 80% de probabilidade
-
Aplique o desfoque gaussiano com 50% de probabilidade. O tamanho do kernel geralmente é de cerca de 10% da imagem ou menos.
-
Converta imagens RGB em escala de cinza com 20% de probabilidade.
-
Normalizar com base nos meios e variações do imagenet
Este pipeline será aplicado independentemente a cada imagem duas vezes e produzirá duas vistas diferentes que serão alimentadas no modelo de backbone. Neste caderno, usaremos um resnet18 padrão.
import torch
import torchvision.transforms as T
class Augment:
"""
A stochastic data augmentation module
Transforms any given data example randomly
resulting in two correlated views of the same example,
denoted x ̃i and x ̃j, which we consider as a positive pair.
"""
def __init__(self, img_size, s=1):
color_jitter = T.ColorJitter(
0.8 * s, 0.8 * s, 0.8 * s, 0.2 * s
)
blur = T.GaussianBlur((3, 3), (0.1, 2.0))
self.train_transform = torch.nn.Sequential(
T.RandomResizedCrop(size=img_size),
T.RandomHorizontalFlip(p=0.5),
T.RandomApply((color_jitter), p=0.8),
T.RandomApply((blur), p=0.5),
T.RandomGrayscale(p=0.2),
T.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
)
def __call__(self, x):
return self.train_transform(x), self.train_transform(x)
Abaixo estão 4 vistas diferentes da mesma imagem aplicando o mesmo pipeline estocástico:
4 Aumentação diferente do mesmo com o mesmo pipeline. Imagem por autor
Para visualizá-los, você precisa desfazer a normalização do STD e colocar os canais de cores na última dimensão:
def imshow(img):
"""
shows an imagenet-normalized image on the screen
"""
mean = torch.tensor((0.485, 0.456, 0.406), dtype=torch.float32)
std = torch.tensor((0.229, 0.224, 0.225), dtype=torch.float32)
unnormalize = T.Normalize((-mean / std).tolist(), (1.0 / std).tolist())
npimg = unnormalize(img).numpy()
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
dataset = STL10("./", split='train', transform=Augment(96), download=True)
imshow(dataset(99)(0)(0))
imshow(dataset(99)(0)(0))
imshow(dataset(99)(0)(0))
imshow(dataset(99)(0)(0))
Modifique o resnet18 e defina grupos de parâmetros
Uma etapa importante para executar o SIMCLR é remover a última camada totalmente conectada. Vamos substituí -lo por uma função de identidade. Em seguida, precisamos adicionar a cabeça de projeção (outro MLP) que será usado apenas para o estágio de pré-treinamento auto-supervisionado. Para fazer isso, precisamos estar cientes da dimensão dos recursos do nosso modelo. Em particular, o Resnet18 produz um vetor de 512-DIM, enquanto o RESNET50 produz um vetor 2048-DIM. O MLP de projeção o transformaria no tamanho do vetor de incorporação, que é 128, com base no artigo oficial.
Para otimizar os modelos SSL, usamos pesados Técnicas de regularizaçãocomo decaimento de peso. Para evitar deterioração do desempenho, precisamos excluir a decaimento do peso das camadas de normalização do lote.
import pytorch_lightning as pl
import torch
import torch.nn.functional as F
from pl_bolts.optimizers.lr_scheduler import LinearWarmupCosineAnnealingLR
from torch.optim import SGD, Adam
class AddProjection(nn.Module):
def __init__(self, config, model=None, mlp_dim=512):
super(AddProjection, self).__init__()
embedding_size = config.embedding_size
self.backbone = default(model, models.resnet18(pretrained=False, num_classes=config.embedding_size))
mlp_dim = default(mlp_dim, self.backbone.fc.in_features)
print('Dim MLP input:',mlp_dim)
self.backbone.fc = nn.Identity()
self.projection = nn.Sequential(
nn.Linear(in_features=mlp_dim, out_features=mlp_dim),
nn.BatchNorm1d(mlp_dim),
nn.ReLU(),
nn.Linear(in_features=mlp_dim, out_features=embedding_size),
nn.BatchNorm1d(embedding_size),
)
def forward(self, x, return_embedding=False):
embedding = self.backbone(x)
if return_embedding:
return embedding
return self.projection(embedding)
O próximo passo é separar os parâmetros dos modelos em 2 grupos.
O objetivo do segundo grupo é remover a decaimento do peso das camadas de normalização do lote. No caso de usar o otimizador LARS, você também precisa remover a decaimento do peso dos vieses. Uma maneira de conseguir isso é a seguinte função:
def define_param_groups(model, weight_decay, optimizer_name):
def exclude_from_wd_and_adaptation(name):
if 'bn' in name:
return True
if optimizer_name == 'lars' and 'bias' in name:
return True
param_groups = (
{
'params': (p for name, p in model.named_parameters() if not exclude_from_wd_and_adaptation(name)),
'weight_decay': weight_decay,
'layer_adaptation': True,
},
{
'params': (p for name, p in model.named_parameters() if exclude_from_wd_and_adaptation(name)),
'weight_decay': 0.,
'layer_adaptation': False,
},
)
return param_groups
Não estou usando o otimizador LARS neste tutorial, mas se você planeja usá -lo aqui é uma implementação que eu uso como referência.
Lógica de treinamento SIMCLR
Aqui, implementaremos toda a lógica de treinamento do SIMCLR. Veja 2 visualizações, encaminhe -as para obter as projeções de incorporação e calcule a perda SIMCLR.
Podemos encerrar o treinamento SIMCLR com uma classe usando Pytorch Lightning Isso encapsula toda a lógica de treinamento. Na sua forma mais simples, precisamos implementar o training_step
Método que obtém como entrada um lote do Dataloader. Você pode pensar nisso como chamando batch = next(iter(dataloader))
em cada etapa. Em seguida vem o configure_optimizers
Método que vincula o modelo ao otimizador e o agendador de treinamento. Eu usei um agendador já implementado da Pytorch Lightning parafusos (Outro pequeno pacote no ecossistema de raios). Essencialmente, aumentamos gradualmente a taxa de aprendizado em seu valor base e, em seguida, fazemos o recozimento cosseno.
class SimCLR_pl(pl.LightningModule):
def __init__(self, config, model=None, feat_dim=512):
super().__init__()
self.config = config
self.augment = Augment(config.img_size)
self.model = AddProjection(config, model=model, mlp_dim=feat_dim)
self.loss = ContrastiveLoss(config.batch_size, temperature=self.config.temperature)
def forward(self, X):
return self.model(X)
def training_step(self, batch, batch_idx):
x, labels = batch
x1, x2 = self.augment(x)
z1 = self.model(x1)
z2 = self.model(x2)
loss = self.loss(z1, z2)
self.log('Contrastive loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
return loss
def configure_optimizers(self):
max_epochs = int(self.config.epochs)
param_groups = define_param_groups(self.model, self.config.weight_decay, 'adam')
lr = self.config.lr
optimizer = Adam(param_groups, lr=lr, weight_decay=self.config.weight_decay)
print(f'Optimizer Adam, '
f'Learning Rate {lr}, '
f'Effective batch size {self.config.batch_size * self.config.gradient_accumulation_steps}')
scheduler_warmup = LinearWarmupCosineAnnealingLR(optimizer, warmup_epochs=10, max_epochs=max_epochs,
warmup_start_lr=0.0)
return (optimizer), (scheduler_warmup)
Acumulação de gradiente e tamanho eficaz do lote
Aqui é crucial destacar a importância de usar um grande tamanho em lote. Este método depende fortemente de um grande tamanho de lote para afastar -se das duas visualizações da mesma imagem (positivas). Para fazer isso com um orçamento restrito, podemos usar acumulação de gradiente. Nós calculamos a média dos gradientes de etapa e atualize o modelo, em vez de atualizar após cada passe para a frente.
Assim, agora deve fazer total sentido que o lote eficaz seja: . Isso é super fácil de fazer no Pytorch Lightning usando uma função de retorno de chamada.
“Na programação de computadores, um ligar de volta é uma referência ao código executável ou a um código executável que é passado como um argumento para outro código. Isso permite que uma camada de software de nível inferior chame uma sub-rotina (ou função) definida em uma camada de nível superior. ” ~ Stackoverflow
from pytorch_lightning.callbacks import GradientAccumulationScheduler
accumulator = GradientAccumulationScheduler(scheduling={0: train_config.gradient_accumulation_steps})
Script principal de pré -treinamento do SIMCLR
O script principal apenas coleta tudo e inicializa o Trainer
Classe de Pytorch Lightning. Você pode executá -lo em um único ou múltiplo GPUs. Observe que no snippet abaixo, estou lendo todas as GPUs disponíveis do sistema.
import torch
from pytorch_lightning import Trainer
import os
from pytorch_lightning.callbacks import GradientAccumulationScheduler
from pytorch_lightning.callbacks import ModelCheckpoint
from torchvision.models import resnet18
available_gpus = len((torch.cuda.device(i) for i in range(torch.cuda.device_count())))
save_model_path = os.path.join(os.getcwd(), "saved_models/")
print('available_gpus:',available_gpus)
filename='SimCLR_ResNet18_adam_'
resume_from_checkpoint = False
train_config = Hparams()
reproducibility(train_config)
save_name = filename + '.ckpt'
model = SimCLR_pl(train_config, model=resnet18(pretrained=False), feat_dim=512)
data_loader = get_stl_dataloader(train_config.batch_size)
accumulator = GradientAccumulationScheduler(scheduling={0: train_config.gradient_accumulation_steps})
checkpoint_callback = ModelCheckpoint(filename=filename, dirpath=save_model_path,every_n_val_epochs=2,
save_last=True, save_top_k=2,monitor='Contrastive loss_epoch',mode='min')
if resume_from_checkpoint:
trainer = Trainer(callbacks=(accumulator, checkpoint_callback),
gpus=available_gpus,
max_epochs=train_config.epochs,
resume_from_checkpoint=train_config.checkpoint_path)
else:
trainer = Trainer(callbacks=(accumulator, checkpoint_callback),
gpus=available_gpus,
max_epochs=train_config.epochs)
trainer.fit(model, data_loader)
trainer.save_checkpoint(save_name)
from google.colab import files
files.download(save_name)
Afinação
Ok, treinamos um modelo. Agora é hora de ajustar fino. Usaremos a classe Pytorch Lightning Module para encapsular a lógica. Estou pegando a espinha dorsal do resnet18 pré -terenciada, sem a cabeça da projeção, e estou adicionando apenas uma camada linear por cima. Estou bem ajustando toda a rede. Nenhum aumento é aplicado aqui. Eles só atrasariam o treinamento. Em vez disso, gostaríamos de quantificar o desempenho em relação a pesos pré -terenciados no ImageNet e na inicialização aleatória.
import pytorch_lightning as pl
import torch
from torch.optim import SGD
class SimCLR_eval(pl.LightningModule):
def __init__(self, lr, model=None, linear_eval=False):
super().__init__()
self.lr = lr
self.linear_eval = linear_eval
if self.linear_eval:
model.eval()
self.mlp = torch.nn.Sequential(
torch.nn.Linear(512,10),
)
self.model = torch.nn.Sequential(
model, self.mlp
)
self.loss = torch.nn.CrossEntropyLoss()
def forward(self, X):
return self.model(X)
def training_step(self, batch, batch_idx):
x, y = batch
z = self.forward(x)
loss = self.loss(z, y)
self.log('Cross Entropy loss', loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
predicted = z.argmax(1)
acc = (predicted == y).sum().item() / y.size(0)
self.log('Train Acc', acc, on_step=False, on_epoch=True, prog_bar=True, logger=True)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
z = self.forward(x)
loss = self.loss(z, y)
self.log('Val CE loss', loss, on_step=True, on_epoch=True, prog_bar=False, logger=True)
predicted = z.argmax(1)
acc = (predicted == y).sum().item() / y.size(0)
self.log('Val Accuracy', acc, on_step=True, on_epoch=True, prog_bar=True, logger=True)
return loss
def configure_optimizers(self):
if self.linear_eval:
print(f"\n\n Attention! Linear evaluation \n")
optimizer = SGD(self.mlp.parameters(), lr=self.lr, momentum=0.9)
else:
optimizer = SGD(self.model.parameters(), lr=self.lr, momentum=0.9)
return (optimizer)
É importante ressaltar que o STL10 é um subconjunto O ImageNet, portanto, transfere o aprendizado do ImageNet, deverá funcionar muito bem.
Método | Finetunning toda a rede, precisão de validação | Avaliação linear. Precisão de validação |
SIMCLR pré -treinamento no STL10 Split não marcado | 75,1% | 73,2 % |
Imagenet pré -treinamento (1m) | 87,9% | 78,6 % |
Inicialização aleatória | 50,6 % | – |
Em todos os casos, o modelo excessivo durante o Finetuning. Lembre -se de que nenhum aumento foi aplicado.
Conclusão
Mesmo com uma avaliação injusta em comparação com os pesos pré-terenciados da ImageNet, o aprendizado auto-supervisionado contrastivo demonstra alguns resultados super promissores. Existem muitos outros métodos auto-supervisionados para brincar, mas o SIMCLR é a linha de base.
Para encerrar, exploramos como construir passo a passo a função de perda SIMCLR e iniciamos um script de treinamento sem muito código de caldeira com pytorch-Lightning. Embora exista uma lacuna entre as representações aprendidas do SIMCLR, os mais recentes métodos de ponta estão alcançando e até superam os recursos aprendidos do ImageNet em muitos domínios.
Obrigado pelo seu interesse na IA e permaneça positivo!
* Divulgação: Observe que alguns dos links acima podem ser links de afiliados e, sem custo adicional, ganharemos uma comissão se você decidir fazer uma compra depois de clicar.

Luis es un experto en Ciberseguridad, Computación en la Nube, Criptomonedas e Inteligencia Artificial. Con amplia experiencia en tecnología, su objetivo es compartir conocimientos prácticos para ayudar a los lectores a entender y aprovechar estas áreas digitales clave.