決定木 (離散データ用)

概要

  • エントロピーを最も下げる、データ集合の分割方法を (再帰的に) 得たい
  • 特徴値が連続値の場合は大小比較 (気温が25℃以上とか) 、離散値の場合はそのままカウント
  • が、連続値の場合は前処理によって離散値に変換し直すことができる
  • (離散値) の場合、値の種類が増えると情報獲得量が増える傾向にある (例..名前にダブりのない10人の人間を名前で分割すると、完全に分割できる (=全員が葉になる) が、意味がない)

イメージ

小説やビジネス書というカテゴリで、書店の売り場が区分けされているイメージ。

小説もビジネス書も漫画も全てひっくるめて、著者名がア行、カ行、...と分類されている書店が存在しないのは、(本の趣旨が)似たものを近くに集めておくメリットが損なわれるから (ビジネス書コーナーにビジネス書を並べておけば、お目当ての書籍が品切れでも、他の似た何かを購入してくれる可能性は高くなる)。

決定木では、より意味のある分類 (割) 基準を順に選択して適用していく。つまるところ、カテゴリ別or著者名別分類のどちらでも、分類された書籍はそれぞれに違った意味で似るのだが、究極的にはどのように本が並んでいて欲しいかは顧客が決める (=教師あり学習)。

セールスフォース本ってそれ営業本やない、IT本や(実話)

ややこしい点

基本的に知りたいのは、分割前&後における「クラス値のエントロピー (乱雑さ) 」。ただし、エントロピーは自己情報量の期待値であるから、分割前から見た、分割後のサンプル数の割合も計算する必要がある。似たような計算が多く出てくるが、それぞれが別物である。

また、全ての子ノードを共通の特徴で分割する必要はない (子ノードごとにばらばら、再帰なのである意味当然) 。そのため、どの特徴値が使用済みなのかどうかは子ノード側で記憶させておく必要がある。結果として、同じ階層のノードであっても以降に分割に使用できる特徴は違ってくることになる。

疑問点

離散値データで作った決定木において、交差検定は存在するか。テスト時に見たことのない値が出てきた場合はどうするのか。高年収 & 背が高い & 自分にだけ優しいイケメンが登場してきたとして、「そんな人今まで見たことないよ」と言われたらどうするのか。連続値ならこういうことは起こらない。

実装してみた (離散データ限定)

Nodeクラス。自作BaseNodeを継承する。

# decision_tree_node.py
import numpy as np
from dot_writer import BaseNode


class Node(BaseNode):
    
    def __init__(self, X, y, fi, fv, label=""):
        super(Node, self).__init__(label)
        self.X = X
        self.y = y
        self.fi = fi
        self.fv = fv
        self.used_fi = []    # 分割に使用済みの特徴インデックス
        self.ent = None    # エントロピー
        self.n = len(self)    # ノードのサンプル数
        self.ny = np.sum(self.y)    # クラス値が正のものの数

        # クラス値が全て等しい場合は葉 (これ以上分割できない)
        if self.ny == 0 or (self.ny / self.n == 1.0):
            self.is_leaf = True
        else:
            self.is_leaf = False    

    def __len__(self):
        """ノードに含まれるサンプル数を返す"""
        return self.X.shape[0]

決定木クラス。離散データ限定。

# decision_tree.py
import numpy as np
from decision_tree_node import Node


