8.2. JMS(Java Message Service)

8.2.1. Overview

本節では、JMS APIとSpring FrameworkのJMS連携用コンポーネントを使用したメッセージの送受信方法について説明する。


8.2.1.1. JMSとは

JMSはJavaでMOM(Message Oriented Middleware)を利用するための標準APIである。
JMSのアーキテクチャは、JMSプロバイダを経由してクライアントからクライアントへメッセージを交換する。
JMSは非同期メッセージングをサポートしているため、クライアント間を疎結合にすることができる。
また、後述するPoint-to-Pointモデルを採用することでメッセージをQueueに格納できるため、クライアントの性能に応じたメッセージ受信が可能となる。
その反面、クライアントからクライアントへのメッセージングにはタイムラグが発生しうるので、リアルタイムな応答が求められる処理に向かない傾向がある。
JMSの詳細については、Java Message Service (JMS)を参照されたい。

JMSを使用することで、同期または非同期でのメッセージングが可能となる。

Note

本ガイドラインではJMS1.1を使用することを前提としている。


利用する際には、下記に説明する配信モデルとメッセージ送受信方式を要件に合わせて選択する必要がある。
  • 配信モデル
配信モデルは、Point-to-Point(PTP)と Publisher-Subscriber(Pub/Sub)の2つのモデルが存在する。
2つのモデルの大きな違いは送信者と受信者が1対1であるか、1対多であるかであり、用途によって選択する必要がある。
  1. Point-To-Point(PTP)モデル
JMS Queue
PTPモデルとは、2つのクライアント間において、一方のクライアント(Producer)からメッセージを送信し、もう一方のクライアント(Consumer)のみがそのメッセージを受信するモデルである。
PTPモデルにおけるメッセージのあて先(Destination)をQueueと呼ぶ。
ProducerはQueueにメッセージを送信し、ConsumerはQueueからメッセージを取得し、処理を行う。
Consumerからメッセージが取得されるか、メッセージが有効期限に達するとQueueからメッセージが削除される。

  1. Publisher-Subscriber(Pub/Sub)モデル
JMS Topic
Pub/Subモデルとは、一方のクライアント(Publisher)からメッセージを発行(Publishes)し、他方の複数クライアント(Subscriber)にそのメッセージを配信(Delivers)するモデルである。
Pub/Subモデルにおけるメッセージのあて先(Destination)をTopicと呼ぶ。
SubscriberはTopicに対し購読依頼(Subscribes)を行い、PublisherはTopicにメッセージを発行する。
Topicに購読依頼している全てのSubscriberにメッセージが配信される。
本ガイドラインでは、一般的に利用されることが多いPTPモデルの実装方法について説明する。
  • メッセージ送信方式
QueueまたはTopicへのメッセージ送信方式には、同期送信方式と非同期送信方式の2通りの処理方式が考えられるが、JMS1.1では同期送信方式のみがサポートされる。
  1. 同期送信方式
明示的にメッセージを送信する機能を呼び出すことで、メッセージに対する処理と送信が開始される。
JMSプロバイダからの応答があるまで待機するため、後続処理がブロックされる。

  1. 非同期送信方式
明示的にメッセージを送信する機能を呼び出すことで、メッセージに対する処理と送信が開始される。
JMSプロバイダからの応答を待たないため、後続処理を続けて実行する。
非同期送信方式の詳細については、Java Message Service(Version 2.0)の”7.3. Asynchronous send”を参照されたい。
  • メッセージ受信方式
QueueまたはTopicに受信したメッセージに対する処理を実装する際には、同期受信方式と非同期受信方式の2通りの処理方式を選択することができる。
後述するように、同期受信方式の利用ケースは限定的であるため、一般的には非同期受信方式が利用されることが多い。
  1. 非同期受信方式
QueueまたはTopicがメッセージを受信すると、受信したメッセージに対する処理が開始される。
1つのメッセージに対する処理が終了しなくても別のメッセージの処理が開始されるため、並列処理に向いている。

  1. 同期受信方式
明示的にメッセージを受信する機能を呼び出すことで、受信とメッセージに対する処理が開始される。
メッセージを受信する機能は、QueueまたはTopicにメッセージが存在しない場合、受信するまで待機する。
そのため、タイムアウト値を設定することで、メッセージの待ち時間を指定する必要がある。
メッセージの同期受信を使用する一例として、WebアプリケーションにおいてQueueに溜まったメッセージを、画面操作時など任意のタイミングで取得・処理したい場合や、 バッチで定期的にメッセージの処理を行いたい場合に使用することができる。

JMSではメッセージは以下のパートで構成される。
詳細はJava Message Service(Version 1.1)の”3. JMS Message Model”を参照されたい。
構成 説明
ヘッダ
JMSプロバイダやアプリケーションに対して、メッセージのDestinationや識別子などの制御情報やJMSの拡張ヘッダ(JMSX)、JMSプロバイダ独自のヘッダ、アプリケーション独自のヘッダを格納する。
プロパティ
ヘッダに追加する制御情報を格納する。
ペイロード
メッセージ本体を格納する。
データ種別によって、javax.jms.BytesMessagejavax.jms.MapMessagejavax.jms.ObjectMessagejavax.jms.StreamMessagejavax.jms.TextMessageの5つのメッセージタイプを提供している。
JavaBeanを送信したい場合は、ObjectMessageを利用する。
その場合は、JavaBeanをクライアント間で共有する必要がある。

Warning

デシリアライズ時の注意点

QueueにObjectMessageが入るとメッセージを取り出す際にデシリアライズが行われる。

デシリアライズ処理は、不正なデータや予期しないデータを使用して業務ロジックの乱用、サービスの拒否、任意のコードの実行が行われる危険があるため、 信頼できない送信元から受信しうるものをデシリアライズしてはならない。 そのため、Queueも(信頼できない送信元を含み得る)不特定多数からのメッセージを受け付ける構成であってはならない。

詳細についてはDeserialization of untrusted dataを参照されたい。

8.2.1.2. JMSの利用

JMSを用いた処理を実装する場合、Java EEで定義されたJMS API(以下、JMS API)を使用することで、処理を実現できる。
ただし、本ガイドラインでは、JMS APIをそのまま使用する場合に比べてメリット(記述が容易など)が多い、Spring FrameworkのJMS連携用コンポーネントを利用する前提としている。
そのため、JMS APIの詳細については説明しない。
詳細についてはJava APIを参照されたい。

Note

JMSはJava APIの標準化はしているが、メッセージの物理的なプロトコルの標準化はしていない。

Note

Java EEサーバではJMS実装が標準で組み込まれているためデフォルトで利用可能(Java EEサーバに組み込まれているJMSプロバイダを使う場合に限られる)だが、Apache TomcatなどのようにJMS実装が組み込まれていないJava EEサーバでは、別途JMS実装が必要になる。



8.2.1.3. Spring Frameworkのコンポーネントを使用したJMSの利用

Spring Frameworkでは、メッセージ送受信を行うためのライブラリとして以下を提供している。
  • spring-jms
    JMSを利用したメッセージングを行うためのコンポーネントを提供する。
    このライブラリに含まれるコンポーネントを利用することで、低レベルのJMS API呼び出しが不要となり、実装を簡素化できる。
    spring-messagingを利用することが可能である。
  • spring-messaging
    メッセージングベースのアプリケーションを作成する際に必要となる基盤機能を抽象化するためのコンポーネントを提供する。
    メッセージとそれを処理するメソッドを対応付けるためのアノテーションのセットが含まれている。
    このライブラリに含まれるコンポーネントを利用することで、メッセージングの実装スタイルを合わせることができる。
spring-jmsのみでも実装可能であるが、spring-messagingを利用することで実装方式を合わせることが可能である。
本ガイドラインでは、spring-messagingも利用することを推奨している。
ここでは、具体的な実装方法の説明を行う前に、Spring Frameworkが提供するJMS連携用のコンポーネントがどのようにメッセージを送受信しているかを説明する。
まずは、説明に登場するコンポーネントを紹介する。
Spring Frameworkは、以下にあげるインタフェースやクラスなどを利用してJMS API経由でメッセージ送受信を行う。
  • javax.jms.ConnectionFactory
    JMSプロバイダへのコネクション作成用インタフェース。
    アプリケーションからJMSプロバイダへの接続を作成する機能を提供する。
  • javax.jms.Destination
    あて先(QueueやTopic)であることを示すインタフェース。
  • javax.jms.MessageProducer
    メッセージの送信用インタフェース。
  • javax.jms.MessageConsumer
    メッセージの受信用インタフェース。
  • javax.jms.Message
    ヘッダとボディを保持するメッセージであることを示すインタフェース。
    送受信はこのインタフェースの実装クラスがやり取りされる。
  • org.springframework.messaging.Message
    さまざまなメッセージングで扱うメッセージを抽象化したインタフェース。
    JMSでも利用可能である。
    前述したとおり、メッセージングの実装方式を合わせるため、基本的にはspring-messagingで提供されているorg.springframework.messaging.Messageを使用する。
    ただし、org.springframework.jms.core.JmsTemplateを使用したほうがよい場合が存在するので、その場合にはjavax.jms.Messageを使用する。
  • org.springframework.jms.core.JmsMessagingTemplateおよびorg.springframework.jms.core.JmsTemplate
    JMS APIを利用するためのリソースの生成や解放などをテンプレート化したクラス。
    メッセージの送信及びメッセージの同期受信機能を行う際に使用することで実装を簡素化できる。
    基本的には、org.springframework.messaging.Messageを扱うことができる JmsMessagingTemplateを使用する。
    JmsMessagingTemplateJmsTemplateをラップしているため、JmsTemplateのプロパティを利用することで設定を行うことができる。
    ただし、一部JmsTemplateをそのまま使用したほうがよい場合が存在する。具体的な使用例については後ほど説明する。
  • org.springframework.jms.listener.DefaultMessageListenerContainer
    DefaultMessageListenerContainerはQueueからメッセージを受け取り、受け取ったメッセージを処理するMessageListenerを起動させる。
  • @org.springframework.jms.annotation.JmsListener
    JMSのMessageListenerとして扱うメソッドであることを示すマーカアノテーション。
    メッセージを受け取った際に処理を行うメソッドに対して@JmsListenerアノテーションを付与する。
  • org.springframework.jms.connection.JmsTransactionManager
    JMS(javax.jms.Connection/ javax.jms.Session)のAPIを呼び出して、トランザクションを管理するための実装クラス。

