Java や curl 等 API から気軽に通話、SMS 送受信を行えるサービス - Twilio


Twilio for KDDI Web Communications | クラウド電話API

なんと、電話番号一個が月額100円程度で維持できる画期的なサービスです。
しかも、以下のように超簡単に使えます(SMS 送信の場合)。

curl
curl -X POST 'https://jp.twilio.com/2010-04-01/Accounts/XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX/Messages.json' \
--data-urlencode 'To=宛先番号'  \
--data-urlencode 'From=Twilioで保持している番号(発信番号)' \
--data-urlencode '送信するテキスト' \
-u XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
java
TwilioRestClient client = new TwilioRestClient(ACCOUNT_SID, AUTH_TOKEN); 

List<NameValuePair> params = new ArrayList<NameValuePair>(); 
params.add(new BasicNameValuePair("To", "Twilioで保持している番号(発信番号)")); 
params.add(new BasicNameValuePair("From", "宛先番号"));    
params.add(new BasicNameValuePair("Body", "送信するテキスト"));  
MessageFactory messageFactory = client.getAccount().getMessageFactory(); 
Message message = messageFactory.create(params); 
System.out.println(message.getSid());

他には Ruby, PHP, Python, Node.js, C# のすぐに使えるサンプルが管理画面に用意されています。
このサンプルの多さはとてもいいですね。

取得できる電話番号にも、SMS 送受信ができるかどうか等色々な電話番号があります。
私は一月 150 円の SMS 送受信が可能な番号を一つ所持しています。

注意点としては、残額がマイナスになるということです。
一つの電話番号の維持に月額100円程度かかります。
残高がなくなっても引かれ続けてしまい、アカウント的には借金になってしまう場合があります。
例えば、2000円チャージし一個の電話番号(例えば100円の電話番号)を2年運用すると、残高が-400円になってしまいます。
そこから更に2000円チャージしても借金分が引かれてしまい、残高は1600円になってしまいます。
新しくアカウントを作成した方がマシですね!
残高が負になるくらいであれば、電話番号を手放した方がよいです、気を付けましょう。

これと bttn を連携して SECOM への緊急通話用に、あるいは二段階認証を要求するサービスの捨てアアカウント用等にどうぞ。

Zimbra と Twilio を所持することで、メールアドレスによる認証、SMS による二段階認証を突破できるため捨てアカウントがはかどります。

維持費がとても安いためいざというときのために取っておいて損はありません!

AWS Lambda を利用して Nexus 6 を (σ´∀`)σゲッツ!! できないんだよ・・・


AWS Lambda を利用して Nexus 6 を (σ´∀`)σゲッツ!! - yanoの日記

そうか・・・Google Play 君・・・君は、アクセス元のIPアドレスから国を自動割り出しその国の在庫を出してくれるんだね・・・・
そして AWS Lambda 君・・・君は cron トリガーがないばかりか東京リージョンもないんだね・・・・お手本のようなプレビュー機能じゃあないか・・・

