본문 바로가기

연구실 공부

[Keras] Dogs vs Cats classification

728x90

https://www.kaggle.com/competitions/dogs-vs-cats/overview

 

Dogs vs. Cats | Kaggle

 

www.kaggle.com

해당 대회에서 데이터를 얻고 분류하는 Resnet18을 사용한 모델을 만들어보겠습니다.

 

https://github.com/JeongWooYeol/Keras_Dog-vs-Cat-classification-

 

GitHub - JeongWooYeol/Keras_Dog-vs-Cat-classification-

Contribute to JeongWooYeol/Keras_Dog-vs-Cat-classification- development by creating an account on GitHub.

github.com

모든 코드는 해당 주소에 존재합니다.

 

먼저 데이터를 전부 다운로드합니다. 상당히 많은 이미지 파일이 존재합니다.

 

from PIL import Image
from torch.utils.data import Dataset
import os

class CatDogDataset(Dataset):
    def __init__(self, file_list, dir, mode='train', transform=None):
        self.file_list = file_list
        self.dir = dir
        self.mode = mode
        self.transform = transform
        if self.mode == 'train':
            if 'dog' in self.file_list[0]:
                self.label = 1
            else:
                self.label = 0

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img = Image.open(os.path.join(self.dir, self.file_list[idx]))
        if self.transform:
            img = self.transform(img)
        if self.mode == 'train':
            img = img.numpy()
            return img.astype('float32'), self.label
        else:
            img = img.numpy()
            return img.astype('float32'), self.file_list[idx]

먼저 데이터를 정리합니다. train image의 경우 파일 이름이 강아지인 경우 "dog"으로 시작하고 고양이의 경우 "cat"으로 시작합니다. 그러므로 이미지의 라벨링을 위와 같이 진행해줍니다. dog의 경우 1로, cat의 경우 0으로 지정해줍니다.

 

위 코드에서 작성한 class를 이용해 train 이미지를 불러와 dataloader를 만들어줍니다.

train_dir = './train/train/'
test_dir = './test1/test1/'
train_files = os.listdir(train_dir)
test_files = os.listdir(test_dir)
batch_size = 128

data_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.ColorJitter(),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.Resize(128),
    transforms.ToTensor()
])

cat_files = [tf for tf in train_files if 'cat' in tf]
dog_files = [tf for tf in train_files if 'dog' in tf]

cats = data.CatDogDataset(cat_files, train_dir, transform=data_transform)
dogs = data.CatDogDataset(dog_files, train_dir, transform=data_transform)

catdogs = ConcatDataset([cats, dogs])

dataloader = DataLoader(catdogs, batch_size=32, shuffle=True, num_workers=4)

이번에는 학습할 model을 만들어보겠습니다. torchvision.models를 이용해 모델을 불러와도 되지만 Resnet18 모델을 직접 작성해봤습니다.

 

import torch
import torch.nn as nn


class BasicBlock(nn.Module):
    def __init__(self, in_channels: int,
                 out_channels: int,
                 stride: int = 1):
        super(BasicBlock, self).__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size= 3, stride= stride, padding = 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size = 3, stride = 1, padding = 1, bias = False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )
        self.shortcut = nn.Sequential()
        self.relu = nn.ReLU(inplace = True)
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels = in_channels, out_channels = out_channels * BasicBlock.expansion, kernel_size= 1, stride = stride, bias = False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )
    expansion = 1

    def forward(self, x):
        result = self.residual_function(x) + self.shortcut(x)
        result = self.relu(result)
        return result


class BottleNeck(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1):
        super(BottleNeck, self).__init__()
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels = in_channels, out_channels = out_channels, kernel_size = 1, stride = 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels = out_channels, out_channels = out_channels, kernel_size = 3, stride = stride, padding = 1, bias = False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace = True),
            nn.Conv2d(in_channels = out_channels, out_channels = out_channels * BottleNeck.expansion, kernel_size = 1, stride = 1, bias = False),
            nn.BatchNorm2d(out_channels * BottleNeck.expansion)
        )
        self.shortcut = nn.Sequential()
        self.relu = nn.ReLU(inplace = True)

        if stride != 1 or in_channels != out_channels * BottleNeck.expansion:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels = in_channels, out_channels = out_channels * BottleNeck.expansion, kernel_size = 1, stride = stride, bias = False),
                nn.BatchNorm2d(out_channels * BottleNeck.expansion)
            )
    expansion = 4

    def forward(self, x):
        result = self.residual_function(x) + self.shortcut(x)
        result = self.relu(result)
        return result