8.2.1.3.1. メッセージを同期送信する場合

メッセージを同期送信する処理の流れについて図を用いて説明する。
Send of Spring JMS
項番 説明
(1)
Service内で、JmsMessagingTemplateに対して「送信対象のDestination名」と「送信するメッセージのペイロード」を渡して処理を実行する。
JmsMessagingTemplateJmsTemplateに処理を委譲する。
(2)
JmsTemplateはJNDI経由で取得されたConnectionFactoryからjavax.jms.Connectionを取得する。
(3)
JmsTemplateMessageProducerDestinationとメッセージを渡す。
MessageProducerjavax.jms.Sessionから生成される。(Sessionは(2)で取得したConnectionから生成される。)
また、Destinationは(1)で渡された「送信対象のDestination名」をもとにJNDI経由で取得される。
(4)
MessageProducerは送信対象のDestinationへメッセージを送信する。

8.2.1.3.2. メッセージを非同期受信する場合

メッセージを非同期受信する処理の流れについて図を用いて説明する。
ASync of Spring JMS
項番 説明
(1)
JNDI経由で取得されたConnectionFactoryからConnectionを取得する。
(2)
DefaultMessageListenerContainerMessageConsumerDestinationを渡す。
MessageConsumerSessionから生成される。(Sessionは(1)で取得したConnectionから生成される。)
また、Destination@JmsListenerアノテーションで指定された「受信対象のDestination名」をもとにJNDI経由で取得される。
(3)
MessageConsumerDestinationからメッセージを受信する。
(4)
受信したメッセージを引数として、MessageListener内の@JmsListenerアノテーションが設定されたメソッド(リスナーメソッド)が呼び出される。リスナーメソッドはDefaultMessageListenerContainerで管理される。

8.2.1.3.3. メッセージを同期受信する場合

メッセージを同期受信する処理の流れについて図を用いて説明する。
Sync of Spring JMS
項番 説明
(1)
Service内で、JmsMessagingTemplateに対して、「受信対象のDestination名」を渡す。
JmsMessagingTemplateJmsTemplateに処理を委譲する。
(2)
JmsTemplateはJNDI経由で取得されたConnectionFactoryからConnectionを取得する。
(3)
JmsTemplateMessageConsumerDestinationとメッセージを渡す。
MessageConsumerSessionから生成される。(Sessionは(2)で取得したConnectionから生成される。)
また、Destinationは(1)で渡された「受信対象のDestination名」をもとにJNDI経由で取得される。
(4)
MessageConsumerDestinationからメッセージを受信する。
メッセージはJmsTemplateJmsMessagingTemplateを経由してServiceに返却される。

8.2.1.4. プロジェクト構成について

JMSを利用する場合のプロジェクトの推奨構成について説明する。
シリアライズしたJavaBeanをObjectMessage経由で送受信する場合、このJavaBeanを送信側と受信側で共有する必要がある。
この場合、既存のブランクプロジェクトとは別にmodelプロジェクトを追加することを推奨する。
  • modelの共有
  • 送信または受信側のクライアントがmodelを提供していない場合

    modelプロジェクトを追加して、通信先のクライアントにJarファイルを配布する。

  • 送信または受信側のクライアントがmodelを提供している場合

    提供されたmodelをライブラリに追加する。

modelプロジェクト、または、配布されたアーカイブファイルと既存のプロジェクトとの関係は以下のようになる。
Projects
項番 プロジェクト名 説明
(1)
webプロジェクト
非同期受信を行うためのリスナークラスを配置する。
(2)
domainプロジェクト
非同期受信を行うためのリスナークラスから実行されるServiceを配置する。
その他、Repositoryなどは従来と同じである。
(3)
modelプロジェクトもしくはJarファイル
ドメイン層に属するクラスのうち、クライアント間で共有するクラスを使用する。

modelプロジェクトを追加するためには、以下を実施する。
  • modelプロジェクトの作成
  • domainプロジェクトからmodelプロジェクトへの依存関係の追加
詳細な追加方法については、同じようにJavaBeanの共有を行っている SOAP Web Service(サーバ/クライアント)SOAPサーバ用にプロジェクトの設定を変更する を参照されたい。

8.2.2. How to use

8.2.2.1. メッセージの送受信に共通する設定

本節では、メッセージの送受信に必要となる共通的な設定について説明する。

8.2.2.1.1. 依存ライブラリの設定

Spring FrameworkのJMS連携用コンポーネントを利用するために、domainプロジェクトのpom.xmlにSpring Frameworkのspring-jmsを追加する。
  • [projectName]-domain/pom.xml
<dependencies>

     <!-- (1) -->
     <dependency>
         <groupId>org.springframework</groupId>
         <artifactId>spring-jms</artifactId>
     </dependency>

 </dependencies>
項番 説明
(1)
spring-jmsをdependenciesに追加する。
spring-jmsspring-messagingに依存するため、spring-messagingも推移的に依存ライブラリとして追加される。
spring-jmsの他に、pom.xmlにJMSプロバイダのライブラリを追加する。
pom.xmlへのライブラリの追加例については、JMSプロバイダに依存する設定 を参照されたい。

Note

上記設定例は、依存ライブラリのバージョンを親プロジェクトである terasoluna-gfw-parent で管理する前提であるため、pom.xmlでのバージョンの指定は不要である。 上記の依存ライブラリはterasoluna-gfw-parentが依存しているSpring Bootで管理されている。


8.2.2.1.2. ConnectionFactoryの設定

ConnectionFactoryの定義の方法には、アプリケーションサーバで定義する方法と、Bean定義ファイルで定義する方法がある。
特別な理由がない場合、Bean定義ファイルをJMSプロバイダ非依存とするため、アプリケーションサーバで定義する方法を選択する。
本節では、アプリケーションサーバで定義する方法についてのみ説明する。
アプリケーションサーバで定義したConnectionFactoryを使用するためには、Bean定義ファイルにJNDI経由で取得したJavaBeanを利用するための設定を行う必要がある。
  • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
<!-- (1) -->
<jee:jndi-lookup id="connectionFactory" jndi-name="jms/ConnectionFactory"/>
項番 説明
(1)
jndi-name属性に、アプリケーションサーバ提供のConnectionFactoryのJNDI名を指定する。
resource-ref属性がデフォルトでtrueのため、JNDI名にプレフィックス(java:comp/env/)がない場合は、自動的に付与される。

Note

Bean定義したConnectionFactoryを使用する場合

JNDIを利用しない場合、ConnectionFactoryの実装クラスをBean定義することでもConnectionFactoryを利用することが可能である。 この場合、ConnectionFactoryの実装クラスはJMSプロバイダ依存となる。詳細については、JMSプロバイダに依存する設定 の”JNDIを使用しない場合の設定”を参照されたい。

8.2.2.1.3. DestinationResolverの設定

Destinationの名前解決には、JNDIによる解決とJMSプロバイダでの解決の二通りの方法がある。
デフォルトではJMSプロバイダでの解決が行われるが、ポータビリティや管理の観点から、特別な理由がない場合はJNDIによる解決を推奨する。
org.springframework.jms.support.destination.JndiDestinationResolverを使用することで、JNDI名によりDestinationの名前解決を行うことができる。
以下にJndiDestinationResolverの定義例を示す。
  • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
<!-- (1) -->
<bean id="destinationResolver"
   class="org.springframework.jms.support.destination.JndiDestinationResolver">
   <property name="resourceRef" value="true" /> <!-- (2) -->
</bean>
項番 説明
(1)
JndiDestinationResolverをBean定義する。
(2)
JNDI名にプレフィックス(java:comp/env/)がないときに、自動的に付与させる場合はtrueを設定する。デフォルトはfalseである。

Warning

<jee:jndi-lookup/>resource-ref属性とはデフォルト値が異なることに注意されたい。

Note

DynamicDestinationResolverを使用する場合

JNDIを利用せずにJMSプロバイダでDestinationの名前解決する場合、DynamicDestinationResolverを利用する。 DynamicDestinationResolverの設定については、JMSプロバイダに依存する設定 の”JNDIを使用しない場合の設定”を参照されたい。

8.2.2.2. メッセージを同期送信する方法

PTPモデルにて、クライアント(Producer)からJMSプロバイダへメッセージを同期送信する方法を説明する。

8.2.2.2.1. 基本的な同期送信

JmsMessagingTemplateを利用して、JMSプロバイダへの同期送信処理を実現する。
Todoクラスのオブジェクトをメッセージ同期送信する場合の実装例を示す。
最初にJmsMessagingTemplateの設定方法を示す。
  • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
<bean id="cachingConnectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- (1) -->
   <property name="targetConnectionFactory" ref="connectionFactory" /> <!-- (2) -->
   <property name="sessionCacheSize" value="1" />  <!-- (3) -->
</bean>
項番 説明
(1)
SessionMessageProducer/Consumerのキャッシュを行うorg.springframework.jms.connection.CachingConnectionFactoryをBean定義する。
Bean定義もしくはJNDI名でルックアップしたJMSプロバイダ固有のConnectionFactoryをそのまま使うのではなく、 CachingConnectionFactoryにラップして使用することで、キャッシュ機能を使用することができる。
(2)
Bean定義もしくはJNDI名でルックアップしたJMSプロバイダ固有のConnectionFactoryを指定する。
(3)
Sessionのキャッシュ数を設定する。(デフォルト値は1)
この例では1を指定しているが、性能要件に応じて適宜キャッシュ数を変更すること。
このキャッシュ数を超えてセッションが必要になるとキャッシュを使用せず、新しいセッションの作成と破棄を繰り返すことになる。
すると処理効率が下がり、性能劣化の原因になるので注意すること。
  • [projectName]-domain/src/main/resources/META-INF/spring/[projectName]-infra.xml
