HELLO CYBERNETICS

深層学習、機械学習、強化学習、信号処理、制御工学、量子計算などをテーマに扱っていきます

TensorFlow eager と edward と PyTorchでDCGAN【ただのコードの羅列】

 

 

follow us in feedly

https://cdn.blog.st-hatena.com/images/theme/og-image-1500.png

はじめに

各ライブラリを利用した時のDCGANのコードを羅列しただけの記事です。 使い方の雰囲気だけでも比較したいときに見てください(基本的にただのメモです)。

それぞれについて要点だけを説明しておきます。

PyTorch

generator

PyTorchにはtorch.nn.Sequentialというクラスがあります。 この中にはtorch.nnモジュールで実装されているクラスを複数個渡すことができます。

 

nn.Sequentialは渡した順番で演算を実行することができるため、あたかもこれで1つの層かのように扱うことができ、 大きなモデルを作りたい場合に非常に便利です。

class Generator(nn.Module):
    
    def __init__(self):
        super(Generator, self).__init__()
        
        self.fc = nn.Sequential(
            nn.Linear(62, 1024),
            nn.BatchNorm1d(1024),
            nn.ReLU(),
            nn.Linear(1024, 128 * 7 * 7),
            nn.BatchNorm1d(128 * 7 * 7),
            nn.ReLU(),
        )
        
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 1, kernel_size=4, stride=2, padding=1),
            nn.Sigmoid(),
        )
        

    def forward(self, input):
        x = self.fc(input)
        x = x.view(-1, 128, 7, 7)
        x = self.deconv(x)
        return x

nn.Sequentialを使うことで、forwardではたった2つの層の順伝播をを書くだけで済みます。

ただし、Linear層からConvTranspose2d層に渡すときにデータの形を変更しなければなりません。viewメソッドはnumpyのreshapeメソッドに相当します。

discriminator

こちらも同様。

class Discriminator(nn.Module):
    
    def __init__(self):
        super(Discriminator, self).__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=4, stride=2, padding=1),
            nn.LeakyReLU(0.2),
            nn.Conv2d(64, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.LeakyReLU(0.2),
        )
        
        self.fc = nn.Sequential(
            nn.Linear(128 * 7 * 7, 1024),
            nn.BatchNorm1d(1024),
            nn.LeakyReLU(0.2),
            nn.Linear(1024, 1),
            nn.Sigmoid(),
        )
        
    
    def forward(self, input):
        x = self.conv(input)
        x = x.view(-1, 128 * 7 * 7)
        x = self.fc(x)
        return x

学習部分

PyTorchでは、モデル毎にgpuへの転送を明示しなければなりません。 しかし、単にcudaメソッドを用いればよく、特に難しいことはありません。

 

optimizerには、そのoptimizerに更新させたいパラメータを予め渡しておきます。 これによってGANのような交互に更新を行いたい場合や、あるいはそれよりも多くのモデルを学習させたい場合に非常に便利です。

G = Generator()
D = Discriminator()
if cuda:
    G.cuda()
    D.cuda()

# optimizer
G_optimizer = optim.Adam(G.parameters(), lr=lr, betas=(0.5, 0.999))
D_optimizer = optim.Adam(D.parameters(), lr=lr, betas=(0.5, 0.999))

# loss
criterion = nn.BCELoss()

data_loader = ## mnistなりFmnistなり

def train(D, G, criterion, D_optimizer, G_optimizer, data_loader):
   
    D.train()
    G.train()

    
    y_real = Variable(torch.ones(batch_size, 1))
    y_fake = Variable(torch.zeros(batch_size, 1))
    if cuda:
        y_real = y_real.cuda()
        y_fake = y_fake.cuda()

    D_running_loss = 0
    G_running_loss = 0
    for batch_idx, (real_images, _) in enumerate(data_loader):
      
        if real_images.size()[0] != batch_size:
            break

        z = torch.rand((batch_size, z_dim))
        if cuda:
            real_images, z = real_images.cuda(), z.cuda()
        real_images, z = Variable(real_images), Variable(z)

        
        D_optimizer.zero_grad()

        
    
        D_real = D(real_images)
        D_real_loss = criterion(D_real, y_real)

  
        z = torch.rand((batch_size, z_dim))
        if cuda:
            z = z.cuda()
        z = Variable(z)        

        fake_images = G(z)
        D_fake = D(fake_images.detach())
        D_fake_loss = criterion(D_fake, y_fake)


        D_loss = D_real_loss + D_fake_loss
        D_loss.backward()
        D_optimizer.step()  
        D_running_loss += D_loss.data[0]


        G_optimizer.zero_grad()

        z = torch.rand((batch_size, z_dim))
        if cuda:
            z = z.cuda()
        z = Variable(z)        

        fake_images = G(z)
        D_fake = D(fake_images)
        G_loss = criterion(D_fake, y_real)
        G_loss.backward()
        G_optimizer.step()
        G_running_loss += G_loss.data[0]
    
    D_running_loss /= len(data_loader)
    G_running_loss /= len(data_loader)
    
    return D_running_loss, G_running_loss


