読者です 読者をやめる 読者になる 読者になる

Google Cloud Pub/Sub 使ってみた

Google Google Cloud Pub/Sub Java Google AppEngine


Cloud Pub/Sub - Reliable Enterprise Messaging in the Cloud — Google Cloud Platform

Google Cloud Pub/Sub が公開ベータになったので、早速使ってみました。
一見、Amazon SNS と競合するサービスなのかと思いきや、単なるメッセージ送信サービスであり、競合は ZeroMQ などのような MQ ライブラリとなっています。
ZeroMQ と違うのは、push だけでなく pull にも対応しており、メールのように未読のメッセージを取りに行って取得するようなこともできるようになっている点です。

メッセージを配信するサーバー、受信するサーバーは GCE や GAE 以外のクラウドサービス・オンプレのどちらもいいのですが、GCE, GAE 以外から使用する場合従来の Google の他の API 同様サーバーキーを用いて認証したり、ウェブマスター ツールによるサイトの登録が必要になります。

面倒くさいので、今回は配信元、配信先共に GAE とします。
多分これが一番早いと思います(嘘)。

プロジェクトの作成

Google Developers Console
よりプロジェクトを作成します。作成したプロジェクトIDを「test-pub-sub」とします。

Google Cloud Pub/Sub API の有効化

プロジェクトを作成したら、[API と認証] - [API] 画面より [Google Cloud Pub/Sub] を [オン] にします。

GAE プロジェクトの作成(Javaでオナシャス!)

test-pub-sub の GAE アプリケーションを作成します。

Google Cloud Pub/Sub v1beta2 Client Library for Java の配置

https://cloud.google.com/pubsub/libraries#libraries より Java 用のライブラリをダウンロードします。

pubsub/google-api-services-pubsub-v1beta2-rev1-1.19.1.jar を GAE の WEB-INF/lib に配置します。
pubsub/libs 配下にある以下の jar を、GAE の WEB-INF/lib に配置します。

  • commons-logging-1.1.1.jar
  • google-api-client-1.19.1.jar
  • google-api-client-appengine-1.19.1.jar
  • google-http-client-1.19.0.jar
  • google-http-client-appengine-1.19.0.jar
  • google-http-client-jackson2-1.19.0.jar
  • google-oauth-client-1.19.0.jar
  • google-oauth-client-appengine-1.19.0.jar
  • httpclient-4.0.1.jar
  • httpcore-4.0.1.jar
  • jackson-core-2.1.3.jar

更に、以下の jar を、GAE の WEB-INF/lib に配置します。

Java ファイルの配置

https://github.com/GoogleCloudPlatform/cloud-pubsub-samples-java/tree/master/cmdline-pull/src/main/java/com/google/cloud/pubsub/client/demos/cli
にある Java ファイルを GAE に配置します。
私は以下のようにパッケージを変更して配置しました(demos/cli を除去)

  • com.google.cloud.pubsub.client.RetryHttpInitializerWrapper
  • com.google.cloud.pubsub.client.SubscriptionMethods
  • com.google.cloud.pubsub.client.TopicMethods
  • com.google.cloud.pubsub.client.Utils

Pubsub 生成クラス (AppEngineConfiguration) の実装

Google Cloud Pub/Sub ではすべての API は Pubsub インスタンスを経由します。
Pubsub インスタンスは、サーバーを GAE, GCE 等のどこで運用するかで生成方法が異なります。
Setting Up Your Application - Google Cloud Pub/Sub — Google Cloud Platform
今回は GAE 上で実行するため、以下で OK です。

import java.io.IOException;

import com.google.api.client.extensions.appengine.http.UrlFetchTransport;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.extensions.appengine.auth.oauth2.AppIdentityCredential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;
import com.google.cloud.pubsub.client.RetryHttpInitializerWrapper;

/**
 * Create a Pubsub client on App Engine.
 */
public class AppEngineConfiguration {
    private static final HttpTransport TRANSPORT = UrlFetchTransport.getDefaultInstance();

    private static final JsonFactory JSON_FACTORY = JacksonFactory.getDefaultInstance();