<!-- (1) -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory" ref="cachingConnectionFactory" />
   <property name="destinationResolver" ref="destinationResolver" />
</bean>

<!-- (2) -->
<bean id="jmsMessagingTemplate" class="org.springframework.jms.core.JmsMessagingTemplate">
    <property name="jmsTemplate" ref="jmsTemplate"/>
</bean>
項番 説明
(1)
JmsTemplateをBean定義する。
JmsTemplateは低レベルのAPIハンドリング(JMS API呼び出し)を代行する。
設定可能な属性に関しては、下記のJmsTemplateの属性一覧を参照されたい。
(2)
JmsMessagingTemplateをBean定義する。同期送信処理を代行するJmsTemplateをインジェクションする。
同期送信に関連するJmsTemplateの属性は以下が存在する。
必要に応じて設定を行う必要がある。
項番 設定項目 内容 必須 デフォルト値
connectionFactory
使用するConnectionFactoryを設定する。
なし(必須であるため)
pubSubDomain
メッセージングモデルについて設定する。
PTP(Queue)モデルはfalse、Pub/Sub(Topic)はtrueに設定する。
- false
sessionTransacted
セッションでのトランザクション管理をするかどうか設定する。
本ガイドラインでは、後述するトランザクション管理を使用するため、デフォルトのままのfalseを推奨する。
- false
messageConverter
メッセージコンバータを設定する。
本ガイドラインで紹介している範囲では、デフォルトのままで問題ない。
- SimpleMessageConverter(*1)が使用される。
destinationResolver
DestinationResolverを設定する。
本ガイドラインでは、JNDIで名前解決を行う、JndiDestinationResolverを設定することを推奨する。
-
DynamicDestinationResolver(*2)が使用される。
(DynamicDestinationResolverを利用するとJMSプロバイダでDestinationの名前解決が行われる。)
defaultDestination
既定のDestinationを設定する。
Destinationを明示的に指定しない場合、このDestinationが使用される。
- null(既定のDestinationなし)
deliveryMode
配信モードを1(NON_PERSISTENT)、2(PERSISTENT)から設定する。
2(PERSISTENT)は、メッセージの永続化を行う。
1(NON_PERSISTENT)は、メッセージの永続化を行わない。
そのため、性能は上がるが、JMSプロバイダの再起動などが起こるとメッセージが消失する可能性がある。
本ガイドラインでは、メッセージの消失を避けるため、 2(PERSISTENT)を使用することを推奨する。
この設定を使用する場合、後述するexplicitQosEnabledtrueを設定する必要があるので注意すること。
- 2(PERSISTENT)
priority
メッセージの優先度を設定する。優先度は0から9まで設定できる。
数値が大きいほど優先度が高くなる。
同期送信時にメッセージがQueueに格納される時点で優先度が評価され、優先度が高いメッセージは低いメッセージより先に取り出されるように格納される。
優先度が同じメッセージはFIFO(First-In First-Out)で扱われる。
この設定を使用する場合、後述するexplicitQosEnabledtrueを設定する必要があるので注意すること。
- 4
timeToLive
メッセージの有効期限をミリ秒で設定する。
メッセージが有効期限に達すると、JMSプロバイダはQueueからメッセージを削除する。
この設定を使用する場合、後述するexplicitQosEnabledtrueを設定する必要があるので注意すること。
- 0(無制限)
explicitQosEnabled
deliveryModeprioritytimeToLiveを有効にする場合はtrueを設定する。
- false

(*1)org.springframework.jms.support.converter.SimpleMessageConverter

(*2)org.springframework.jms.support.destination.DynamicDestinationResolver


次に送信対象のJavaBeanを作成する。
  • [projectName]-domain/src/main/java/com/example/domain/model/Todo.java
package com.example.domain.model;

import java.io.Serializable;

public class Todo implements Serializable { // (1)

    private static final long serialVersionUID = -1L;

    // omitted

    private String description;

    // omitted

    private boolean finished;

    // omitted

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public boolean isFinished() {
        return finished;
    }

    public void setFinished(boolean finished) {
        this.finished = finished;
    }

}
項番 説明
(1)
基本的には通常のJavaBeanで問題ないが、シリアライズして送信するため、java.io.Serializableインタフェース を実装する必要がある。
最後に実際に同期送信を行う処理を記述する。
以下では、指定したテキストをもつTodoオブジェクトをQueueに同期送信する実装例を示す。
  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import javax.inject.Inject;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import com.example.domain.model.Todo;

@Service
public class TodoServiceImpl implements TodoService {

    @Inject
    JmsMessagingTemplate jmsMessagingTemplate;    // (1)

    @Override
    public void sendMessage(String message) {

       Todo todo = new Todo();
       // omitted

       jmsMessagingTemplate.convertAndSend("jms/queue/TodoMessageQueue", todo);  // (2)

    }
}
項番 説明
(1)
JmsMessagingTemplateをインジェクションする。
(2)
JmsMessagingTemplateconvertAndSendメソッドを使用して、引数のJavaBeanをorg.springframework.messaging.Messageインタフェースの実装クラスに変換し、指定したDestinationに対しメッセージを同期送信する。
デフォルトで変換には、org.springframework.jms.support.converter.SimpleMessageConverterが使用される。
SimpleMessageConverterを使用すると、javax.jms.Messagejava.lang.Stringbyte配列java.util.Mapjava.io.Serializableインタフェースを実装したクラスを送信可能である。

Note

業務ロジック内でJMSの例外ハンドリング

JMS (Java Message Service)のIntroductionで触れられているように、Spring Frameworkでは検査例外を非検査例外に変換している。 そのため、業務ロジック内でJMSの例外をハンドリングする場合は、非検査例外を扱う必要がある。

Templateクラス 例外の変換を行うメソッド 変換後の例外
JmsMessagingTemplate
JmsMessagingTemplateconvertJmsExceptionメソッド
MessagingException(*1)及びそのサブ例外
JmsTemplate
JmsAccessorconvertJmsAccessExceptionメソッド
JmsException(*2)及びそのサブ例外

(*1) org.springframework.messaging.MessagingException

(*2) org.springframework.jms.JmsException


8.2.2.2.2. メッセージヘッダを編集して同期送信する場合

JmsMessagingTemplateconvertAndSendメソッドの引数にKey-Value形式のヘッダ属性と値を指定することで、ヘッダ属性を編集して同期送信することが可能である。 ヘッダの詳細については、javax.jms.Messageを参照されたい。 送信、応答メッセージなどを紐づける役割のJMSCorrelationIDを同期送信時に指定する場合の実装例を示す。

  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import java.util.Map;
import javax.inject.Inject;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.jms.support.JmsHeaders;
import com.example.domain.model.Todo;

@Service
public class TodoServiceImpl implements TodoService {

@Inject
JmsMessagingTemplate jmsMessagingTemplate;

  public void sendMessageWithCorrelationId(String correlationId) {

    Todo todo = new Todo();
    // omitted

    Map<String, Object> headers = new HashMap<>();
    headers.put(JmsHeaders.CORRELATION_ID, correlationId);// (1)

    jmsMessagingTemplate.convertAndSend("jms/queue/TodoMessageQueue",
            todo, headers); // (2)

  }
}
項番 説明
(1)
Mapの実装クラスに対し、ヘッダ属性名とその値を設定してヘッダ情報を作成する。
(2)
JmsMessagingTemplateconvertAndSendメソッドを使用することで、(2)で作成したヘッダ情報を付与したメッセージを同期送信する。

Warning

編集可能なヘッダ属性について

Spring FrameworkのSimpleMessageConverterによるメッセージ変換時には、ヘッダ属性の一部(JMSDestinationJMSDeliveryModeJMSExpirationJMSMessageIDJMSPriorityJMSRedeliveredJMSTimestamp)をread-onlyとして扱っている。 そのため、上記の実装例のようにread-onlyのヘッダ属性を設定しても、送信したメッセージのヘッダには格納されない。(メッセージのプロパティとして保持される。) read-onlyのヘッダ属性うち、JMSDeliveryModeJMSPriorityについては、JmsTemplate単位での設定が可能である。 詳細については、基本的な同期送信JmsTemplateの属性一覧を参照されたい。

8.2.2.2.3. トランザクション管理

データの一貫性を保証する必要がある場合は、トランザクション管理機能を使用する。
本ガイドラインで推奨する「宣言型トランザクション管理」を利用した実装例を以下に示す。
「宣言型トランザクション管理」の詳細は、トランザクション管理について を参照されたい。

トランザクション管理を実現するためには、org.springframework.jms.connection.JmsTransactionManagerを利用する。
最初に設定例を示す。
  • [projectName]-domain/src/main/resources/META-INF/spring/[projectName]-infra.xml
<!-- (1) -->
<bean id="sendJmsTransactionManager"
   class="org.springframework.jms.connection.JmsTransactionManager">
   <!-- (2) -->
   <property name="connectionFactory" ref="cachingConnectionFactory" />
</bean>
項番 説明
(1)
JmsTransactionManagerをBean定義する。

Note

TransactionManagerのbean名について

@Transactionalアノテーションを付与した場合、デフォルトではBean名transactionManagerで登録されているBeanが使用される。 (詳細は、トランザクション管理を使うための設定について を参照されたい)

Blankプロジェクトには、transactionManagerというBean名でDataSourceTransactionManagerが定義されているため、上記の設定では別名でBeanを定義している。

そのため、アプリケーション内で、TransactionManagerを1つしか使用しない場合は、bean名をtransactionManagerにすることで@TransactionalアノテーションでのtransactionManager属性の指定を省略することができる。

(2)
トランザクションを管理するCachingConnectionFactoryを指定する。

トランザクション管理を行い、TodoオブジェクトをQueueに同期送信する実装例を以下に示す。

  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import javax.inject.Inject;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.example.domain.model.Todo;

@Service
@Transactional("sendJmsTransactionManager")  // (1)
public class TodoServiceImpl implements TodoService {
   @Inject
   JmsMessagingTemplate jmsMessagingTemplate;

