モジュール
使うモジュールはTensorFlow2.0とTensorFlow Probability0.8.0です。 matplotlibとnumpyも可視化のために準備します。
import tensorflow as tf import tensorflow_probability as tfp import numpy as np import matplotlib.pyplot as plt plt.style.use("seaborn") tfd = tfp.distributions
データとモデル
データ
データは適当な一次関数上にノイズを発生させて作ります。
$$ y = 5 x - 3 + \epsilon $$
ここで $\epsilon \sim N(0, 2)$ を既知とします。(既知として良いかは問題による。本来は直線なんだと仮定しているなら、標本分散からざっくり仮定してしまう場合もある)
def toy_data(): x = np.random.uniform(0, 5, 30) y = 5 * x - 3 + np.random.normal(0, 2, 30) return x, y x_np, y_np = toy_data() plt.plot(x_np, y_np, "o")
モデル
モデルは下記のように仮定します。
$$ \begin{align} a \sim N(0, 10) \\ b \sim N(0, 10) \\ y \sim N(ax+b, 2) \\ \end{align} $$
ここで $a \sim N(0, 10)$ は平均 $0$ で標準偏差 $10$ の正規分布をパラメータの事前分布にするということになります。 $b$ も同様です。標準偏差を大きめにとっておけば、強い仮定を入れない事前分布になります。
このようなモデルは下記のように書くことができます。
x = tf.convert_to_tensor( x_np, dtype=tf.float32 ) y = tf.convert_to_tensor( y_np, dtype=tf.float32 ) Root = tfd.JointDistributionCoroutine.Root def model(): a = yield Root(tfd.Normal(loc=0, scale=10.)) b = yield Root(tfd.Normal(loc=0, scale=10.)) y = yield tfd.Normal(loc=a * x + b, scale=tf.constant(2.0))
ここで Root
という変数が事前分布の役割を担ってくれます。書きやすい。
本当は x
を引数で渡したかったのですが、とりあえず方法がわからなかったので断念…。外から固定のグローバル変数で与えてしまいます。
こいつを
joint = tfd.JointDistributionCoroutine(model)
とラッピングしてやることで同時分布
$$ p(y, a, b \mid x) $$
を作ることができます。$x$ は外から与えてしまっていることに注意しましょう。 この分布から $y, a, b$ をサンプルするのは簡単です。
sample = joint.sample() # sample #(<tf.Tensor: id=26, shape=(), dtype=float32, numpy=5.6546383>, # <tf.Tensor: id=51, shape=(), dtype=float32, numpy=0.31189847>, # <tf.Tensor: id=77, shape=(30,), dtype=float32, numpy= # array([18.153383 , 24.010094 , 20.69832 , 25.581663 , 2.8224666, # 25.486622 , 22.960232 , 11.860916 , 10.4699335, 14.64207 , # 5.95372 , 12.057662 , 18.14566 , 0.6437745, 25.23413 , # 28.587183 , 3.336884 , 23.129398 , 5.641927 , 20.396828 , # 14.366775 , 8.47799 , 20.790955 , 15.584529 , 25.958817 , # 16.917774 , 9.393593 , 13.510647 , 14.067269 , 22.415134 ], # dtype=float32)>)
と得られます。これは model
で yield
が呼ばれる順番にテンソルが格納されていることに注意しましょう。 (a, b, y)
の順番です。
学習前の生成モデルからのデータ
さて、この同時分布は
$$ p(y, a, b \mid x) = p(y \mid a, b, x)p(a)p(b) $$
とモデル化されています(model
で自分でそのように書いたのです!)。
従って、こいつから得られたサンプルは
$$ \begin{align} a \sim N(0, 10) \\ b \sim N(0, 10) \\ y \sim N(ax+b, 2) \\ \end{align} $$
という順序で発生しているということになります。こうして得られた $(x, y)$の散布図は
plt.plot(x.numpy(), sample[-1].numpy(), "o")
となっています。
今は $a, b$ が単に事前分布から発生しているため、完全にでたらめなものになっています。
対数同時確率の計算
さて、同時分布を書き下しており、適当なサンプルも得られているので、対数同時確率を計算できます。
joint.log_prob(sample) #<tf.Tensor: id=115, shape=(30,), dtype=float32, numpy= #array([ -8.277493 , -8.221734 , -9.475068 , -8.234814 , -8.262238 , # -8.547427 , -8.965708 , -9.013991 , -9.039732 , -8.551022 , # -9.443696 , -8.62281 , -10.0297575, -9.994436 , -8.507306 , # -9.7459 , -8.516853 , -8.572927 , -10.799183 , -8.218836 , # -8.29117 , -8.223557 , -8.871852 , -8.91931 , -8.539169 , # -8.812513 , -8.222011 , -8.44175 , -8.872479 , -8.223422 ], # dtype=float32)>
これは30個の値が格納されていますが、各データ1つ1つに対する対数確率が入っているということです。これの総和、あるいは平均を取れば、i.i.d と仮定した場合の全てのデータの対数確率を考慮できます。
事後分布
さて、これから計算したいのは$a, b$ の事後分布 $p(a, b \mid x, y)$ です。$x, y$ はデータとして手元にあるので対数確率を計算するモジュールに、$x, y$ を所与として与えてしまえば良いのです。
def unnormalized_log_posterior(a, b): return tf.reduce_mean(joint.log_prob([a, b, y]))
ここで $x$ はモデルの外から与えてしまっているのでここには現れていませんが、既に given な状態です。上記の関数によってある固定された $x, y $ の下での $a, b$ の対数同時確率を計算できるようになりました(正確には確率ではない。こいつは正規化されていないから。だけど、MCMCでは相対的にどこが大きいのかさえ分かれば良いので正規化されていなくても良い)。
MCMCを回す
確率遷移核
さて、MCMCを回す場合にはその手法を決める必要があります。今回は勾配ベースで計算をするメトロポリスヘイスティング法の発展版NUTSを利用します。勾配ベースの手法は自動微分ライブラリと相性が良いです。
こいつに先ほど作った(正規化されていない)対数確率関数を渡します。
kernel = tfp.mcmc.NoUTurnSampler(
target_log_prob_fn=unnormalized_log_posterior,
step_size=0.1,
)
余談ですが、MCMCはある条件を満たす(あるいはそれに近い)マルコフ連鎖を利用して、事後分布のサンプリングを実施します。このときマルコフ連鎖での状態遷移の仕方を決める確率を「確率遷移核」と呼びます。核は英語でカーネルなので、こいつのことも「kernel
」 と呼び、そういう変数名が付けられることがあるということを覚えておくと良いかもしれません(カーネル法のカーネルではない)。
MCMC の設定
MCMCは下記のように関数で書くのが良いでしょう。
理由は単に、@tf.function
でグラフに変換して高速化できるからです。
これをするとしないとでは、速度が全然違います。(ありだと24秒、なしだと全然終わらないので洗濯物を干してました。)
@tf.function() def run_chain(): init_state = list(joint.sample()[:-1]) # a, b chains_states, kernels_results = tfp.mcmc.sample_chain( num_results=1000, num_burnin_steps=300, current_state=init_state, kernel=kernel ) return chains_states, kernels_results # Sample from posterior distribution and get diagnostic chain_states, kernel_results = run_chain()
サンプリングの結果
MCMCでは事後分布を数式で得られるわけではなく、パラメータの対数同時確率から、確率の高いところほど多くサンプリングをするという手続きによって、ヒストグラムが事後分布を形作るようなサンプルが得られます。
a, b = ( chain_states[0], chain_states[1], ) plt.hist(a) plt.hist(b)
$a = 5, b = -3$ が真値です。MCMCは変分推論よりもバラつきを多めにとった事後分布を形成します(というより変分推論が強気の結果を出す)。
EAP推定
さて事後分布が得られたので $a, b$ を事後分布(サンプル)の期待値を取ることで点推定してみましょう。これは通常EAP推定と呼ばれる方法です。一方で事後分布の最大値(サンプルの最頻値)を取るのはMAP推定と呼ばれます。MAP推定はそもそも事後分布を求めることなく、最適化問題で得られる方法なので、ここではわざわざ計算しません。
パラメータ $\cdot$ のEAP推定値は
$$ \mathbb E _ {p(\cdot \mid x, y)}[\cdot] $$
と得られます。もちろん事後分布での期待値計算は事後分布の式を知らないので実施できませんが、サンプルが得られているので今回は単に平均を取る操作で代替しましょう。
a_mean = tf.reduce_mean(a) b_mean = tf.reduce_mean(b)
で得ることができます。この点推定の結果を使って直線を引いてみましょう。
x_new = tf.convert_to_tensor(np.linspace(0, 5, 100), dtype=tf.float32) y_new = a_mean * x_new + b_mean plt.plot(x_new, y_new) plt.plot(x, y, "o")
ベイズ予測分布
さて、事後分布があるのだからベイズ予測分布を計算したくなります。
$$ p(y _ {new} \mid x _ {new}, x, y) = \int _ {a, b} p(y _ {new}\mid x _ {new}, a, b)p(a, b \mid x, y)dadb $$
と言っても我々が持っているのは事後分布からサンプリングをされているであろうサンプルだけですから、下記のように既に持っているサンプル $a, b$ で期待値計算を近似してしまいます。
$$ p(y _ {new} \mid x _ {new}, x, y) \sim \frac{1}{N}\sum _ {a _ i, b _ i} ^ N p(y _ {new}\mid x _ {new}, a _ i, b _ i) $$
これはとある統計モデル $p(y _ {new}\mid x _ {new}, a _ i, b _ i)$ をいろいろなパラメータで試してみて混ぜてしまおうと言う操作を行っていることになります。とは言っても色々なパラメータは事後分布から発生させている妥当なものであるので、いわば点推定のように情報を1つに縮約するのではなく、持っているサンプルの情報を全て使おうと言うことです。
TFPに上記のモンテカルロサンプリングの関数がきっとあるのですが、よくわからなかったので、ブロードキャストを使って全ての事後分布からのサンプルを同時に計算してしまいます。
def statistical_model(a, b): return tfd.Normal( loc=tf.expand_dims(a, axis=0) * tf.expand_dims(x_new, axis=1) + tf.expand_dims(b, axis=0), scale=2.0 ).sample() samples = statistical_model(a, b)
このとき得られるベイズ予測分布からのサンプルは
samples.shape
# TensorShape([100, 1000])
となっています。0番目の軸が一連のデータ、1番目の軸が事後分布からのサンプリングされたパラメータの個数になります。
ベイズ予測分布の期待値は、事後分布からのサンプリングの個数について平均を取ればいいので
y_expected = tf.reduce_mean(samples, axis=1)
となります。
下記で描画します。
plt.plot(x_new, y_expected, "r") plt.plot(x_new, samples, "b", alpha=0.003 ) plt.plot(x, y, "og")
ノイズ項を無しにした、回帰曲線のベイズ予測分布
ところで上記では下のモデルに忠実に、生成モデルの中に $\epsilon$ も含んでいました。今知りたいのが回帰曲線なのだとすれば、こいつは無視しておいて(所詮本質的ではないノイズのモデル化部分だから)、回帰曲線の部分だけ分布として表示したいと思うかもしれません。
その場合は下記のようにノイズ項でのラッピングをなくしてやればいいです。
def statistical_model(a, b): return tf.expand_dims(a, axis=0) * tf.expand_dims(x_new, axis=1) + tf.expand_dims(b, axis=0) samples = statistical_model(a, b) y_expected = tf.reduce_mean(samples, axis=1) plt.plot(x_new, y_expected, "r") plt.plot(x_new, samples, "b", alpha=0.003 ) plt.plot(x, y, "og")
いくつかの回帰曲線が引かれていることが見えますね。