DENSITY ESTIMATION USING REAL NVP

3 minute read

Keras 공식 홈페이지의 https://keras.io/examples/generative/real_nvp/의 코드를 기반으로 작성되었습니다. Real NVP 논문: DENSITY ESTIMATION USING REAL NVP

Real NVP

Normalizing Flow를 이용한 분포 추정에서, 널리 사용되는 방법 중 하나인 Real NVP를 tensorflow와 keras로 구현해보겠다. 해당 논문에 관한 이론적인 내용은 기회가 된다면 추후 포스팅 할 예정입니다!

0. import

import tensorflow as tf
import tensorflow.keras as K
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
print('TensorFlow version:', tf.__version__)
print('Eager Execution Mode:', tf.executing_eagerly())
print('available GPU:', tf.config.list_physical_devices('GPU'))
from tensorflow.python.client import device_lib
print('==========================================')
print(device_lib.list_local_devices())
TensorFlow version: 2.4.0 
Eager Execution Mode: True 
available GPU: [] 
========================================== 
[name: "/device:CPU:0" 
device_type: "CPU" 
memory_limit: 268435456 
locality { } 
incarnation: 10055035958512198800 
]

1. dataset

dataset은 scikit-learn에서 제공하는 moons dataset을 이용하였다.

from sklearn.datasets import make_moons
import numpy as np
import matplotlib.pyplot as plt

data = make_moons(3000, noise=0.05)[0].astype("float32")
data.shape
(3000, 2)

2. 모형 정의와 학습에 필요한 parameter 정의

PARAMS = {
    'data_dim': 2,
    'embedding_dim': 256,
    'reg': 0.01,
    'coupling_MLP_num': 4,
    'K': 8,
    'latent_dim': 2,
    'nf_dim': 1,
}

3. Coupling layer

coupling layer는 relu activation을 갖는 hidden layer로 구성된 MLP이다. hidden layer의 개수나 units의 수는 적합하고자 하는 데이터의 분포에 맞게 조절하면 된다!

#%%
class CouplingLayer(K.models.Model):
    def __init__(self, params, embedding_dim, output_dim, activation, name='CouplingLayer', **kwargs):
        super(CouplingLayer, self).__init__(name=name, **kwargs)
        
        self.params = params
        self.embedding_dim = embedding_dim
        self.output_dim = output_dim
        self.activation = activation
        self.dense = [
            layers.Dense(self.embedding_dim, activation='relu', kernel_regularizer=K.regularizers.l2(self.params['reg'])) 
            for _ in range(self.params['coupling_MLP_num'])
            ] + [
            layers.Dense(self.output_dim, activation=self.activation, kernel_regularizer=K.regularizers.l2(self.params['reg']))
            ]
        
    def call(self, x):
        for d in self.dense:
            x = d(x)
        return x

4. Normalizing Flow

모형의 loss를 log_likelihood, train과 test에 필요한 step을 각각 train_step, test_step 그리고 역변환은 inverse라고 정의하였다.

이때, 중요한 점은 @propertyself.loss_tracker를 이용해 모형에서 사용되는 metric을 loss들의 평균으로 사용하였다.

그리고 mask를 이용한 일반적인 coupling layer를 구현하는 과정에서 tf.repeat이 필요하다. 이 연산은 정확한 차원이 사전에 정의되지 않게 만들어 model.fit()을 사용할 수 없게 만든다. 따라서, build_graph(self)라는 함수를 이용해 dummy 값을 넣어주어 모형 내부의 차원이 모두 정의될 수 있도록 하였다.

