Keras 공식 홈페이지의의 코드를 기반으로 작성되었습니다. Real NVP 논문: DENSITY ESTIMATION USING REAL NVP
Real NVP
Normalizing Flow를 이용한 분포 추정에서, 널리 사용되는 방법 중 하나인 Real NVP를 tensorflow와 keras로 구현해보겠다. 해당 논문에 관한 이론적인 내용은 기회가 된다면 추후 포스팅 할 예정입니다!
0. import
import tensorflow as tf
import tensorflow.keras as K
from tensorflow.keras import layers
from tensorflow.keras.utils import to_categorical
print('TensorFlow version:', tf.__version__)
print('Eager Execution Mode:', tf.executing_eagerly())
print('available GPU:', tf.config.list_physical_devices('GPU'))
from tensorflow.python.client import device_lib
TensorFlow version: 2.4.0
Eager Execution Mode: True
available GPU: []
[name: "/device:CPU:0"
device_type: "CPU"
memory_limit: 268435456
locality { }
incarnation: 10055035958512198800
1. dataset
dataset은 scikit-learn에서 제공하는 moons dataset을 이용하였다.
from sklearn.datasets import make_moons
import numpy as np
import matplotlib.pyplot as plt
data = make_moons(3000, noise=0.05)[0].astype("float32")
(3000, 2)
2. 모형 정의와 학습에 필요한 parameter 정의
'data_dim': 2,
'embedding_dim': 256,
'reg': 0.01,
'coupling_MLP_num': 4,
'K': 8,
'latent_dim': 2,
'nf_dim': 1,
3. Coupling layer
coupling layer는 relu
activation을 갖는 hidden layer로 구성된 MLP이다. hidden layer의 개수나 units의 수는 적합하고자 하는 데이터의 분포에 맞게 조절하면 된다!
class CouplingLayer(K.models.Model):
def __init__(self, params, embedding_dim, output_dim, activation, name='CouplingLayer', **kwargs):
super(CouplingLayer, self).__init__(name=name, **kwargs)
self.params = params
self.embedding_dim = embedding_dim
self.output_dim = output_dim
self.activation = activation
self.dense = [
layers.Dense(self.embedding_dim, activation='relu', kernel_regularizer=K.regularizers.l2(self.params['reg']))
for _ in range(self.params['coupling_MLP_num'])
] + [
layers.Dense(self.output_dim, activation=self.activation, kernel_regularizer=K.regularizers.l2(self.params['reg']))
def call(self, x):
for d in self.dense:
x = d(x)
return x
4. Normalizing Flow
모형의 loss를 log_likelihood
, train과 test에 필요한 step을 각각 train_step
, test_step
그리고 역변환은 inverse
라고 정의하였다.
이때, 중요한 점은 @property
와 self.loss_tracker
를 이용해 모형에서 사용되는 metric을 loss들의 평균으로 사용하였다.
그리고 mask를 이용한 일반적인 coupling layer를 구현하는 과정에서 tf.repeat
이 필요하다. 이 연산은 정확한 차원이 사전에 정의되지 않게 만들어
을 사용할 수 없게 만든다. 따라서, build_graph(self)
라는 함수를 이용해 dummy 값을 넣어주어 모형 내부의 차원이 모두 정의될 수 있도록 하였다.
class NormalizingFlow(K.models.Model):
def __init__(self, params, name='NormalizingFlow', **kwargs):
super(NormalizingFlow, self).__init__(name=name, **kwargs)
self.params = params
self.mask = np.array(
[[0, 1], [1, 0]] * (self.params['K'] // 2), dtype="float32"
self.s = [CouplingLayer(self.params, self.params['embedding_dim'], self.params['nf_dim'], activation='tanh')
for _ in range(self.params['K'])]
self.t = [CouplingLayer(self.params, self.params['embedding_dim'], self.params['nf_dim'], activation='linear')
for _ in range(self.params['K'])]
self.loss_tracker = K.metrics.Mean(name="loss")
def metrics(self):
"""List of the model's metrics.
We make sure the loss tracker is listed as part of `model.metrics`
so that `fit()` and `evaluate()` are able to `reset()` the loss tracker
at the start of each epoch and at the start of an `evaluate()` call.
return [self.loss_tracker]
def call(self, x, sum_log_abs_det_jacobians=None):
if sum_log_abs_det_jacobians is None:
sum_log_abs_det_jacobians = 0
log_abs_det_jacobian = 0
for i in range(self.params['K']):
x_masked = tf.boolean_mask(x, self.mask[i], axis=1)
x = (
x * self.mask[i]
+ (1 - self.mask[i])
* (x * tf.repeat(tf.math.exp(self.s[i](x_masked)), 2, axis=1)
tf.repeat(self.t[i](x_masked), 2, axis=1))
log_abs_det_jacobian += tf.reduce_sum(self.s[i](x_masked), axis=-1)
sum_log_abs_det_jacobians += log_abs_det_jacobian
return x, sum_log_abs_det_jacobians
def inverse(self, x):
for i in reversed(range(self.params['K'])):
x_masked = tf.boolean_mask(x, self.mask[i], axis=1)
x = (
x * self.mask[i]
+ (1 - self.mask[i])
* ((x - tf.repeat(self.t[i](x_masked), 2, axis=1))
tf.repeat(tf.math.exp(- self.s[i](x_masked)), 2, axis=1))
return x
def build_graph(self):
dummy = tf.random.normal((1, self.params['data_dim']))
_ = self(dummy)
return print('Graph is built!')
def log_likelihood(self, x):
x, sum_log_abs_det_jacobians = self(x)
loss = tf.reduce_mean(tf.reduce_sum(tf.square(x), axis=1) / 2 - sum_log_abs_det_jacobians)
return loss
def train_step(self, data):
with tf.GradientTape() as tape:
loss = self.log_likelihood(data)
grad = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(grad, self.trainable_variables))
return {"loss": self.loss_tracker.result()}
def test_step(self, data):
loss = self.log_likelihood(data)
return {"loss": self.loss_tracker.result()}
5. 모형 선언 및 적합
이전 section에서 언급한 것처럼,
이전에 model.build_graph()
를 이용해 모형 내부의 차원을 먼저 정확히 정의되도록 한다.
model = NormalizingFlow(PARAMS)
lr_schedule = K.optimizers.schedules.ExponentialDecay(
0.0001, 500, 0.95, staircase=False
history =
data, batch_size=256, epochs=200, verbose=1, validation_split=0.2
Epoch 200/200
10/10 [==============================] - 0s 35ms/step - loss: -1.4918 - val_loss: -1.4159
6. 결과 확인
plt.figure(figsize=(10, 5))
plt.title("model loss")
plt.legend(["train", "validation"], loc="upper right")
# From data to latent space.
z, _ = model(data)
# From latent space to data.
samples = tf.random.normal((3000, 2))
x = model.inverse(samples)
f, axes = plt.subplots(2, 2)
f.set_size_inches(15, 10)
axes[0, 0].scatter(data[:, 0], data[:, 1], color="r")
axes[0, 0].set(title="Inference data space X", xlabel="x", ylabel="y")
axes[0, 1].scatter(z[:, 0], z[:, 1], color="r")
axes[0, 1].set(title="Inference latent space Z", xlabel="x", ylabel="y")
axes[1, 0].scatter(samples[:, 0], samples[:, 1], color="g")
axes[1, 0].set(title="Generated latent space Z", xlabel="x", ylabel="y")
axes[1, 1].scatter(x[:, 0], x[:, 1], color="g")
axes[1, 1].set(title="Generated data space X", label="x", ylabel="y")
f, axes = plt.subplots(1, 2)
f.set_size_inches(10, 5)
z_interpolation = np.linspace([-1, 1], [0.5, 1], 11)
x_interpolation = model.inverse(z_interpolation)
axes[0].scatter(samples[:, 0], samples[:, 1], color="g", alpha=0.05)
axes[0].set(title="Generated latent space Z", xlabel="x", ylabel="y")
axes[0].scatter(z_interpolation[:, 0], z_interpolation[:, 1], color='r', s=50)
axes[1].scatter(x[:, 0], x[:, 1], color="g", alpha=0.05)
axes[1].scatter(x_interpolation[:, 0], x_interpolation[:, 1], color='r', s=50)
axes[1].set(title="Generated data space X", label="x", ylabel="y")
plot을 보면 latent space위의 linear interpolation이 data space에서 data의 분포에 따라 smooth한 interpolation을 구성한다는 것을 보여주고 있다.
- Dinh, L., Sohl-Dickstein, J., & Bengio, S. (2016). Density estimation using real nvp. arXiv preprint arXiv:1605.08803.