Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

本節では、ジョブにおけるトランザクション制御について以下の順序で説明する。

本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。

一般的なバッチ処理におけるトランザクション制御のパターンについて

一般的に、バッチ処理は大量件数を処理するため、処理の終盤で何かしらのエラーが発生した場合に全件処理しなおしとなってしまうと バッチシステムのスケジュールに悪影響を与えてしまう。
これを避けるために、1ジョブの処理内で一定件数ごとにトランザクションを確定しながら処理を進めていくことで、 エラー発生時の影響を局所化することが多い。
(以降、一定件数ごとにトランザクションを確定する方式を「中間コミット方式」、コミット単位にデータをひとまとめにしたものを「チャンク」と呼ぶ。)

中間コミット方式のポイントを以下にまとめる。

  1. エラー発生時の影響を局所化する。

    • 更新時にエラーが発生しても、エラー箇所直前のチャンクまでの処理が確定している。

  2. リソースを一定量しか使わない。

    • 処理対象データの大小問わず、チャンク分のリソースしか使用しないため安定する。

ただし、中間コミット方式があらゆる場面で有効な方法というわけではない。
システム内に一時的とはいえ処理済みデータと未処理データが混在することになる。 その結果、リカバリ処理時に未処理データを識別することが必要となるため、リカバリが複雑になる可能性がある。 これを避けるには、中間コミット方式ではなく、全件を1トランザクションで確定させるしかない。
(以降、全件を1トランザクションで確定する方式を「一括コミット方式」と呼ぶ。)

とはいえ、何千万件というような大量件数を一括コミット方式で処理してしまうと、 コミットを行った際に全件をデータベース反映しようとして高負荷をかけてしまうような事態が発生する。 そのため、一括コミット方式は小規模なバッチ処理には向いているが、大規模バッチで採用するには注意が必要となる。 よって、この方法も万能な方法というわけではない。

つまり、「影響の局所化」と「リカバリの容易さ」はトレードオフの関係にある。 「中間コミット方式」と「一括コミット方式」のどちらを使うかは、ジョブの性質に応じてどちらを優先すべきかを決定してほしい。
もちろん、バッチシステム内のジョブすべてをどちらか一方で実現する必要はない。 基本的には「中間コミット方式」を採用するが、特殊なジョブのみ「一括コミット方式」を採用する(または、その逆とする)ことは自然である。

以下に、「中間コミット方式」と「一括コミット方式」のメリット・デメリット、採用ポイントをまとめる。

表 1. 方式別特徴一覧
コミット方式 メリット デメリット 採用ポイント

中間コミット方式

エラー発生時の影響を局所化する

リカバリ処理が複雑になる可能性がある

大量データを一定のマシンリソースで処理したい場合

一括コミット方式

データの整合性を担保する

大量件数処理時に高負荷になる可能性がある

永続化リソースに対する処理結果をAll or Nothingとしたい場合
小規模のバッチ処理に向いている

データベースの同一テーブルへ入出力する際の注意点

データベースの仕組み上、コミット方式を問わず、 同一テーブルへ入出力する処理で大量データを取り扱う際に注意が必要な点がある。

  • 読み取り一貫性を担保するための情報が出力(UPDATEの発行)により失われた結果、 入力(SELECT)にてエラーが発生することがある。

これを回避するには、以下の対策がある。

  • 情報を確保する領域を大きくする。

    • 拡張する際には、リソース設計にて十分検討の上実施してほしい。

    • 拡張方法は使用するデータベースに依存するため、マニュアルを参照。

  • 入力データを分割し多重処理を行う。

Architecture

Spring Batchにおけるトランザクション制御

ジョブのトランザクション制御はSpring Batchがもつ仕組みを活用する。

以下に2種類のトランザクションを定義する。

フレームワークトランザクション

Spring Batchが制御するトランザクション

ユーザトランザクション

ユーザが制御するトランザクション

チャンクモデルにおけるトランザクション制御の仕組み

チャンクモデルにおけるトランザクション制御は、中間コミット方式のみとなる。 一括コミット方式は実現できない。

チャンクモデルにおける一括コミット方式についてはJIRAにレポートされている。
Spring Batch/BATCH-647
結果、chunk completion policyをカスタマイズしてチャンクサイズを動的に変更することで解決している。 しかし、この方法では全データを1チャンクに格納してしまいメモリを圧迫してしまうため、方式として採用することはできない。

この方式の特徴は、チャンク単位にトランザクションが繰り返し行われることである。

正常系でのトランザクション制御

正常系でのトランザクション制御を説明する。

Transaction Control Chunk Model Normal Process
図 1. 正常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • 入力データがなくなるまで2から8までの処理を繰り返す。

    • チャンク単位で、フレームワークトランザクションを開始する。

    • チャンクサイズに達するまで2から5までの処理を繰り返す。

  2. ステップは、ItemReaderから入力データを取得する。

  3. ItemReaderは、ステップに入力データを返却する。

  4. ステップは、ItemProcessorで入力データに対して処理を行う。

  5. ItemProcessorは、ステップに処理結果を返却する。

  6. ステップはチャンクサイズ分のデータをItemWriterで出力する。

  7. ItemWriterは、対象となるリソースへ出力を行う。

  8. ステップはフレームワークトランザクションをコミットする。