   @Override
   public void sendMessage(String message) {

      Todo todo = new Todo();
      // omitted

      jmsMessagingTemplate.convertAndSend("jms/queue/TodoMessageQueue", todo);  // (2)
   }

}
項番 説明
(1)
@Transactionalアノテーションを利用してトランザクション境界を宣言する。
これにより、クラス内の各メソッドの開始時にトランザクションが開始され、メソッドの終了時にトランザクションがコミットされる。
(2)
Queueにメッセージを同期送信する。
ただし、実際にメッセージがQueueに送信されるのはトランザクションがコミットされるタイミングとなるので注意すること。

DBのトランザクション管理を行う必要があるアプリケーションでは、業務の要件をもとにJMSとDBのトランザクションの関連を精査した上でトランザクションの管理方針を決定すること。

JMSとDBのトランザクションの連携にはJTAによるグローバルトランザクションを使用する方法があるが、プロトコルの特性上、性能面のオーバーヘッドがかかるため、”Best Effort 1 Phase Commit”の使用を推奨する。詳細は以下を参照されたい。

Warning

メッセージ受信後にJMSプロバイダとの接続が切れるなどでJMSプロバイダにトランザクションの処理結果が返らない場合

メッセージ受信後にJMSプロバイダとの接続が切れるなどで、JMSプロバイダにトランザクションの処理結果が返らない場合、トランザクションの扱いはJMSプロバイダに依存する。 そのため、受信したメッセージの消失などを考慮した設計を行うこと。 特に、メッセージの消失が絶対に許されないような場合には、メッセージの消失を補う仕組みを用意するか、グローバルトランザクションなどの利用を検討する必要がある。

“Best Effort 1 Phase Commit”はorg.springframework.data.transaction.ChainedTransactionManagerを利用することで実現する。
以下に、JMSのトランザクション管理にトランザクション管理sendJmsTransactionManagerを使用し、DBのトランザクション管理にBlankプロジェクトのデフォルトの設定で定義されているtransactionManagerを使用する設定例を示す。
  • [projectName]-domain/src/main/resources/META-INF/spring/[projectName]-infra.xml
<!-- (1) -->
<bean id="sendChainedTransactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
    <constructor-arg>
        <list>
            <!-- (2) -->
            <ref bean="sendJmsTransactionManager" />
            <ref bean="transactionManager" />
        </list>
    </constructor-arg>
</bean>
項番 説明
(1)
ChainedTransactionManagerをBean定義する。
(2)
JMSとDBのトランザクションマネージャを指定する。
登録した順にトランザクションが開始され、登録した逆順にトランザクションがコミットされる。

上記の設定を利用した実装例を以下に示す。

  • [projectName]-domain/src/main/java/com/example/domain/service/todo/ChainedTransactionalTodoServiceImpl.java

    package com.example.domain.service.todo;
    
    import javax.inject.Inject;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    import com.example.domain.model.Todo;
    
    @Service
    @Transactional("sendChainedTransactionManager")  // (1)
    public class ChainedTransactionalTodoServiceImpl implements ChainedTransactionalTodoService {
       @Inject
       JmsMessagingTemplate jmsMessagingTemplate;
    
       @Inject
       TodoSharedService todoSharedService;
    
       @Override
       public void sendMessage(String message) {
           Todo todo = new Todo();
           // omitted
    
           jmsMessagingTemplate.convertAndSend("jms/queue/TodoMessageQueue", todo); // (2)
    
           // omitted
           todoSharedService.insert(todo); // (3)
       }
    
    }
    
    項番 説明
    (1)
    @TransactionalアノテーションにsendChainedTransactionManagerを指定することで、JMSとDBのトランザクション管理を行う。
    @Transactionalアノテーションの詳細については、ドメイン層の実装トランザクション管理についてを参照されたい。
    (2)
    メッセージの同期送信を行う。
    (3)
    DBアクセスを伴う処理を実行する。この例では、DBの更新を伴うSharedServiceを実行している。

    Note

    業務上、JMSとDBなど複数のトランザクションをまとめて管理する必要がある場合、グローバルトランザクションを検討する。 グローバルトランザクションについては、トランザクション管理を使うための設定についての”複数DB(複数リソース)に対するトランザクション管理(グローバルトランザクションの管理)が必要な場合”を参照されたい。


8.2.2.3. メッセージを非同期受信する方法

JMSとはの”メッセージ受信方式”で述べたように、一般的に受信処理を行う場合には非同期受信を利用する。
非同期受信機能を司るDefaultMessageListenerContainerに対し、@JmsListenerアノテーションが付与されたリスナーメソッドを登録することで非同期受信処理を実現する。
非同期受信時の処理を行うリスナーメソッドの役割として、以下が存在する。
  1. メッセージを受け取るためのメソッドを提供する。
    @JmsListenerアノテーションが付与されたメソッドを実装することで、メッセージを受け取ることができる。
  2. 業務処理の呼び出しを行う。
    リスナーメソッドでは業務処理の実装は行わず、Serviceのメソッドに処理を委譲する。
  3. 業務ロジックで発生した例外のハンドリングを行う。
    ビジネス例外や正常稼働時に発生するライブラリ例外のハンドリングを行う。
  4. 処理結果をメッセージ送信する。
    応答メッセージなどの送信が必要なメソッドでは、org.springframework.jms.listener.adapter.JmsResponseを利用することで、指定したDestinationに対してリスナーメソッドや業務ロジックの処理結果をメッセージ送信することができる。

8.2.2.3.1. 基本的な非同期受信

@JmsListenerアノテーションを利用した非同期受信の方法について説明をする。
非同期受信の実装には下記の設定が必要となる。
  • JMS Namespaceを定義する。
  • @JmsListenerアノテーションを有効化する。
  • DIコンテナで管理しているコンポーネントのメソッドに@JmsListenerアノテーションを指定する。
それぞれの詳細な実装方法について、以下に記述する。
  • [projectName]-web/src/main/resources/META-INF/spring/applicationContext.xml
<!-- (1) -->
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- (2) -->
    <jms:annotation-driven />


    <!-- (3) -->
    <jms:listener-container
        factory-id="jmsListenerContainerFactory"
        destination-resolver="destinationResolver"
        concurrency="1"/>
項番 属性名 内容
(1)
xmlns:jms
JMS Namespaceを定義する。
値としてhttp://www.springframework.org/schema/jmsを指定する。
JMS Namespaceの詳細については、JMS Namespace Supportを参照されたい。
  xsi:schemaLocation
スキーマのURLを指定する。
値にhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsdを追加する。
(2)
-
<jms:annotation-driven />を利用して、@JmsListenerアノテーションや@SendToアノテーション等のJMS関連のアノテーション機能を有効化する。
(3)
-
<jms:listener-container/>を利用してDefaultMessageListenerContainerを生成するファクトリへパラメータを与えることで、DefaultMessageListenerContainerの設定を行う。
<jms:listener-container/>の属性には、利用したいConnectionFactoryのBeanを指定できるconnection-factory属性が存在する。connection-factory属性のデフォルト値はconnectionFactoryである。
この例では、ConnectionFactoryの設定で示したConnectionFactoryのBean(Bean名はconnectionFactory)を利用するため、connection-factory属性を省略している。
<jms:listener-container/>には、ここで紹介した以外の属性も存在する。
詳細については、Attributes of the JMS <listener-container> elementを参照されたい。

Warning

DefaultMessageListenerContainer内部には独自のキャッシュ機能が備わっている。一方で、APサーバ製品やMOM製品によって関連リソースをキャッシュする場合もある。両者の管理に不整合が生じないようにcache属性でキャッシュレベルを指定すること。 詳細については、DefaultMessageListenerContainerのJavadocを参照されたい。 本ガイドラインでは、<jms:listener-container/>connection-factory属性には、ConnectionFactoryの設定で定義したConnectionFactoryを指定する。

  concurrency
DefaultMessageListenerContainerが管理するリスナーメソッドごとの並列数に対する上限を指定する。
concurrency属性のデフォルトは1である。
並列数の下限と上限を指定することも可能である。例えば、下限を5、上限を10とする場合は”5-10”と指定する。
リスナーメソッドの並列数が設定した上限値に達した場合は、並列に処理されず待ち状態となる。
必要に応じて値を設定すること。

Note

リスナーメソッド単位で並列数を指定したい場合は、@JmsListenerアノテーションのconcurrency属性を利用することができる。

  destination-resolver
非同期受信時のDestination名解決で使用するDestinationResolverのBean名を設定する。
DestinationResolverのBean定義については、DestinationResolverの設定を参照されたい。
destination-resolver属性を指定していない場合はDefaultMessageListenerContainer内で生成されたDynamicDestinationResolverが利用される。
  factory-id
Bean定義を行うDefaultJmsListenerContainerFactoryの名前を設定する。
@JmsListenerアノテーションがデフォルトでBean名jmsListenerContainerFactoryを参照するため、<jms:listener-container/>が一つの場合はBean名をjmsListenerContainerFactoryとすることを推奨する。
  cache
ConnectionSessionMessageConsumerなどのキャッシュ対象を決定するために、キャッシュレベルを指定する。
connection, session, consumer, none(キャッシュしない), auto(自動的に選択)のいずれかより選択する。
ここではデフォルトのautoを指定するため、cache属性を省略している。

Note

autoを指定した場合、transaction-manager属性の指定有無によって、挙動が変わる。 指定した場合は none指定時と同様となり、指定しない場合は consumer指定時と同様となる。 これは、transaction-manager属性が、JTAトランザクションを使用する場合にのみ指定することに起因している。 アプリケーションサーバ内でConnectionSessionなどをプールしない場合は、性能向上のため consumerを指定することを検討すること。

DIコンテナで管理しているコンポーネントのメソッドに@JmsListenerアノテーションを指定することで、指定したDestinationより非同期でメッセージを受信する。 実装方法を以下に示す。

  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
package com.example.listener.todo;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import com.example.domain.model.Todo;
@Component
public class TodoMessageListener {

   @JmsListener(destination = "jms/queue/TodoMessageQueue")   // (1)
   public void receive(Todo todo) {
      // omitted
   }

}
項番 説明
(1)
非同期受信用のメソッドに対し@JmsListenerアノテーションを設定する。destination属性には、受信先のDestination名を指定する。

