5.6.1. 非同期処理の実装(共通編)

5.6.1.1. Overview

本節では、Amazon Simple Queue Service(以下、Amazon SQS)を使用した非同期処理の実装方法について、非同期実行(共通編) に則って説明する。
非同期処理実装の全体イメージを以下に示す。
Screen image of asynchronous processing.

Note

リクエストを受け付け、キューにメッセージを送信するフロントサーバと、キューからメッセージを受信し非同期に処理を行うバックサーバの2つのサーバが存在する前提で説明を進める。

必要に応じて、バックサーバのプロジェクトを作成すること。プロジェクトの作成については、オンライン版クラウド拡張開発プロジェクトの作成 を参照されたい。

5.6.1.1.1. Amazon SQSとは

Amazon SQSは、Amazon Web Serviceが提供するメッセージキューサービスである。
非同期通信をサポートしており、クライアント間で疎結合にデータの移動を行える。
Amazon SQSの詳細については、Amazon Simple Queue Service (SQS)を参照されたい。

5.6.1.1.2. Amazon SQSが提供するキューの種類

Amazon SQSは、標準キューとFIFOキューという2タイプのキューを提供している。
なお、本ガイドラインでは標準キューの利用法について紹介する。
2017/02時点での両者の主な違いは以下の通り。
  標準キュー FIFOキュー
1秒あたりのトランザクション数 無制限 300件
メッセージの2重配信 稀に発生する 発生しない
メッセージの到着順 保証されない 保証される
キューの詳細についてはAmazon SQS 製品の詳細を参照されたい。
  • 標準キュー利用時の注意点
Amazon SQSでは、冗長性と高可用性を確保する為、メッセージのコピーを複数台のサーバに保持している。
その為、メッセージの削除をサーバ間で同期できない事象が発生した場合、処理済みのメッセージを再度受信してしまうケースが存在する。
対処法として、二重受信チェック処理を実装するなど、アプリケーション側でべき等性を担保する実装とする必要がある。
本ガイドラインでは、メッセージ受信時にメッセージIDをDBに登録する事で2重受信チェックを実現している。詳細については2重受信チェックを参照されたい。

Warning

標準キューを利用する場合、稀にメッセージの2重配信が発生する為、べき等性を担保するようアプリケーションを実装すること。

2重配信は標準キューの重要な特徴であるため、少なくとも 1 回の配信は必ず参照して頂きたい。

  • キューの採用基準
標準キューとFIFOキューはメリット・デメリットが一長一短な為、システムの性能目標や特性に合わせて採用を検討されたい。

例えば、連続したコマンドをメッセージに乗せて送信する場合など、厳密なメッセージ順序を求めるシステムには、性能面を考慮した上で、FIFOキューの採用を検討すると良い。
一方、メッセージ到着順の変動を許容できるシステムの場合は、性能面で優位性のある標準キューの採用を検討する良い。
本ガイドラインで紹介するモデルでは、チケット予約システムのように大量のリクエストが集中するシステムを想定しており、リクエストの順序は問わない為、標準キューを採用している。

5.6.1.1.3. JavaアプリケーションからのAmazon SQSの利用

Amazon Web Serviceでは、JavaアプリケーションからAmazon SQSを利用する為のAPIを提供している。

  • AmazonSQSインタフェース(AWS SDK for Java)
com.amazonaws.services.sqs.AmazonSQSをはじめとする、AWSが提供するSDKに含まれるAmazon SQS用のAPI。
AmazonSQSインタフェースを利用する場合、アプリケーションはAWS依存となる。
詳細についてはAWS SDK for Javaおよびスタンダード キューを参照されたい。
  • JMSインタフェース(Amazon SQS Java Messaging Library)
Amazon SQSをJMSプロバイダとして使用する為のインタフェース。
本ガイドラインでは、このJMSインタフェースと、後述するSpring JMSを組み合わせた実装例を紹介する。
JMSインタフェースの詳細についてはAmazon SQSでJMSを使用するを参照されたい。

Note

JMSインタフェースは、SQSのメッセージ送受信をJMSライクに行えるインタフェースであり、実態はJMSとは異なる為、注意が必要である。 例えば、非同期受信時のトランザクション管理や、Pub-Subモデルとしての送受信はサポートされていない。