というわけで、上記の記事では (σ´∀`)σゲッツ!! できないことが分かりました(日本の在庫確認ができない)
じゃあ Proxy 通せば?とか、もうやめましょう。普通に作ろうじゃあないか。
ポテンシャルは Azure のモバイルサービス、ジョブサービスを超えていると思うが、現時点では Lambda は Azure をたたきのめせていない、棲み分けになっている。
規模の経済により Amazon の方が Azure より有利ではあるが、Azure, Google も追い上げてきているので(特に Azure)危機感を持って今後に期待したいところだ。

ということで国内サーバーより Google Play にアクセスし在庫状況を取得し、AWS SDK for JavaAmazon SNS に Push 通知するようにした。
技術者でない普通のユーザは Android 限定になってしまうが以下のアプリを使うとよいと思う。

Play Store Stock Checker : Google Playストアの端末販売状況を確認できるアプリ、在庫や価格の変動をプッシュ通知する機能も搭載 | juggly.cn

投げやり感満載ではあるが、書いたコードの断片を残しておこうと思う。
一応、GCM(Google Cloud Messaging) と APNS, default で iOS, Android, Eメール に通知を送ることが可能だ。
スタンドアロンJava ではなく Jetty で動かして cron で head 送るようにしたが、その辺は好きにするといいだろう。

public class NotifyNexus6StockServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    private static final int MAX_PUSH_DEFAULT_LENGTH = 70; // iOS は 8 以降ペイロードの許容サイズが飛躍的に増えたから、もっと多くしても大丈夫

    private static final AtomicBoolean blue32GBHasNotified = new AtomicBoolean(false);

    private static final AtomicBoolean blue64GBHasNotified = new AtomicBoolean(false);

    private static final AtomicBoolean white32GBHasNotified = new AtomicBoolean(false);

    private static final AtomicBoolean white64GBHasNotified = new AtomicBoolean(false);

    private static final Map<String, Object> shortenData(final Map<String, Object> data) {
        // default を短縮したバージョンの data クローンを作る
        final Map<String, Object> shortData = new LinkedHashMap<String, Object>();

        shortData.putAll(data);
        if (shortData.containsKey("default")) {
            final Object defaultMsg = shortData.get("default");

            if (defaultMsg instanceof CharSequence) {
                shortData.put("default", Strings.shorten(defaultMsg.toString(), MAX_PUSH_DEFAULT_LENGTH)); // 長すぎる文字列を切り詰める
            }
        }

        return shortData;
    }

    public static final String makeASNSJSON(final Map<String, Object> data, final boolean wantContentAvailable, final boolean wantBadge, final boolean wantSound) {
        final Map<String, Object> shortData = shortenData(data);
        final Map<String, Object> payload = new LinkedHashMap<String, Object>();

        { // iPhone
            final Map<String, Object> apns = new LinkedHashMap<String, Object>();
            final Map<String, Object> aps = new LinkedHashMap<String, Object>();

            if (wantContentAvailable) {
                aps.put("content-available", Integer.valueOf(1));
            }
            aps.put("alert", shortData.get("default")); // THE・流用

            if (wantBadge) {
                if (shortData.containsKey("badge")) {
                    final Object badge = shortData.get("badge");

                    if (badge instanceof Number) {
                        aps.put("badge", Integer.valueOf(((Number) badge).intValue()));
                    }
                }
            }
            if (wantSound) {
                if (shortData.containsKey("sound")) {
                    final Object sound = shortData.get("sound");

                    if (sound instanceof CharSequence) {
                        aps.put("sound", sound.toString());
                    } else {
                        aps.put("sound", "default");
                    }
                } else {
                    aps.put("sound", "default");
                }
            }

            apns.putAll(shortData);
            apns.remove("badge");
            apns.remove("sound");
            apns.put("aps", aps);
            payload.put("APNS", JSON.encode(apns));
        }

        { // Android(GCM)
            final Map<String, Object> gcm = new LinkedHashMap<String, Object>();

            gcm.put("data", shortData);
            payload.put("GCM", JSON.encode(gcm));
        }

        { // default
            if (shortData.containsKey("default")) {
                final Object defaultMsg = shortData.get("default");

                if (defaultMsg instanceof CharSequence) {
                    payload.put("default", defaultMsg.toString());
                }
            }
        }

        return JSON.encode(payload);
    }

    protected static boolean isInStock(final String url) throws IOException {
        final DefaultHttpClient client = new DefaultHttpClient();

        try {
            final HttpGet request = new HttpGet(url + "&_=" + new Date().getTime()); // get なのでクエリパラメータ付けてキャッシュ抑止しましょう
            final HttpResponse response = client.execute(request);
            final String content = BaseClient.getResponse(response);

            return StringUtils.contains(content, "詳しくはお近くの携帯電話ショップまでお問い合わせください。") && !StringUtils.contains(content, "現在在庫切れです。しばらくしてからもう一度ご確認ください。");
        } catch (final HttpHostConnectException e) {
            return false;
        } finally {
            client.getConnectionManager().shutdown();
        }
    }

    protected static void publishTopic(final String defaultMsg) {
        final ClientConfiguration clientConfiguration = new ClientConfiguration();
        final AmazonSNSClient client = new AmazonSNSClient(new BasicAWSCredentials("XXXXXXXXXXXXXXXXXXXX", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"), clientConfiguration);

        client.setEndpoint("sns.us-east-1.amazonaws.com"); // FIXME: リージョンは適宜変更

        try {
            try {
                final String targetArn = "arn:aws:sns:us-east-1:XXXXXXXXXXXX:NotifyMe";
                final Map<String, Object> data = new LinkedHashMap<String, Object>();

                data.put("default", defaultMsg);
                data.put("badge", Integer.valueOf(1)); // バッジは好きにすればいいと思う

                final String message = makeASNSJSON(data, true, true, true);

                if (StringUtils.isBlank(message)) {
                    return;
                }

                final PublishRequest publishRequest = new PublishRequest().withTargetArn(targetArn).withMessage(message).withMessageStructure("json");

                client.publish(publishRequest);
            } catch (final Exception e) {
                e.printStackTrace();
            }
        } finally {
            client.shutdown();
        }
    }

    @Override
    final protected void service(final HttpServletRequest req, final HttpServletResponse resp) throws IOException {
        if (!blue32GBHasNotified.get()) {
            if (isInStock("https://play.google.com/store/devices/details/Nexus_6_32_GB_%E3%83%80%E3%83%BC%E3%82%AF%E3%83%96%E3%83%AB%E3%83%BC?id=nexus_6_blue_32gb")) {
                publishTopic("Nexus 6(blue/32gb) is now available!");
                blue32GBHasNotified.set(true);
            }
        }

        if (!blue64GBHasNotified.get()) {
            if (isInStock("https://play.google.com/store/devices/details/Nexus_6_64_GB_%E3%83%80%E3%83%BC%E3%82%AF%E3%83%96%E3%83%AB%E3%83%BC?id=nexus_6_blue_64gb")) {
                publishTopic("Nexus 6(blue/64gb) is now available!);
                blue64GBHasNotified.set(true);
            }
        }

        if (!white32GBHasNotified.get()) {
            if (isInStock("https://play.google.com/store/devices/details/Nexus_6_32_GB_%E3%82%AF%E3%83%A9%E3%82%A6%E3%83%89_%E3%83%9B%E3%83%AF%E3%82%A4%E3%83%88?id=nexus_6_white_32gb")) {
                publishTopic("Nexus 6(white/32gb) is now available!");
                white32GBHasNotified.set(true);
            }
        }

        if (!white64GBHasNotified.get()) {
            if (isInStock("https://play.google.com/store/devices/details/Nexus_6_64_GB_%E3%82%AF%E3%83%A9%E3%82%A6%E3%83%89_%E3%83%9B%E3%83%AF%E3%82%A4%E3%83%88?id=nexus_6_white_64gb")) {
                publishTopic("Nexus 6(white/64gb) is now available!");
                white64GBHasNotified.set(true);
            }
        }
    }
}

え?状態の永続化?大丈夫大丈夫!!(そこまで必要ないでしょ・・・)
(しかも static に状態持たせちゃうと分散環境かつ非スティッキーなやけに凝った運用をしていると複数通知されてしまいますがその辺はご了承)

AWS Lambda を利用して Nexus 6 を (σ´∀`)σゲッツ!!

