graphvizを使ってツリー構造をpng画像化できる基本ノードクラス

概要

  • ツリーを構築でき、構造をpng画像として出力できるノードクラスが欲しい
  • 存在するかもしれないが、作ってみる
  • graphviz、pydotplusを使用した
  • DOMへの敬意を込めてappend_child()というメソッド名をjsから拝借した

良かった点

print_labels()、print_edges()をベタに実装してみて、ほとんど構造が同じであることに気が付いた(ともに再帰処理を行う)。print_labels()はツリーを辿りながら(その時点での)親のみを使用する。print_edges()は子も利用する。従って、ノード処理用の関数を呼び出すタイミングだけが異なる。この違いを抽象化できた。

処理タイミングのイメージ (一応)

列車の分岐ポイント。走行中の列車が分岐ポイントに差し掛かった時、そのポイントが(現在のスコープにおける)である。分岐ポイントを通過し切っていない段階では、子ノードはまだ存在しない(辿り着いていない)。親と子を同時に処理に利用したい場合は、次の分岐ポイント(現在の親からみた子)まで待つ必要がある。このことがそのまま処理タイミングの違いにつながる。

実装 (Python)

# dot_writer.py
import sys
import os
import pydotplus


class BaseNode(object):
    """ツリーのノードを表現するクラス"""

    # ノードIDの発行に使用するカウンター
    __counter__ = 0
    
    def __init__(self, label):
        BaseNode.__counter__ += 1
        self._id = BaseNode.__counter__    # ノードID
        self._label = str(label)    # ノードの表現
        self._children = []    # 子ノード

    def __str__(self):
        s = "<%s %d[%s]>" % (
            self.__class__.__name__, self._id, self._label,
        )
        return s

    def append_child(self, node):
        """子ノードを追加する"""
        if isinstance(node, BaseNode):
            self._children.append(node)

    @staticmethod
    def traverse(parent, func, rtype="a", depth=0):
        """処理関数を受け取って再帰処理を行う"""
        
        # 関数の呼び出しタイミングで切り分ける
        if rtype == "a":
            func(parent, depth)    # ここで呼び出す
            if len(parent._children) > 0:
                depth += 1
                for child in parent._children:
                    BaseNode.traverse(child, func, depth=depth)
        elif rtype == "b":
            if len(parent._children) > 0:
                depth += 1
                for child in parent._children:
                    func(parent, child, depth)    # 引数が異なる
                    BaseNode.traverse(child, func, rtype=rtype, depth=depth)

    @staticmethod
    def print_labels(parent):
        """ノードの定義部分を生成する"""

        def p(parent, depth):
            """ノードの定義部分を表示する"""
            template = "%s\"%s\" [label=\"%s\"]"
            indent = " " * 4
            print(template % (indent, parent._id, parent._label))

        # ノードの定義部分を出力する
        BaseNode.traverse(parent, p)

    @staticmethod
    def print_edges(parent):
        """ノードの接続情報を書き出す"""

        def p(parent, child, depth):
            """ノード間の接続情報を表示する"""
            template = "%s\"%s\" -> \"%s\""
            indent = " " * 8
            print(template % (indent, parent._id, child._id))

        # ノードの接続情報を出力する
        BaseNode.traverse(parent, p, rtype="b")

    @staticmethod
    def write_dot(root, path, shape="box", redirect=True):
        """グラフ情報を.dotファイルに書き込む"""

        # ファイルを作成して標準出力をリダイレクトさせる
        # redirect=Trueの場合、以降のすべてのprint()の実行結果が
        # 指定ファイルに書き込まれる
        if redirect:
            f_out = open(path, "w")
            sys.stdout = f_out

        # インデント部分
        indent = " " * 4

        # グラフ定義の開始部分を書き出す
        def_start_lines = [
            "digraph {",
            "%snode [shape=%s]" % (indent, shape),
        ]
        def_start = "\n".join(def_start_lines)
        print(def_start)

        # ノードの定義部分を書き出す
        BaseNode.print_labels(root)

        # ノードの接続情報を書き出す
        BaseNode.print_edges(root)

        # グラフ定義の終了部分を書き出す
        def_end = "}"
        print(def_end)

        # 標準出力を元に戻す
        if redirect:
            sys.stdout = sys.__stdout__

    @staticmethod
    def write_png(dot_path, output_name, remove_dot=False):
        """
        png画像を生成する
        
        .dotファイルはグラフ情報のパースの際に利用されるだけなので必要により削除可
        """

        if not os.path.exists(dot_path):
            emsg = "%s not found" % dot_path
            raise FileNotFoundError(emsg)
        else:
            with open(dot_path, "r") as f:
                dot_data = f.read()

                # グラフを生成する
                graph = pydotplus.graph_from_dot_data(dot_data)

                # png画像を生成する
                graph.write_png(output_name)

        # dotファイルを削除する
        if remove_dot:
            os.remove(dot_path)