Amazon SQS提供のJMSインタフェースがサポートしているJMS実装については、サポートされている JMS 1.1 実装を参照されたい。

Note

本ガイドライン執筆時点では、JMSインタフェースはFIFOキューをサポートしていない。利用するキューを選定する際は、使用するAPIに対応しているかを確認されたい。

5.6.1.1.4. Spring JMSを使用したAmazon SQSの利用

Spring Frameworkが提供するSpring JMSライブラリ経由でSQSを利用する方法について説明する。
Spring JMSについては、Macchinetta Server Framework for Java (1.x) のガイドラインSpring Frameworkのコンポーネントを使用したJMSの利用に詳しい利用法が記されている為、参照されたい。

Note

Spring Cloudが提供する Spring Cloud for Amazon Web Services(以下、Spring Cloud AWS)を用いても、JavaアプリケーションからAmazon SQSを利用することができる。 ただし、本ガイドライン執筆時点のバージョン(1.2.1.RELEASE)では、メッセージ受信を行うSimpleMessageListenerContainerの仕様により、メッセージ受信後の処理がパラレルに実行できないという制約がある。 詳細については、Spring Cloud AWSのissues#166を参照されたい。

以上の理由から、本ガイドラインではSpring Cloud AWSを使用せず、Spring JMSを用いて説明する。

5.6.1.1.4.1. メッセージを同期送信する場合

メッセージを同期送信する処理の流れについて図を用いて説明する。

Screen image of synchronous sending.
項番 説明
(1)
Service内で、JmsMessagingTemplateに対して「送信先のAmazon SQSキュー名」と「送信するメッセージのペイロード」を渡して処理を実行する。
JmsMessagingTemplateJmsTemplateに処理を委譲する。
(2)
JmsTemplateはDIコンテナから取得したConnectionFactoryからjavax.jms.Connectionを取得する。
(3)
JmsTemplateMessageProducerDestinationとメッセージを渡す。
MessageProducerjavax.jms.Sessionから生成される。(Sessionは(2)で取得したConnectionから生成される。)
(4)
SQSMessageProducerは送信対象のDestinationへメッセージを送信する。

5.6.1.1.4.2. メッセージを非同期受信する場合

メッセージを非同期受信する処理の流れについて図を用いて説明する。

Screen image of asynchronous receiving.
項番 説明
(1)
DIコンテナから取得したConnectionFactoryからConnectionを取得する。
(2)
DefaultMessageListenerContainerMessageConsumerDestinationを渡す。
MessageConsumerSessionから生成される。(Sessionは(1)で取得したConnectionから生成される。)
また、Destination@JmsListenerアノテーションで指定された「受信対象のAmazon SQSキュー名」をもとにAmazonSQS経由で取得される。
(3)
MessageConsumerDestinationからメッセージを受信する。
(4)
受信したメッセージを引数として、MessageListener内の@JmsListenerアノテーションが設定されたメソッド(リスナメソッド)が呼び出される。リスナメソッドはDefaultMessageListenerContainerで管理される。

5.6.1.2. How to use

5.6.1.2.1. Amazon SQSの設定

Amazon SQSを使用する為に必要な設定について説明する。

5.6.1.2.1.1. キューの作成

Amazon Web Serviceのコンソール、またはクエリAPIから、アプリケーションで使用するキューを作成する。 キューの作成については、チュートリアル: Amazon SQS キューの作成を参照されたい。

特筆すべき設定項目について、以下に紹介する。 設定の詳細やその他の設定項目についてはAmazon SQS キューの操作を参照されたい。

  • キューの種類

    標準キュー、FIFOキューから選択する。
    なお、本ガイドラインは標準キューを使用した場合について紹介している。
  • キュー名

    キューの名称を決定する。
    ここで決めた名称を、後述する@JmsListenerアノテーションに属性値として指定する。
  • 可視性タイムアウト

    キューの可視性タイムアウトを設定する。
    可視性タイムアウトとは、キューから受信されて処理中となっているメッセージが、他の受信コンポーネントから不可視となる時間の長さである。
    可視性タイムアウト時間を超えて処理が継続した場合、処理中に別の受信コンポーネント(別スレッドを含む)にて再度受信されてしまう。その為、可視性タイムアウトは処理時間を勘案した上で、充分な値を設定すること。
    詳細については、可視性タイムアウトおよび 全般的な推奨事項 メッセージの処理を参照されたい。
  • メッセージ受信待機時間

