Tensorflow

triplet loss를 활용한 이미지 유사도 측정

카카오그래놀라 2021. 10. 7. 16:05

원문: https://keras.io/examples/vision/siamese_network/

요약: triplet loss 함수를 사용하여 이미지간 유사도를 비교하는 Siamese Network에 대해 알아봅시다.

View in Colab   

GitHub source

 

개요

Siamese Network 는 두 개 이상의 동일한 하위 네트워크를 포함하는 네트워크 아키텍쳐로서, 하위 네트워크들은 각 입력에 대한 특징 벡터를 생성하고 비교하는 역할을 합니다. 

 

이 예제에서는 3개의 동일한 하위 네트워크가 있는 Siamese Network를 사용합니다. 우리는 모델에 3개의 이미지를 제공할 것입니다. 그 중 2개는 유사하고 (Anchor-Positive 이미지) 나머지 1개(Negative 이미지)는 관련이 없습니다. 우리의 목표는 모델이 이미지간 유사성을 추정하는 방법을 배우게 하는 것입니다.

 

NN의 학습에 우리는 triplet loss 함수를 사용할 것입니다. 위 loss는 FaceNet Paper by Schroff et al,. 2015. 를 통해 자세히 살펴볼 수 있습니다. triplet loss는 다음과 같이 정의됩니다.

L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)

 

Dataset으로는  Totally Looks Like dataset by Rosenfeld et al., 2018. 를 활용하였습니다.

 

라이브러리 로드

import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet


target_shape = (200, 200)

 

데이터셋 준비하기

위의 Dataset을 ~/.keras 경로에 압축을 해제합니다.

데이터 셋은 2개의 개별 파일로 구성되어 있습니다.

  • left.zip 은 anchor로서 활용되는 이미지입니다.
  • right.zip Positive(anchor와 닮은 이미지)로서 활용되는 이미지 입니다.
cache_dir = Path(Path.home()) / ".keras"
anchor_images_path = cache_dir / "left"
positive_images_path = cache_dir / "right"
!gdown --id 1jvkbTr_giSP3Ru8OwGNCg6B4PvVbcO34
!gdown --id 1EzBZUb_mh_Dp_FKD0P4XiYYSd0QBH5zW
!unzip -oq left.zip -d $cache_dir
!unzip -oq right.zip -d $cache_dir

 

 

데이터 불러오기

우리는 tf.data 파이프라인을 사용하여 데이터를 로드하고 Siamese Network 훈련하는 데 필요한 triplets(삼중항, 쉽게 말하자면 이미지를 3개씩 불러온다는 의미)를 생성할 것입니다.

Anchor, Positive 및 Negative 이미지들의 파일명을 통해 파이프라인을 설정합니다. 파이프라인은 해당 이미지를 로드하고 전처리합니다.

def preprocess_image(filename):
    image_string = tf.io.read_file(filename)
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image


def preprocess_triplets(anchor, positive, negative):
    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative),
    )

 

tf.data.Dataset을 통해 파이프라인을 구축합시다.

anchor 이미지들의 경로를 담은 anchor_images,

positive 이미지들의 경로를 담은 positive_images 가 있습니다.

그리고 negative 이미지들은 따로 모으는 방식이 아닌 anchor와 positive 이미지들을 합친 다음 랜덤으로 섞어서 활용하겠습니다. (우연히 같은 이미지가 positive와 negative에 활용될 수 있으나, 랜덤하게 섞기 때문에 대부분은 다른 이미지가 negative에 적용됩니다.)

anchor_images = sorted(
    [str(anchor_images_path / f) for f in os.listdir(anchor_images_path)]
)

positive_images = sorted(
    [str(positive_images_path / f) for f in os.listdir(positive_images_path)]
)

image_count = len(anchor_images)

anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)

rng = np.random.RandomState(seed=42)
rng.shuffle(anchor_images)
rng.shuffle(positive_images)

# anchor_images와 positive_images를 합친 다음,
# 다시 한 번 섞어서 negative_images로 사용합니다.
negative_images = anchor_images + positive_images
np.random.RandomState(seed=32).shuffle(negative_images)

negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images)
negative_dataset = negative_dataset.shuffle(buffer_size=4096)

dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))
dataset = dataset.shuffle(buffer_size=1024)
dataset = dataset.map(preprocess_triplets)

train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))

train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(8)

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(8)

 

예시 사진을 보면 1, 2번째 이미지는 비슷하고, 3번째 이미지는 다른 모습을 볼 수 있습니다.

 

def visualize(anchor, positive, negative):
    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(3, 3)
    for i in range(3):
        show(axs[i, 0], anchor[i])
        show(axs[i, 1], positive[i])
        show(axs[i, 2], negative[i])


visualize(*list(train_dataset.take(1).as_numpy_iterator())[0])

 

