Fluentd+BigQuery+Elasticsearch+Kibanaで迷惑メールを解析

僕のメールアドレスには、去年辺りから、どういうわけか毎日ほぼ決まった時間帯に、決まったフォーマットの subject をもつ迷惑メールが一日平均 5 通くらい届きます。
普通であれば削除するのですが、「ほぼ決まった時間帯」「決まったフォーマットの subject を持つ」「複数通送られてくる」という特異性からか、無意識に削除せず別のフォルダに切り分けていました。
数えてみると 1500 通くらいあったので、大して Big でもないしこれ以上 Big になって欲しくもないのですが、BigQuery に流し込んで解析してみたいと思います。
Fluentd(td-agent) を使うので、ついでに Elasticsearch と Kibana も使って可視化してしまいましょう。

入力プラグインさえ作ってしまえば、後は Fluentd の出力プラグインが Elasticsearch と BigQuery にデータを投げてくれます(プラグインの作者様に感謝!)。


以下は Ubuntu 14.04 を利用している前提で書きます。

JDK のインストール

Elasticsearch の実行に Java が必要であるため、まずは Java をインストールします。

curl -L -C - -b "oraclelicense=accept-securebackup-cookie" -O http://download.oracle.com/otn-pub/java/jdk/8u25-b17/jdk-8u25-linux-x64.tar.gz
tar zxvf jdk-8u25-linux-x64.tar.gz
rm jdk-8u25-linux-x64.tar.gz

mkdir -p /usr/local/java
mv jdk1.8.0_25 /usr/local/java/jdk1.8.0_25
ln -s /usr/local/java/jdk1.8.0_25 /usr/local/java/jdk

Fluentd のインストール

Installation Guide を参考にインストールします。
Fluentd インストール前の作業として、ulimit, sysctl パラメータを書き換えるのを忘れないようにしましょう(上記リンク内に説明が書かれています)

Ubuntu 14.04 の場合、以下のようにするようです。

curl -L http://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

fluent-plugin-bigquery のインストール

このプラグインを使用すると、BigQuery に簡単にデータを保存することができます。
gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。

/opt/td-agent/embedded/bin/gem install fluent-plugin-bigquery

fluent-plugin-elasticsearch のインストール

このプラグインを使用すると、Elasticsearch に簡単にデータを保存することができます。
こちらも同様に gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。
libcurl が必要なことに注意してください。

apt-get install -y libcurl4-openssl-dev
/opt/td-agent/embedded/bin/gem install fluent-plugin-elasticsearch

libcurl の 4 という数字に過敏に反応される方は

apt search libcurl | grep openssl

などとしてインストール可能なバージョンを検索してください。

Elasticsearch のインストール

setup-repositories apt こちらの公式サイトを参考にインストールを行います。
Ubuntu 14.04 の場合以下のようにするようです。

wget -qO - https://packages.elasticsearch.org/GPG-KEY-elasticsearch | sudo apt-key add -
sudo add-apt-repository "deb http://packages.elasticsearch.org/elasticsearch/1.4/debian stable main"
sudo apt-get update && sudo apt-get install elasticsearch
sudo update-rc.d elasticsearch defaults 95 10 # 自動起動させたい場合

/etc/init.d/elasticsearch に以下を追加します。

JAVA_HOME=/usr/local/java/jdk

これにより、ファイルの内容は以下のようになります。

...
### END INIT INFO

JAVA_HOME=/usr/local/java/jdk
PATH=/bin:/usr/bin:/sbin:/usr/sbin
NAME=elasticsearch
DESC="Elasticsearch Server"
DEFAULT=/etc/default/$NAME
...

Kibana のインストール

Kibana は 3 でも 4 でも構いません。
4 の方が集計等の操作が扱いやすくなっています。
ここでは、kibana 4 beta 3 をインストールします。
kibana 4 beta 3 installation を参考にインストールします。
Kibana 3 では Apache や nginx に配置すればよかったですが、Kibana 4 beta 3 の段階では実行ファイルを実行するようです。

お使いのシェルの環境変数に、JAVA_HOME=/usr/local/java/jdk を設定しておいてください。

Google BigQuery のインストール

インストール作業と言うよりは、プロジェクトの作成ですね。
Google Developers Console より適当にプロジェクトを作成します(名前やIDは何でもいいですが、IDは後で使うので覚えておいてください)
[API と認証] - [API] 画面より [BigQuery API] を ON にするのを忘れないように注意です。
これ以降では、作成したプロジェクトIDを spam, プロジェクト番号を 1234567891234 と仮定します。
Google Cloud SDK のインストールは省略します。手前味噌で申し訳ないですがコチラの記事を参考にするか、公式のドキュメント等を参考にセットアップしてください。

Google BigQuery テーブルの作成

Google BigQuery のスキーマファイルを作成します。

/root/spam-schema.json

[
  {
    "name": "sent_time_f",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "sent_time_i",
    "type": "INTEGER",
    "mode": "required"
  },
  {
    "name": "subject",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_a",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_b",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_c",
    "type": "STRING",
    "mode": "required"
  }
]

Google BigQuery は現時点で空のテーブルを GUI から作成できないため、上記のスキーマが定義された空テーブルを、bq コマンドを使用して作成します。
ここでは、データセット名を spam, テーブル名も spam としています。

bq mk 1234567891234:spam
bq mk -t 1234567891234:spam.spam /root/spam-schema.json

Fluentd(td-agent) の設定

/etc/td-agent/td-agent.conf ファイルを開き、末尾に次を追記します。
FIXME になっている箇所はご利用の環境に応じて変更してください。
ファイルのエンコーディングは UTF-8N(BOM 無し)で保存してください。