異常系でのトランザクション制御

異常系でのトランザクション制御を説明する。

Transaction Control Chunk Model Abnormal Process
図 2. 異常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • 入力データがなくなるまで2から7までの処理を繰り返す。

    • チャンク単位で、フレームワークトランザクションを開始する。

    • チャンクサイズに達するまで2から5までの処理を繰り返す。

  2. ステップは、ItemReaderから入力データを取得する。

  3. ItemReaderは、ステップに入力データを返却する。

  4. ステップは、ItemProcessorで入力データに対して処理を行う。

  5. ItemProcessorは、ステップに処理結果を返却する。

  6. ステップはチャンクサイズ分のデータをItemWriterで出力する。

  7. ItemWriterは、対象となるリソースへ出力を行う。

    • 2から7までの処理過程で例外が発生すると、その時点で実行中の処理を中断し、以降の処理を行う。

  1. ステップはフレームワークトランザクションをロールバックする。

タスクレットモデルにおけるトランザクション制御の仕組み

タスクレットモデルにおけるトランザクション制御は、下記のいずれかの方式を利用できる。

以下、これらを順に説明する。

タスクレットモデルにおける一括コミット方式

一括コミット方式では、Spring Batchがタスクレット起動時に開始されるトランザクション制御の仕組みをそのまま利用する。
この方式の特徴は、1つのトランザクション内で繰り返しデータ処理を行うことである。

正常系でのトランザクション制御

正常系でのトランザクション制御を説明する。

Single Transaction Control Tasklet Model Normal Process
図 3. 正常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • ステップはフレームワークトランザクションを開始する。

  2. ステップはタスクレットを実行する。

    • 入力データがなくなるまで3から7までの処理を繰り返す。

  3. タスクレットは、Repositoryから入力データを取得する。

  4. Repositoryは、タスクレットに入力データを返却する。

  5. タスクレットは、入力データを処理する。

  6. タスクレットは、Repositoryへ出力データを渡す。

  7. Repositoryは、対象となるリソースへ出力を行う。

  8. タスクレットはステップへ処理終了を返却する。

  9. ステップはフレームワークトランザクションをコミットする。

タスクレットモデルにおける一括コミット方式での注意点

タスクレットモデルで一括コミット方式を利用する場合、Taskletはフレームワークトランザクションの管理下で実行されるため、 StepがTasklet実行前にフレームワークトランザクションを開始し、TaskletがStepに処理終了を返すことでコミットまたはロールバックされ、トランザクションが終了する。
そのため、たとえば、常駐型のバッチプロセスを作る目的で、Tasklet実装内に無限ループを起こすような処理を記述すると、フレームワークトランザクションが終了しなくなる。 利用するDBMSの仕様によっては、上記のようなトランザクションの長期間滞留は性能劣化を引き起こす原因となり得るため、プロセス常駐を目的としたTaskletの実装は避けるべきである。

プロセス常駐型のジョブ実行方式として、非同期実行方式(非同期実行(DBポーリング)非同期実行(Webコンテナ))が適用可能であるかを検討してほしい。

異常系でのトランザクション制御

異常系でのトランザクション制御を説明する。

Single Transaction Control Tasklet Model Abormal Process
図 4. 異常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • ステップはフレームワークトランザクションを開始する。

  2. ステップはタスクレットを実行する。

    • 入力データがなくなるまで3から7までの処理を繰り返す。

  3. タスクレットは、Repositoryから入力データを取得する。

  4. Repositoryは、タスクレットに入力データを返却する。

  5. タスクレットは、入力データを処理する。

  6. タスクレットは、Repositoryへ出力データを渡す。

  7. Repositoryは、対象となるリソースへ出力を行う。

    • 2から7までの処理過程で例外が発生すると、その時点で実行中の処理を中断し、以降の処理を行う。

  1. タスクレットはステップへ例外をスローする。

  2. ステップはフレームワークトランザクションをロールバックする。

タスクレットモデルにおける中間コミット方式

中間コミット方式では、ユーザにてトランザクションを直接操作する。
この方式の特徴は、リソースの操作を行えないフレームワークトランザクションを利用することで、ユーザトランザクションのみにリソースの操作を行わせることである。
transaction-manager属性に、リソースが紐づかないorg.springframework.batch.support.transaction.ResourcelessTransactionManagerを指定する。

正常系でのトランザクション制御

正常系でのトランザクション制御を説明する。

Chunk Transaction Control Tasklet Model Normal Process
図 5. 正常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • ステップはフレームワークトランザクションを開始する。

  2. ステップはタスクレットを実行する。

    • 入力データがなくなるまで3から10までの処理を繰り返す。

  3. タスクレットは、TransactionManagerよりユーザトランザクションを開始する。

    • チャンクサイズに達するまで4から8までの処理を繰り返す。

  4. タスクレットは、Repositoryから入力データを取得する。

  5. Repositoryは、タスクレットに入力データを返却する。

  6. タスクレットは、入力データを処理する。

  7. タスクレットは、Repositoryへ出力データを渡す。

  8. Repositoryは、対象となるリソースへ出力を行う。

  9. タスクレットは、TransactionManagerによりユーザトランザクションのコミットを実行する。

  10. TransactionManagerは、対象となるリソースへコミットを発行する。

  11. タスクレットはステップへ処理終了を返却する。

  12. ステップはフレームワークトランザクションをコミットする。

