Fluentd+BigQuery+Elasticsearch+Kibanaで迷惑メールを解析
僕のメールアドレスには、去年辺りから、どういうわけか毎日ほぼ決まった時間帯に、決まったフォーマットの subject をもつ迷惑メールが一日平均 5 通くらい届きます。
普通であれば削除するのですが、「ほぼ決まった時間帯」「決まったフォーマットの subject を持つ」「複数通送られてくる」という特異性からか、無意識に削除せず別のフォルダに切り分けていました。
数えてみると 1500 通くらいあったので、大して Big でもないしこれ以上 Big になって欲しくもないのですが、BigQuery に流し込んで解析してみたいと思います。
Fluentd(td-agent) を使うので、ついでに Elasticsearch と Kibana も使って可視化してしまいましょう。
入力プラグインさえ作ってしまえば、後は Fluentd の出力プラグインが Elasticsearch と BigQuery にデータを投げてくれます(プラグインの作者様に感謝!)。
以下は Ubuntu 14.04 を利用している前提で書きます。
JDK のインストール
Elasticsearch の実行に Java が必要であるため、まずは Java をインストールします。
curl -L -C - -b "oraclelicense=accept-securebackup-cookie" -O http://download.oracle.com/otn-pub/java/jdk/8u25-b17/jdk-8u25-linux-x64.tar.gz tar zxvf jdk-8u25-linux-x64.tar.gz rm jdk-8u25-linux-x64.tar.gz mkdir -p /usr/local/java mv jdk1.8.0_25 /usr/local/java/jdk1.8.0_25 ln -s /usr/local/java/jdk1.8.0_25 /usr/local/java/jdk
Fluentd のインストール
Installation Guide を参考にインストールします。
Fluentd インストール前の作業として、ulimit, sysctl パラメータを書き換えるのを忘れないようにしましょう(上記リンク内に説明が書かれています)
Ubuntu 14.04 の場合、以下のようにするようです。
curl -L http://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
fluent-plugin-bigquery のインストール
このプラグインを使用すると、BigQuery に簡単にデータを保存することができます。
gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。
/opt/td-agent/embedded/bin/gem install fluent-plugin-bigquery
fluent-plugin-elasticsearch のインストール
このプラグインを使用すると、Elasticsearch に簡単にデータを保存することができます。
こちらも同様に gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。
libcurl が必要なことに注意してください。
apt-get install -y libcurl4-openssl-dev /opt/td-agent/embedded/bin/gem install fluent-plugin-elasticsearch
libcurl の 4 という数字に過敏に反応される方は
apt search libcurl | grep openssl
などとしてインストール可能なバージョンを検索してください。
Elasticsearch のインストール
setup-repositories apt こちらの公式サイトを参考にインストールを行います。
Ubuntu 14.04 の場合以下のようにするようです。
wget -qO - https://packages.elasticsearch.org/GPG-KEY-elasticsearch | sudo apt-key add - sudo add-apt-repository "deb http://packages.elasticsearch.org/elasticsearch/1.4/debian stable main" sudo apt-get update && sudo apt-get install elasticsearch sudo update-rc.d elasticsearch defaults 95 10 # 自動起動させたい場合
/etc/init.d/elasticsearch に以下を追加します。
JAVA_HOME=/usr/local/java/jdk
これにより、ファイルの内容は以下のようになります。
... ### END INIT INFO JAVA_HOME=/usr/local/java/jdk PATH=/bin:/usr/bin:/sbin:/usr/sbin NAME=elasticsearch DESC="Elasticsearch Server" DEFAULT=/etc/default/$NAME ...
Kibana のインストール
Kibana は 3 でも 4 でも構いません。
4 の方が集計等の操作が扱いやすくなっています。
ここでは、kibana 4 beta 3 をインストールします。
kibana 4 beta 3 installation を参考にインストールします。
Kibana 3 では Apache や nginx に配置すればよかったですが、Kibana 4 beta 3 の段階では実行ファイルを実行するようです。
Google BigQuery のインストール
インストール作業と言うよりは、プロジェクトの作成ですね。
Google Developers Console より適当にプロジェクトを作成します(名前やIDは何でもいいですが、IDは後で使うので覚えておいてください)
[API と認証] - [API] 画面より [BigQuery API] を ON にするのを忘れないように注意です。
これ以降では、作成したプロジェクトIDを spam, プロジェクト番号を 1234567891234 と仮定します。
Google Cloud SDK のインストールは省略します。手前味噌で申し訳ないですがコチラの記事を参考にするか、公式のドキュメント等を参考にセットアップしてください。
Google BigQuery テーブルの作成
Google BigQuery のスキーマファイルを作成します。
/root/spam-schema.json
[ { "name": "sent_time_f", "type": "STRING", "mode": "required" }, { "name": "sent_time_i", "type": "INTEGER", "mode": "required" }, { "name": "subject", "type": "STRING", "mode": "required" }, { "name": "value_a", "type": "STRING", "mode": "required" }, { "name": "value_b", "type": "STRING", "mode": "required" }, { "name": "value_c", "type": "STRING", "mode": "required" } ]
Google BigQuery は現時点で空のテーブルを GUI から作成できないため、上記のスキーマが定義された空テーブルを、bq コマンドを使用して作成します。
ここでは、データセット名を spam, テーブル名も spam としています。
bq mk 1234567891234:spam bq mk -t 1234567891234:spam.spam /root/spam-schema.json
Fluentd(td-agent) の設定
/etc/td-agent/td-agent.conf ファイルを開き、末尾に次を追記します。
FIXME になっている箇所はご利用の環境に応じて変更してください。
ファイルのエンコーディングは UTF-8N(BOM 無し)で保存してください。
<source> type spam tag spam.spam host mail.example.com # FIXME 接続先ホスト port 993 # FIXME 接続先ポート user user@example.com # FIXME 接続ユーザ password 123456789abc # FIXME 接続ユーザのパスワードの Base64 examine とある迷惑メールボックス名 # FIXME 対象となるメールボックス名 since 1-Jan-2014 # FIXME この日付以降のメールを調査します limit 30 # FIXME examine 対象のメールボックスから最新のメール以降何件を調査するか、-1 の場合すべてのメールを調査 </source> <match spam.spam> type copy <store> type stdout </store> <store> type bigquery method insert # default auth_method private_key # default email xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com # FIXME private_key_path /home/username/.keys/00000000000000000000000000000000-privatekey.p12 # FIXME # private_key_passphrase notasecret # default project spam # FIXME dataset spam # FIXME table spam # FIXME schema_path /root/spam-schema.json </store> <store> type elasticsearch host localhost port 9200 index_name spam type_name spam include_tag_key true tag_key key </store> </match>
/etc/td-agent/plugin/in_spam.rb の作成
Fluentd(td-agent) の入力プラグインを作成します。
このプラグインは、td-agent.conf の source ディレクティブに設定されたメールサーバー、メールボックス、ユーザ情報を用い IMAP 経由でメールを走査し、特定のパターンの subject を持つメールを出力します。
#! ruby -Ku require 'base64' require 'time' require 'openssl' require 'mail' require 'net/imap' OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE def each_mail(host, port, user, password, examine, since, limit) imap = Net::IMAP.new(host, port, true) imap.login(user, password) imap.examine(Net::IMAP.encode_utf7(examine).force_encoding('ASCII-8BIT')) rule = ['SINCE', since] sorted = imap.sort(['DATE'], rule, 'UTF-8') if limit >= 0 sorted = sorted.slice(0, limit) end sorted.each do |message_id| begin env = imap.fetch(message_id, 'ENVELOPE')[0].attr['ENVELOPE'] rescue => e p e.backtrace next end from = env.from time = Time.parse(env.date) subject = Mail::Encodings.unquote_and_convert_to(env.subject, 'UTF-8') yield(imap, message_id, env, from, time, subject) end imap.logout imap.disconnect end class SpamInput < Fluent::Input Fluent::Plugin.register_input('spam', self) def configure(conf) super @tag = conf['tag'] @host = conf['host'] @port = conf['port'] @user = conf['user'] @password = conf['password'] @examine = conf['examine'] @since = conf['since'] @limit = conf['limit'] if @tag.blank? || @host.blank? || @port.blank? || @user.blank? || @password.blank? || @examine.blank? || @since.blank? || @limit.blank? raise Fluent::ConfigError end @password = Base64.decode64(@password) @limit = @limit.to_i end def start super @thread = Thread.new(&method(:run)) end def shutdown @thread.kill end def run begin each_mail(@host, @port, @user, @password, @examine, @since, @limit) {|imap, message_id, env, from, time, subject| begin if subject =~ /^\s?(.+?)_(.+?)_(.+)$/ # こんなパターンのメールがいっぱい来る! Fluent::Engine.emit(@tag, time.to_i, { 'sent_time_f' => time.strftime('%Y-%m-%d'), 'sent_time_i' => time.to_i, 'subject' => subject, 'value_a' => $1, 'value_b' => $2, 'value_c' => $3 }) end rescue => e Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s }) end } rescue => e Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s }) return end Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'status' => 'success' }) end end
Kibana インデックス作成
Kibana 4 からはデフォルトのインデックスを作成しなければ何もできません。
http://localhost:5601/ にアクセスし、[Index name] に spam* [Time-field name] に sent_time_f を設定し、インデックスを作成してください。
あとは、Google BigQuery で自由に SQL を投げて解析するもよし、Kibana で集計してダッシュボードをピン留めしておくもよし。
╭( ・ㅂ・)و ̑̑ グッ !
(オプション)Kibana で日本語の Aggregation(集計)がうまく動かない(´;ω;`)
Kibana+Elasticsearchで文字列の完全一致と部分一致検索の両方を実現する
偉大な先人の知恵をお借りしましょう(ありがとうございます、大変助かりました!)。
今回の例だと、以下のようにすれば OK です。
curl -XPUT http://localhost:9200/spam/ -d "`cat /root/es_spam_dynamic_template.json`"
/root/es_spam_dynamic_template.json
{ "template": "*", "mappings": { "spam": { "_source": { "compress": true }, "dynamic_templates": [ { "string_template" : { "match" : "*", "mapping": { "type": "multi_field", "fields": { "{name}": { "type": "string", "index" : "analyzed" }, "full": { "type": "string", "index" : "not_analyzed" } } }, "match_mapping_type" : "string" } } ], "properties" : { "sent_time_f" : { "type" : "date", "index" : "not_analyzed" } } } } }
これで、名前が .full になっている term で集計すれば日本語でもいい感じに動いてくれます。