メッセージの受信待機時間を設定する。
受信待機時間とは、受信コンポーネントが空のキューに対してReceiveMessageリクエストを行った際に、キューにメッセージが到着するまで待機する時間である。
値は0-20秒の間で設定でき、0秒をショートポーリング、1秒以上をロングポーリングと呼ぶ。
ロングポーリングを使用することでAmazon SQSへのReceiveMessageリクエスト回数を削減できる為、20秒のロングポーリングを使用する事を推奨する。
ロングポーリングについてはAmazon SQS ロングポーリングを参照されたい。
  • デッドレターキュー設定
正常に処理できないメッセージの送達先として、デッドレターキューを設定できる。
デッドレターキューについては、Amazon SQS デッドレターキューの使用を参照されたい。

Note

メッセージ受信待機時間は、キュー側と受信コンポーネント側の両方に設定することができ、受信コンポーネント側の設定が優先される。

AWS SDK(JMSインタフェースを含む)は、デフォルトで20秒のロングポーリング設定となっている為、本ガイドラインで紹介する構成の場合、キューのメッセージ受信待機時間設定に関わらず、20秒のロングポーリングとなる。

5.6.1.2.2. メッセージの送受信に共通する設定

本節では、Amazon SQSを使用したメッセージの送受信に必要となる共通的な設定について説明する。

5.6.1.2.2.1. 依存ライブラリの設定

フロントサーバ、バックサーバそれぞれのdomainプロジェクトのpom.xmlに、Amazon SQSを利用する為に必要となる依存ライブラリを追加する。

記述例を以下に示す。

  • xxx-domain/pom.xml
<dependencies>

     <!-- (1) -->
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-jms</artifactId>
         <version>4.3.21.RELEASE</version>
     </dependency>

     <!-- (2) -->
     <dependency>
         <groupId>com.amazonaws</groupId>
         <artifactId>amazon-sqs-java-messaging-lib</artifactId>
         <version>1.0.4</version>
     </dependency>

 </dependencies>
項番 説明
(1)
Spring Frameworkが提供するspring-jmsをdependenciesに追加する。
Amazon SQSはJMS1.1をサポートしているため、バージョンはJMS1.1に対応する4.3.21.RELEASEを指定する。
(2)
JMSプロバイダとして、Amazon SQSのJMSインタフェース amazon-sqs-java-messaging-libをdependenciesに追加する。

5.6.1.2.2.2. ConnectionFactoryの設定

フロントサーバ、バックサーバそれぞれのdomainプロジェクトのinfra.xmlにConnectionFactoryの定義を追加する。

記述例を以下に示す。

  • xxx-infra.xml
<!-- (1) -->
<bean id="connectionFactory" class="com.amazon.sqs.javamessaging.SQSConnectionFactory"
    factory-bean="connectionFactoryBuilder" factory-method="build" />

<!-- (2) -->
<bean id="connectionFactoryBuilder"
    class="com.amazon.sqs.javamessaging.SQSConnectionFactory$Builder">
    <property name="regionName" value="us-east-1" />
</bean>
項番 説明
(1)
SQSConnectionFactoryをBean定義する。
SQSConnectionFactorySQSConnectionFactory$Builderのファクトリメソッドにてインスタンス生成する為、
factory-method属性に buildを指定する。
(2)
SQSConnectionFactoryのインスタンス生成に用いるビルダクラス SQSConnectionFactory$BuilderをBean定義する。
regionName属性に、接続先キューが存在するリージョンを指定する。

Note

ここではリージョンを固定文字列として記述しているが、実際の開発では外部管理とすることが望ましい。

環境依存値の外部管理については、環境依存値の外部管理 を参照されたい。

Note

ConnectionFactoryの定義方法について