使用例

def main():
    """BaseNodeの使用例"""

    # ノード名からノードを生成
    labels = ["root", "c1-1", "c1-2", "c2-1", "c2-2"]
    nodes = [BaseNode(label) for label in labels]
    
    # ルートノードに子を追加 (root → c1-1, c1-2)
    root = nodes[0]
    for node in nodes[1:3]: root.append_child(node)

    # 子に孫を追加 (c1-2 → c2-1, c2-2)
    for node in nodes[3:]: nodes[2].append_child(node)

    # .dotファイルを出力
    name = "tree_sample"
    dot = "./%s.dot" % name
    BaseNode.write_dot(root, dot)

    # ツリー構造をpng画像に出力
    png = "./%s.png" % name
    BaseNode.write_png(dot, png, remove_dot=True)


if __name__ == '__main__':
    main()

以下の画像が生成される。
f:id:zdassen:20170609223025p:plain

Fuzzy C-means法

概要

  • クラスタ中心との距離の逆数に応じた所属確率を用いる
  • K-means法では最近傍のクラスタのみに所属させていた

イメージ

自宅が吉祥寺駅三鷹駅の間にあるとき、吉祥寺駅との距離をk、三鷹駅との距離をmとすると k < m のとき、吉祥寺駅の近くに住んでいると自慢してよい度合いが増す。逆に k = m に近づけば近づくほど、(住所というものを考慮しなければ) 同時に三鷹の住人でもある度合いが増す (自慢しにくくなる)。

K-means法の場合は、単に最寄り駅がどこかだけを考慮する (どれだけ同じような距離だけ、二つの駅から自宅が離れていたとしても)。

一方、C-meansでは7割吉祥寺の住人で、かつ3割程度三鷹の住人である、というような表現をすることになる。(あくまで駅からの距離で考える場合、) 全員が吉祥寺に住んでいるし三鷹の住人でもあるのだが、その割合が人により異なる

ところで、その駅の住人度というのは駅からの相対的な距離で求めたのであるから、当然その割合が大きい住人の自宅近くが駅である可能性が高い (吉祥寺な度合いが9割なら吉祥寺駅に近いはず)。逆に度合いの低い住人の住所は、(度合いが低い程度に応じて) 考慮に入れなければよい (※実際にはクラスタ中心である駅が、更新毎に位置を変えることになるが..)。

はっきりしないはfuzzyだが (恥をかかない程度に) 役に立つby 市民s

実装してみた (Python、※sse()の実装だけ自信がない)

# fcm.py
import numpy as np
from numpy.linalg import norm