Nexus 6 が欲しいのになかなか買えない!!!!!!
SIM カードが不要なので Google Play で買おうと思ってるのですが、いつ見ても

現在在庫切れです。しばらくしてからもう一度ご確認ください。

色や容量にはこだわらないのに、年末から折を見て確認しているのに、いつ見ても

現在在庫切れです。しばらくしてからもう一度ご確認ください。

(#`Д´)ノノ┻┻;:'、・゙


こうなってしまうと本当に在庫が復活するのか疑わしい物です。
どれ、ちょっと自動で在庫をチェックして、在庫が復活した際に通知を送りましょう。

Azure に tie-in してよいのであれば、すぐに作れます。
なぜなら Azure には Node.js をバックエンドに GUI からスケジューラを作成できるお手軽便利機能があるからです。

今回はそれではつまらないのと(Azure が優秀過ぎる)、Amazon SNS のトピックが便利(モバイルデバイス以外にもメールにも通知を行いたい)なため最近ホットな AWS Lambda を使ってみます。
ところが、AWS Lambda にはスケジューラ機能がありません。そのうち実装されると思いますが、とにかく今はないので、代替方法を探します。

皆さん、S3 を利用したり Lambda API で自分自身を呼び出したりとなんとか頑張ってループしているようです。
今回の例ですと、後者の Lambda API で 45 秒間隔で Nexus 6 のサイトをチェックし、在庫がある場合に通知を行う Lambda を作ってみましょう。
また、通知を連続で送ってしまわないよう、通知済みを記録するファイルを作成します。このファイルがある場合には、通知済みとして通知を行わず、ファイルが存在しない場合には通知を行うようにします。

var AWS = require('aws-sdk');
var HTTPS = require('https'); // http な場合は var HTTP = require('http'); とします

exports.handler = function(event, context) {
  console.log('Starting nexus 6 stock checker...');
  console.log(event);

  var lambda = new AWS.Lambda({
    accessKeyId: 'XXXXXXXXXXXXXXXXXXX', // FIXME: AWS Lambda の IAM アクセスキー
    secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', // FIXME: AWS Lambda の IAM シークレットキー
    region: 'us-east-1' // FIXME: Lambda のリージョン
  });

  setTimeout(function() {
    var params = {
      FunctionName: 'checkNuxus6Stock', // Lambda 関数名
      InvokeArgs: '{}'
    };

    lambda.invokeAsync(params, function(err, data) {
      if (err) {
        console.log('Unexpected error encountered while invoking lambda function.');
        console.log(err, err.stack);

        return;
      } else {
        console.log(data);
      }

      try {
        // get なのでキャッシュされないよう URL に _ パラメータ(名前は何でもいい、値が一意であれば)をくっつけておきます
        check('https://play.google.com/store/devices/details/Nexus_6_32_GB_%E3%83%80%E3%83%BC%E3%82%AF%E3%83%96%E3%83%AB%E3%83%BC?id=nexus_6_blue_32gb&_=' + new Date().getTime(), context); // 青/32GB
        check('https://play.google.com/store/devices/details/Nexus_6_64_GB_%E3%83%80%E3%83%BC%E3%82%AF%E3%83%96%E3%83%AB%E3%83%BC?id=nexus_6_blue_64gb&_=' + new Date().getTime(), context); // 青/64GB
        check('https://play.google.com/store/devices/details/Nexus_6_32_GB_%E3%82%AF%E3%83%A9%E3%82%A6%E3%83%89_%E3%83%9B%E3%83%AF%E3%82%A4%E3%83%88?id=nexus_6_white_32gb&_=' + new Date().getTime(), context); // 白/32GB
        check('https://play.google.com/store/devices/details/Nexus_6_64_GB_%E3%82%AF%E3%83%A9%E3%82%A6%E3%83%89_%E3%83%9B%E3%83%AF%E3%82%A4%E3%83%88?id=nexus_6_white_64gb&_=' + new Date().getTime(), context); // 白/64GB

        context.done();
      } catch (e) {
        console.log(e);
      }
    });
  }, 45000); // 45 秒後に自身を実行


  function check(url, context) {
    HTTPS.get(url, function (res) { // http な場合は HTTP.get とします
      console.log(url + ' response:' + res.statusCode);

      if (res.statusCode < 200 || res.statusCode >= 300) {
        return;
      }

      var body = '';

      res.on('data', function(chunk) { // レスポンスをドラゴンボールのようにかき集めます
          body += chunk;
      });

      res.on('end', function() { // レスポンスから在庫がないときに存在する HTML コードの検索を行い、在庫状況をチェックします
        console.log(body);

        if (body != null && body.indexOf('We are out of inventory. Please check back soon.') == -1) {
          console.log('Notifying me...');
          notifyMe();
        }
      });
    }).on('error', function(e){
      console.log('error', e);
    });
  }

  function notifyMe() {
    var s3 = new AWS.S3({ apiVersion: '2006-03-01' });

    s3.getObject({ Bucket: 'XXXXXXXXXX', Key: 'nexus6_has_notified'}, function(err, data) { // FIXME: 通知状況を確認するためのバケットとキー(パス)
      if (err) { // ファイルが存在しない場合には未通知とみなし通知を行います
        console.log('Object not found.');

        var sns = new AWS.SNS({
          apiVersion: '2010-03-31',
          region: 'us-east-1' // FIXME: AWS SNS のリージョン
        });
        var payload = {
          aps: {
            alert: 'Nexus 6 is now available!',
            sound: 'default',
            customProperty: 'We are Nexsus!' // デバイスに独自にデータを渡したい場合この辺に突っ込んどきます(iPhone の場合)
          }
        };
        var message = {
          default: 'Nexus 6 is now available!',
          APNS: JSON.stringify(payload) // iPhone の場合(GCM, Baidu 等を用いて通知したい場合それぞれのプラットフォームに応じた形式の JSON を格納します)
        };
        var params = {
          TopicArn: 'arn:aws:sns:us-east-1:XXXXXXXXXXX:XXXXXXX', // FIXME: AWS SNS の通知先トピックARN
          MessageStructure: 'json',
          Message: JSON.stringify(message)
        };

        sns.publish(params, function(err, data) {
          if (err) {
            console.log('Unexpected error encountered while Notifying me.');
            console.log(err);
          }
        });

        s3.putObject({ // 通知済みであることを S3 に保存します
          Bucket: 'XXXXXXXXXX', // FIXME: 通知状況を保存するバケット
          Key: 'nexus6_has_notified',
          Body: new Buffer('1', 'binary'), // 今回の例だとファイルが「存在する/しない」のみの制御なので何が書いてあっても問題なし
          ContentType: 'application/octet-stream'
        }, function(err, res) {
          if (err) {
            console.log('Unexpected error encountered while putting object.');
            console.error(err);
          }
        });
      } else {
        console.log('Object found.');
      }
    });
  }
};

Lambda 作成時の名前を「checkNuxus6Stock」、タイムアウトを「60」秒にして作成すれば OK です。

Google BigQuery と Elasticsearch に timestamp を入れる

BigQuery の TIMESTAMP 型は以下のフォーマットを受け付けます。
Data types

Elasticsearch の date は以下のフォーマットを受け付けます。
date format

う〜ん、タイムゾーンを絡めると Elasticsearch には BigQuery が要求する TIMESTAMP 型の組み込み型がないですね・・・というわけでカスタムフォーマットを使いましょう。
そのために、入力するタイムスタンプは

2015-01-03 00:40:35.220 +09:00

とします。で、

curl -XPUT http://localhost:9200/FIXME/ -d "`cat es_timestamp.json`"

es_timestamp.json

{
    "template": "*",
    "mappings": {
        "FIXME": {
            "_source": { "compress": true },
            "properties": {
                "timestamp": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss.SSS ZZ", "index": "not_analyzed" }
            }
        }
    }
}

timestamp フィールドの型をカスタムフォーマットで指定します。
FIXME はご利用の環境に応じて適切に変更してくださいませ。

Fluentd+BigQuery+Elasticsearch+Kibanaで迷惑メールを解析

僕のメールアドレスには、去年辺りから、どういうわけか毎日ほぼ決まった時間帯に、決まったフォーマットの subject をもつ迷惑メールが一日平均 5 通くらい届きます。
普通であれば削除するのですが、「ほぼ決まった時間帯」「決まったフォーマットの subject を持つ」「複数通送られてくる」という特異性からか、無意識に削除せず別のフォルダに切り分けていました。
数えてみると 1500 通くらいあったので、大して Big でもないしこれ以上 Big になって欲しくもないのですが、BigQuery に流し込んで解析してみたいと思います。
Fluentd(td-agent) を使うので、ついでに Elasticsearch と Kibana も使って可視化してしまいましょう。

入力プラグインさえ作ってしまえば、後は Fluentd の出力プラグインが Elasticsearch と BigQuery にデータを投げてくれます(プラグインの作者様に感謝!)。


以下は Ubuntu 14.04 を利用している前提で書きます。

JDK のインストール

Elasticsearch の実行に Java が必要であるため、まずは Java をインストールします。

curl -L -C - -b "oraclelicense=accept-securebackup-cookie" -O http://download.oracle.com/otn-pub/java/jdk/8u25-b17/jdk-8u25-linux-x64.tar.gz
tar zxvf jdk-8u25-linux-x64.tar.gz
rm jdk-8u25-linux-x64.tar.gz

mkdir -p /usr/local/java
mv jdk1.8.0_25 /usr/local/java/jdk1.8.0_25
ln -s /usr/local/java/jdk1.8.0_25 /usr/local/java/jdk

Fluentd のインストール

Installation Guide を参考にインストールします。
Fluentd インストール前の作業として、ulimit, sysctl パラメータを書き換えるのを忘れないようにしましょう(上記リンク内に説明が書かれています)

Ubuntu 14.04 の場合、以下のようにするようです。

curl -L http://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh

fluent-plugin-bigquery のインストール

このプラグインを使用すると、BigQuery に簡単にデータを保存することができます。
gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。

/opt/td-agent/embedded/bin/gem install fluent-plugin-bigquery

fluent-plugin-elasticsearch のインストール

このプラグインを使用すると、Elasticsearch に簡単にデータを保存することができます。
こちらも同様に gem install で簡単にインストールできますが、Fluentd(td-agent) 組み込みの gem でインストールする必要があります。
libcurl が必要なことに注意してください。

apt-get install -y libcurl4-openssl-dev
/opt/td-agent/embedded/bin/gem install fluent-plugin-elasticsearch

libcurl の 4 という数字に過敏に反応される方は

apt search libcurl | grep openssl

などとしてインストール可能なバージョンを検索してください。

Elasticsearch のインストール

setup-repositories apt こちらの公式サイトを参考にインストールを行います。
Ubuntu 14.04 の場合以下のようにするようです。

wget -qO - https://packages.elasticsearch.org/GPG-KEY-elasticsearch | sudo apt-key add -
sudo add-apt-repository "deb http://packages.elasticsearch.org/elasticsearch/1.4/debian stable main"
sudo apt-get update && sudo apt-get install elasticsearch
sudo update-rc.d elasticsearch defaults 95 10 # 自動起動させたい場合

/etc/init.d/elasticsearch に以下を追加します。

JAVA_HOME=/usr/local/java/jdk

これにより、ファイルの内容は以下のようになります。

...
### END INIT INFO

JAVA_HOME=/usr/local/java/jdk
PATH=/bin:/usr/bin:/sbin:/usr/sbin
NAME=elasticsearch
DESC="Elasticsearch Server"
DEFAULT=/etc/default/$NAME
...

Kibana のインストール

Kibana は 3 でも 4 でも構いません。
4 の方が集計等の操作が扱いやすくなっています。
ここでは、kibana 4 beta 3 をインストールします。
kibana 4 beta 3 installation を参考にインストールします。
Kibana 3 では Apache や nginx に配置すればよかったですが、Kibana 4 beta 3 の段階では実行ファイルを実行するようです。

お使いのシェルの環境変数に、JAVA_HOME=/usr/local/java/jdk を設定しておいてください。

Google BigQuery のインストール

インストール作業と言うよりは、プロジェクトの作成ですね。
Google Developers Console より適当にプロジェクトを作成します(名前やIDは何でもいいですが、IDは後で使うので覚えておいてください)
[API と認証] - [API] 画面より [BigQuery API] を ON にするのを忘れないように注意です。
これ以降では、作成したプロジェクトIDを spam, プロジェクト番号を 1234567891234 と仮定します。
Google Cloud SDK のインストールは省略します。手前味噌で申し訳ないですがコチラの記事を参考にするか、公式のドキュメント等を参考にセットアップしてください。

Google BigQuery テーブルの作成

Google BigQuery のスキーマファイルを作成します。

/root/spam-schema.json

[
  {
    "name": "sent_time_f",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "sent_time_i",
    "type": "INTEGER",
    "mode": "required"
  },
  {
    "name": "subject",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_a",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_b",
    "type": "STRING",
    "mode": "required"
  },
  {
    "name": "value_c",
    "type": "STRING",
    "mode": "required"
  }
]

Google BigQuery は現時点で空のテーブルを GUI から作成できないため、上記のスキーマが定義された空テーブルを、bq コマンドを使用して作成します。
ここでは、データセット名を spam, テーブル名も spam としています。

bq mk 1234567891234:spam
bq mk -t 1234567891234:spam.spam /root/spam-schema.json

Fluentd(td-agent) の設定

/etc/td-agent/td-agent.conf ファイルを開き、末尾に次を追記します。
FIXME になっている箇所はご利用の環境に応じて変更してください。
ファイルのエンコーディングは UTF-8N(BOM 無し)で保存してください。

<source>
  type spam

  tag spam.spam
  host mail.example.com # FIXME 接続先ホスト
  port 993 # FIXME 接続先ポート
  user user@example.com # FIXME 接続ユーザ
  password 123456789abc # FIXME 接続ユーザのパスワードの Base64
  examine とある迷惑メールボックス名 # FIXME 対象となるメールボックス名
  since 1-Jan-2014 # FIXME この日付以降のメールを調査します
  limit 30 # FIXME examine 対象のメールボックスから最新のメール以降何件を調査するか、-1 の場合すべてのメールを調査
</source>

<match spam.spam>
  type copy

  <store>
    type stdout
  </store>

  <store>
    type bigquery

    method insert    # default

    auth_method private_key   # default
    email xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com # FIXME
    private_key_path /home/username/.keys/00000000000000000000000000000000-privatekey.p12 # FIXME
    # private_key_passphrase notasecret # default

    project spam # FIXME
    dataset spam # FIXME
    table   spam # FIXME

    schema_path /root/spam-schema.json
  </store>

  <store>
    type elasticsearch
    host localhost
    port 9200
    index_name spam
    type_name spam
    include_tag_key true
    tag_key key
  </store>
</match>

/etc/td-agent/plugin/in_spam.rb の作成

Fluentd(td-agent) の入力プラグインを作成します。
このプラグインは、td-agent.conf の source ディレクティブに設定されたメールサーバー、メールボックス、ユーザ情報を用い IMAP 経由でメールを走査し、特定のパターンの subject を持つメールを出力します。

#! ruby -Ku

require 'base64'
require 'time'
require 'openssl'
require 'mail'
require 'net/imap'

OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE

def each_mail(host, port, user, password, examine, since, limit)
    imap = Net::IMAP.new(host, port, true)
    imap.login(user, password)
    imap.examine(Net::IMAP.encode_utf7(examine).force_encoding('ASCII-8BIT'))

    rule = ['SINCE', since]
    sorted = imap.sort(['DATE'], rule, 'UTF-8')

    if limit >= 0
        sorted = sorted.slice(0, limit)
    end

    sorted.each do |message_id|
        begin
            env = imap.fetch(message_id, 'ENVELOPE')[0].attr['ENVELOPE']
        rescue => e
            p e.backtrace

            next
        end

        from = env.from
        time = Time.parse(env.date)
        subject = Mail::Encodings.unquote_and_convert_to(env.subject, 'UTF-8')

        yield(imap, message_id, env, from, time, subject)
    end

    imap.logout
    imap.disconnect
end

class SpamInput < Fluent::Input
    Fluent::Plugin.register_input('spam', self)

    def configure(conf)
        super
        
        @tag = conf['tag']
        @host = conf['host']
        @port = conf['port']
        @user = conf['user']
        @password = conf['password']
        @examine = conf['examine']
        @since = conf['since']
        @limit = conf['limit']

        if @tag.blank? || @host.blank? || @port.blank? || @user.blank? || @password.blank? || @examine.blank? || @since.blank? || @limit.blank?
            raise Fluent::ConfigError
        end

        @password = Base64.decode64(@password)
        @limit = @limit.to_i
    end

    def start
        super

        @thread = Thread.new(&method(:run))
    end

    def shutdown
        @thread.kill
    end

    def run
        begin
            each_mail(@host, @port, @user, @password, @examine, @since, @limit) {|imap, message_id, env, from, time, subject|
                begin
                    if subject =~ /^\s?(.+?)_(.+?)_(.+)$/ # こんなパターンのメールがいっぱい来る!
                        Fluent::Engine.emit(@tag, time.to_i, {
                            'sent_time_f' => time.strftime('%Y-%m-%d'),
                            'sent_time_i' => time.to_i,
                            'subject' => subject,
                            'value_a' => $1,
                            'value_b' => $2,
                            'value_c' => $3
                        })
                    end
                rescue => e
                    Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s })
                end
            }
        rescue => e
            Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'backtrace' => e.backtrace.to_s })

            return
        end

        Fluent::Engine.emit('debug.' + @tag, Time.now.to_i, { 'status' => 'success' })
    end