ここでは1件ごとにリソースへ出力しているが、 チャンクモデルと同様に、チャンク単位で一括更新し処理スループットの向上を狙うことも可能である。 その際に、SqlSessionTemplateexecutorTypeBATCHに設定することで、BatchUpdateを利用することもできる。 これは、MyBatisのItemWriterを利用する場合と同様の動作になるため、MyBatisのItemWriterを利用して更新してもよい。 MyBatisのItemWriterについて、詳細は MyBatisBatchItemWriterを参照。

異常系でのトランザクション制御

異常系でのトランザクション制御を説明する。

Chunk Transaction Control Tasklet Model Abormal Process
図 6. 異常系のシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • ステップはフレームワークトランザクションを開始する。

  2. ステップはタスクレットを実行する。

    • 入力データがなくなるまで3から11までの処理を繰り返す。

  3. タスクレットは、TransactionManagerよりユーザトランザクションを開始する。

    • チャンクサイズに達するまで4から8までの処理を繰り返す。

  4. タスクレットは、Repositoryから入力データを取得する。

  5. Repositoryは、タスクレットに入力データを返却する。

  6. タスクレットは、入力データを処理する。

  7. タスクレットは、Repositoryへ出力データを渡す。

  8. Repositoryは、対象となるリソースへ出力を行う。

    • 3から8までの処理過程で例外が発生すると、その時点で実行中の処理を中断し、以降の処理を行う。

  1. タスクレットは、発生した例外に対する処理を行う。

  2. タスクレットは、TransactionManagerによりユーザトランザクションのロールバックを実行する。

  3. TransactionManagerは、対象となるリソースへロールバックを発行する。

  4. タスクレットはステップへ例外をスローする。

  5. ステップはフレームワークトランザクションをロールバックする。

処理の継続について

ここでは、例外をハンドリングして処理をロールバック後、処理を異常終了しているが、 継続して次のチャンクを処理することも可能である。 いずれの場合も、途中でエラーが発生したことをステップのステータス・終了コードを変更することで後続の処理に通知する必要がある。

フレームワークトランザクションについて

ここでは、ユーザトランザクションをロールバック後に例外をスローしてジョブを異常終了させているが、 ステップへ処理終了を返却しジョブを正常終了させることも出来る。 この場合、フレームワークトランザクションは、コミットされる。

モデル別トランザクション制御の選定方針

Macchinetta Batch 2.xの基盤となるSpring Batchでは、チャンクモデルでは中間コミット方式しか実現できない。 しかし、タスクレットモデルでは、中間コミット方式、一括コミット方式のいずれも実現できる。

よって、Macchinetta Batch 2.xでは、一括コミット方式が必要な際は、タスクレットモデルにて実装する。

起動方式ごとのトランザクション制御の差

起動方式によってはジョブの起動前後にSpring Batchの管理外となるトランザクションが発生する。 ここでは、2つの非同期実行処理方式におけるトランザクションについて説明する。

DBポーリングのトランザクションについて

DBポーリングが行うジョブ要求テーブルへの処理については、Spring Batch管理外のトランザクション処理が行われる。 また、ジョブで発生した例外については、ジョブ内で対応が完結するため、JobRequestPollTaskが行うトランザクションには影響を与えない。

下図にトランザクションに焦点を当てた簡易的なシーケンス図を示す。

With Database polling transaction
図 7. DBポーリング処理のトランザクション
シーケンス図の説明
  1. 非同期バッチデーモンでJobRequestPollTaskが周期実行される。

  2. JobRequestPollTaskは、Spring Batch管理外のトランザクションを開始する。

  3. JobRequestPollTaskは、ジョブ要求テーブルから非同期実行対象ジョブを取得する。

  4. JobRequestPollTaskは、Spring Batch管理外のトランザクションをコミットする。

  5. JobRequestPollTaskは、Spring Batch管理外のトランザクションを開始する。

  6. JobRequestPollTaskは、ジョブ要求テーブルのポーリングステータスをINITからPOLLEDへ更新する。

  7. JobRequestPollTaskは、Spring Batch管理外のトランザクションをコミットする。

  8. JobRequestPollTaskは、ジョブを実行する。

  9. ジョブ内では、管理用データベース(JobRepository)へのトランザクション管理はSpring Batchが行う。

  10. ジョブ内では、ジョブ用データベースへのトランザクション管理はSpring Batchが行う。

  11. JobRequestPollTaskにjob_execution_idが返却される。

  12. JobRequestPollTaskは、Spring Batch管理外のトランザクションを開始する。

  13. JobRequestPollTaskは、ジョブ要求テーブルのポーリングステータスをPOLLEDからEXECUTEへ更新する。

  14. JobRequestPollTaskは、Spring Batch管理外のトランザクションをコミットする。

SELECT発行時のコミットについて

データベースによっては、SELECT発行時に暗黙的にトランザクションを開始する場合がある。 そのため、明示的にコミットを発行することでトランザクションを確定させ、他のトランザクションと明確に区別し影響を与えないようにしている。

WebAPサーバ処理のトランザクションについて