class DecisionTree(object):
    
    def __init__(self, feature_names=None):
        self.feature_names = feature_names    # 特徴名リスト

        self.X = None
        self.y = None
        self.root = None

    def _split(self, X, y, fi):
        """指定された特徴の値でサンプル集合を分割する"""

        # y値で分割する場合もあるので連結しておく
        data = np.hstack((X, y.reshape(-1, 1)))

        # 親データ集合を指定された特徴 & 条件で分割
        nodes = []
        uniq_vals = np.unique(data[:, fi])
        for uv in uniq_vals:
            mask = data[:, fi] == uv
            node = Node(X[mask, :], y[mask], fi, uv)
            node.used_fi.append(fi)
            if self.feature_names:
                node._label = "%s = %s (%d/%d)" % (
                    self.feature_names[fi], uv, node.ny, node.n - node.ny,
                )
            nodes.append(node)

        return nodes

    def _probs(self, nodes, by_class=False):
        """
        分割後のそれぞれの事象の発生確率を算出する

        初回のクラス値での分割の場合は、単にサンプル数の割合を求めればよい
        以降では、分割後のサンプル数における、正例 (クラス値が1 or Trueのもの)
        の割合を求める必要がある
        """

        # クラス値で分割した場合
        # (=単なる分割割合)
        if by_class:
            n = np.sum([len(node) for node in nodes])
            return [len(node) / n for node in nodes]

        # クラス値以外で分割した場合
        else:
            probs = []
            for node in nodes:
                n = len(node)    # 分割後のサンプル数
                ny = node.ny    # 分割後のサンプルのうち、Trueのものの数
                probs.append(ny / n)
            return probs

    @staticmethod
    def _self_info(px):
        """自己情報量を算出する"""
        if px == 0.0 or px == 1.0: return 0.0
        return -1 * np.log2(px)

    @staticmethod
    def _entropy(probs, debug=False):
        """事象Xについてのエントロピーを算出する (自己情報量の期待値)"""
        ent = 0.0
        for px in probs:
            px_hx = px * DecisionTree._self_info(px)
            ent += px_hx
            if debug:
                print("P(X) = %.3f -> %.5f" % (px, px_hx))
        if debug:
            print("entropy: %.3f" % ent)
        return ent

    def _info_gain(self, parent, fi, debug=False):
        """情報利得を算出する"""
        
        # 条件でサンプルを分割
        nodes = self._split(parent.X, parent.y, fi)

        # それぞれの場合の事象の発生確率を算出
        probs = self._probs(nodes)
        if debug: print("probs: %s" % probs)

        # 分割前における、分割後のサンプルの割合を算出する
        props = self._probs(nodes, by_class=True)
        if debug: print("props: %s" % props)

        # 条件付きエントロピーを算出
        ent = 0.0
        for i, prob in enumerate(probs):
            not_prob = 1.0 - prob
            ent_after = DecisionTree._entropy([prob, not_prob], 
                debug=debug)
            nodes[i].ent = ent_after
            prop = props[i]     # サンプルの割合
            ent_after *= prop
            ent += ent_after

        return parent.ent - ent, nodes

    def split_by_max_ig(self, parent, debug=False):
        """情報利得を最大とする条件でサンプルを分割する"""
        n_features = parent.X.shape[1]
        best_ig = -np.inf
        best_fi = None
        best_nodes = None
        for fi in range(n_features):
            if fi in parent.used_fi:
                continue
            ig, nodes = self._info_gain(parent, fi)
            if debug: print("f%d -> %.3f" % (fi, ig))
            if best_ig < ig:
                best_ig = ig
                best_fi = fi
                best_nodes = nodes
        if debug: print("best is f%d -> %.3f" % (best_fi, best_ig))
        for bn in best_nodes:
            bn.used_fi += parent.used_fi    # 以前に分割に使用した特徴を追加
        return best_nodes, best_fi

    def traverse_split(self, parent, debug=False):
        """再帰的に走査 & 分割を行う"""

        # 葉なら終了
        if parent.is_leaf: return

        # 特徴を使い切った時点で終了
        n_features = parent.X.shape[1]
        n_used = len(parent.used_fi)
        if n_features == n_used: return

        # 情報利得を最大にする分割方法を見つける
        parent._children, best_fi = \
            self.split_by_max_ig(parent, debug=debug)

        # 子に対して同様の操作(走査?)を行う
        for c in parent._children:
            if c.is_leaf: continue
            self.traverse_split(c, debug=debug)

    def fit(self, X, y, png_name=None):
        """学習させる"""
        self.X = X
        self.y = y

        # サンプル全体のエントロピーを算出
        # クラス値によりサンプルを分割する
        n_samples, n_features = X.shape
        ci = n_features    # クラス値のインデックス
        nodes = self._split(X, y, fi=ci)

        # ルートノードを生成する
        root = Node(X, y, ci, "play")

        # 分割後のそれぞれの場合の事象の発生確率を算出
        probs = self._probs(nodes, by_class=True)

        # クラス値のエントロピーを算出
        entp = DecisionTree._entropy(probs)
        root.ent = entp
        root._label = "root"

        # 再帰的に走査 & 分割
        self.traverse_split(root)

        self.root = root

        # ツリー情報を.dotファイルに出力
        if png_name:

            # ノードのクラスオブジェクトを取得
            _BaseNode = root.__class__

            # ファイル名を設定
            dot_name = "./%s.dot" % png_name
            img_name = "./%s.png" % png_name

            # ツリー情報を.dotファイルに出力
            dot_data = _BaseNode.write_dot(root, dot_name)
            _BaseNode.write_png(dot_name, img_name, remove_dot=True)

    def _predict(self, x):
        """単一のサンプルから値を予測する"""

        parent = self.root

        # 葉なら終了
        # (※ただし、交差検定のような場合だと葉が存在しない場合が出てくる)
        while not parent.is_leaf:
            for c in parent._children:
                v = x[c.fi]    # サンプルの特徴fiの値
                if v == c.fv:
                    parent = c

        if parent.ny == 0:
            return False
        else:
            return True

    def predict(self, X):
        """サンプル全体から値を予測する"""
        y_pred = np.array([self._predict(x) for x in X])
        return y_pred