Macchinetta Server Framework for Java (1.x) のガイドラインConnectionFactoryの設定では、 Bean定義ファイルがアプリケーションサーバ提供のJMSプロバイダ依存となることを防ぐため、ConnectionFactoryをアプリケーションサーバ側にて定義することを推奨しているが、 本ガイドラインで紹介しているケースにおいては、JMSプロバイダはクラウドベンダ提供のライブラリを使用する為、アプリケーションサーバ側に定義する必要性は低い。

以上の理由から、本ガイドラインではBean定義ファイルで定義する方法を採っている。

5.6.1.2.2.3. DestinationResolverの設定

Amazon SQSキューの名前解決は、AWS SDK for Javaが提供するJMSプロバイダによって行われる。 Spring JMSは、JMSプロバイダによる解決を行う DynamicDestinationResolverをデフォルトで使用する為、 DestinationResolverについての設定は不要である。

Note

Spring BootのAuto-configurationの仕組みを採用し、デフォルトのまま使用した場合、 JmsAutoConfigurationの設定により、JNDIによる名前解決を行うJndiDestinationResolverがBean定義され、宛先キューの解決に使用される。 ただし、Amazon SQSのキューはJNDIでルックアップできない為、最終的にはDynamicDestinationResolverでの解決が行われる。

5.6.1.2.3. メッセージを同期送信する方法

クライアントからAmazon SQSキューへメッセージを同期送信する方法を説明する。

本ガイドラインでは、Macchinetta Server Framework for Java (1.x) のガイドライン メッセージを同期送信する方法との差分について重点的に紹介している為、 本ガイドラインと併せて、Macchinetta Server Framework for Java (1.x) のガイドラインも参照されたい。

5.6.1.2.3.1. 基本的な同期送信

JmsMessagingTemplateを利用して、Amazon SQSへの同期送信処理を実現する。
ここでは、Reservationクラスのオブジェクトをメッセージ同期送信する場合の実装例を紹介する。
  • 同期送信に必要となるBean定義

    メッセージの送信側アプリケーションに必要となるBean定義例を以下に示す。
    • xxx-infra.xml

      <bean id="cachingConnectionFactory"
         class="org.springframework.jms.connection.CachingConnectionFactory" primary="true"> <!-- (1) -->
         <property name="targetConnectionFactory" ref="connectionFactory" /> <!-- (2) -->
         <property name="sessionCacheSize" value="10" />  <!-- (3) -->
      </bean>
      
      項番 説明
      (1)
      SessionMessageProducerのキャッシュを行うorg.springframework.jms.connection.CachingConnectionFactoryをBean定義する。
      SQSConnectionFactoryをそのまま使うのではなく、 CachingConnectionFactoryにラップして使用することで、キャッシュ機能を使用することができる。
      DIコンテナにConnectionFactory実装クラスが複数登録されることになる為、primary属性にtrueを指定する。
      (2)
      Bean定義されたSQSConnectionFactoryを指定する。
      (3)
      Sessionのキャッシュ数を設定する。(デフォルト値は1)
      この例では10を指定しているが、性能要件に応じて適宜キャッシュ数を変更すること。
      このキャッシュ数を超えてセッションが必要になるとキャッシュを使用せず、新しいセッションの作成と破棄を繰り返すことになる。
      すると処理効率が下がり、性能劣化の原因になるので注意すること。

Note

本ガイドラインでは、Spring BootのAuto-configurationの使用を前提としており、 メッセージ送信に使用するJmsTemplateJmsMessagingTemplateJmsAutoConfigurationにてBean定義される為、 デフォルト設定のまま使用する場合は、Bean定義は不要である。