for epoch in range(100):
    D_loss, G_loss = train(D, G, criterion, D_optimizer, G_optimizer, data_loader)
    print('epoch %d, D_loss: %.4f G_loss: %.4f' % (epoch + 1, D_loss, G_loss))

PyTorchでのDCGANの実装に関しては

aidiary.hatenablog.com

に詳しい情報があります。

TensorFlow eager

TensorFlowのDefine by Runモードです。

generator

PyTorchにはnn.Moduleクラスにtrainメソッドとevalメソッドがあり、これらによってドロップアウトやバッチ正規化などの 検証時と訓練時で振る舞いの変わる層の制御が可能です。

 

いまのところこの機能はTFEagerモードには備わっていないため、そのようなベースとなるクラスを予め作っておきます。

 

これを継承したクラスを作って、self.trainingを使ってバッチ正規化の振る舞いを制御するようにします。 Sequentialクラスも存在しないので、ベタに順伝播を書かなければなりませんが、まあ読みづらいくらいでそれほど面倒ではありません。

 

もう1つポイントとしては、pytorchのTensor型には多くのメソッドが備わっていますが、 TensorFlowのTensor型は基本的に関数でゴリゴリ処理を施していくことになります。 ここが意外と()が入れ子になって書きづらい印象です。

class Base_model(tfe.Network):
    '''
    今のところtrainとevalを切り替える良い仕様は無いので
    手動で切り替えるクラスを作っておきます。
    '''
    def __init__(self, *args):
        super(Base_model, self).__init__()
        self.training = True
        
    def train_mode(self):
        self.training = True
        
    def eval_mode(self):
        self.training = False


class Generator(Base_model):
    '''
    潜在変数(ノイズ)を受け取って28*28の白黒画像へ
    バッチ正規化などはfunctionとして実装し、self.trainingを引数にする
    gpu利用時にはPytorchなどのように(batch, channel, h, w)の形式が良い
    '''
    def __init__(self):
        super(Generator, self).__init__()
        
        self.fc1 = self.track_layer(tf.layers.Dense(1024))
        self.fc2 = self.track_layer(tf.layers.Dense(128*7*7))
        
        self.deconv1 = self.track_layer(
            tf.layers.Conv2DTranspose(128, [4, 4], [2, 2], 
                "same", data_format='channels_first'))
        
        self.deconv2 = self.track_layer(
            tf.layers.Conv2DTranspose(1, [4, 4], [2, 2],
                "same", data_format='channels_first'))
    
    def call(self, x):
        x = tf.nn.relu(tf.layers.batch_normalization(
                self.fc1(x), training=self.training))
        x = tf.nn.relu(tf.layers.batch_normalization(
                self.fc2(x), training=self.training))
        x = tf.reshape(x, [-1, 128, 7, 7])
        x = tf.nn.relu(tf.layers.batch_normalization(
                self.deconv1(x), training=self.training, axis=1))
        x = tf.nn.sigmoid(self.deconv2(x))
        return x

discriminator

class Discriminator(Base_model):
    '''
    画像を受け取ったら、それがfake画像がreal画像かを判別する
    注意事項はGenerator同様
    '''
    def __init__(self):
        super(Discriminator, self).__init__()
        self.conv1 = self.track_layer(
            tf.layers.Conv2D(64, [4,4], [2,2],
                "same", data_format='channels_first'))
        self.conv2 = self.track_layer(
            tf.layers.Conv2D(128, [4,4], [2,2],
                "same", data_format='channels_first'))
        
        self.fc1 = self.track_layer(tf.layers.Dense(1024))
        self.fc2 = self.track_layer(tf.layers.Dense(1))
        
    def call(self, x):
        x = tf.nn.leaky_relu(tf.layers.batch_normalization(
                self.conv1(x), training=self.training, axis=1))
        x = tf.nn.leaky_relu(tf.layers.batch_normalization(
                self.conv2(x), training=self.training, axis=1))
        x = tf.layers.flatten(x)
        x = tf.nn.leaky_relu(tf.layers.batch_normalization(
                self.fc1(x), training=self.training))
        x = self.fc2(x)
        return x