#%%
class NormalizingFlow(K.models.Model):
    def __init__(self, params, name='NormalizingFlow', **kwargs):
        super(NormalizingFlow, self).__init__(name=name, **kwargs)
        
        self.params = params
        self.mask = np.array(
            [[0, 1], [1, 0]] * (self.params['K'] // 2), dtype="float32"
        )
        self.s = [CouplingLayer(self.params, self.params['embedding_dim'], self.params['nf_dim'], activation='tanh')
                    for _ in range(self.params['K'])]
        self.t = [CouplingLayer(self.params, self.params['embedding_dim'], self.params['nf_dim'], activation='linear')
                    for _ in range(self.params['K'])]
        
        self.loss_tracker = K.metrics.Mean(name="loss")
        
    @property
    def metrics(self):
        """List of the model's metrics.
        We make sure the loss tracker is listed as part of `model.metrics`
        so that `fit()` and `evaluate()` are able to `reset()` the loss tracker
        at the start of each epoch and at the start of an `evaluate()` call.
        """
        return [self.loss_tracker]
        
    def call(self, x, sum_log_abs_det_jacobians=None):
        if sum_log_abs_det_jacobians is None:
            sum_log_abs_det_jacobians = 0
        log_abs_det_jacobian = 0
        
        for i in range(self.params['K']):
            x_masked = tf.boolean_mask(x, self.mask[i], axis=1)
            x = (
                x * self.mask[i]
                + (1 - self.mask[i])
                * (x * tf.repeat(tf.math.exp(self.s[i](x_masked)), 2, axis=1) 
                   + 
                   tf.repeat(self.t[i](x_masked), 2, axis=1))
            )
            
            log_abs_det_jacobian += tf.reduce_sum(self.s[i](x_masked), axis=-1)
        sum_log_abs_det_jacobians += log_abs_det_jacobian
        return x, sum_log_abs_det_jacobians
    
    def inverse(self, x):
        for i in reversed(range(self.params['K'])):
            x_masked = tf.boolean_mask(x, self.mask[i], axis=1)
            x = (
                x * self.mask[i]
                + (1 - self.mask[i])
                * ((x - tf.repeat(self.t[i](x_masked), 2, axis=1))
                   *
                   tf.repeat(tf.math.exp(- self.s[i](x_masked)), 2, axis=1))
            )
        return x
    
    def build_graph(self):
        dummy = tf.random.normal((1, self.params['data_dim']))
        _ = self(dummy)
        return print('Graph is built!')
    
    def log_likelihood(self, x):
        x, sum_log_abs_det_jacobians = self(x)
        loss = tf.reduce_mean(tf.reduce_sum(tf.square(x), axis=1) / 2 - sum_log_abs_det_jacobians)
        return loss
    
    def train_step(self, data):
        with tf.GradientTape() as tape:
            loss = self.log_likelihood(data)

        grad = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(grad, self.trainable_variables))
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}    
    
    def test_step(self, data):
        loss = self.log_likelihood(data)
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

5. 모형 선언 및 적합

이전 section에서 언급한 것처럼, model.fit()이전에 model.build_graph()를 이용해 모형 내부의 차원을 먼저 정확히 정의되도록 한다.

model = NormalizingFlow(PARAMS)
model.build_graph()
model.summary()

lr_schedule = K.optimizers.schedules.ExponentialDecay(
    0.0001, 500, 0.95, staircase=False
)
model.compile(optimizer=K.optimizers.Adam(learning_rate=lr_schedule))

history = model.fit(
    data, batch_size=256, epochs=200, verbose=1, validation_split=0.2
)
Epoch 200/200 
10/10 [==============================] - 0s 35ms/step - loss: -1.4918 - val_loss: -1.4159

6. 결과 확인

plt.figure(figsize=(10, 5))
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.title("model loss")
plt.legend(["train", "validation"], loc="upper right")
plt.ylabel("loss")
plt.xlabel("epoch")

# From data to latent space.
z, _ = model(data)

# From latent space to data.
samples = tf.random.normal((3000, 2))
x = model.inverse(samples)

f, axes = plt.subplots(2, 2)
f.set_size_inches(15, 10)

axes[0, 0].scatter(data[:, 0], data[:, 1], color="r")
axes[0, 0].set(title="Inference data space X", xlabel="x", ylabel="y")
axes[0, 1].scatter(z[:, 0], z[:, 1], color="r")
axes[0, 1].set(title="Inference latent space Z", xlabel="x", ylabel="y")
axes[1, 0].scatter(samples[:, 0], samples[:, 1], color="g")
axes[1, 0].set(title="Generated latent space Z", xlabel="x", ylabel="y")
axes[1, 1].scatter(x[:, 0], x[:, 1], color="g")
axes[1, 1].set(title="Generated data space X", label="x", ylabel="y")
plt.show()
f, axes = plt.subplots(1, 2)
f.set_size_inches(10, 5)

z_interpolation = np.linspace([-1, 1], [0.5, 1], 11)
x_interpolation = model.inverse(z_interpolation)

axes[0].scatter(samples[:, 0], samples[:, 1], color="g", alpha=0.05)
axes[0].set(title="Generated latent space Z", xlabel="x", ylabel="y")
axes[0].scatter(z_interpolation[:, 0], z_interpolation[:, 1], color='r', s=50)
axes[1].scatter(x[:, 0], x[:, 1], color="g", alpha=0.05)
axes[1].scatter(x_interpolation[:, 0], x_interpolation[:, 1], color='r', s=50)
axes[1].set(title="Generated data space X", label="x", ylabel="y")
plt.show()

plot을 보면 latent space위의 linear interpolation이 data space에서 data의 분포에 따라 smooth한 interpolation을 구성한다는 것을 보여주고 있다.

Reference

  • Dinh, L., Sohl-Dickstein, J., & Bengio, S. (2016). Density estimation using real nvp. arXiv preprint arXiv:1605.08803.

Comments