    public static Pubsub createPubsubClient() throws IOException {
        final GoogleCredential credential = new AppIdentityCredential.AppEngineCredentialWrapper(TRANSPORT, JSON_FACTORY).createScoped(PubsubScopes.all());
        // Please use custom HttpRequestInitializer for automatic
        // retry upon failures. We provide a simple reference
        // implementation in the "Retry Handling" section.
        final HttpRequestInitializer initializer = new RetryHttpInitializerWrapper(credential);

        return new Pubsub.Builder(TRANSPORT, JSON_FACTORY, initializer).setApplicationName("test-pub-sub").build();
    }
}

アプリケーション名を設定しないと実行時に警告が出るため、適当に設定します(setApplicationName("test-pub-sub") の部分)

Topic 管理サーブレット

Pub/Sub の Topic を取得・作成・削除するサーブレットです。

import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Empty;
import com.google.api.services.pubsub.model.ListTopicsResponse;
import com.google.api.services.pubsub.model.Topic;

public class TopicsServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    private static final Pattern TOPIC_NAME_PATTERN = Pattern.compile("/topics/(.+)");

    @Override
    final protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();
        final Pubsub.Projects.Topics.List listMethod = pubsub.projects().topics().list("projects/test-pub-sub");
        String nextPageToken = null;
        ListTopicsResponse listTopicsresponse;

        do {
            if (nextPageToken != null) {
                listMethod.setPageToken(nextPageToken);
            }
            listTopicsresponse = listMethod.execute();

            final List<Topic> topics = listTopicsresponse.getTopics();

            if (topics != null) {
                final Writer writer = response.getWriter();

                for (final Topic topic : topics) {
                    writer.write(topic.getName());
                    writer.write("\n");
                }
            }

            nextPageToken = listTopicsresponse.getNextPageToken();
        } while (nextPageToken != null);
    }

    @Override
    final protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        request.setCharacterEncoding("UTF-8");
        response.setCharacterEncoding("UTF-8");
        response.setContentType("text/plain");

        final String name = request.getParameter("name");

        if (StringUtils.isBlank(name)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(name);

        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();
        final Topic newTopic = pubsub.projects().topics().create(name, new Topic()).execute();
        final Writer writer = response.getWriter();

        writer.write(newTopic.getName());
    }

    @Override
    final protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        final String requestURI = request.getRequestURI();

        Logger.getLogger(getClass().getName()).info(requestURI);

        String name = null;
        final Matcher matcher = TOPIC_NAME_PATTERN.matcher(requestURI);

        if (matcher.find()) {
            name = matcher.group(1);
        }

        if (StringUtils.isBlank(name)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(name);

        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();

        pubsub.projects().topics().delete(name).execute();
    }
}

後に web.xml で /topics に URL マッピングします。
/topics に GET することでトピックの一覧を表示、パラメータ名=name としてトピック名を指定することで、トピックの作成、/topics/... に DELETE リクエストを投げることで、トピックを削除します。
作成するトピック名はここでは
projects/test-pub-sub/topics/自由な名前
になります。使用可能なリソース名については
What is Google Cloud Pub/Sub? - Google Cloud Pub/Sub — Google Cloud Platform
を参照してください。
Chrome をご使用でしたら Postman などを利用し、name=projects/test-pub-sub/topics/test-topic な POST をリクエストすることでトピック名=projects/test-pub-sub/topics/test-topic を作成できます。

Subscription 管理サーブレット

Pub/Sub の Subscription を取得・作成・削除するサーブレットです。

import java.io.IOException;
import java.io.Writer;
import java.util.List;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.Empty;
import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
import com.google.api.services.pubsub.model.PushConfig;
import com.google.api.services.pubsub.model.Subscription;