なお、JmsTemplateの設定については、Macchinetta Server Framework for Java (1.x) のガイドラインメッセージを同期送信する方法に詳しく紹介されている為、必要に応じて参照されたい。

  • 送信対象のJavaBeanの実装

    フロントサーバ、バックサーバの両アプリケーションで共用するオブジェクトの為、modelプロジェクトに作成する。
    modelの共有についての詳細は、Macchinetta Server Framework for Java (1.x) のガイドライン プロジェクト構成についてを参照されたい。

    実装例を以下に示す。

    • Reservation.java
    package com.example.domain.model;
    
    import java.io.Serializable;
    
    public class Reservation implements Serializable { // (1)
    
        private static final long serialVersionUID = -1L;
    
        private String reserveNo;
    
        // omitted
    
        public String getReserveNo() {
            return reserveNo;
        }
    
        public void setReserveNo(String reserveNo) {
            this.reserveNo = reserveNo;
        }
    
        // omitted
    
    }
    
    項番 説明
    (1)
    シリアライズして送信するため、java.io.Serializableインタフェース を実装する必要がある。
  • 同期送信を行うサービスクラスの実装

    予約情報を持つReservationオブジェクトをAmazon SQSキューに同期送信する。

    実装例を以下に示す。

    • ReservationServiceImpl.java
    package com.example.domain.service.reservation;
    
    import javax.inject.Inject;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    import com.example.domain.model.Reservation;
    
    @Service
    public class ReservationServiceImpl implements ReservationService {
    
        @Inject
        JmsMessagingTemplate jmsMessagingTemplate;    // (1)
    
        @Override
        public void sendMessage(Reservation reservation) {
    
           // omitted
    
           jmsMessagingTemplate.convertAndSend("reservation-queue", reservation);  // (2)
    
        }
    }
    
    項番 説明
    (1)
    JmsMessagingTemplateをインジェクションする。
    (2)
    JmsMessagingTemplateconvertAndSendメソッドを使用して、引数のJavaBeanをorg.springframework.messaging.Messageインタフェースの実装クラスに変換し、指定したDestinationに対しメッセージを同期送信する。
    本実装例では、Amazon SQSキューに送信する為、com.amazon.sqs.javamessaging.message.SQSObjectMessageに変換され、送信される。

5.6.1.2.4. メッセージを非同期受信する方法

Amazon SQSキューからメッセージを非同期受信する方法を説明する。

本ガイドラインでは、Macchinetta Server Framework for Java (1.x) のガイドライン メッセージを非同期受信する方法との差分について重点的に紹介している為、 本ガイドラインと併せて、Macchinetta Server Framework for Java (1.x) のガイドラインも参照されたい。

Warning

Amazon SQSはトランザクションをサポートしていない為、Macchinetta Server Framework for Java (1.x) のガイドライン トランザクション管理で紹介されているような、 メッセージングとDBのトランザクションを組み合わせる設計は行えない為、注意が必要である。

5.6.1.2.4.1. 基本的な非同期受信

@JmsListenerアノテーションを利用した非同期受信の方法について説明する。
  • Bean定義ファイルの設定

    非同期受信の実装には下記の設定が必要となる。

    • JMS Namespaceを定義する。
    • @JmsListenerアノテーションを有効化する。
    • DIコンテナで管理しているコンポーネントのメソッドに@JmsListenerアノテーションを指定する。

    記述例を以下に示す。

    • applicationContext.xml
    <!-- (1) -->
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
    
        <!-- (2) -->
        <jms:annotation-driven />
    
        <!-- (3) -->
        <jms:listener-container
            factory-id="jmsListenerContainerFactory"
            acknowledge="client" />
    
    項番 属性名 内容
    (1)
    xmlns:jms
    JMS Namespaceを定義する。
    値としてhttp://www.springframework.org/schema/jmsを指定する。
    JMS Namespaceの詳細については、JMS Namespace Supportを参照されたい。
      xsi:schemaLocation
    スキーマのURLを指定する。
    値にhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsdを追加する。
    (2)
    -
    <jms:annotation-driven />を利用して、@JmsListenerアノテーションや@SendToアノテーション等のJMS関連のアノテーション機能を有効化する。
    (3)
    -
    <jms:listener-container/>を利用してDefaultMessageListenerContainerを生成するファクトリへパラメータを与えることで、DefaultMessageListenerContainerの設定を行う。
    <jms:listener-container/>の属性には、利用したいConnectionFactoryのBeanを指定できるconnection-factory属性が存在する。connection-factory属性のデフォルト値はconnectionFactoryである。
    この例では、ConnectionFactoryの設定で示したConnectionFactoryのBean(Bean名はconnectionFactory)を利用するため、connection-factory属性を省略している。
    <jms:listener-container/>には、ここで紹介した以外の属性も存在する。
    詳細については、Attributes of the JMS <listener-container> elementを参照されたい。

    Warning

    非同期受信の場合、DefaultMessageListenerContainerの内部に独自のキャッシュ機能が備わっているため、CachingConnectionFactoryは使用してはいけない。 詳細については、DefaultMessageListenerContainerのJavadocを参照されたい。

      factory-id
    Bean定義を行うDefaultJmsListenerContainerFactoryの名前を設定する。
    @JmsListenerアノテーションがデフォルトでBean名jmsListenerContainerFactoryを参照するため、<jms:listener-container/>が一つの場合はBean名をjmsListenerContainerFactoryとすることを推奨する。
      acknowledge
    メッセージ受信時の、Amazon SQSへの確認応答の返し方を設定する。Amazon SQSキューは、確認応答を受け取った際にキュー内のメッセージを削除する。
    acknowledge属性のデフォルトはautoである。ここでは、clientを設定している。
    clientを設定した場合は、リスナメソッドが正常終了した際に確認応答を返し、例外発生時にはメッセージがキューに戻される。

    Note

    acknowledgeモードはAmazon SQS、Spring JMS で意味合いが異なる為、注意が必要である。 ここでは、DefaultMessageListenerContainerを使用する為、Spring JMSのacknowledgeモードに従って設定している。

    Note

    Amazon SQSはトランザクションをサポートしていない為、transaction-manager属性は設定できない。

  • リスナクラスの実装