class FCMeans(object):
    
    def __init__(self, n_clusters, n_iter=5, m=2, save_hist=False):
        self.n_clusters = n_clusters    # クラスタ数
        self.n_iter = n_iter    # 中心点の更新回数
        self.m = m    # ファジー係数
        self.fuz = 2.0 / (m - 1.0)    # ファジー性 (曖昧さ)
        self.save_hist = save_hist    # 中心点の移動履歴を保存するかどうか

        self.X = None
        self._cents = None    # 中心点
        self._cents_history = []    # 中心点の移動履歴
        self._sse_history = []    # クラスタ内誤差平方和の履歴
        self._memsp = None    # 各サンプルの、クラスタへの所属確率

    @staticmethod
    def _fspan(xi):
        """特徴値の値の範囲を求める"""
        fmin, fmax = np.min(xi), np.max(xi)
        return fmax - fmin

    def _rand_cent(self):
        """特徴値の範囲に応じて、ランダムな中心点を決定する"""
        ndim = self.X.shape[1]
        cents = np.random.rand(self.n_clusters, ndim)
        for di in range(ndim):
            fspan = FCMeans._fspan(self.X[:, di])
            cents[:, di] *= fspan
        self._cents = cents
        return cents

    @staticmethod
    def _dist_ratio(x, cent_a, cent_b):
        """サンプル点の、それぞれのクラスタ中心との距離の比を求める"""
        da = norm(x - cent_a)
        db = norm(x - cent_b)
        return da / db

    def _update_memsp(self):
        """各サンプルの所属確率を更新する"""
        memsp = []
        for x in self.X:
            probs = []

            # ある中心点と、他の中心点との距離の比を算出
            for target_cent in self._cents:
                dist_ratios = [
                    FCMeans._dist_ratio(x, target_cent, cent)
                        for cent in self._cents
                ]
                dist_ratios_fuz = np.array(dist_ratios) ** self.fuz
                ttl = np.sum(dist_ratios_fuz)
                ttl_inv = 1.0 / ttl
                probs.append(ttl_inv)

            memsp.append(probs)
        memsp = np.array(memsp)
        self._memsp = memsp

    def _update_cents(self):
        """クラスタ中心を更新する"""
        ujs = []
        for j in range(self.n_clusters):
            wj = self._memsp[:, j].reshape(-1, 1)
            wjm = wj ** self.m
            X_wjm = self.X * wjm
            X_wjm_sum = np.sum(X_wjm, axis=0)
            wjm_sum = np.sum(wjm, axis=0)
            uj = X_wjm_sum / wjm_sum
            ujs.append(uj)
        new_cents = np.array(ujs)

        # 中心点の更新履歴を保存
        if self.save_hist:
            self._cents_history.append(new_cents.copy())

        self._cents = new_cents

    def _iter(self):
        """繰り返し部分を実行"""

        # 各クラスタへの所属確率を求める
        self._update_memsp()

        # クラスタ中心を更新
        self._update_cents()

    def fit(self, X):
        """学習させる"""
        self.X = X

        # ランダムな中心点を決定
        self._rand_cent()

        # 繰り返し部分を実行する (所属確率の算出→中心点の更新)
        for i in range(self.n_iter):
            self._iter()
            if self.save_hist:
                new_sse = self.sse()
                self._sse_history.append(new_sse)

    def predict(self, X):
        """サンプルの所属先クラスタを予測する"""

        # 所属先クラスタを更新
        self._update_memsp()

        # 所属確率が高いほうを返す
        return np.argmax(self._memsp, axis=1)

    def sse(self):
        """クラスタ内誤差平方和 (SSE) を算出"""

        # 各クラスタ中心との距離を算出
        d = []
        for x in self.X:
            ds = [norm(x - cent) ** 2 for cent in self._cents]
            d.append(ds)
        d = np.array(d)
        
        return np.sum(self._memsp * d)

使用例

import numpy as np
from matplotlib import pyplot as plt


def make_two_clusters(n_samples, mu1, std1, mu2, std2):
    """
    指定された平均、標準偏差に基づく正規分布から
    クラスターを2つ生成する(次元数=2)
    """
    ndim = 2    # 次元数
    a = np.random.randn(n_samples, ndim) * std1 + mu1
    b = np.random.randn(n_samples, ndim) * std2 + mu2
    X = np.vstack((a, b))
    return X, a, b


def main():
    np.random.seed(123)

    # サンプルデータを生成
    n_samples = 500
    X, a, b = make_two_clusters(n_samples,
        mu1=3.8, std1=0.5,
        mu2=6.0, std2=1.05)

    # 学習させる
    n_clusters = 2
    n_iter = 7
    fcm = FCMeans(n_clusters, n_iter=n_iter, save_hist=True)
    fcm.fit(X)
    y_pred = fcm.predict(X)

    # 分類結果をプロットする
    plot_result = True
    if plot_result:
        colors = ("darkcyan", "orange", "crimson")
        for i, label in enumerate(np.unique(y_pred)):
            mask = y_pred == label
            plt.scatter(X[mask, 0], X[mask, 1],
                color=colors[i], edgecolor="black", alpha=0.7,
                label="cluster %d" % (i + 1,))

        # 中心点の移動履歴をプロット
        cents_hist = np.array(fcm._cents_history)
        for i in range(n_clusters):
            plt.plot(cents_hist[:, i, 0], cents_hist[:, i, 1],
                color="red", label="centroid %d" % (i + 1,))

        plt.legend(loc="best")
        plt.tight_layout()
        plt.show()

    # クラスタ内誤差平方和 (SSE) の推移をプロット
    plot_sse = True
    if plot_sse:
        plt.title("Transition of sum squared error")
        plt.plot(fcm._sse_history, color="orange")
        plt.xlabel("number of iteration")

        ax = plt.gca()
        labels = [str(i) for i in range(n_iter + 1)]
        ax.set_xticklabels(labels)

        plt.grid(True, alpha=0.3)
        plt.show()