public class SubscriptionsServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    private static final Pattern SUBSCRIPTION_NAME_PATTERN = Pattern.compile("/subscriptions/(.+)");

    @Override
    final protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();
        final Pubsub.Projects.Subscriptions.List listMethod = pubsub.projects().subscriptions().list("projects/test-pub-sub");
        String nextPageToken = null;
        ListSubscriptionsResponse listSubscriptionsResponse;

        do {
            if (nextPageToken != null) {
                listMethod.setPageToken(nextPageToken);
            }
            listSubscriptionsResponse = listMethod.execute();

            final List<Subscription> subscriptions = listSubscriptionsResponse.getSubscriptions();

            if (subscriptions != null) {
                final Writer writer = response.getWriter();

                for (final Subscription subscription : subscriptions) {
                    writer.write(subscription.getName());
                    writer.write("\n");
                }
            }

            nextPageToken = listSubscriptionsResponse.getNextPageToken();
        } while (nextPageToken != null);
    }

    @Override
    final protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        request.setCharacterEncoding("UTF-8");
        response.setCharacterEncoding("UTF-8");
        response.setContentType("text/plain");

        final String endpointName = request.getParameter("endpoint");

        if (StringUtils.isBlank(endpointName)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(endpointName);

        final String topicName = request.getParameter("topic");

        if (StringUtils.isBlank(topicName)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(topicName);

        final String subscriptionName = request.getParameter("subscription");

        if (StringUtils.isBlank(subscriptionName)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(subscriptionName);

        final PushConfig pushConfig = new PushConfig().setPushEndpoint(endpointName);
        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();
        final Subscription subscription = new Subscription()
        // The name of the topic from which this subscription
        // receives messages
                .setTopic(topicName)
                // Ackowledgement deadline in second
                .setAckDeadlineSeconds(60)
                // Only needed if you are using push delivery
                .setPushConfig(pushConfig);
        final Subscription newSubscription = pubsub.projects().subscriptions().create(subscriptionName, subscription).execute();
        final Writer writer = response.getWriter();

        writer.write(newSubscription.getName());
    }

    @Override
    final protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        final String requestURI = request.getRequestURI();

        Logger.getLogger(getClass().getName()).info(requestURI);

        String name = null;
        final Matcher matcher = SUBSCRIPTION_NAME_PATTERN.matcher(requestURI);

        if (matcher.find()) {
            name = matcher.group(1);
        }

        if (StringUtils.isBlank(name)) {
            return;
        }

        Logger.getLogger(getClass().getName()).info(name);

        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();

        pubsub.projects().subscriptions().delete(name).execute();
    }
}

後に web.xml で /subscriptions に URL マッピングします。
/subscriptions に GET することでサブスクリプションの一覧を表示、パラメータ名=topic, subscription, endpoint として POST することで、指定したトピックに指定したサブスクリプション名で指定したエンドポイントへのサブスクリプションの作成、/subscriptions/... に DELETE リクエストを投げることで、サブスクリプションを削除します。
作成するサブスクリプション名はここでは
projects/test-pub-sub/subscriptions/自由な名前
になります。使用可能なリソース名については
What is Google Cloud Pub/Sub? - Google Cloud Pub/Sub — Google Cloud Platform
を参照してください。
Chrome をご使用でしたら Postman などを利用し、topic=projects/test-pub-sub/topics/test-topic, subscription=projects/test-pub-sub/subscriptions/test-subscription, endpoint=https://test-pub-sub.appspot.com/endpoint な POST をリクエストすることでサブスクリプションを作成できます。
エンドポイントは自分自身以外を指定する場合、ウェブマスター ツールに登録されていないといけません。今回は自分自身を指定しているので、登録は不要です。
また、現時点(2015/03/06)でエンドポイントは https である必要があります。

メッセージ配信サーブレット

Pub/Sub のメッセージを配信するサーブレットです。

import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.List;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PublishResponse;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.common.collect.ImmutableList;

