본문 바로가기

연구실 공부

DenseNet 코드

728x90

저번에는 DenseNet 논문을 보고 개념을 정리했습니다.

https://github.com/andreasveit/densenet-pytorch

 

GitHub - andreasveit/densenet-pytorch: A PyTorch Implementation for Densely Connected Convolutional Networks (DenseNets)

A PyTorch Implementation for Densely Connected Convolutional Networks (DenseNets) - GitHub - andreasveit/densenet-pytorch: A PyTorch Implementation for Densely Connected Convolutional Networks (Den...

github.com

해당 github에 존재하는 코드를 바탕으로 진행했습니다.

 

class BasicBlock(nn.Module):
    # 일반적인 denseblock(torch.cat을 통해 input과 output을 붙여준다)
    # Batchnorm -> ReLU -> 3x3 Conv2d
    def __init__(self, in_planes, out_planes, dropRate=0.0):
        super(BasicBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.droprate = dropRate
    def forward(self, x):
        out = self.conv1(self.relu(self.bn1(x)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, training=self.training)
        return torch.cat([x, out], 1)

 

위 코드는 기본적인 densenet block을 만들어줍니다. input으로 들어온 내용에 block을 거치고 나온 결과를 붙여준 값을 return 합니다. block은 Batchnorm -> ReLU -> 3x3 Conv2d 과정을 거칩니다.

 

class BottleneckBlock(nn.Module):
    # bottleneck을 적용한 denseblock
    # Batchnorm -> ReLU -> 1x1 Conv2d -> Batchnorm -> ReLU -> 3x3 Conv2d
    def __init__(self, in_planes, out_planes, dropRate=0.0):
        super(BottleneckBlock, self).__init__()
        inter_planes = out_planes * 4
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes, inter_planes, kernel_size=1, stride=1,
                               padding=0, bias=False)
        self.bn2 = nn.BatchNorm2d(inter_planes)
        self.conv2 = nn.Conv2d(inter_planes, out_planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        self.droprate = dropRate
    def forward(self, x):
        out = self.conv1(self.relu(self.bn1(x)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, inplace=False, training=self.training)
        out = self.conv2(self.relu(self.bn2(out)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, inplace=False, training=self.training)
        return torch.cat([x, out], 1)

 

위 코드는 bottleneck 구조를 적용한 densenet block입니다. 논문으로 치면 DenseNet-B로 표현할 수 있습니다. Batchnorm -> ReLU -> 1x1 Conv2d -> Batchnorm -> ReLU -> 3x3 Conv2d 과정을 거칩니다.

 

class TransitionBlock(nn.Module):
    # deseblock 사이에 존재하는 transition block
    # Batchnorm -> ReLU -> 1x1 Conv2d -> average pool2d
    def __init__(self, in_planes, out_planes, dropRate=0.0):
        super(TransitionBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=1,
                               padding=0, bias=False)
        self.droprate = dropRate
    def forward(self, x):
        out = self.conv1(self.relu(self.bn1(x)))
        if self.droprate > 0:
            out = F.dropout(out, p=self.droprate, inplace=False, training=self.training)
        return F.avg_pool2d(out, 2)

 

위 코드는 denseblock 사이에 들어가는 transition layer를 구현한 코드입니다. Batchnorm -> ReLU -> 1x1 Conv2d -> average pool2d 과정을 거칩니다. 여기서 pool2d를 통해 feature map이 downsampling 됩니다.

 

class DenseBlock(nn.Module):
    # denseblock 생성
    def __init__(self, nb_layers, in_planes, growth_rate, block, dropRate=0.0):
        super(DenseBlock, self).__init__()
        self.layer = self._make_layer(block, in_planes, growth_rate, nb_layers, dropRate)
    def _make_layer(self, block, in_planes, growth_rate, nb_layers, dropRate):
        layers = []
        for i in range(nb_layers):
            layers.append(block(in_planes+i*growth_rate, growth_rate, dropRate))
        return nn.Sequential(*layers)
    def forward(self, x):
        return self.layer(x)

class DenseNet3(nn.Module):
    # 2번의 transition block을 거쳐 3가지의 dense block을 만든다
    def __init__(self, depth, num_classes, growth_rate=12,
                 reduction=0.5, bottleneck=True, dropRate=0.0):
        super(DenseNet3, self).__init__()
        in_planes = 2 * growth_rate
        n = (depth - 4) / 3
        if bottleneck == True:
            n = n/2
            block = BottleneckBlock
        else:
            block = BasicBlock
        n = int(n)
        # 1st conv before any dense block
        self.conv1 = nn.Conv2d(3, in_planes, kernel_size=3, stride=1,
                               padding=1, bias=False)
        # 1st block에 들어가기 전에 Conv2d 진행

        # 1st block
        # parameter에 맞춰 block 생성
        self.block1 = DenseBlock(n, in_planes, growth_rate, block, dropRate)
        in_planes = int(in_planes+n*growth_rate)
        # transition block에 들어갈 채널 수를 정의하고 넣어준다
        self.trans1 = TransitionBlock(in_planes, int(math.floor(in_planes*reduction)), dropRate=dropRate)
        in_planes = int(math.floor(in_planes*reduction))

        # 2nd block
        # parameter에 맞춰 block 생성
        self.block2 = DenseBlock(n, in_planes, growth_rate, block, dropRate)
        in_planes = int(in_planes+n*growth_rate)
        # transition block에 들어갈 채널 수를 정의하고 넣어준다
        self.trans2 = TransitionBlock(in_planes, int(math.floor(in_planes*reduction)), dropRate=dropRate)
        in_planes = int(math.floor(in_planes*reduction))

        # 3rd block
        self.block3 = DenseBlock(n, in_planes, growth_rate, block, dropRate)
        in_planes = int(in_planes+n*growth_rate)
        # global average pooling and classifier

        self.bn1 = nn.BatchNorm2d(in_planes)
        self.relu = nn.ReLU(inplace=True)
        self.fc = nn.Linear(in_planes, num_classes)
        self.in_planes = in_planes

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.bias.data.zero_()
    def forward(self, x):
        out = self.conv1(x)
        out = self.trans1(self.block1(out))
        out = self.trans2(self.block2(out))
        out = self.block3(out)
        out = self.relu(self.bn1(out))
        out = F.avg_pool2d(out, 8)
        out = out.view(-1, self.in_planes)
        return self.fc(out)

 

DenseBlock을 parameter에 맞춰 생성해주고 DenseBlock 3개를 포함한 구조를 만드는 클래스입니다.

이 코드를 이용해 CIFAR 10에 대해 10개의 이미지로 분류하는 학습을 진행할 수 있습니다.

 

if __name__ == '__main__':
    batch_size=64
    learning_rate = 0.1
    depth = 100
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    transform_train = transforms.Compose([
        transforms.RandomCrop(32, padding=4),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
    ])

    transform_test = transforms.Compose([
        transforms.ToTensor(),
    ])

    train_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10('./data', train=True, download=True, transform=transform_train),
        batch_size=batch_size, shuffle=True
    )

    test_loader = torch.utils.data.DataLoader(
        datasets.CIFAR10('./data', train=False, transform=transform_test),
        batch_size=batch_size, shuffle=True
    )
    # 데이터 불러오기

    model = densenet.DenseNet3(depth, 10, growth_rate=12, reduction=0.5, bottleneck=True, dropRate=0.0)

    print('Number of model parameters: {}'.format(
        sum([p.data.nelement() for p in model.parameters()])))

    model = model.to(device)

    criterion = nn.CrossEntropyLoss().to(device)
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate,
                                momentum=0.9, nesterov=True, weight_decay=1e-4)

    train(train_loader, model, criterion, optimizer, 30)
    # test(test_loader, model,criterion, 30)

 

이와 같이 데이터를 불러오고 densenet모델을 생성하는 코드를 작성했습니다. CrossEntropyLoss, SGD를 사용해 학습을 진행했습니다.

 

def train(train_loader, model, criterion, optimizer, epoch):
    model.train()
    for i, (input, target) in enumerate(train_loader):
        target = target.to(device)
        input = input.to(device)

        output = model(input)
        loss = criterion(output, target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i % 20 == 0):
            print("epoch %d , step %d loss : %f" % (epoch, i, loss.data))
    torch.save(model, PATH + 'model.pt')

 

이와 같이 train 메서드를 정의해주고 학습이 끝난 후 저장을 했습니다.

 

def test(test_loader, model, criterion, epoch):
    PATH = './weights/'
    model = torch.load(PATH + 'model.pt')
    model.eval()
    correct = 0
    for i, (input, target) in enumerate(test_loader):
        target = target.to(device)
        input = input.to(device)

        output = model(input)
        loss = criterion(output, target)
        _, pred = torch.max(output.data, 1)
        correct += (pred == target).sum().item()
    print("Accuracy : %f" % (epoch, 100.0 * correct/len(test_loader.dataset)))

 

이와 같이 저장된 데이터를 불러 test를 진행할 수 있습니다.