loss

def D_loss(real_outputs, fake_outputs):
    '''
    DiscriminatorのlossはDiscriminatorが
    real画像に対してどのような判定を行ったかのlogitsと
    fake画像に対してどのような判定を行ったかのlogitsが
    あれば計算できる。
    '''
    loss_on_real = tf.losses.sigmoid_cross_entropy(
        multi_class_labels=tf.ones_like(real_outputs),
        logits=real_outputs,
        label_smoothing=0.25)
    loss_on_fake = tf.losses.sigmoid_cross_entropy(
        multi_class_labels=tf.zeros_like(fake_outputs),
        logits=fake_outputs)
    
    return loss_on_real + loss_on_fake

def G_loss(fake_outputs):
    '''
    GeneratorのlossはDiscriminatorが
    fake画像に対してどのような判定を行ったかのlogitsが
    あれば計算できる
    '''
    loss = tf.losses.sigmoid_cross_entropy(
        multi_class_labels=tf.ones_like(fake_outputs),
        logits=fake_outputs)

    return loss

学習部分

1回のパラメータ更新をpythonの関数で書いておきます。

 

TFeagerではwith tf.deviceで利用するハードを指定するのが良いと思われます。 Tensorに関してはgpu()メソッドやcpu()メソッドがありますが、tfe.Networkクラスにはこのようなメソッドがないため、 with tf.deviceで一挙に処理したほうがやり忘れが無いように思います。

 

ここでは特にデバイスは指定せずに実装しておき、関数を実行するときに、関数をwith tf.deviceで包むことにします。 正しい、大きいサイズの行列計算を必要としないような値を計算したい場合は、cpuに任せるようにしてもいいでしょう。

def train(G, D, G_optimizer, D_optimizer,
          G_loss, D_loss, data_loader, z_dim):
    # 訓練モードへ
    D.train_mode()
    G.train_mode()

    D_running_loss = 0
    G_running_loss = 0
    
    with tf.device("/cpu:0"):
        count = 0

    for batch_idx, (real_images, _) in enumerate(tfe.Iterator(data_loader)):
        
        current_batchsize = real_images.shape[0]

        z = tf.random_uniform(
            shape=[current_batchsize, z_dim],
            minval=-1.,
            maxval=1.,
            seed=batch_idx)
        
        with tfe.GradientTape(persistent=True) as g:
            '''
            tfe.GradientTapeクラスは dloss/dwの自動微分計算をしたい場合に
            g = tfe.GradientTape()として
            g(loss, w)とすればいい。
            そのために必要なlossの計算を行う。
            '''
            
            fake_images = G(z)
            D_fake = D(fake_images)
            D_real = D(real_images)
            D_loss_value = D_loss(D_real, D_fake)
            D_running_loss += D_loss_value
            
            G_loss_value = G_loss(D_fake)
            G_running_loss += G_loss_value
            
        G_grad = g.gradient(G_loss_value, G.variables)
        D_grad = g.gradient(D_loss_value, D.variables)
        
        with tf.variable_scope('generator'):
            '''
            optimizer.apply_gradientsには
            パラメータによる勾配と現在のパラメータの値をタプルで渡す。
            '''
            G_optimizer.apply_gradients(
                zip(G_grad, G.variables))
            
        with tf.variable_scope('discriminator'):
            D_optimizer.apply_gradients(
                zip(D_grad, D.variables))

        with tf.device("/cpu:0"):
            count += 1

    D_running_loss /= count
    G_running_loss /= count

    return D_running_loss, G_running_loss

こちらの方で、利用するデバイスを指定します。

data_loader = ##

with tf.device("/gpu:0"):
    D_loss_list = []
    G_loss_list = []
    
    for epoch in range(1, num_epochs+1):

        D_loss_value, G_loss_value = train(D=D, G=G, 
                                           G_loss=G_loss,
                                           D_loss=D_loss,
                                           D_optimizer=D_optimizer,
                                           G_optimizer=G_optimizer,
                                           data_loader=ds_train,
                                           z_dim=z_dim)

        D_loss_list.append(D_loss_value.numpy())
        G_loss_list.append(G_loss_value.numpy())
        print('G_loss {},  D_loss {}'.format(G_loss_value.numpy(), D_loss_value.numpy()))