DIコンテナで管理しているコンポーネントのメソッドに@JmsListenerアノテーションを指定することで、指定したDestinationより非同期でメッセージを受信する。
また、JavaアプリケーションからのAmazon SQSの利用にて紹介した、標準キューの2重受信の検出についてもここで行う。

実装例を以下に示す。

  • ReservationMessageListener.java
package com.example.listener.reservation;

import javax.inject.Inject;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.JmsHeaders;
import org.springframework.stereotype.Component;
import com.example.domain.common.exception.DuplicateReceivingException;
import com.example.domain.model.Reservation;
import com.example.domain.service.reservation.ReservationInspectionService;

@Component
public class ReservationMessageListener {

   @Inject
   ReservationInspectionService reservationInspectionService;

   @JmsListener(destination = "reservation-queue", concurrency = "5-10")   // (1)
   public void receive(Reservation reservation,
           @Header(JmsHeaders.MESSAGE_ID) String messageId) { // (2)

       try{
           reservationInspectionService.inspectAndNotify(reservation, messageId); // (3)
       }catch(DuplicateReceivingException e){ // (4)
           return;
       }
      // omitted
   }

}
項番 説明
(1)
非同期受信用のメソッドに対し@JmsListenerアノテーションを設定する。
destination属性には、受信先のキュー名を指定する。
concurrency属性には、リスナメソッドの並列数の上限を指定する。記述例のように、下限と上限を設定することも可能である。

Note

concurrency属性は、<jms:listener-container/>にて設定することも可能だが、 記述例ではリスナメソッドごとに並列数を設定する設計を想定している為、@JmsListenerアノテーションに設定している。

(2)
後述する2重受信チェックに使用するJMSMessageIDを、Headerアノテーションを使用してメソッド引数として受け取る。
(3)
メッセージ受信後に実行するサービスクラスのメソッドを呼び出す。
サービスクラス内にて2重受信チェックを行うため、受信メッセージのJMSMessageIDを引数として渡す。
(4)
2重受信チェックの結果、受信したメッセージが処理済であった場合は、returnしてリスナメソッドを正常終了させる。

5.6.1.2.4.2. 2重受信チェック

Amazon SQSの標準キューを利用する際に必要となる、2重受信チェックについて説明する。

Amazon SQSのドキュメント少なくとも 1 回の配信で示されている通り、標準キューを使用する場合は、アプリケーションがべき等性を持つように設計する必要がある。
べき等性を持たせる方法はいくつか存在するが、本ガイドラインではRDBの一意性制約を利用して実現する方法を紹介する。

処理済のメッセージを再度受信した場合のイメージを以下に示す。
Screen image of unique message check.
項番 説明
(1)
バックサーバのアプリケーションは、SQSメッセージ(ID:AAAAAAAAAAAZ)を受信する。
(2)
バックサーバのアプリケーションは、2重受信チェック部品を使用してメッセージID:AAAAAAAAAAAZをRDBのメッセージID管理テーブルに登録した後、業務処理を実行する。
メッセージID管理テーブルは、メッセージIDカラムがユニークキーに設定されている前提。