@JmsListenerアノテーションの主な属性の一覧を以下に示す。 詳細やその他の属性については、@JmsListenerアノテーションのJavadocを参照されたい。

項番 項目 内容
destination
受信するDestinationを指定する。
containerFactory
リスナーメソッドの管理を行うDefaultJmsListenerContainerFactoryのBean名を指定する。
デフォルトはjmsListenerContainerFactoryである。
selector
受信するメッセージを限定するための条件であるメッセージセレクタを指定する。
明示的に値を指定しない場合、デフォルトは”“(空文字)であり、すべてのメッセージが受信対象となる。
利用方法については、非同期受信するメッセージを限定する場合を参照されたい。
concurrency
リスナーメソッドの並列数の上限を指定する。
concurrency属性のデフォルトは1である。
並列数の下限と上限を指定することも可能である。例えば、下限を5、上限を10とする場合は”5-10”と指定する。
リスナーメソッドの並列数が設定した上限値に達した場合は、並列に処理されず待ち状態となる。
必要に応じて値を設定すること。

8.2.2.3.2. メッセージのヘッダ情報を取得する

非同期受信の処理結果をProducer側で指定したDestination(ヘッダ属性JMSReplyToの値)に送信する場合など、メッセージのヘッダ情報をリスナーメソッド内で利用する場合には、@org.springframework.messaging.handler.annotation.Headerアノテーションを利用する。
実装例を以下に示す。
  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
@JmsListener(destination = "jms/queue/TodoMessageQueue")
public JmsResponse<Todo> receiveAndResponse(
        Todo todo, @Header("jms_replyTo") Destination storeResponseMessageQueue) { // (1)

    // omitted

     return JmsResponse.forDestination(todo, storeResponseMessageQueue);
}
項番 説明
(1)
受信メッセージのヘッダ属性JMSReplyToの値を取得するために、@Headerアノテーションを指定する。
JMSの標準ヘッダ属性を取得する場合に指定するキーの値については、JmsHeadersの定数の定義を参照されたい。

8.2.2.3.3. 非同期受信後の処理結果をメッセージ送信

@JmsListenerアノテーションを定義したメソッドの処理結果を、応答メッセージとしてDestinationに送信する方法が用意されている。
処理結果の送信先を指定する方法として、以下の2つが存在する。
  • 処理結果の送信先を静的に指定する場合
  • 処理結果の送信先を動的に指定する場合

それぞれについて、以下に説明する。

  • 処理結果の送信先を静的に指定する場合
    @JmsListenerアノテーションが定義されているメソッドに対し、Destinationを指定した@SendToアノテーションを定義することで、固定のDestinationへの処理結果のメッセージ送信を実現する。
    実装例を以下に示す。
  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
@JmsListener(destination = "jms/queue/TodoMessageQueue")
@SendTo("jms/queue/ResponseMessageQueue") // (1)
public Todo receiveMessageAndSendTo(Todo todo) {

    // omitted
    return todo; // (2)
}
項番 説明
(1)
@SendToアノテーションを定義することで、処理結果の送信先に対するデフォルトのDestinationを指定できる。
(2)
@SendToアノテーションに定義したDestinationに送信するデータを返却する。
許可されている返却値の型はorg.springframework.messaging.Messagejavax.jms.MessageStringbyte配列、MapSerializableインタフェースを実装したクラス である。
  • 処理結果の送信先を動的に変更する場合
動的に送信先のDestinationを変更する場合はJmsResponseクラスのforDestinationforQueueメソッドを用いる。
送信先のDestinationやDestination名を動的に変更することで、任意のDestinationに処理結果を送信することができる。実装例を以下に示す。
  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
@JmsListener(destination = "jms/queue/TodoMessageQueue")
public JmsResponse<Todo> receiveMessageJmsResponse(Todo todo) {

    // omitted

    String resQueue = null;

    if (todo.isFinished()) {
        resQueue = "jms/queue/FinihedTodoMessageQueue";
    } else {
        resQueue = "jms/queue/ActiveTodoMessageQueue";
    }

    return JmsResponse.forQueue(todo, resQueue); // (1)
}
項番 説明
(1)
処理内容に応じて送信先のQueueを変更する場合はJmsResponseクラスのforDestinationforQueueメソッドを使用する。
この例では、forQueueメソッドを利用して、Destination名から送信を行っている。

Note

JmsResponseクラスのforQueueメソッドを利用する場合は、文字列であるDestination名を利用する。 Destination名の解決には、DefaultMessageListenerContainerに指定したDestinationResolverが利用される。

Note

処理結果の送信先をProducer側で指定する場合

以下のように実装することで、Producer側で指定した任意のDestinationに処理結果のメッセージを送信することができる。

実装箇所 実装内容
Producer側
JMS標準に則りメッセージのヘッダ属性JMSReplyToにDestinationを指定する。
ヘッダ属性の編集については、メッセージヘッダを編集して同期送信する場合を参照されたい。
Consumer側
メッセージ送信するオブジェクトを返却する。

ヘッダ属性JMSReplyToはConsumer側で指定したデフォルトのDestinationよりも優先される。 詳細については、Response managementを参照されたい。

8.2.2.3.4. 非同期受信するメッセージを限定する場合

受信時にメッセージセレクタを指定することで受信するメッセージを限定することができる。

  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
@JmsListener(destination = "jms/queue/MessageQueue" , selector = "TodoStatus = 'deleted'")    // (1)
public void receive(Todo todo) {
    // omitted
}
項番 説明
(1)
selector属性を利用することで受信対象の条件を設定することができる。
ヘッダ属性のTodoStatusdeletedのメッセージのみ受信する。
メッセージセレクタはSQL92条件式構文のサブセットに基づいている。
詳細はMessage Selectorsを参照されたい。

8.2.2.3.5. 非同期受信したメッセージの入力チェック

セキュリティなどの観点から、不正なデータを保持したメッセージを業務ロジック内で処理しないよう、入力チェックを行うべきである。
Method Validationを利用してServiceのメソッドで入力チェックを実装し、入力チェックエラー時の例外をリスナーメソッドでハンドリングする。
これは、トランザクション管理を行う場合に、入力チェックエラー時の例外によって無用なロールバック処理が起こることを回避するためである。トランザクション管理については、トランザクション管理を参照されたい。
Method Validationの設定や実装方法の詳細は、入力チェックMethod Validationを参照されたい。
基本的な同期送信で示したTodoのオブジェクトに対して入力チェックを行う実装例を以下に示す。
  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import javax.validation.Valid;
import org.springframework.validation.annotation.Validated;
import com.example.domain.model.Todo;

@Validated // (1)
public interface TodoService {

    void updateTodo(@Valid Todo todo); // (2)

}
項番 説明
(1)
@Validatedアノテーションを付けることで、このインタフェースが入力チェック対象であることを宣言する。
(2)
Bean Validationの制約アノテーションをメソッドの引数として指定する。
  • [projectName]-domain/src/main/java/com/example/domain/model/Todo.java
package com.example.domain.model;

import java.io.Serializable;
import javax.validation.constraints.Null;

// (1)
public class Todo implements Serializable {

    private static final long serialVersionUID = -1L;

    // omitted

    @Null
    private String description;

    // omitted

    private boolean finished;

    // omitted

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    public boolean isFinished() {
        return finished;
    }

    public void setFinished(boolean finished) {
        this.finished = finished;
    }

}
項番 説明
(1)
Bean ValidationでJavaBeanの入力チェックを定義する。
この例では一例として@Nullアノテーションを設定している。
詳細は「入力チェック」を参照されたい。
  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
@Inject
TodoService todoService;

@JmsListener(destination = "jms/queue/MessageQueue")
public void receive(Todo todo) {
    try {
        todoService.updateTodo(todo); // (1)
    } catch (ConstraintViolationException e) { // (2)
        // omitted
    }
}
項番 説明
(1)
入力チェックを行うServiceのメソッドを実行する。
(2)
制約違反時に発生するConstraintViolationExceptionを捕捉する。
捕捉後には任意の処理を実行可能である。
論理的なエラーメッセージを格納するためのQueueを利用する場合など、別のQueueにメッセージ送信する例については、非同期受信時の例外ハンドリングを参照されたい。

8.2.2.3.6. トランザクション管理

データの一貫性を保証する必要がある場合は、トランザクション管理機能を使用する。
非同期受信で使用するSpring JMS の DefaultMessageListenerContainerには、JMSのトランザクション管理の仕組みが組み込まれている。listener-containeracknowledge属性でその機能を切り替えられる。それを利用した場合の実装例を以下に示す。

Note

メッセージがQueueに戻されると、そのメッセージが再度非同期受信されるため、エラーの原因が解決していない場合は、ロールバック、非同期受信を繰り返すこととなる。 JMSプロバイダによっては、ロールバック後の再送信回数に閾値を設定でき、再送信された回数が閾値を超えた場合、Dead Letter Queueにメッセージを格納する。

  • [projectName]-web/src/main/resources/META-INF/spring/applicationContext.xml
<!-- (1) -->
<jms:listener-container
    factory-id="jmsListenerContainerFactory"
    destination-resolver="destinationResolver"
    concurrency="1"
    error-handler="jmsErrorHandler"
    acknowledge="transacted"/>
項番 属性名 内容
(1)
cache
ConnectionSessionMessageConsumerなどのキャッシュ対象を決定するために、キャッシュレベルを指定する。
ここではデフォルトのautoを指定するため、cache属性を省略している。
基本的な非同期受信の説明を合わせて参照されたい。
  acknowledge
トランザクションを有効にするため、確認応答モードにtransactedを指定する。デフォルトはautoである。

Warning

アプリケーションサーバによっては、アプリケーション内でのConnectionSessionのキャッシュを禁止している場合があるため、使用するアプリケーションサーバの仕様に応じてキャッシュの有効化、無効化を決定すること。

Warning