edward

エドワードではGANを学習させるためのAPIが提供されています。 なので学習コードを明示的に書く必要はありません。

placeholderとノイズ入力

まず、バッチデータのgeneratorを作ります。 これは、他のライブラリに対しても使えます。練習のため。

EdwardはTensorFlowのGraphモードのみ対応なので、プレースホルダーを使ってバッチデータとtrainingモードか否かをboolで送るようにします。

def generator(array, batch_size):
    start = 0
    while True:
        stop = start + batch_size
        diff = stop - array.shape[0]
        if diff <= 0:
            batch = array[start:stop]
            start += batch_size
        else:
            batch = np.concatenate((array[start:], array[:diff]))
            start = diff
        batch = batch.astype(np.float32) / 255.0
#         batch = np.random.binomial(1, batch)
        yield batch

        
batch_size = 128
x_train_generator = generator(X_train, batch_size)
x_ph = tf.placeholder(tf.float32, [batch_size, 1, 28, 28])
training = tf.placeholder(tf.bool)

generetor

ここは普通にTensorFlowのtf.layersを使います。 generatorの方はインスタンスを生成しておくことに注意。

def g_net(eps):

    net = tf.layers.dense(eps, 1024, activation=tf.nn.relu)
    net = tf.layers.batch_normalization(net, axis=1, training=training)
    net = tf.layers.dense(net, 128*7*7, activation=tf.nn.relu)
    net = tf.layers.batch_normalization(net, axis=1, training=training)
    net = tf.reshape(net, [-1, 128, 7, 7])

    net = tf.layers.conv2d_transpose(net, 128, [4, 4], [2, 2],
                                    "same", activation=None,
                                     data_format="channels_first")
    net = tf.layers.batch_normalization(net, axis=1, training=training)
    net = tf.nn.relu(net)
    
    net = tf.layers.conv2d_transpose(net, 1, [4, 4], [2, 2],
                                    "same", activation=tf.nn.sigmoid,
                                     data_format="channels_first")
    return net

latent_dim = 100
with tf.variable_scope("g_net"):
    eps = Uniform(tf.zeros([batch_size, latent_dim]))
    x = g_net(eps)

discriminator

こちらはPython関数で書いておくだけで、インスタンスを生成しないようです。 このような作りになっている理由は謎…。

def d_net(x):
    net = tf.layers.conv2d(x, 64, [4, 4], [2, 2], "same",
                          data_format="channels_first",
                          activation=None)
    net = tf.layers.batch_normalization(net, axis=1, training=training)
    net = tf.nn.leaky_relu(net)
    
    net = tf.layers.conv2d(net, 128, [4, 4], [2, 2], "same",
                          data_format="channels_first",
                          activation=None)
    net = tf.layers.batch_normalization(net, axis=1, training=training)
    net = tf.nn.leaky_relu(net)
    
    net = tf.layers.flatten(net)
    
    net = tf.layers.dense(net, 256, activation=tf.nn.leaky_relu)
    net = tf.layers.dense(net, 1, activation=None)
    return net

学習部分

ed.GANInferenceにdiscriminatorのpython関数を渡します。

inference = ed.GANInference(
    data={x: x_ph}, discriminator=d_net)

optimizer = tf.train.AdamOptimizer(2e-4)
optimizer_d = tf.train.AdamOptimizer(2e-4)

inference.initialize(
    optimizer=optimizer, optimizer_d=optimizer_d,
    n_iter=45000, n_print=1000)

sess = ed.get_session()
tf.global_variables_initializer().run()

idx = np.random.randint(batch_size, size=16)
i = 0

for t in range(inference.n_iter):
## 画像保存用コード 
#    if t % inference.n_print == 0:
#        samples = sess.run(x, {training: False})
#        samples = samples[idx, ]

#        fig = plot(samples)
#        plt.savefig(os.path.join(out_dir, '{}.png').format(
#            str(i).zfill(3)), bbox_inches='tight')
#        plt.close(fig)
#        i += 1

    x_batch = next(x_train_generator)
    info_dict = inference.update(feed_dict={x_ph: x_batch, training: True})
    inference.print_progress(info_dict)

参考

aidiary.hatenablog.com