Note

2重受信チェック部品は、@Transactionalが付与されたサービスクラスのメソッド内にて呼び出され、サービスクラスのトランザクションに参加する。その為、メッセージID挿入は、サービスクラスのメソッドが正常終了した時点でコミットされる。

(3)
バックサーバのアプリケーションは、何らかの原因により、(2)で受信したSQSメッセージ(ID:AAAAAAAAAAAZ)を再度受信する。
(4)
2重受信チェック部品は、メッセージID:AAAAAAAAAAAZをRDBのメッセージID管理テーブルに登録しようとするが、既に同じIDが登録されている為に挿入できず、2重受信と判断する。
(5)
バックサーバのアプリケーションは、以降の業務処理を行わずにリスナメソッドを正常終了させる。

Note

リスナメソッドを例外終了させた場合、Spring JMSのacknowledgeモードclientの仕様により、メッセージが削除されずキューに戻されてしまう。 2重受信した処理済メッセージをキューから削除する為、ここではリスナメソッドを正常終了させている。

2重受信チェックの実装例を以下に示す。
  • DuplicateMessageChecker.java
package com.example.domain.messaging;

import javax.inject.Inject;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.transaction.annotation.Transactional;
import com.example.domain.repository.messaging.MessageIdRepository;
import com.example.domain.common.exception.DuplicateReceivingException;

public class DuplicateMessageChecker {

    @Inject
    MessageIdRepository repository; // (1)

    @Transactional  // (2)
    public void checkDuplicateMessage(String messageId) {

        try {

            repository.register(messageId); // (3)

        } catch (DuplicateKeyException e) { // (4)

            throw new DuplicateReceivingException(messageId);
        }
    }
}
項番 説明
(1)
メッセージIDをINSERTするリポジトリ MessageIdRepositoryをインジェクションする。

Note

MessageIdRepositoryは単項目のINSERTを行うリポジトリの為、マッピングファイル等の記述例は割愛する。

(2)
メッセージIDをINSERTするトランザクションを業務処理のトランザクションに含める為、@Transactionalを付与する。
propagation属性には、デフォルト値のREQUIREDが使用される。
業務処理にて例外が発生した場合は、メッセージIDのINSERTもロールバックされる。
(3)
MessageIdRepositoryregisterメソッドを実行し、メッセージIDをメッセージID管理テーブルにINSERTする。
(4)
一意性制約違反によって発生するDuplicateKeyExceptionをcatchする。
2重受信発生を示す例外をthrowする。
  • ReservationInspectionServiceImpl.java
package com.example.domain.service.reservation;

import javax.inject.Inject;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.example.domain.common.messaging.DuplicateMessageChecker;
import com.example.domain.model.Reservation;

@Service
public class ReservationInspectionServiceImpl implements
                                             ReservationInspectionService {

    @Inject
    DuplicateMessageChecker duplicateMessageChecker; // (1)

    @Transactional
    public void inspectAndNotify(Reservation reservation, String messageId) {

        duplicateMessageChecker.checkDuplicateMessage(messageId); // (2)

        // omitted

    }
}
項番 説明
(1)
2重受信チェックユーティリティDuplicateMessageCheckerをインジェクションする。
(2)
DuplicateMessageCheckerを使用して、2重受信チェックを行う。
2重受信が発生していた場合には例外がthrowされる。

5.6.1.2.4.3. メッセージのトレース

メッセージのトレーサビリティ向上の為、各ログにメッセージIDを出力させる方法を説明する。