非同期受信と同期送信・受信を併用し、かつ、単一のトランザクションで管理したい場合、jms:listener-containerconnection-factory属性とJmsTemplateconnectionFactoryプロパティで指定するConnectionFactoryのインスタンスを同一にすること。これによって、Springは非同期受信と同期送受信で利用するSessionを共有するため、単一のトランザクションとなる。 このとき、jms:listener-containerおよび JmsTemplateの両方でキャッシュを有効にするには、以下のような手段が候補となる。

  • JMS関連リソースのキャッシュをAPサーバ製品に任せ、JNDIルックアップ経由で取得したオブジェクトを非同期受信と同期送信・受信の両方でそのまま使用する。
  • MOM製品がconnectionfactoryのcache機能を持っている場合、それを非同期受信と同期送信・受信の両方でそのまま使用する。
  • org.springframework.jms.connection.CachingConnectionFactoryを非同期受信と同期送信・受信の両方でそのまま使用する。

いずれの場合もlistener-containercacheにはnoneを指定すること。


Note

特定の例外の場合にロールバック以外の例外ハンドリングを行う方法

トランザクション管理を有効にした場合、入力チェックなどで発生した例外を捕捉せずにthrowすると、ロールバックによってメッセージがQueueに戻される。 リスナーメソッドはQueueに戻されたメッセージを再度非同期受信するため、非同期受信→エラー発生→ロールバックがJMSプロバイダの設定回数分繰り返されることになる。 リトライによってエラーの原因が解消されないような例外の場合は、上記のような無駄な処理を抑えるため、例外を補足してリスナーメソッドからthrowしないようにハンドリングを行う。 詳細については、非同期受信時の例外ハンドリングを参照されたい。

DBのトランザクション管理を行う必要があるアプリケーションでは、業務の要件をもとにJMSとDBのトランザクションの関連を精査した上でトランザクションの管理方針を決定すること。 非同期受信でJMSとDBのトランザクションを連携させるには以下のような方法が考えられる。

  1. JTAによるグローバルトランザクションを使用する方法
  2. ”Best Effort 1 Phase Commit”を使用する方法
  3. JMSとDBのトランザクションを個別に指定する方法

このうち、以下を理由に「JMSとDBのトランザクションを個別に指定する方法」の利用を検討されたい。 同期送信のトランザクション管理(トランザクション管理)でも紹介したように、 JTAによるグローバルトランザクションはプロトコルの特性上、性能面のオーバヘッドがかかる。 これを解消するため、同期送信では”Best Effort 1 Phase Commit”を使用するトランザクション管理方法を紹介したが、非同期受信ではトランザクションが不適切な構成になるため推奨されない。 一般的にリカバリの観点からDBトランザクション境界よりJMSトランザクション境界を外側に置く構成をとるが、 SpringのDefaultMessageListenerContainerは独自のトランザクション管理機構を持つために、 JTA用の設定である jms:listener-containertransaction-manager属性を活用し”Best Effort 1 Phase Commit”を実現しようとすると、 DBトランザクション境界がJMSトランザクション境界の外側になってしまう。 結果、非同期で受信したメッセージが正常に処理されたにもかかわらずDBトランザクションがロールバックされる可能性が生じる。

Warning

メッセージ受信後にJMSプロバイダとの接続が切れた場合などでJMSプロバイダにトランザクションの処理結果が返らない場合

メッセージ受信後にJMSプロバイダとの接続が切れた場合などで、JMSプロバイダにトランザクションの処理結果が返らない場合、トランザクションの扱いはJMSプロバイダに依存する。 そのため、受信したメッセージの消失や、ロールバックによるメッセージの再処理などを考慮した設計を行うこと。 特に、メッセージの消失が許されないような場合には、メッセージの消失を補う仕組みを用意するか、グローバルトランザクションなどの利用を検討する必要がある。

本ガイドラインではグローバルトランザクションは使わずに、上記の通りJMSのトランザクションはSpring JMSが内部で実装しているトランザクション管理に委ね、DBのトランザクションをブランクプロジェクトのデフォルトの設定で定義されているtransactionManagerで管理する方法を推奨する。その実装例を以下に示す。

  • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
package com.example.listener.todo;

import javax.inject.Inject;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import com.example.domain.service.todo.TodoService;
import com.example.domain.model.Todo;
@Component
public class TodoMessageListener {
    @Inject
    TodoService todoService;

    @JmsListener(destination = "TransactedQueue") // (1)
    public void receiveTransactedMessage(Todo todo) {

        todoService.update(todo);

    }
}
  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.example.domain.model.Todo;

@Transactional // (2)
@Service
public class TodoServiceImpl implements TodoService {

    @Override
    public void update(Todo todo) {
        // omitted
    }
}
項番 説明
(1)
@JmsListenerアノテーションを定義し、JMSのトランザクション管理を有効にしたDefaultJmsListenerContainerFactoryを指定する。
@JmsListenerアノテーションはデフォルトでBean名jmsListenerContainerFactoryを参照するため、containerFactory属性を省略している。
(2)
DBのトランザクション境界を定義する。
valueを省略しているため、デフォルトで、Bean名transactionManagerを参照する。同期送信ではJmsTransactionManagerChainedTransactionManagerのBean名を指定したが、非同期受信ではJMSのトランザクションはSpringに委ねるためDBのトランザクションマネージャを参照させる。
@Transactionalアノテーションの詳細については、ドメイン層の実装トランザクション管理についてを参照されたい。

Note

トランザクション境界のネストの順序は業務要件によるが、JMSプロバイダは外部システムとの連携に使用される場合が多い。 その場合はJMSトランザクション境界をDBトランザクション境界の外側に置き、内向きのDBトランザクションを先に完結する方がリカバリは容易である。 DBのトランザクションをコミットし、JMSのトランザクションがロールバックした場合、メッセージがQueueに戻されるため、同じメッセージを再度処理することになる。 設計上の考慮点として、業務処理の再実行時にDB更新処理を再試行しても問題ないように設計する必要がある。


上記の設定、実装例に従ってアプリケーションを作成した場合の挙動について説明する。

  • リスナーメソッドの処理が正常に終了した場合
DefaultMessageListenerContainerがJMSトランザクションを開始・コミットし、DBのトランザクションマネージャがDBのトランザクションを開始・コミットする。
JMS/DB Transaction
項番 説明
(1)
JMSのトランザクションを開始する。
(2)
DBのトランザクションを開始する。
(3)
DBのトランザクションをコミットし、DBのトランザクションを終了する。
(4)
リスナーメソッドが正常終了する。
(5)
JMSのトランザクションをコミットし、JMSのトランザクションを終了する。
  • 業務ロジックで予期せぬ例外が発生した場合

サービスで例外が発生した場合JMSトランザクションとDBトランザクションの両方をロールバックする。

JMS/DB Transaction
項番 説明
(1)
JMSのトランザクションを開始する。
(2)
DBのトランザクションを開始する。
(3)
業務ロジックで予期しない例外が発生する。
(4)
DBのトランザクションをロールバックし、DBのトランザクションを終了する。
(5)
JMSのトランザクションをロールバックし、JMSのトランザクションを終了する。
JMSのトランザクションがロールバックするため、メッセージがQueueに戻される。
  • メッセージ受信後にJMSプロバイダとの接続が切れた場合などで、DBのトランザクションのみコミットしてしまう場合

非同期受信を伴う処理をグローバルトランザクションで管理しない場合は、DBトランザクションとJMSトランザクションは別々にコミットすることになるため、 JMSとDBの状態に不整合が生じる可能性がある。具体的には以下の様な場合が該当する。

  • JMSコネクションの切断を検知できずにDBの更新処理を続け、コミットしてしまう場合
  • DBトランザクションのコミット後でJMSトランザクションをコミットする前に例外が発生した場合

そのような場合に、JMSのトランザクションをロールバックした後に再度同じメッセージを処理することもあれば、送信側によって同一内容のメッセージを複数回送信してしまうことがある。そのような背景で同じメッセージを複数受信した場合でもデータの完全性を保障する必要がある。 その対策として、JMSMessageID、または、PropertyBodyに含まれる、リクエストを一意に特定するための情報を記録する方法がある。 これは、メッセージの受信ごとに過去に記録した情報と比較し、処理の状況に応じて処理し分けることを意味する。 なお、以下のとおり、利用する情報によって対応できる事象に差がある。

  • JMSMessageIDを記録する場合、メッセージがロールバックされた際の二重処理にのみ対応できる。
  • PropertyBodyの一部を記録する場合、メッセージがロールバックされた際に加えて、異常時などに業務上同一の意味をもつメッセージが複数回送信された際の二重処理にも対応できる。
JMS/DB Transaction
項番 説明
(1)
JMSのトランザクションを開始する。
(2)
DBのトランザクションを開始する。
(3)
DBのトランザクションをコミットし、DBのトランザクションを終了する。
(4)
JMSのトランザクションのコミット前にJMSプロバイダとの接続が切れるなどの予期せぬエラーが発生する。
(5)
JMSのトランザクションのコミットに失敗する。
そのため、メッセージ消失などに備え、整合性を担保するための仕組みを用意する必要がある。

Note

上記のような事象を避け、JMSとDBなど複数のトランザクションを厳密に管理する必要がある場合には、グローバルトランザクションの利用を検討する。 グローバルトランザクションについては、各種製品マニュアルを参照されたい。


8.2.2.3.7. 非同期受信時の例外ハンドリング