WebAPが対象とするリソースへの処理については、Spring Batch管理外のトランザクション処理が行われる。 また、ジョブで発生した例外については、ジョブ内で対応が完結するため、WebAPが行うトランザクションには影響を与えない。

下図にトランザクションに焦点を当てた簡易的なシーケンス図を示す。

With Web Application transaction
図 8. WebAPサーバ処理のトランザクション
シーケンス図の説明
  1. クライアントからリクエストによりWebAPの処理が実行される。

  2. WebAPは、Spring Batch管理外のトランザクションを開始する。

  3. WebAPは、ジョブ実行前にWebAPでのリソースに対して読み書きを行う。

  4. WebAPは、ジョブを実行する。

  5. ジョブ内では、管理用データベース(JobRepository)へのトランザクション管理はSpring Batchが行う。

  6. ジョブ内では、ジョブ用データベースへのトランザクション管理はSpring Batchが行う。

  7. WebAPにjob_execution_idが返却される。

  8. WebAPは、ジョブ実行後にWebAPでのリソースに対して読み書きを行う。

  9. WebAPは、Spring Batch管理外のトランザクションをコミットする。

  10. WebAPは、クライアントにレスポンスを返す。

How to use

ここでは、1ジョブにおけるトランザクション制御について、以下の場合に分けて説明する。

データソースとは、データの格納先(データベース、ファイル等)を指す。 単一データソースとは1つのデータソースを、複数データソースとは2つ以上のデータソースを指す。

単一データソースを処理するケースは、データベースのデータを加工するケースが代表的である。
複数データソースを処理するケースは、以下のようにいくつかバリエーションがある。

  • 複数のデータベースの場合

  • データベースとファイルの場合

単一データソースの場合

1つのデータソースに対して入出力するジョブのトランザクション制御について説明する。

以下にMacchinetta Batch 2.xでの設定例を示す。

データソースの設定(com.example.batch.config.LaunchContextConfig.java)
// Job-common definitions
public BasicDataSource jobDataSource(@Value("${jdbc.driver}") String driverClassName,
                                     @Value("${jdbc.url}") String url,
                                     @Value("${jdbc.username}") String username,
                                     @Value("${jdbc.password}") String password) {
    final BasicDataSource basicDataSource = new BasicDataSource();
    basicDataSource.setDriverClassName(driverClassName);
    basicDataSource.setUrl(url);
    basicDataSource.setUsername(username);
    basicDataSource.setPassword(password);
    basicDataSource.setMaxTotal(10);
    basicDataSource.setMinIdle(1);
    basicDataSource.setMaxWaitMillis(5000);
    basicDataSource.setDefaultAutoCommit(false);
    return basicDataSource;
}
トランザクションマネージャの設定(com.example.batch.config.LaunchContextConfig.java)
// (1)
@Bean
public PlatformTransactionManager jobTransactionManager(@Qualifier("jobDataSource") DataSource jobDataSource) {
    final DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
    dataSourceTransactionManager.setDataSource(jobDataSource);
    dataSourceTransactionManager.setRollbackOnCommitFailure(true);
    return dataSourceTransactionManager;
}
データソースの設定(META-INF/spring/launch-context.xml)
<!-- Job-common definitions -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
      destroy-method="close"
      p:driverClassName="${jdbc.driver}"
      p:url="${jdbc.url}"
      p:username="${jdbc.username}"
      p:password="${jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false" />
トランザクションマネージャの設定(META-INF/spring/launch-context.xml)
<!-- (1) -->
<bean id="jobTransactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"
      p:dataSource-ref="jobDataSource"
      p:rollbackOnCommitFailure="true" />
項番 説明

(1)

トランザクションマネージャのBean定義
データソースは上記で定義したjobDataSourceを設定する。
コミットに失敗した場合はロールバックをするように設定済み。

トランザクション制御の実施

ジョブモデルおよびコミット方式により制御方法が異なる。

チャンクモデルの場合