以下のようになる。
f:id:zdassen:20170616155216p:plain
走査回数がK-meansより少なくなる性質がある*1
f:id:zdassen:20170616155357p:plain

体感すべき部分

以下のように、サンプルxとクラスタ中心 (現時点での候補) の位置は変化しない状況でファジー係数を大きくしていくと、サンプルxの所属確率はそれぞれ50%に近づく (曖昧な状態)。プロットすると以下のようになる。パッと見た感じではサンプルxは左下のクラスタ中心に所属しているように見える。
f:id:zdassen:20170616174433p:plain
f:id:zdassen:20170616174030p:plain

以下はプロット用のコード。

def dist_ratio(x, cent_a, cent_b):
    """中心点との距離の比を求める"""
    da = norm(x - cent_a)
    db = norm(x - cent_b)
    return da / db


def main():
    """ファジー係数による所属確率の変化の度合いを調べる"""

    # 座標 (0, 0)、(1, 1) に中心点の候補が存在する場合
    cents = np.array([[0., 0.], [1., 1.]])

    # あるサンプル点 ((0, 0) のほうが近そう)
    x = np.array([0.2, 0.2]).reshape(1, -1)
    
    # 現在の状況をプロット
    plot_situation = False
    if plot_situation:
        colors = ("darkcyan", "red")
        labels = ("sample x", "centroids")
        for i, p in enumerate((x, cents)):
            plt.scatter(p[:, 0], p[:, 1],
                color=colors[i], edgecolor="black",
                alpha=0.7, label=labels[i])
        plt.legend(loc="upper left")
        plt.grid(True, alpha=0.3)
        plt.show()

    # ファジー係数を変化させながら
    # サンプルxの所属確率の変化度合いを調べる
    m_max = 10
    memsp = []
    for m in range(2, m_max + 1):
        
        # クラスタへの所属確率
        probs = []

        for target_cent in cents:

            # ファジー性 (曖昧さ)
            fuz = 2.0 / (m - 1.0)

            # (サンプルxの)ある中心点との距離 &
            # (サンプルXの)他の中心点との距離、の比
            dist_ratios = np.array([
                dist_ratio(x, target_cent, cent)
                    for cent in cents
            ])

            # ファジー性 (曖昧さ) を考慮する
            dist_ratios_fuz = dist_ratios ** fuz

            # クラスタへの所属確率
            drf_sum = np.sum(dist_ratios_fuz)
            drf_sum_inv = 1.0 / drf_sum
            probs.append(drf_sum_inv)

        memsp.append(probs)
    memsp = np.array(memsp)

    # 所属確率の変化度合いをプロット
    plot_memsp = True
    if plot_memsp:
        n_clusters = cents.shape[0]
        colors = ("darkcyan", "orange")
        labels = tuple(["ownership of cluster %d" % (i + 1)
            for i in range(n_clusters)])
        for i in range(n_clusters):
            plt.plot(memsp[:, i], color=colors[i], label=labels[i])

        x_labels = [str(i + 1) for i in range(m_max)]
        ax = plt.gca()
        ax.set_xticklabels(x_labels)
        
        plt.xlabel("m")
        plt.ylabel("ownership")
        plt.title("Transition of ownership")
        plt.grid(True, alpha=0.3)
        plt.legend(loc="best")
        plt.show()


if __name__ == '__main__':
    main()

*1:Python機械学習プログラミング 達人データサイエンティストによる理論と実践』 p305