トランザクション管理を行う場合には、ロールバック処理を考慮した例外のハンドリングを行う必要がある。
トランザクション管理の詳細については、トランザクション管理を参照されたい。
JMSの例外ハンドリングは、目的に応じて以下の2種類のパターンに分類される。
表-例外ハンドリングのパターン
項番 ハンドリングの目的 ハンドリング対象となり得る例外の例 ハンドリング方法 ハンドリング単位
(1)
ビジネス層で発生した例外を個別にハンドリングする場合
入力チェックエラーなどのビジネス例外
リスナーメソッド
(try-catch)
リスナーメソッド単位
(2)
リスナーメソッドからthrowされた例外を統一的にハンドリングする場合
入出力エラーなどのシステム例外
ErrorHandler
JMSListenerContainer単位
  • ビジネス層で発生した例外を個別にハンドリングする場合

    メッセージの内容が不正である場合など、ビジネス層で発生した例外をリスナーメソッドで捕捉(try-catch)し、リスナーメソッド単位でハンドリングを行う。
    トランザクション管理を行う場合、ロールバックが必要なケースは例外をDefaultMessageListenerContainerにthrowする必要があるため、補足した例外をthrowし直すこと。
    実装例を以下に示す。
    • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
    @Inject
    TodoService todoService;
    
    @JmsListener(destination = "jms/queue/TodoMessageQueue")
    public JmsResponse<Todo> receiveTodo(Todo todo) {
        try {
            todoService.insertTodo(todo);
        } catch (BusinessException e) {
            return JmsResponse.forQueue(todo, "jms/queue/ErrorMessageQueue"); // (1)
        }
        return null; // (2)
    }
    
    項番 説明
    (1)
    JmsResponseクラスのforQueueメソッドを利用し、任意のオブジェクトを論理的なエラーメッセージを格納するためのQueueに送信することができる。
    この例では、AOPでログ出力が行われるBusinessExceptionを捕捉しているため、明示的にログ出力処理などを記述していないが、例外の原因を消失させないように例外をハンドリングする必要がある。
    トランザクション管理を行い、ロールバックしてメッセージの再処理を行いたい場合には、捕捉した例外をthrowする必要がある。
    (2)
    メッセージを送信しない場合は、返り値をnullにする。
  • リスナーメソッドからthrowされた例外を統一的にハンドリングする場合

    例外ごとに共通的なハンドリングを行う場合には、<jms:listener-container/>error-handler属性に定義したErrorHandlerの実装クラスを利用する。
    設定方法を以下に示す。
    • [projectName]-web/src/main/resources/META-INF/spring/applicationContext.xml
    <!-- (1) -->
    <jms:listener-container
        factory-id="jmsListenerContainerFactory"
        destination-resolver="destinationResolver"
        concurrency="1"
        error-handler="jmsErrorHandler"
        acknowledge="transacted"/>
    
    <!-- (2) -->
    <bean id="jmsErrorHandler"
        class="com.example.domain.service.todo.JmsErrorHandler">
    </bean>
    
    項番 説明
    (1)
    <jms:listener-container/>error-handler属性にエラーハンドリングクラスのBean名を定義する。
    (2)
    エラーハンドリングクラスをBean定義する。

    実装方法を以下に示す。

    • [projectName]-web/src/main/java/com/example/listener/todo/JmsErrorHandler.java
    package com.example.listener.todo;
    
    import org.springframework.util.ErrorHandler;
    import org.terasoluna.gfw.common.exception.SystemException;
    
    public class JmsErrorHandler implements ErrorHandler {  // (1)
    
       @Override
        public void handleError(Throwable t) { // (2)
            // omitted
            if (t.getCause() instanceof SystemException) {  // (3)
    
                // omitted system error handling
    
            } else {
                // omitted error handling
            }
        }
    }
    
    項番 説明
    (1)
    ErrorHandlerインタフェースを実装したエラーハンドリングクラスを作成する。
    (2)
    リスナーメソッド内で発生した例外はorg.springframework.jms.listener.adapter.ListenerExecutionFailedExceptionにラップされ、引数として渡される。
    (3)
    任意の例外クラスを判定し、例外に沿ったエラーハンドリングを実施する。
    アプリケーション内で発生した例外を取得するにはt.getCause()を実行する必要がある。

8.2.2.4. メッセージを同期受信する方法

JmsMessagingTemplateを利用して、JMSプロバイダへの同期受信処理を実現する。
同期受信を利用することで、任意のタイミングでメッセージの受信が可能である。
同期受信を利用しない実現方法を十分に検討した上で、アーキテクチャを決定すること。
同期受信のBean定義ファイルの設定を以下に示す。
  • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
<bean id="cachingConnectionFactory"
   class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- (1) -->
   <property name="targetConnectionFactory" ref="connectionFactory" /> <!-- (2) -->
   <property name="sessionCacheSize" value="1" />  <!-- (3) -->
</bean>
項番 説明
(1)
SessionMessageProducer, MessageConsumerのキャッシュを行うorg.springframework.jms.connection.CachingConnectionFactoryをBean定義する。
Bean定義もしくはJNDI名でルックアップしたJMSプロバイダ固有のConnectionFactoryをそのまま使うのではなく、 CachingConnectionFactoryにラップして使用することで、キャッシュ機能を使用することができる。
(2)
Bean定義もしくはJNDI名でルックアップしたConnectionFactoryを指定する。
(3)
Sessionのキャッシュ数を設定する。(デフォルト値は1)
この例では1を指定しているが、性能要件に応じて適宜キャッシュ数を変更すること。
このキャッシュ数を超えてセッションが必要になるとキャッシュを使用せず、新しいセッションの作成と破棄を繰り返すことになる。
すると処理効率が下がり、性能劣化の原因になるので注意すること。
  • [projectName]-domain/src/main/resources/META-INF/spring/[projectName]-infra.xml
<!-- (1) -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory" ref="cachingConnectionFactory" />
   <property name="destinationResolver" ref="destinationResolver" />
</bean>

<!-- (2) -->
<bean id="jmsMessagingTemplate" class="org.springframework.jms.core.JmsMessagingTemplate">
    <property name="jmsTemplate" ref="jmsTemplate"/>
</bean>
項番 説明
(1)
JmsTemplateをBean定義する。
JmsTemplateは低レベルのAPIハンドリング(JMS API呼び出し)を代行する。
設定可能な属性に関しては、下記のJmsTemplateの属性一覧を参照されたい。
(2)
JmsMessagingTemplateをBean定義する。同期受信処理を代行するJmsTemplateをインジェクションする。
同期受信に関連するJmsTemplateの属性一覧を以下に示す。
必要に応じて設定を行う必要がある。
項番 設定項目 内容 必須 デフォルト値
connectionFactory
使用するConnectionFactoryを設定する。
なし(必須であるため)
pubSubDomain
メッセージングモデルについて設定する。
PTP(Queue)モデルはfalse、Pub/Sub(Topic)はtrueに設定する。
- false
sessionTransacted
セッションでのトランザクション管理をするかどうか設定する。
本ガイドラインでは、後述するトランザクション管理を使用するため、デフォルトのままのfalseを推奨する。
- false
sessionAcknowledgeMode
sessionAcknowledgeModeはセッションの確認応答モードを設定する。
詳細についてはJmsTemplateのJavaDocを参照されたい。
-
1
receiveTimeout
同期受信時のタイムアウト時間(ミリ秒)を設定する。未設定の場合、メッセージを受信するまで待機する。
未設定の状態だと、後続の処理に影響が出てしまうため、必ず適切なタイムアウト時間を設定すること。
-
0
messageConverter
メッセージコンバータを設定する。
本ガイドラインで紹介している範囲では、デフォルトのままで問題ない。
- SimpleMessageConverter(*1)が使用される。
destinationResolver
DestinationResolverを設定する。
本ガイドラインでは、JNDIで名前解決を行う、JndiDestinationResolverを設定することを推奨する。
-
DynamicDestinationResolver(*2)が使用される。
(DynamicDestinationResolverを利用するとJMSプロバイダでDestinationの名前解決が行われる。)
defaultDestination
既定のDestinationを設定する。
Destinationを明示的に指定しない場合、このDestinationが使用される。
- null(既定のDestinationなし)

(*1)org.springframework.jms.support.converter.SimpleMessageConverter

(*2)org.springframework.jms.support.destination.DynamicDestinationResolver

JmsMessagingTemplateクラスのreceiveAndConvertメソッドにより、メッセージの同期受信を行う。実装例を以下に示す。

  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
package com.example.domain.service.todo;

import javax.inject.Inject;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import com.example.domain.model.Todo;

@Service
public class TodoServiceImpl implements TodoService {
    @Inject
    JmsMessagingTemplate jmsMessagingTemplate;

    @Override
    public String receiveTodo() {

       // omitted
       Todo retTodo = jmsMessagingTemplate.receiveAndConvert("jms/queue/TodoMessageQueue", Todo.class);   // (1)

    }
}
項番 説明
(1)
JmsMessagingTemplatereceiveAndConvertメソッドにより、指定したDestinationからメッセージを受信する。
receiveAndConvertメソッドは、第2引数に変換先のクラスを指定することで型変換したクラスが取得できる。
ヘッダ項目を参照する場合はreceiveメソッドを使用することにより、Spring FrameworkのMessageオブジェクトで取得することができる。

8.2.3. Appendix

8.2.3.1. JMSプロバイダに依存する設定

JMSプロバイダごとに設定が異なる場合がある。 以下にJMSプロバイダごとの設定について説明する。

8.2.3.1.1. Apache ActiveMQを利用する場合

Apache ActiveMQを利用する場合の設定について説明する。

  • アプリケーションサーバに対するJMSプロバイダ固有の設定

    JMSプロバイダによっては、固有の設定が必要な場合がある。
    Apache ActiveMQでは、受信するメッセージのペイロードが許可されたオブジェクトで構成されていることを保障するために、環境変数をアプリケーションサーバの起動引数に追加する必要がある。
    詳細については、ObjectMessageを参照されたい。
    環境変数をApache Tomcatの起動引数に追加する例を以下に示す。JBoss Enterprise Application Platform 7.2の場合はConfiguring JBoss EAP to Run as a Serviceを、JBoss Enterprise Application Platform 6.4の場合はService Configurationを、Weblogicの場合はStarting Managed Servers with a Startup Scriptを参照されたい。
    • $CATALINA_HOME/bin/setenv.sh
    # omitted
    # (1)
    -Dorg.apache.activemq.SERIALIZABLE_PACKAGES=java.lang,java.util,org.apache.activemq,org.fusesource.hawtbuf,com.thoughtworks.xstream.mapper,com.example.domain.model
    # omitted
    
    項番 説明
    (1)
    許可する任意のオブジェクトのパッケージを追加する。java.lang, java.util, org.apache.activemq, org.fusesource.hawtbuf, com.thoughtworks.xstream.mapperはApache ActiveMQを使用する場合に必要な設定である。
    このサンプルで必要な設定値として、”com.example.domain.model”を追加している。
  • ライブラリの追加

    spring-jmsライブラリにはJMS APIが含まれない。
    JMSプロバイダのライブラリにはJMS APIを含むことが多いが、JMSプロバイダのライブラリにJMS APIが含まれない場合は、pom.xmlにJMS APIを追加する。
    domainプロジェクトとwebプロジェクトのpom.xmlにactivemq-clientをビルド用のライブラリとして追加する。
    また、アプリケーションサーバにactivemq-clientとその依存ライブラリを追加する。
    • [projectName]-domain/pom.xml
    • [projectName]-web/pom.xml
    <dependencies>
    
        <!-- (1) -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <scope>provided</scope>
        </dependency>
    
    </dependencies>
    
    項番 説明
    (1)
    Apache ActiveMQのクライアントライブラリをビルド用としてdependenciesに追加する。activemq-clientのライブラリにはJMS APIも含まれているため、JMS APIをライブラリとして追加する必要はない。