class ResNet(nn.Module):
    def __init__(self, block, num_block,  init_weights = True):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size = 7, stride = 2, padding = 3, bias = False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace = True),
            nn.MaxPool2d(kernel_size = 3, stride = 2, padding = 1)
        )
        self.conv2 = self.make_layer(block, 64, num_block[0], 1)
        self.conv3 = self.make_layer(block, 64 * 2, num_block[1], stride = 2)
        self.conv4 = self.make_layer(block, 64 * 4, num_block[2], stride = 2)
        self.conv5 = self.make_layer(block, 64 * 8, num_block[3], stride = 2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Sequential(
            nn.Linear(in_features = 512 * block.expansion, out_features= 500),
            nn.Linear(500, 2)
        )

        if init_weights:
            self.initialize_weights()

    def make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        output = self.conv1(x)
        output = self.conv2(output)
        x = self.conv3(output)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

    def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode = 'fan_out', nonlinearity = 'relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
def resnet18():
    return ResNet(BottleNeck, [2, 2, 2, 2])

 

이제 학습을 진행해보겠습니다.

import numpy as np
import torch, torchvision
from torchvision import transforms
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, ConcatDataset
import torch.nn as nn
import os
import data
import model

def train(dataloader, model):
    device = 'cuda'
    print(model)

    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.002, amsgrad=True)
    scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[500, 1000, 1500], gamma=0.5)

    epochs = 50
    itr = 1
    p_itr = 200
    model.train()
    total_loss = 0
    loss_list = []
    acc_list = []
    for epoch in range(epochs):
        for samples, labels in dataloader:
            samples, labels = samples.to(device), labels.to(device)
            optimizer.zero_grad()
            output = model(samples)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            scheduler.step()

            if itr % p_itr == 0:
                pred = torch.argmax(output, dim=1)
                correct = pred.eq(labels)
                acc = torch.mean(correct.float())
                print(
                    '[Epoch {}/{}] Iteration {} -> Train Loss: {:.4f}, Accuracy: {:.3f}'.format(epoch + 1, epochs, itr,
                                                                                                total_loss / p_itr,
                                                                                                acc))
                loss_list.append(total_loss / p_itr)
                acc_list.append(acc)
                total_loss = 0

            itr += 1

    return model


samples, labels = iter(dataloader).next()
plt.figure(figsize=(16, 24))
grid_imgs = torchvision.utils.make_grid(samples[:24])
np_grid_imgs = grid_imgs.numpy()
# in tensor, image is (batch, width, height), so you have to transpose it to (width, height, batch) in numpy to show it.
plt.imshow(np.transpose(np_grid_imgs, (1, 2, 0)))

best_model = train(dataloader, model.resnet18())
torch.save(best_model.state_dict(), "ResNet18_CatDog.pth")

학습이 완료된 모델을 "ResNet18_CatDog.pth"라는 이름으로 저장합니다. 이제 test를 통해 정확히 결과가 나오는지 확인해보겠습니다.

 

import pandas as pd
import numpy as np
import torch, torchvision
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import os
from data import CatDogDataset
import model


if __name__ == '__main__':
    filename_pth = 'ResNet18_CatDog.pth'
    test_transform = torchvision.transforms.Compose([
        torchvision.transforms.Resize((128, 128)),
        torchvision.transforms.ToTensor()
    ])
    test_dir = './test1/test1/'
    test_files = os.listdir(test_dir)

    testset = CatDogDataset(test_files, test_dir, mode = 'test', transform = test_transform)
    test_dl = DataLoader(testset, batch_size=32, shuffle=False, num_workers=4)

    model = model.resnet18().cuda()
    model.load_state_dict((torch.load(filename_pth, map_location = torch.device('cuda'))), strict = False)
    print(model)

    device = 'cuda'
    model.eval()

    fn_list = []
    pred_list = []
    for x, fn in test_dl:
        with torch.no_grad():
            x = x.to(device)
            output = model(x)
            pred = torch.argmax(output, dim=1)
            fn_list += [n[:-4] for n in fn]
            pred_list += [p.item() for p in pred]

    submission = pd.DataFrame({"id": fn_list, "label": pred_list})
    submission.to_csv('preds_ResNet18.csv', index=False)

    samples, _ = iter(test_dl).next()
    samples = samples.to(device)
    fig = plt.figure(figsize=(24, 16))
    fig.tight_layout()
    output = model(samples[:24])
    pred = torch.argmax(output, dim=1)
    pred = [p.item() for p in pred]
    ad = {0: 'cat', 1: 'dog'}
    for num, sample in enumerate(samples[:24]):
        plt.subplot(4, 6, num + 1)
        plt.title(ad[pred[num]])
        plt.axis('off')
        sample = sample.cpu().numpy()
        plt.imshow(np.transpose(sample, (1, 2, 0)))
    plt.show()

 

잘 분류하는 모습을 볼 수 있습니다.