チャンクモデルの場合は、中間コミット方式となり、Spring Batchにトランザクション制御を委ねる。 ユーザにて制御することは一切行わないようにする。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader<SalesPlanDetail> reader,
                   ItemWriter<SalesPlanDetail> writer,
                   LoggingItemReaderListener listener) {
    return new StepBuilder("jobSalesPlan01.step01", jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(10, // (2)
                    transactionManager) // (1)
            .reader(reader)
            .writer(writer)
            .listener(listener)
            .build();
}

@Bean
public Job jobSalesPlan01(JobRepository jobRepository, Step step01,
                          LoggingItemReaderListener listener) {
    return new JobBuilder("jobSalesPlan01", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<batch:job id="jobSalesPlan01" job-repository="jobRepository">
    <batch:step id="jobSalesPlan01.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"> <!-- (1) -->
            <batch:chunk reader="detailCSVReader"
                         writer="detailWriter"
                         commit-interval="10" /> <!-- (2) -->
        </batch:tasklet>
    </batch:step>
</batch:job>
項番 説明

(1)

StepBuilderのchunkメソッドの引数transactionManager/<batch:tasklet>要素のtransaction-manager属性に定義済みのjobTransactionManagerを設定する。
ここに設定したトランザクションマネージャで中間コミット方式のトランザクションを制御する。

(2)

StepBuilderのchunkメソッドの引数chunkSize/commit-interval属性にチャンクサイズを設定する。この例では10件処理するごとに1回コミットが発行される。

タスクレットモデルの場合

タスクレットモデルの場合は、一括コミット方式、中間コミット方式でトランザクション制御の方法が異なる。

一括コミット方式

Spring Batchにトランザクション制御を委ねる。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   Tasklet salesPlanSingleTranTask) {
    return new StepBuilder("jobSalesPlan01.step01", jobRepository)
            .tasklet(salesPlanSingleTranTask, transactionManager) // (1)
            .build();
}

@Bean
public Job jobSalesPlan01(JobRepository jobRepository, Step step01,
                          LoggingItemReaderListener listener) {
    return new JobBuilder("jobSalesPlan01", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<batch:job id="jobSalesPlan01" job-repository="jobRepository">
    <batch:step id="jobSalesPlan01.step01">
        <!-- (1) -->
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="salesPlanSingleTranTask" />
    </batch:step>
</batch:job>
項番 説明

(1)

StepBuilderのtaskletメソッドの引数transactionManager/<batch:tasklet>要素のtransaction-manager属性に定義済みのjobTransactionManagerを設定する。
ここに設定したトランザクションマネージャで一括コミット方式のトランザクションを制御する。

中間コミット方式

ユーザにてトランザクション制御を行う。

  • 処理の途中でコミットを発行する場合は、TransactionManagerをInjectして手動で行う。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobResourcelessTransactionManager") PlatformTransactionManager transactionManager,
                   Tasklet salesPlanSingleTranTask) {
    return new StepBuilder("jobSalesPlan01.step01", jobRepository)
            .tasklet(salesPlanSingleTranTask, transactionManager) // (1)
            .build();
}

@Bean
public Job jobSalesPlan01(JobRepository jobRepository, Step step01,
                          LoggingItemReaderListener listener) {
    return new JobBuilder("jobSalesPlan01", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<batch:job id="jobSalesPlan01" job-repository="jobRepository">
    <batch:step id="jobSalesPlan01.step01">
        <!-- (1) -->
        <batch:tasklet transaction-manager="jobResourcelessTransactionManager"
                       ref="salesPlanChunkTranTask" />
    </batch:step>
</batch:job>
実装例
@Component
public class SalesPlanChunkTranTask implements Tasklet {

    @Inject
    ItemStreamReader<SalesPlanDetail> itemReader;

     // (2)
    @Inject
    @Named("jobTransactionManager")
    PlatformTransactionManager transactionManager;

    @Inject
    SalesPlanDetailRepository repository;

    private static final int CHUNK_SIZE = 10;

    @Override
    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {

        DefaultTransactionDefinition definition = new DefaultTransactionDefinition();
        TransactionStatus status = null;
        int count = 0;

        try {
            // omitted.

            itemReader.open(executionContext);

            while ((item = itemReader.read()) != null) {

                if (count % CHUNK_SIZE == 0) {
                    status = transactionManager.getTransaction(definition); // (3)
                }
                count++;
                repository.create(item);
                if (count % CHUNK_SIZE == 0) {
                    transactionManager.commit(status);  // (4)
                }
            }
            if (status != null && !status.isCompleted()) {
                transactionManager.commit(status);    // (5)
            }
        } catch (Exception e) {
            logger.error("Exception occurred while reading.", e);
            if (status != null && !status.isCompleted()) {
                    transactionManager.rollback(status);    // (6)
            }
            throw e;
        } finally {
            if (status != null && !status.isCompleted()) {
                try {
                    transactionManager.rollback(status);    // (7)
                } catch (TransactionException e2) {
                        logger.error("TransactionException occurred while rollback.", e2);
                }
            }
            itemReader.close();
        }

        return RepeatStatus.FINISHED;
    }
}
項番 説明

(1)

StepBuilderのtaskletメソッドの引数transactionManager/<batch:tasklet>要素のtransaction-manager属性に定義済みのjobResourcelessTransactionManagerを設定する。

(2)

トランザクションマネージャをInjectする。
@NamedアノテーションでjobTransactionManagerを指定して利用するBeanを特定させる。

(3)

チャンクの開始時にトランザクションを開始する。

(4)

チャンク終了時にトランザクションをコミットする。

(5)

最後のチャンクについて、トランザクションをコミットする。

(6)

例外発生時にはトランザクションをロールバックする。

(7)

トランザクションが終わっていない場合、トランザクションをロールバックする。

ItemWriterによる更新

上記の例では、Repositoryを使用しているが、ItemWriterを利用してデータを更新することもできる。 ItemWriterを利用することで実装がシンプルになる効果があり、特にファイルを更新する場合はFlatFileItemWriterを利用するとよい。

非トランザクショナルなデータソースに対する補足

ファイルの場合はトランザクションの設定や操作は不要である。

FlatFileItemWriterを利用する場合、擬似的なトランザクション制御が行える。 これは、リソースへの書き込みを遅延し、コミットタイミングで実際に書き出すことで実現している。 正常時にはチャンクサイズに達したときに、実際のファイルにチャンク分データを出力し、例外が発生するとそのチャンクのデータ出力が行われない。

FlatFileItemWriterは、transactionalプロパティでトランザクション制御の有無を切替えられる。デフォルトはtrueでトランザクション制御が有効になっている。 transactionalプロパティがfalseの場合、FlatFileItemWriterは、トランザクションとは無関係にデータの出力を行う。

一括コミット方式を採用する場合、transactionalプロパティをfalseにすることを推奨する。 上記の説明にあるとおりコミットのタイミングでリソースへ書き出すため、それまではメモリ内に全出力分のデータを保持することになる。 そのため、データ量が多い場合にはメモリ不足になりエラーとなる可能性が高くなる。

ファイルしか扱わないジョブにおけるTransactionManagerの設定について

以下に示すジョブ定義のように、StepBuilderのchunkメソッドの引数transactionManager/batch:taskletのtransaction-manager属性は必須のため省略できない。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader<SalesPlanDetail> reader,
                   ItemWriter<SalesPlanDetail> writer) {
    return new StepBuilder("jobSalesPlan01.step01", jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(100,
                    transactionManager)
            .reader(reader)
            .writer(writer)
            .build();
}
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" writer="writer" commit-interval="100" />
</batch:tasklet>

そのため、jobTransactionManagerを常に指定すること。この時、以下の挙動となる。

  • transactionalがtrueの場合

    • 指定したTransactionManagerに同期してリソースに出力する。

  • transactionalがfalseの場合

    • 指定したTransactionManagerのトランザクション処理は空振りし、トランザクションと関係なくリソースに出力する。

この時、jobTransactionManagerが参照するリソース(たとえば、データベース)に対してトランザクションが発行されるが、 テーブルアクセスは伴わないため実害がない。

また、実害がある場合や空振りでも参照するトランザクションを発行したくない場合は、リソースを必要としないResourcelessTransactionManagerを使用することができる。 ResourcelessTransactionManagerLaunchContextConfig.java/launch-context.xmljobResourcelessTransactionManagerとして定義済みである。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobResourcelessTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader<SalesPlanDetail> reader,
                   ItemWriter<SalesPlanDetail> writer) {
    return new StepBuilder("jobSalesPlan01.step01", jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(100,
                    transactionManager)
            .reader(reader)
            .writer(writer)
            .build();
}
<batch:tasklet transaction-manager="jobResourcelessTransactionManager">
    <batch:chunk reader="reader" writer="writer" commit-interval="100" />
</batch:tasklet>

複数データソースの場合

複数データソースに対して入出力するジョブのトランザクション制御について説明する。 入力と出力で考慮点が異なるため、これらを分けて説明する。

複数データソースからの取得

複数データソースからのデータを取得する場合、処理の軸となるデータと、それに付随する追加データを分けて取得する。 以降は、処理の軸となるデータを処理対象レコード、それに付随する追加データを付随データと呼ぶ。

Spring Batchの構造上、ItemReaderは1つのリソースから処理対象レコードを取得することを前提としているためである。 これは、リソースの種類を問わず同じ考え方となる。

  1. 処理対象レコードの取得

    • ItemReaderにて取得する。

  2. 付随データの取得

    • 付随データは、そのデータに対す変更の有無と件数に応じて、以下の取得方法を選択する必要がある。これは、択一ではなく、併用してもよい。

      • ステップ実行前に一括取得

      • 処理対象レコードに応じて都度取得

ステップ実行前に一括取得する場合

以下を行うListenerを実装し、以降のStepからデータを参照する。

  • データを一括して取得する

  • スコープがJobまたはStepのBeanに情報を格納する

    • Spring BatchのExecutionContextを活用してもよいが、 可読性や保守性のために別途データ格納用のクラスを作成してもよい。 ここでは、簡単のためExecutionContextを活用した例で説明する。

マスタデータなど、処理対象データに依存しないデータを読み込む場合にこの方法を採用する。 ただし、マスタデータと言えど、メモリを圧迫するような大量件数が対象である場合は、都度取得したほうがよいかを検討すること。

一括取得するListenerの実装
@Component
// (1)
public class BranchMasterReadStepListener implements StepExecutionListener {

    @Inject
    BranchRepository branchRepository;

    @Override
    public void beforeStep(StepExecution stepExecution) {   // (2)

        List<Branch> branches = branchRepository.findAll(); //(3)

        Map<String, Branch> map = branches.stream()
                .collect(Collectors.toMap(Branch::getBranchId,
                        UnaryOperator.identity()));  // (4)

        stepExecution.getExecutionContext().put("branches", map); // (5)
    }
}
@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader<Customer> reader,
                   ItemWriter<Customer> writer,
                   RetrieveBranchFromContextItemProcessor processor,
                   BranchMasterReadStepListener listener) {
    return new StepBuilder("outputAllCustomerList01.step01",
            jobRepository)
            .<Customer, Customer> chunk(10, transactionManager)
            .reader(reader)
            .processor(processor)
            .listener(listener) // (6)
            .writer(writer)
            .build();
}

@Bean
public Job outputAllCustomerList01(JobRepository jobRepository,
                                        Step step01) {
    return new JobBuilder("outputAllCustomerList01",jobRepository)
            .start(step01)
            .build();
}
<batch:job id="outputAllCustomerList01" job-repository="jobRepository">
    <batch:step id="outputAllCustomerList01.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="retrieveBranchFromContextItemProcessor"
                         writer="writer" commit-interval="10"/>
            <batch:listeners>
                <batch:listener ref="branchMasterReadStepListener"/> <!-- (6) -->
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
</batch:job>
一括取得したデータを後続ステップのItemProcessorで参照する例
@Component
public class RetrieveBranchFromContextItemProcessor implements
        ItemProcessor<Customer, CustomerWithBranch> {

    private Map<String, Branch> branches;

    @BeforeStep       // (7)
    @SuppressWarnings("unchecked")
    public void beforeStep(StepExecution stepExecution) {
        branches = (Map<String, Branch>) stepExecution.getExecutionContext()
                .get("branches"); // (8)
    }

    @Override
    public CustomerWithBranch process(Customer item) throws Exception {
        CustomerWithBranch newItem = new CustomerWithBranch(item);
        newItem.setBranch(branches.get(item.getChargeBranchId()));    // (9)
        return newItem;
    }
}
項番 説明

(1)

StepExecutionListenerインタフェースを実装する。

(2)

ステップ実行前にデータを取得するため、beforeStepメソッドを実装する。

(3)

マスタデータを取得する処理を実装する。

(4)

後続処理が利用しやすいようにList型からMap型へ変換を行う。

(5)

ステップのコンテキストに取得したマスタデータをbranchesという名前で設定する。

(6)

対象となるジョブへ作成したListenerを登録する。

(7)

ItemProcessorのステップ実行前にマスタデータを取得するため、@BeforeStepアノテーションでListener設定を行う。

(8)

@BeforeStepアノテーションが付与されたメソッド内で、ステップのコンテキストから(5)で設定されたマスタデータを取得する。

(9)

ItemProcessorのprocessメソッド内で、マスタデータからデータ取得を行う。

コンテキストへ格納するオブジェクト

コンテキスト(ExecutionContext)へ格納するオブジェクトは、java.io.Serializableを実装したクラスでなければならない。 これは、ExecutionContextJobRepositoryへ格納されるためである。

処理対象レコードに応じて都度取得する場合

業務処理のItemProcessorとは別に、都度取得専用のItemProcessorにて取得する。 これにより、各ItemProcessorの処理を簡素化する。

  1. 都度取得用のItemProcessorを定義し、業務処理と分離する。

    • この際、テーブルアクセス時はMyBatisをそのまま使う。

  2. 複数のItemProcessorをCompositeItemProcessorを使用して連結する。

    • ItemProcessorはdelegates属性に指定した順番に処理されることに留意する。

都度取得用のItemProcessorの実装例
@Component
public class RetrieveBranchFromRepositoryItemProcessor implements
        ItemProcessor<Customer, CustomerWithBranch> {

    @Inject
    BranchRepository branchRepository;  // (1)

    @Override
    public CustomerWithBranch process(Customer item) throws Exception {
        CustomerWithBranch newItem = new CustomerWithBranch(item);
        newItem.setBranch(branchRepository.findOne(
                item.getChargeBranchId())); // (2)
        return newItem; // (3)
    }
}
@Bean
public CompositeItemProcessor<Customer, CustomerWithBranch> compositeItemProcessor(
        RetrieveBranchFromRepositoryItemProcessor retrieveBranchFromRepositoryItemProcessor,
        BusinessLogicItemProcessor businessLogicItemProcessor) {
    return new CompositeItemProcessorBuilder<Customer, CustomerWithBranch>()
            .delegates(retrieveBranchFromRepositoryItemProcessor, businessLogicItemProcessor) // (4), (5)
            .build();
}
<bean id="compositeItemProcessor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <ref bean="retrieveBranchFromRepositoryItemProcessor"/> <!-- (4) -->
            <ref bean="businessLogicItemProcessor"/>  <!-- (5) -->
        </list>
    </property>
</bean>
項番 説明

(1)

MyBatisを利用した都度データ取得用のRepositoryをInjectする。

(2)

入力データ(処理対象レコード)に対して、Repositoryから付随データを取得する。

(3)

処理対象レコードと付随データを一緒にしたデータを返却する。
このデータが次のItemProcessorへの入力データになることに注意する。

(4)

都度取得用のItemProcessorを設定する。

(5)

ビジネスロジックのItemProcessorを設定する。

複数データソースへの出力(複数ステップ)

データソースごとにステップを分割し、各ステップで単一データソースを処理することで、ジョブ全体で複数データソースを処理する。

  • 1ステップ目で加工したデータをテーブルに格納し、2ステップ目でファイルに出力する、といった要領となる。

  • 各ステップがシンプルになりリカバリしやすい反面、2度手間になる可能性がある。

    • この結果、以下のような弊害を生む場合は、1ステップで複数データソースを処理することを検討する。

      • 処理時間が伸びてしまう

      • ビジネスロジックが冗長となる

複数データソースへの出力(1ステップ)

一般的に、複数のデータソースに対するトランザクションを1つにまとめる場合は、2phase-commitによる分散トランザクションを利用する。 しかし、以下の様なデメリットがあることも同時に知られている。

  • XAResourceなど分散トランザクションAPIにミドルウエアが対応している必要があり、それにもとづいた特殊な設定が必要になる

  • バッチプログラムのようなスタンドアロンJavaで、分散トランザクションのJTA実装ライブラリを追加する必要がある

  • 障害時のリカバリが難しい

Spring Batchでも分散トランザクションを活用することは可能だが、JTAによるグローバルトランザクションを使用する方法では、プロトコルの特性上、性能面のオーバーヘッドがかかる。 より簡易に複数データソースをまとめて処理する方法として、Best Efforts 1PCパターンによる実現手段を推奨する。

Best Efforts 1PCパターンとは

端的に言うと、複数データソースをローカルトランザクションで扱い、同じタイミングで逐次コミットを発行するという手法を指す。 下図に概念図を示す。

Best Efforts 1PC Overview
図 9. Best Efforts 1PCパターンの概念図
図の説明
1.

ユーザがChainedTransactionManagerにトランザクション開始を指示する。

2~7.

ChainedTransactionManagerは、登録されているトランザクションマネージャの逐次トランザクションを開始する。

8~10.

ユーザは各リソースへトランザクショナルな操作を行う。

11.

ユーザがChainedTransactionManagerにコミットを指示する。

12~17.

ChainedTransactionManagerは、登録されているトランザクションマネージャの逐次コミットを発行する。なお、コミット(またはロールバック)の発行はトランザクション開始と逆順になる。

この方法は分散トランザクションではないため、2番目以降のトランザクションマネージャにおけるcommit/rollback時に障害(例外)が発生した場合に、 データの整合性が保てない可能性がある。 そのため、トランザクション境界で障害が発生した場合のリカバリ方法を設計する必要があるが、リカバリ頻度を低減し、リカバリ手順を簡潔にできる効果がある。

複数のトランザクショナルリソースを同時に処理する場合

複数のデータベースを同時に処理する場合や、データベースとMQを処理する場合などに活用する。

以下のように、ChainedTransactionManagerを使用して複数トランザクションマネージャを1つにまとめて定義することで1phase-commitとして処理する。 なお、ChainedTransactionManagerはSpring Dataが提供するクラスである。

pom.xml
<dependencies>
    <!-- omitted -->
    <!-- (1) -->
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-commons</artifactId>
    </dependency>
<dependencies>
// (2)
@Bean
public ChainedTransactionManager chainedTransactionManager(
        @Qualifier("transactionManager1") PlatformTransactionManager transactionManager1,
        @Qualifier("transactionManager2") PlatformTransactionManager transactionManager2) {
    return new ChainedTransactionManager(transactionManager1, transactionManager2); // (3)
}

@Bean
public Step step01(JobRepository jobRepository,
                @Qualifier("chainedTransactionManager") PlatformTransactionManager transactionManager, // (4)
                ItemReader<SalesPlanDetail> reader,
                ItemWriter<SalesPlanDetail> writer) {
    return new StepBuilder("jobSalesPlan01.step01",
            jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
            .reader(reader)
            .writer(writer)
            .build();
}

@Bean
public Job jobSalesPlan01(JobRepository jobRepository,
                                        Step step01) {
    return new JobBuilder("jobSalesPlan01", jobRepository)
            .start(step01)
            .build();
}
<!-- Chained Transaction Manager -->
<!-- (2) -->
<bean id="chainedTransactionManager"
      class="org.springframework.data.transaction.ChainedTransactionManager">
   <constructor-arg>
        <!-- (3) -->
        <list>
            <ref bean="transactionManager1"/>
            <ref bean="transactionManager2"/>
        </list>
    </constructor-arg>
</bean>

<batch:job id="jobSalesPlan01" job-repository="jobRepository">
    <batch:step id="jobSalesPlan01.step01">
        <!-- (4) -->
        <batch:tasklet transaction-manager="chainedTransactionManager">
            <!-- omitted -->
        </batch:tasklet>
    </batch:step>
</batch:job>
項番 説明

(1)

ChainedTransactionManagerを利用するために、依存関係を追加する。

(2)

ChainedTransactionManagerのBean定義を行う。
尚、 spring-data-commons 2.5.0でChainedTransactionManagerは非推奨であり、予告なく削除される場合があるため注意すること。
spring-data-commons/ChainedTransactionManager

(3)

まとめたい複数のトランザクションマネージャをリストで定義する。

(4)

ジョブが利用するトランザクションマネージャに(2)で定義したBeanIDを指定する。

トランザクショナルリソースと非トランザクショナルリソースを同時に処理する場合

この方法は、データベースとファイルを同時に処理する場合に活用する。

データベースについては単一データソースの場合と同様。

ファイルについてはFlatFileItemWriterのtransactionalプロパティをtrueに設定することで、前述の「Best Efforts 1PCパターン」と同様の効果となる。
詳細は非トランザクショナルなデータソースに対する補足を参照。

この設定は、データベースのトランザクションをコミットする直前までファイルへの書き込みを遅延させるため、2つのデータソースで同期がとりやすくなる。 ただし、この場合でもデータベースへのコミット後、ファイル出力処理中に異常が発生した場合はデータの整合性が保てない可能性があるため、 リカバリ方法を設計する必要がある。

中間方式コミットでの注意点

非推奨ではあるがItemWriterで処理データをスキップする場合は、チャンクサイズの設定値が強制変更される。 そのことがトランザクションに非常に大きく影響することに注意する。詳細は、 スキップを参照。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28