メッセージIDを含めたログの例を、以下に示す。
date:2017-02-08 09:38:42     messageId:cad212f8-4e35-4d03-924f-5d5fe339a282  message:[START SERVICE] (omitted)
date:2017-02-08 09:38:43     messageId:cad212f8-4e35-4d03-924f-5d5fe339a282  message:[END SERVICE  ] (omitted)
date:2017-02-08 09:38:44     messageId:32b00a02-a851-4900-b5b8-72a44d42bedb  message:[START SERVICE] (omitted)
date:2017-02-08 09:38:45     messageId:92c76511-3564-4332-892b-6dadae2bc090  message:[START SERVICE] (omitted)
date:2017-02-08 09:38:45     messageId:92c76511-3564-4332-892b-6dadae2bc090  message:[END SERVICE  ] (omitted)
date:2017-02-08 09:38:45     messageId:32b00a02-a851-4900-b5b8-72a44d42bedb  message:[END SERVICE  ] (omitted)
メッセージIDを出力させることで、不規則に出力された場合でも、ログを結びつけることができる。
上記の例だと、3行目と6行目は4,5行目を跨いでいるが、同じリクエストに関するログであることがわかる。

このような横断的なログ出力は、MDCを利用することで可能となる。MDCについては、Macchinetta Server Framework for Java (1.x) のガイドラインMDCの使用に詳しい利用法が記されている為、参照されたい。

MDCを用いてメッセージIDをログに埋め込む例を以下に示す。

  • MessageIdLoggingInterceptor.java
package com.example.app.common.logging;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.slf4j.MDC;
import com.example.domain.model.Reservation

public class MessageIdLoggingInterceptor implements MethodInterceptor {  // (1)

     @Override
     public Object invoke(MethodInvocation invocation) throws Throwable {

         String key = "messageId";

         Object[] arguments = invocation.getArguments();
         Parameter[] parameters = invocation.getMethod().getParameters();

         for (int i = 0; i < parameters.length; i++) {
             Header header = parameters[i].getAnnotation(Header.class);

             if (header != null && JmsHeaders.MESSAGE_ID.equals(header.value()) // (2)
                     && arguments[i] instanceof String) {
                 MDC.put(key, ((String) arguments[i])); // (3)
                 break;
             }
         }

         Object ret = invocation.proceed();

         MDC.remove(key); // (4)

         return ret;
     }

}
項番 説明
(1)
Spring AOPにてメッセージ埋め込み処理を差し込む為、MethodInterceptorインタフェースを実装する。
(2)
MethodInvocationgetArgumentsメソッドを呼び出し、リスナメソッドの引数リストを取得する。
リスナメソッドの引数のうち、HeaderアノテーションにJMSMessageIDが指定されているものを取得する。
(3)
MDCputメソッドを使用して、メッセージIDをmessageIdというキーで登録する。
(4)
MDCremoveメソッドを使用して、登録したメッセージIDを削除する。

Note

removeメソッドをfinally句で呼び出さない理由について

finally句でMDCのremoveメソッドを呼び出す作りにすると、例外発生時にMDCからメッセージIDがremoveされてしまい、 Spring提供の例外ハンドラErrorHandler内でのログにメッセージIDが出力されなくなる。例外の起因となったメッセージが特定し辛くなる為、 例外時にMDCの情報を削除する処理は、ErrorHandlerにて行うのが望ましい。

作成したMessageIdLoggingInterceptorクラスを、Bean定義ファイルに設定する。

記述例を以下に示す。

  • applicationContext.xml
<!-- (1) -->
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xmlns:aop="http://www.springframework.org/schema/aop"
   xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

<!-- (2) -->
<bean id="messageIdLoggingInterceptor" class="com.example.app.common.logging.MessageIdLoggingInterceptor" />

<!-- (3) -->
<aop:config>
    <aop:advisor advice-ref="messageIdLoggingInterceptor"
        pointcut="@annotation(org.springframework.jms.annotation.JmsListener)" />
</aop:config>
項番 属性名 内容
(1)
xmlns:aop
AOP Namespaceを定義する。
値としてhttp://www.springframework.org/schema/aopを指定する。
AOP Namespaceの詳細については、Schema-based AOP supportを参照されたい。
  xsi:schemaLocation
スキーマのURLを指定する。
値にhttp://www.springframework.org/schema/aophttp://www.springframework.org/schema/aop/spring-aop.xsdを追加する。
(2)
-
MessageIdLoggingInterceptorをBean定義する。
(3)
-
(2)でBean定義した MessageIdLoggingInterceptorをアドバイスとして登録する。
ポイントカットには@annotation指示子を使用し、@JmsListenerアノテーションが付与されたメソッドを指定する。