end

Kibana インデックス作成

Kibana 4 からはデフォルトのインデックスを作成しなければ何もできません。
http://localhost:5601/ にアクセスし、[Index name] に spam* [Time-field name] に sent_time_f を設定し、インデックスを作成してください。


あとは、Google BigQuery で自由に SQL を投げて解析するもよし、Kibana で集計してダッシュボードをピン留めしておくもよし。
╭( ・ㅂ・)و ̑̑ グッ !


(オプション)Kibana で日本語の Aggregation(集計)がうまく動かない(´;ω;`)

Kibana+Elasticsearchで文字列の完全一致と部分一致検索の両方を実現する
偉大な先人の知恵をお借りしましょう(ありがとうございます、大変助かりました!)。

今回の例だと、以下のようにすれば OK です。

curl -XPUT http://localhost:9200/spam/ -d "`cat /root/es_spam_dynamic_template.json`"

/root/es_spam_dynamic_template.json

{
    "template": "*",
    "mappings": {
        "spam": {
            "_source": { "compress": true },
            "dynamic_templates": [
                {
                    "string_template" : {
                        "match" : "*",
                        "mapping": {
                            "type": "multi_field",
                            "fields": {
                                "{name}": {
                                    "type": "string",
                                    "index" : "analyzed"
                                },
                                "full": {
                                    "type": "string",
                                    "index" : "not_analyzed"
                                }
                            }
                        },
                        "match_mapping_type" : "string"
                    }
                }
            ],
            "properties" : {
                "sent_time_f" : { "type" : "date", "index" : "not_analyzed" }
            }
        }
    }
}

これで、名前が .full になっている term で集計すれば日本語でもいい感じに動いてくれます。