使用法

ゴルフのプレー状況に関するデータを使用する (離散値バージョン)。
天候から、プレーするかしないかを予測したい。
以下のデータを参考にした。
決定木 - Wikipedia

import numpy as np
from decision_tree import DecisionTree


def load_golf_label():
    """ゴルフのプレー状況に関するデータ (離散値バージョン)"""
    data = np.array([
        ["sunny", "hot", "high", "False", "no"],
        ["sunny", "hot", "high", "True", "no"],
        ["overcast", "hot", "high", "False", "yes"],
        ["rainy", "mild", "high", "False", "yes"],
        ["rainy", "cool", "normal", "False", "yes"],
        ["rainy", "cool", "normal", "True", "no"],
        ["overcast", "cool", "normal", "True", "yes"],
        ["sunny", "mild", "high", "False", "no"],
        ["sunny", "cool", "normal", "False", "yes"],
        ["rainy", "mild", "normal", "False", "yes"],
        ["sunny", "mild", "normal", "True", "yes"],
        ["overcast", "mild", "high", "True", "yes"],
        ["overcast", "hot", "normal", "False", "yes"],
        ["rainy", "mild", "high", "True", "no"],
    ])
    return data[:, :4], data[:, -1]


def main():
    
    # ゴルフのプレー状況に関するデータを読み込む (ラベルデータ)
    X, y = load_golf_label()
    
    # 特徴名のリスト
    fnames = ["outl", "temp", "hum", "wind", "play"]

    # 決定木で学習させる
    dt = DecisionTree(fnames)
    dt.fit(X, y == "yes", png_name="dec_tree")    # True, Falseに変換する


if __name__ == "__main__":
    main()

動作結果

以下の画像を得る。

f:id:zdassen:20170620155404p:plain

体感すべき部分

バリアンスが大きい (基本的に過学習気味)。データをある特徴で再帰的に分割しているだけなので、元のデータ集合が別のものに変われば、出来上がる木の構造も変わりやすくなる。交差検定的に訓練データを変えて学習させると、f:id:zdassen:20170620161146p:plain
のような構造や、
f:id:zdassen:20170620161212p:plain
のような形に簡単に変化する。アンサンブル学習 (ランダムフォレスト) においては、バリアンスが大きいことを逆に利用する。簡単に言うと、素人を100人集めて何かを予測させても、素人の無知さを100倍しただけで終わる。一方、過学習気味の専門家が1人いても同様にうれしくはないのだが、100人集めて専門家に投票させると、それぞれの予測は1票に変わる。それを集計して得票数に変換する限りで過学習分を軽減することができる、というアイデア。逆に言えば、同じような予測値を返すバリアンスの小さい学習器をそれなりの数だけ集めても、全員の精度が高いので100人集める意味がないか、全員間違っているので間違っていることに気づけない、ということになりやすい。
以下はコード。

from sklearn.model_selection import KFold


def main():
    """交差検定的に学習データを変えて学習させる"""

    # ゴルフのプレー状況に関するデータを読み込む (ラベルデータ)
    X, y = load_golf_label()

    # 特徴名リスト
    fnames = ["outl", "temp", "hum", "wind", "play"]

    # 交差検定的に訓練データを変えて学習させてみる (交差数: 4)
    n_split = 4
    kf = KFold(n_split)
    for i, (is_train, is_test) in enumerate(kf.split(X)):

        # 訓練用データを取得
        X_train, y_train = X[is_train, :], y[is_train]

        # "yes" or "no"をTrue or Falseに変換しておく
        y_train = y_train == "yes"

        # ツリー画像を保存するかどうか
        export_png = True
        if export_png:
            
            # 画像ファイル名
            png_name = "dec_tree_4_%d" % (i + 1,)

        else:
            png_name = None

        # 学習の実行部分
        dt = DecisionTree(fnames)
        dt.fit(X_train, y_train, png_name=png_name)


if __name__ == '__main__':
    main()

※イメージ2

クイズ私は誰でしょう?。初めからマニアックな質問をしても答えに近づけない。初期段階では、情報利得の大きい (より答えを絞り込める) おおざっぱな質問から始めるのがセオリー。