<source>
  type spam

  tag spam.spam
  host mail.example.com # FIXME 接続先ホスト
  port 993 # FIXME 接続先ポート
  user user@example.com # FIXME 接続ユーザ
  password 123456789abc # FIXME 接続ユーザのパスワードの Base64
  examine とある迷惑メールボックス名 # FIXME 対象となるメールボックス名
  since 1-Jan-2014 # FIXME この日付以降のメールを調査します
  limit 30 # FIXME examine 対象のメールボックスから最新のメール以降何件を調査するか、-1 の場合すべてのメールを調査
</source>

<match spam.spam>
  type copy

  <store>
    type stdout
  </store>

  <store>
    type bigquery

    method insert    # default

    auth_method private_key   # default
    email xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com # FIXME
    private_key_path /home/username/.keys/00000000000000000000000000000000-privatekey.p12 # FIXME
    # private_key_passphrase notasecret # default

    project spam # FIXME
    dataset spam # FIXME
    table   spam # FIXME

    schema_path /root/spam-schema.json
  </store>

  <store>
    type elasticsearch
    host localhost
    port 9200
    index_name spam
    type_name spam
    include_tag_key true
    tag_key key
  </store>
</match>

/etc/td-agent/plugin/in_spam.rb の作成

Fluentd(td-agent) の入力プラグインを作成します。
このプラグインは、td-agent.conf の source ディレクティブに設定されたメールサーバー、メールボックス、ユーザ情報を用い IMAP 経由でメールを走査し、特定のパターンの subject を持つメールを出力します。

#! ruby -Ku

require 'base64'
require 'time'
require 'openssl'
require 'mail'
require 'net/imap'

OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE

def each_mail(host, port, user, password, examine, since, limit)
    imap = Net::IMAP.new(host, port, true)
    imap.login(user, password)
    imap.examine(Net::IMAP.encode_utf7(examine).force_encoding('ASCII-8BIT'))

    rule = ['SINCE', since]
    sorted = imap.sort(['DATE'], rule, 'UTF-8')

    if limit >= 0
        sorted = sorted.slice(0, limit)
    end

    sorted.each do |message_id|
        begin
            env = imap.fetch(message_id, 'ENVELOPE')[0].attr['ENVELOPE']
        rescue => e
            p e.backtrace

            next
        end

        from = env.from
        time = Time.parse(env.date)
        subject = Mail::Encodings.unquote_and_convert_to(env.subject, 'UTF-8')

        yield(imap, message_id, env, from, time, subject)
    end

    imap.logout
    imap.disconnect
end

class SpamInput < Fluent::Input
    Fluent::Plugin.register_input('spam', self)

    def configure(conf)
        super
        
        @tag = conf['tag']
        @host = conf['host']
        @port = conf['port']
        @user = conf['user']
        @password = conf['password']
        @examine = conf['examine']
        @since = conf['since']
        @limit = conf['limit']

        if @tag.blank? || @host.blank? || @port.blank? || @user.blank? || @password.blank? || @examine.blank? || @since.blank? || @limit.blank?
            raise Fluent::ConfigError
        end

        @password = Base64.decode64(@password)
        @limit = @limit.to_i
    end

    def start
        super

        @thread = Thread.new(&method(:run))
    end

    def shutdown
        @thread.kill
    end

    def run
        begin
            each_mail(@host, @port, @user, @password, @examine, @since, @limit) {|imap, message_id, env, from, time, subject|
                begin
                    if subject =~ /^\s?(.+?)_(.+?)_(.+)$/ # こんなパターンのメールがいっぱい来る!
                        Fluent::Engine.emit(@tag, time.to_i, {
                            'sent_time_f' => time.strftime('%Y-%m-%d'),
                            'sent_time_i' => time.to_i,
                            'subject' => subject,
                            'value_a' => $1,
                            'value_b' => $2,
                            'value_c' => $3
                        })
                    end
                rescue => e
                    Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s })
                end
            }
        rescue => e
            Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s })

            return
        end

        Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'status' => 'success' })
    end
end

Kibana インデックス作成

Kibana 4 からはデフォルトのインデックスを作成しなければ何もできません。
http://localhost:5601/ にアクセスし、[Index name] に spam* [Time-field name] に sent_time_f を設定し、インデックスを作成してください。


あとは、Google BigQuery で自由に SQL を投げて解析するもよし、Kibana で集計してダッシュボードをピン留めしておくもよし。
╭( ・ㅂ・)و ̑̑ グッ !


(オプション)Kibana で日本語の Aggregation(集計)がうまく動かない(´;ω;`)

Kibana+Elasticsearchで文字列の完全一致と部分一致検索の両方を実現する
偉大な先人の知恵をお借りしましょう(ありがとうございます、大変助かりました!)。

今回の例だと、以下のようにすれば OK です。

curl -XPUT http://localhost:9200/spam/ -d "`cat /root/es_spam_dynamic_template.json`"

/root/es_spam_dynamic_template.json

{
    "template": "*",
    "mappings": {
        "spam": {
            "_source": { "compress": true },
            "dynamic_templates": [
                {
                    "string_template" : {
                        "match" : "*",
                        "mapping": {
                            "type": "multi_field",
                            "fields": {
                                "{name}": {
                                    "type": "string",
                                    "index" : "analyzed"
                                },
                                "full": {
                                    "type": "string",
                                    "index" : "not_analyzed"
                                }
                            }
                        },
                        "match_mapping_type" : "string"
                    }
                }
            ],
            "properties" : {
                "sent_time_f" : { "type" : "date", "index" : "not_analyzed" }
            }
        }
    }
}

これで、名前が .full になっている term で集計すれば日本語でもいい感じに動いてくれます。