embedding generator model 정의하기

 Siamese 네트워크는 triplet의 각 이미지에 대한 임베딩을 생성합니다. 이를 위해 ImageNet에서 사전 훈련된 ResNet50 모델을 사용하고 몇 가지 Dense 레이어를 연결하여 이러한 임베딩을 분리하는 방법을 배울 것입니다.

conv5_block1_out 레이어까지 모델의 모든 레이어의 가중치를 고정합니다. 이는 모델이 이미 학습한 가중치에 영향을 미치지 않도록 하는 데 중요합니다. 우리는 훈련하는 동안 가중치를 미세 조정할 수 있도록 맨 아래 몇 개의 레이어를 훈련 가능한 상태로 둘 것입니다.

base_cnn = resnet.ResNet50(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)

flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(512, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
dense2 = layers.Dense(256, activation="relu")(dense1)
dense2 = layers.BatchNormalization()(dense2)
output = layers.Dense(256)(dense2)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    if layer.name == "conv5_block1_out":
        trainable = True
    layer.trainable = trainable

 

Siamese Network model 정의하기

Siamense Network는 각각의 Triplet 이미지를 입력으로 받아 임베딩을 생성하고 Anchor와 Positive 임베딩 사이의 거리와 Anchor와 Negative 임베딩 사이의 거리를 출력합니다.

거리를 계산하기 위해 두 값을 튜플로 반환하는 사용자 지정 레이어 DistanceLayer를 사용할 수 있습니다.

사용자 지정 레이어에 대한 설명은 https://www.tensorflow.org/guide/keras/custom_layers_and_models?hl=ko 를 참조하세요.

class DistanceLayer(layers.Layer):
    """
    이 레이어는 Anchor 임베딩과 Positive 임베딩 그리고 Anchor 임베딩과 Negative 임베딩의 거리를
    계산하는 역할을 합니다.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(resnet.preprocess_input(anchor_input)),
    embedding(resnet.preprocess_input(positive_input)),
    embedding(resnet.preprocess_input(negative_input)),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

 

Model.fit() 사용자 정의하기

 이제 Siamese 네트워크에서 생성된 3개의 임베딩을 사용하여 Triple Loss를 계산할 수 있도록 사용자 지정 training loop가 있는 모델을 구현해야 합니다. train loss를 추적하기 위해 Mean metric 인스턴스를 생성해 보겠습니다.

 아래를 이해하기 위해서는 사용자 정의 모델에 대한 이해가 필요합니다. 

https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit?hl=ko 를 참조하세요.

class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.5):
        super(SiameseModel, self).__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape는 내부에서 수행하는 모든 작업을 기록하는 컨텍스트 관리자입니다.
        # 여기서 손실을 계산하는 데 사용하므로 그래디언트를 가져올 수 있고,
        # `compile()`을 통해 그래디언트를 적용할 수 있습니다.
        
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # 가중치에 대한 손실 함수의 그래디언트를 저장합니다.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # 모델에 지정된 옵티마이저를 통해 그래디언트를 적용합니다.
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # trainig loss를 갱신해줍니다.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # loss를 갱신해줍니다.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # loss 계산하기
        # 우선 계산된 anchor-positive 거리와 anchor-negative 거리를 가져옵니다.
        ap_distance, an_distance = self.siamese_network(data)

        # triplet loss 정의에 따라
        # 두 거리를 빼고, 음수가 나오지 않도록 max(loss+margin, 0)을 적용합니다.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        #`reset_states()`가 자동으로 호출될 수 있도록 여기에 메트릭을 나열해야 합니다.
        return [self.loss_tracker]

 

훈련하기

siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0001))
siamese_model.fit(train_dataset, epochs=10, validation_data=val_dataset)

 

결과 확인하기

우리는 어떻게 네트워크가 유사한 이미지에 따라 임베딩을 분리하는 방법을 학습하는지 확인할 수 있습니다.

우리는 코사인 유사도를 통해 두 임베딩간 유사도를 측정할 수 있습니다.

각 이미지에 대해 생성된 임베딩 간의 유사성을 확인하기 위해 샘플을 가져옵시다.

sample = next(iter(train_dataset))
visualize(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(resnet.preprocess_input(anchor)),
    embedding(resnet.preprocess_input(positive)),
    embedding(resnet.preprocess_input(negative)),
)

 

마지막으로 Anchor와 Positive 사이의 코사인 유사성을 계산하고 Anchor와 Negative 이미지 사이의 유사성과 비교할 수 있습니다.

Anchor와 Positive 사이의 코사인 유사성이 Anchor와 Negative 이미지 사이의 유사성보다 클 것으로 예상해야 합니다.

cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())


# Positive similarity: 0.9940324
# Negative similarity 0.9918252