Note

上記設定例は、依存ライブラリのバージョンを親プロジェクトである terasoluna-gfw-parent で管理する前提であるため、pom.xmlでのバージョンの指定は不要である。 上記の依存ライブラリはterasoluna-gfw-parentが依存しているSpring Bootで管理されている。


Warning

Macchinetta Server Framework で使用しているSpring Bootでは、Apache ActiveMQと接続などを行う際に使用するライブラリのバージョンを定義している。 そのため、Apache ActiveMQのバージョンを決定する際には注意すること。 また、Macchinetta Server Framework のバージョンアップの際には、ライブラリとミドルウェアのバージョンの整合性が取れなくなる可能性があるので注意すること。


  • アプリケーションサーバへのJNDI登録

    アプリケーションサーバへのJNDI登録については、Manually integrating Tomcat and ActiveMQを参照されたい。
  • JNDIを使用しない場合の設定

    本ガイドラインではJNDIによる名前解決する方法を推奨しているが、
    アプリケーションサーバ上で動かせない単体テストの実施において、JMSプロバイダと接続する場合などには、JNDIを利用しないケースがある。
    その場合、ConnectionFactoryの実装クラスのBeanの生成と、JMSプロバイダでDestinationの名前解決を行うためにDynamicDestinationResolverを設定する必要がある。
    ただし、JmsTemplatedestinationResolver属性やDefaultMessageListenerContainerdestination-resolver属性を省略した場合は、内部的に生成されたDynamicDestinationResolverが使用されるため、DynamicDestinationResolverのBean定義を省略可能である。
    また、QueueについてもJNDIを用いて指定していたが、JMSプロバイダーの機能を用いてDestinationに指定したQueueが存在しない場合に、指定した名前のQueueを動的に生成させることができる。
    アプリケーションサーバを介さずに接続を行うにはApache ActiveMQの内部Brokerを用いる必要がある。
    Apache ActiveMQの内部Brokerの設定についてはHow do I embed a Broker inside a Connectionを参照されたい。
    テスト用のコンテキストに下記の設定を追加すること。
    • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
    <!-- (1) -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg value="tcp://localhost:61616"/>  <!-- (2) -->
    </bean>
    
    <!-- (3) -->
    <bean id="destinationResolver" class="org.springframework.jms.support.destination.DynamicDestinationResolver">
    </bean>
    
    項番 説明
    (1)
    Apache ActiveMQのConnectionFactoryをBean定義する。
    (2)
    Apache ActiveMQの起動URLを指定する。起動URLは各環境に沿った値を設定する。
    (3)
    DynamicDestinationResolverをBean定義する。
    Destinationが指定されていない場合には、省略可能である。

8.2.3.2. 同一メッセージの大量送信

同一メッセージの大量送信の実装においてJmsMessageTemplateを使用する場合、メモリ使用量が増えてしまう可能性がある。
そのため、JmsTemplateクラスのsendメソッドを使用して実装を行うことを検討する必要がある。
理由としては、JmsMessageTemplateではメッセージ送信処理を行うたびにorg.springframework.jms.core.MessageCreatorというクラスのインスタンスが生成されてしまう。
無駄なインスタンスの生成を防ぐために、送信処理時にMessageCreatorのインスタンスが生成されないJmsTemplateクラスのsendメソッドで送信を行うことでメモリの使用量を削減するようにする。
以下に、ある文字列を同一Destinationに100件送信を行う場合コード例を示す。
  • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
 package com.example.domain.service.todo;

 import java.io.IOException;
 import javax.inject.Inject;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
 import org.springframework.stereotype.Service;

 @Service
 public class TodoServiceImpl implements TodoService {

    @Inject
    JmsTemplate jmsTemplate; // (1)

    @Override
    public void sendManyMessage(final String messageStr) throws IOException {
        MessageCreator mc = new MessageCreator() { // (2)
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage(); // (3)
                message.setText(messageStr);

                // omitted
                return message;
            }
        };
        for (int i = 0; i < 100; i++) {
            jmsTemplate.send("jms/queue/TodoMessageQueue", mc); // (4)
        }
    }
}
項番 説明
(1)
JmsMessagingTemplateを使用すると、送信のたびにMessageCreaterの生成が行われてしまうため、MessageCreaterの生成を送信と分離して定義できるJmsTemplateを利用する。
(2)
JMSのMessageを作成するためにMessageCreatorのインスタンスを生成する。
(3)
JmsTemplateクラスのsendメソッドでメッセージを送信することで、ループごとにMessageCreatorのインスタンスを生成が行われなくなり、 メモリの使用量を削減させることができるようになる。

8.2.3.3. サイズの大きなデータの送受信

画像データなどサイズの大きなデータ(目安として1 MB以上)を扱う場合、同時トランザクション数やヒープサイズによってはOutOfMemoryErrorが発生する可能性がある。 JMSの標準APIではサイズの大きなデータをストリームとして扱うことができるのはプリミティブ型のデータの送信を行うStreamMessageと未解釈のバイトストリームの送信を行えるByteMessageのみである。 そのため、JMS APIではなく、JMSプロバイダベンダ毎に用意している固有のAPIを使用するケースがある。

8.2.3.3.1. Apache ActiveMQを利用する場合

Blob Messageを使用することでサイズの大きなメッセージを送受信することができる。実装例を以下に示す。

Note

org.apache.activemq.BlobMessageを使用する場合、Apache ActiveMQ独自のAPIを使用することになるため、 Spring Frameworkが提供しているMessageCachingConnectionFactoryを使用することはできない。 性能影響を考慮し、BlobMessageを使用する場合はBlobMessage用のJmsTemplateを別途定義することを推奨する。

  • 設定

    BlobMessageを用いたメッセージの送信では、メッセージはヒープ領域ではなく、一時的にApache ActiveMQが起動しているサーバに格納される。 メッセージの格納先の定義例を以下に示す。

    • [projectName]-env/src/main/resources/META-INF/spring/[projectName]-env.xml
    <bean id="connectionFactory"
       class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
          <!-- (1) -->
          <value>tcp://localhost:61616?jms.blobTransferPolicy.uploadUrl=/tmp</value>
        </property>
    </bean>
    
    項番 説明
    (1)
    一時的にメッセージを格納するApache ActiveMQのサーバのディレクトリを定義する。
    jms.blobTransferPolicy.uploadUrlにはデフォルトでhttp://localhost:8080/uploads/が設定されており、デフォルトかbrokerURLをオーバーロードすることで一時ファイルの置き場を指定できる。
    例では/tmpに一時的にファイルを格納している。
  • 送信

    Blob Messageを利用した送信クラスの実装例を以下に示す。

    • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
    package com.example.domain.service.todo;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import javax.inject.Inject;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import org.apache.activemq.ActiveMQSession;
    import org.apache.activemq.BlobMessage;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    @Service
    public class TodoServiceImpl implements TodoService {
        @Inject
        JmsTemplate jmsTemplate;
    
        @Override
        public void sendBlobMessage(String inputFilePath) throws IOException {
    
            Path path = Paths.get(inputFilePath);
            try (final InputStream inputStream = Files.newInputStream(path)) {
    
                jmsTemplate.send("jms/queue/TodoMessageQueue", new MessageCreator() {
                    public Message createMessage(Session session) throws JMSException {
    
                        ActiveMQSession activeMQSession = (ActiveMQSession) session;  // (1)
    
                        BlobMessage blobMessage = activeMQSession.createBlobMessage(inputStream);  // (2)
                        return blobMessage;
                    }
                });
            }
        }
    }
    
    項番 説明
    (1)
    BlobMessageを使用するにはApache ActiveMQ独自APIであるorg.apache.activemq.ActiveMQSessionを使用する。
    (2)
    ActiveMQSessionより、送信データを指定してBlobMessageを生成する。
    createBlobMessageメソッドの引数はFileInputStreamURLクラスが指定可能である。
  • 受信

    受信クラスの実装例を以下に示す。

    • [projectName]-web/src/main/java/com/example/listener/todo/TodoMessageListener.java
    package com.example.listener.todo;
    
    import java.io.IOException;
    import javax.inject.Inject;
    import javax.jms.JMSException;
    import org.apache.activemq.BlobMessage;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    import com.example.domain.service.todo.TodoService;
    @Component
    public class TodoMessageListener {
        @Inject
        TodoService todoService;
        @JmsListener(destination = "jms/queue/TodoMessageQueue")
        public void receiveBlobMessage(BlobMessage message) throws IOException, JMSException {
         todoService.fileInputBlobMessage(message);
            // omitted
        }
    }
    
    • [projectName]-domain/src/main/java/com/example/domain/service/todo/TodoServiceImpl.java
    package com.example.domain.service.todo;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import org.apache.activemq.BlobMessage;
    import org.springframework.stereotype.Service;
    
    @Service
    public class TodoServiceImpl implements TodoService {
    
        @Override
        public void fileInputBlobMessage(BlobMessage message) throws IOException {
            try(InputStream is =  message.getInputStream()){   // (1)
                Path path = Paths.get("outputFilePath");
                Files.copy(is, path);
                // omitted
            }
        }
    }
    
    項番 説明
    (1)
    受信したBlobMessageより、InputStreamとしてデータを取得する。