モデルの書き方
edward2
Edwardとpyroのモデルの書き方はそれ程差はありません。 EdwardではRandomVariableという型を準備しており、確率変数の計算を直接記述できます。
def LinearModel(X): beta = ed.Normal(loc=0., scale=5., name="beta") a = ed.Normal(loc=0., scale=100., name="a") sigma = ed.HalfNormal(5., name='sigma') y_hat = X*beta + a return ed.Normal(loc=y_hat, scale=sigma, name='observed')
pyro
Pyroにおいてはpyro.distributions
をpyro.sample
でラッピングすることで同様の記述ができます。
def LinearModel(X): beta = pyro.sample("beta", dist.Normal(0., 5.)) a = pyro.sample("a", dist.Normal(0., 100.)) sigma = pyro.sample("sigma", dist.HalfNormal(5.)) y_hat = X*beta + a return pyro.sample("observed", dist.Normal(y_hat, sigma), obs=y)
tfp
TFPの生のAPIで記述が最も似ているのは、tfd.JointDistributionCoroutine
でしょう。同じように書いてみます。
root = tfd.JointDistributionCoroutine.Root def LinearModel(X): beta = yield root(tfd.Normal(loc=0., scale=5., name="beta")) a = yield root(tfd.Normal(loc=0., scale=100., name="a")) sigma = yield root(tfd.HalfNormal(loc=5., name="simga")) y_hat = X*beta + a obs = yield tfd.Normal(loc=y_hat, scale=sigma, name="observed") return obs
しかし、見ての通りジェネレータを返しており、PyroやEdwardのようにTensorLikeなものが値として返ってくるわけではない点に注意です。
実際、上記を書いてインスタンス化をすることはできても、model = LinearModel(X_tensor)
などとした時に怒られます。
したがって下記のように、ジェネレータをラッピングする関数がデータ X
を受け取る形で実装する必要があります。
def LinearModel(X): root = tfd.JointDistributionCoroutine.Root def LinearModel_(): beta = yield root(tfd.Normal(loc=0., scale=5., name="beta")) a = yield root(tfd.Normal(loc=0., scale=100., name="a")) sigma = yield root(tfd.HalfNormal(5., name="simga")) y_hat = X*beta + a obs = yield tfd.Normal(loc=y_hat, scale=sigma, name="observed") return obs LinearModel_dist = tfd.JointDistributionCoroutine(LinearModel_) return LinearModel_dist.sample()
また、LinearModel_dist
自体は分布であるので、sample()
メソッドを呼んで初めてTensorが返ってきます。
よって、PyroやEdwardのように振る舞う関数 LinearModel(X)
が欲しければ、戻り値をLinearModel_dist.sample()
とする必要があるのです。
ただ、諸々の事情によって上記の書き方もオススメできません。なぜなら同時確率を計算したい場合には distributions
モジュールの prob
メソッド、あるいは log_prob
メソッドを必要とするからです。したがって
def LinearModel(X): root = tfd.JointDistributionCoroutine.Root def LinearModel_(): beta = yield root(tfd.Normal(loc=0., scale=5., name="beta")) a = yield root(tfd.Normal(loc=0., scale=100., name="a")) sigma = yield root(tfd.HalfNormal(5., name="simga")) y_hat = X*beta + a obs = yield tfd.Normal(loc=y_hat, scale=sigma, name="observed") return obs return tfd.JointDistributionCoroutine(LinearModel_)
と分布を返してしまうのが筋になります。必要なときに sample
メソッドを呼べばいいというわけですね。
対数同時確率の得方
MCMCにしてもVIにしても対数同時確率の計算をする必要があります。
そのため、作ったモデルからデータ X
を受け取った時の対数同時確率を得る方法を下記に記します。
edward2
RandomVariable
は log_prob
メソッドを有しているので、それぞれの変数のメソッドに実現値を渡して、それらの総和を計算すればいいという事になります。
Edwardでは、名前 name
にアクセスすることでコレを内部で実行してくれる関数を作成する ed.make_log_joint_fn
という関数があります。
log_joint = ed.make_log_joint_fn(LinearModel) beta = tf.constant(2.) a = tf.constant(1.) sigma = tf.constant(6.) observed = tf.constant([[-4.], [-4.], [-1.], [-3.], [-7]]) log_prob = log_joint(X, # LinearModel の引数 beta=beta, a=a, sigma=sigma, observed=observed)
この関数の中身は下記のようになっています。
def make_log_joint_fn(model): def log_joint_fn(*model_args, **model_kwargs): def tracer(rv_call, *args, **kwargs): name = kwargs.get("name") kwargs["value"] = model_kwargs.get(name) rv = rv_call(*args, **kwargs) log_probs.append(tf.reduce_sum(rv.distribution.log_prob(rv))) return rv log_probs = [] with ed.trace(tracer): model(*model_args) return sum(log_probs) return log_joint_fn
pyro
pyroにも確率変数に名前がついているため、その対応を取る仕組みが備わっています。
pyro.poutine
モジュールが確率分布の非常に柔軟な操作を可能にしており、その中の trace
関数がモデルの内部の確率変数全てを名前で保持しています。
trace = poutine.trace(LinearModel).get_trace(X)
trace.nodes.keys()
# odict_keys(['_INPUT', 'beta', 'a', 'sigma', 'observed', '_RETURN'])
また、condtion
関数によって、モデルの中の確率変数に確定値を与えることができます。一部の変数だけが確定した場合の条件付きモデルもこれを使えば簡単に同時分布から作成できてしまうというわけです(これはMAP推定などをしたあとの予測モデルを作りたい時にも便利である)。今回は同時確率が欲しいので、全ての確率変数に確定値を割り当ててしまえば良いということになります。
conditioned_model = poutine.condition( LinearModel, data=dict(beta=torch.tensor(0., dtype=torch.float32), a=torch.tensor(1., dtype=torch.float32), sigma=torch.tensor(1., dtype=torch.float32)))
あとは出来上がった条件付き分布が p(X \mid y, a, \beta)
になっているので、この新しい分布を trace
して、元々引数だった X
を下記のように与えることで同時確率を取り出すことができます。
trace = poutine.trace(conditioned_model).get_trace(X) trace.log_prob_sum()
少し遠回りをしているような気もしますが、この柔軟さが色々と効いて来ると思われます。(例えば beta
だけはMAP推定でほかはMCMCにするとか、そういう試行錯誤も簡単にできるわけです)。
tfp
TFPはモデルの作成時に同時分布を獲得しているのでした。そして同時分布はyield
で生成される全ての変数に関して、sample
も実施できますし log_prob
の計算も可能です。
def LinearModel(X): root = tfd.JointDistributionCoroutine.Root def LinearModel_(): beta = yield root(tfd.Normal(loc=0., scale=5., name="beta")) a = yield root(tfd.Normal(loc=0., scale=100., name="a")) sigma = yield root(tfd.HalfNormal(5., name="simga")) y_hat = X*beta + a obs = yield tfd.Normal(loc=y_hat, scale=sigma, name="observed") return obs return tfd.JointDistributionCoroutine(LinearModel_) sample = LinearModel(tf.random.normal([5, 1])).sample() # (<tf.Tensor: id=1854, shape=(), dtype=float32, numpy=-5.990119>, # <tf.Tensor: id=1879, shape=(), dtype=float32, numpy=-42.965343>, # <tf.Tensor: id=1901, shape=(), dtype=float32, numpy=9.698466>, # <tf.Tensor: id=1926, shape=(5, 1), dtype=float32, numpy= # array([[-68.70192 ], # [-47.147198], # [-26.439589], # [-46.889202], # [-66.65064 ]], dtype=float32)>)
これはyield
が書かれた順番に格納されていることに注意して、log_prob
にもこの順序で渡せばよいです。
beta = tf.constant(2.) a = tf.constant(1.) sigma = tf.constant(6.) observed = tf.constant([[-4.], [-4.], [-1.], [-3.], [-7]]) log_prob = LinearModel(tf.random.normal([5, 1])).log_prob(( beta, a, sigma, observed ))
また、ここでは各確率変数に対する対数確率が返ってくるので、tf.reduce_sum
等で和を取ってやることで目的の同時対数確率が得られます。