public class MessagesServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    @Override
    final protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        request.setCharacterEncoding("UTF-8");
        response.setCharacterEncoding("UTF-8");
        response.setContentType("text/plain");

        final String topicName = request.getParameter("topic");

        if (StringUtils.isBlank(topicName)) {
            return;
        }

        final String message = request.getParameter("message");

        if (StringUtils.isBlank(message)) {
            return;
        }

        final Pubsub pubsub = AppEngineConfiguration.createPubsubClient();
        final PubsubMessage pubsubMessage = new PubsubMessage();

        // You need to base64-encode your message with
        // PubsubMessage#encodeData() method.
        pubsubMessage.encodeData(message.getBytes(StandardCharsets.UTF_8));

        final List<PubsubMessage> messages = ImmutableList.of(pubsubMessage);
        final PublishRequest publishRequest = new PublishRequest().setMessages(messages);
        final PublishResponse publishResponse = pubsub.projects().topics().publish(topicName, publishRequest).execute();
        final List<String> messageIds = publishResponse.getMessageIds();

        if (messageIds != null) {
            final Writer writer = response.getWriter();

            for (final String messageId : messageIds) {
                writer.write(messageId);
                writer.write("\n");
            }
        }
    }
}

後に web.xml で /messages に URL マッピングします。
/messages にパラメータ名=topic, message として POST することで、指定したトピックに指定したメッセージを配信します。
Chrome をご使用でしたら Postman などを利用し、topic=projects/test-pub-sub/topics/test-topic, message=テストメッセージ な POST をリクエストすることでメッセージを配信できます。

メッセージ受信サーブレット

Pub/Sub のメッセージを受信するサーブレットです。

import java.io.IOException;
import java.nio.charset.StandardCharsets;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.google.api.client.json.JsonParser;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.model.PubsubMessage;

public class EndpointServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    @Override
    final protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException {
        request.setCharacterEncoding("UTF-8");
        response.setCharacterEncoding("UTF-8");
        response.setContentType("text/plain");

        final ServletInputStream reader = request.getInputStream();

        try {
            // Parse the JSON message to the POJO model class.
            final JsonParser parser = JacksonFactory.getDefaultInstance().createJsonParser(reader);

            parser.skipToKey("message");

            final PubsubMessage message = parser.parseAndClose(PubsubMessage.class);
            // Base64-decode the data and work with it.
            final String data = new String(message.decodeData(), StandardCharsets.UTF_8);

            // Work with your message
            // Respond with a 20X to acknowledge receipt of the message.
            response.getWriter().write(data);
            response.setStatus(HttpServletResponse.SC_OK);
        } finally {
            reader.close();
        }
    }
}

後に web.xml で /endpoint に URL マッピングします。
配信されたメッセージをパースし、そのままレスポンスとして出力するだけとしています。
配信されるメッセージのフォーマットは以下を参照してください。
Subscriber Guide - Google Cloud Pub/Sub — Google Cloud Platform

web.xml

<?xml version="1.0" encoding="utf-8"?><web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" version="2.5">
  <servlet>
    <servlet-name>TopicsServlet</servlet-name>
    <servlet-class>TopicsServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet>
    <servlet-name>SubscriptionsServlet</servlet-name>
    <servlet-class>SubscriptionsServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet>
    <servlet-name>EndpointServlet</servlet-name>
    <servlet-class>EndpointServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet>
    <servlet-name>MessagesServlet</servlet-name>
    <servlet-class>MessagesServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>

  <servlet-mapping>
    <servlet-name>TopicsServlet</servlet-name>
    <url-pattern>/topics</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>TopicsServlet</servlet-name>
    <url-pattern>/topics/*</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>SubscriptionsServlet</servlet-name>
    <url-pattern>/subscriptions</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>SubscriptionsServlet</servlet-name>
    <url-pattern>/subscriptions/*</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>EndpointServlet</servlet-name>
    <url-pattern>/endpoint</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>MessagesServlet</servlet-name>
    <url-pattern>/messages</url-pattern>
  </servlet-mapping>

  <security-constraint>
    <web-resource-collection>
      <web-resource-name>admin</web-resource-name>
      <url-pattern>/*</url-pattern>
    </web-resource-collection>
    <auth-constraint>
      <role-name>admin</role-name>
    </auth-constraint>
  </security-constraint>
</web-app>

ここでは一応、security-constraint で自分自身以外がトピックやサブスクリプションを変更できないようにしておきます。

というわけでササッと触ってみました。IRC やメールの様な機能を作成することができそうです。
MQ のような Pub/Sub 機能を作成する際の選択肢の一つとしてどうぞ。