Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

一般的に、バッチウィンドウ(バッチ処理のために使用できる時間)がシビアなバッチシステムでは、 複数ジョブを並列に動作させる(以降、並列処理と呼ぶ)ことで全体の処理時間を可能な限り短くするように設計する。
しかし、1ジョブの処理データが大量であるために処理時間がバッチウィンドウに収まらない場合がある。
その際は、1ジョブの処理データを分割して多重走行させる(以降、多重処理と呼ぶ)ことで処理時間を短縮させる手法が用いられる。
この、並列処理と多重処理は同じような意味合いで扱われることもあるが、ここでは以下の定義とする。

並列処理

複数の異なるジョブを、同時に実行する。

Parallel Step
図 1. 並列処理の概略図
多重処理

1ジョブの処理対象を分割して、同時に実行する。

Partition Step
図 2. 多重処理の概略図

並列処理と多重処理ともにジョブスケジューラで行う方法とMacchinetta Batch 2.xで行う方法がある。
なお、Macchinetta Batch 2.xでの並列処理および多重処理は フロー制御の上に成り立っている。

表 1. 並列処理および多重処理の実現方法
実現方法 並列処理 多重処理

ジョブスケジューラ

依存関係がない複数の異なるジョブを同時に実行するように定義する。

複数の同じジョブを異なるデータ範囲で実行するように定義する。各ジョブに処理対象のデータを絞るための情報を引数などで渡す。
たとえば、1年間のデータを月ごとに分割する、エリアや支店などの単位で分割する、など

Macchinetta Batch 2.x

Parallel Step (並列処理)
ステップ単位で並列処理を行う。
各ステップは同じ処理である必要はなく、データベースとファイルというような種類が異なるリソースに対して並列で処理を行う事も可能である。

Partitioning Step (多重処理)
Managerステップでは対象データを分割するためのキーを取得し、 Workerステップではこのキーにもとづいて分割したデータを処理する。
Parallel Stepとは異なりWorkerステップの処理は同一処理となる。

ジョブスケジューラを使用する場合

1ジョブに1プロセスが割り当てられるため複数プロセスで起動される。 そのため、1つのジョブを設計・実装する難易度は低い。
しかし、複数プロセスで起動するため、同時実行数が増えるとマシンリソースへの負荷が高くなる。
よって、同時実行数が3、4程度であれば、ジョブスケジューラを利用するとよい。
もちろん、この数値は絶対的なものではない。実行環境やジョブの実装に依存するため目安としてほしい。

Macchinetta Batch 2.xを使用する場合

各ステップがスレッドに割り当てられるため、1プロセス複数スレッドで動作する。そのため、1つのジョブへの設計・実装の難易度はジョブスケジューラを使用する場合より高くなる。
しかし、複数スレッドで起動するため、同時実行数が増えてもマシンリソースへの負荷がジョブスケジューラを使用する場合ほど高くはならない。 よって、同時実行数が多い(5以上の)場合であれば、Macchinetta Batch 2.xを利用するのがよい。
もちろん、この数値は絶対的なものではない。実行環境やシステム特性に依存するため目安としてほしい。

Spring Batchで実行可能な並列処理方法の1つにMulti Thread Stepがあるが、以下の理由によりMacchinetta Batch 2.xでの利用は非推奨とする。

Multi Thread Stepとは

チャンク単位で複数スレッドで並列処理を行う方法。

非推奨理由

Spring Batchが提供しているReaderやWriterのほとんどが、マルチスレッドでの利用を想定して設計されていない。 そのため、データロストや重複処理が発生する可能性があり、処理の信頼性が低い。また、複数スレッドで動作するため、一定の処理順序とならない。
ItemReader/ItemProcessor/ItemWriterを自作する場合でもスレッドセーフなどMulti Thread Stepを使うためには考慮すべき点が多く実装および運用の難易度が高くなる。 これらの理由により、Multi Thread Stepは非推奨としている。
代わりにPartitioning Step (多重処理)を利用することを推奨する。

並列処理・多重処理で1つのデータベースに対して更新する場合は、リソース競合とデッドロックが発生する可能性がある。 ジョブ設計の段階から潜在的な競合発生を排除すること。

マルチプロセスや複数筐体への分散処理は、Spring Batchに機能があるが、Macchinetta Batch 2.xとしては障害設計が困難になるため扱わないこととする。

本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる

ジョブスケジューラによる並列処理と多重処理

ここでは、ジョブスケジューラによる並列処理と多重処理の概念について説明を行う。

ジョブ登録、スケジュール設定などについては、使用するジョブスケジューラのマニュアルを参照。

ジョブスケジューラによるジョブの並列化

並列実行させたい処理をそれぞれジョブとして登録、それぞれのジョブが同時に開始するようにスケジュールを設定する。 各々のジョブは異なる処理として登録してよい。

ジョブスケジューラによるジョブの多重化

多重実行させたい処理を複数登録し、パラメータにより対象データの抽出範囲を指定する。 その上で、それぞれのジョブが同時に開始するようにスケジュールを設定する。 各々のジョブは同じ処理ではあるが、処理対象データ範囲は独立していることが必要となる。

How to use

Macchinetta Batch 2.xでの並列処理および多重処理を行う方法を説明する。

Parallel Step (並列処理)

Parallel Step (並列処理)の方法を説明する。

Parallel Step
図 3. Parallel Stepの概要図
概要図の説明

各ステップに別々な処理を定義することができ、それらを並列に実行することができる。 各ステップごとにスレッドが割り当てられる。

Parallel Stepの概要図を例にしたParallel Stepの定義方法を以下に示す。

// (1)
@Bean
public TaskExecutor parallelTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setQueueCapacity(200);
    return executor;
}

// (4)
@Bean
public Step stepChunkDb(JobRepository jobRepository,
                  ItemReader fileReader,
                  ItemWriter databaseWriter,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelStepJob.step.chunk.db", jobRepository)
            .chunk(100, transactionManager)
            .reader(fileReader)
            .writer(databaseWriter)
            .build();
}

// (5)
@Bean
public Step stepTaskletChunk(JobRepository jobRepository,
                  Tasklet chunkTransactionTasklet,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelStepJob.step.tasklet.chunk", jobRepository)
            .tasklet(chunkTransactionTasklet, transactionManager)
            .build();
}

// (6)
@Bean
public Step stepTaskletSingle(JobRepository jobRepository,
                  Tasklet singleTransactionTasklet,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelStepJob.step.tasklet.single", jobRepository)
            .tasklet(singleTransactionTasklet, transactionManager)
            .build();
}

// (7)
@Bean
public Step stepChunkDb(JobRepository jobRepository,
                  ItemReader databaseReader,
                  ItemWriter fileWriter,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelStepJob.step.chunk.file", jobRepository)
            .chunk(200, transactionManager)
            .reader(databaseReader)
            .writer(fileWriter)
            .build();
}

// (3)
@Bean
public Flow flow1(Step stepChunkDb) {
    return new FlowBuilder<SimpleFlow>("flow1")
            .start(stepChunkDb)
            .build();
}

// (3)
@Bean
public Flow flow2(Step stepTaskletChunk) {
    return new FlowBuilder<SimpleFlow>("flow2")
            .start(stepTaskletChunk)
            .build();
}

// (3)
@Bean
public Flow flow3(Step stepTaskletSingle) {
    return new FlowBuilder<SimpleFlow>("flow3")
            .start(stepTaskletSingle)
            .build();
}

// (3)
@Bean
public Flow flow4(Step stepChunkDb) {
    return new FlowBuilder<SimpleFlow>("flow4")
            .start(stepChunkDb)
            .build();
}

// (2)
@Bean
public Flow splitFlow(
        @Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
        Flow flow1, Flow flow2, Flow flow3, Flow flow4) {
    return new FlowBuilder<SimpleFlow>("parallelStepJob.split")
            .split(taskExecutor)
            .add(flow1, flow2, flow3, flow4)
            .build();
}

@Bean
public Job parallelSummarizeJob(JobRepository jobRepository,
                                Flow splitFlow) {
    return new JobBuilder("parallelStepJob", jobRepository)
            .start(splitFlow)
            .end()
            .build();
}
<!-- Task Executor -->
<!-- (1) -->
<task:executor id="parallelTaskExecutor" pool-size="10" queue-capacity="200"/>

<!-- Job Definition -->
<!-- (2) -->
<batch:job id="parallelStepJob" job-repository="jobRepository">
  <batch:split id="parallelStepJob.split" task-executor="parallelTaskExecutor">
      <batch:flow>  <!-- (3)  -->
          <batch:step id="parallelStepJob.step.chunk.db">
               <!-- (4) -->
              <batch:tasklet transaction-manager="jobTransactionManager">
                  <batch:chunk reader="fileReader" writer="databaseWriter"
                          commit-interval="100"/>
              </batch:tasklet>
          </batch:step>
      </batch:flow>

      <batch:flow>  <!-- (3)  -->
          <batch:step id="parallelStepJob.step.tasklet.chunk">
               <!-- (5) -->
              <batch:tasklet transaction-manager="jobTransactionManager"
                             ref="chunkTransactionTasklet"/>
          </batch:step>
      </batch:flow>

      <batch:flow>  <!-- (3)  -->
          <batch:step id="parallelStepJob.step.tasklet.single">
               <!-- (6) -->
              <batch:tasklet transaction-manager="jobTransactionManager"
                             ref="singleTransactionTasklet"/>
          </batch:step>
      </batch:flow>

      <batch:flow>  <!-- (3)  -->
          <batch:step id="parallelStepJob.step.chunk.file">
              <batch:tasklet transaction-manager="jobTransactionManager">
                   <!-- (7) -->
                  <batch:chunk reader="databaseReader" writer="fileWriter"
                          commit-interval="200"/>
              </batch:tasklet>
          </batch:step>
      </batch:flow>

  </batch:split>
</batch:job>
表 2. 説明
項番 説明

(1)

並列処理のために、各スレッドに割り当てるためのスレッドプールを定義する。

(2)

FlowBuilderのaddメソッドの引数flows/<batch:split>要素内に並列実行するステップをBean定義したFlowクラス/<batch:flow>要素を使用した形式で定義をする。
splitメソッドの引数executor/task-executor属性に(1)で定義したスレッドプールのBeanを設定する

(3)

Bean定義したFlow/<batch:flow>ごとに並列位処理したいBean定義したStep/<batch:step>を定義する。

(4)

概要図のStep1:チャンクモデルの中間コミット方式処理を定義する。

(5)

概要図のStep2:タスクレットモデルの中間コミット方式処理を定義する。

(6)

概要図のStep3:タスクレットモデルの一括コミット方式処理を定義する。

(7)

概要図のStep4:チャンクモデルの非トランザクショナルなリソースに対する中間コミット方式処理を定義する。

並列処理したために処理性能が低下するケース

並列処理では多重処理同様にデータ範囲を変えて同じ処理を並列走行させることが可能である。この場合、データ範囲はパラメータなどで与える。
この際に、個々の処理ごとに対象となるデータ量が小さい場合、 稼働時に占有するリソース量や処理時間などのフットプリントが並列処理では不利に働き、 かえって処理性能が低下することがある。

フットプリントの例
  • 入力リソースに対するオープンから最初のデータ範囲を取得するまでの処理

    • リソースオープンは、データ取得に比べて処理時間がかかる

    • データ範囲のメモリ領域を初期化する処理も同様に時間がかかる

また、Parallel Stepの前後に共通処理のステップを定義することも可能である。

@Bean
public Step stepPreprocess(JobRepository jobRepository,
                        StepExecutionLoggingListener listener,
                        DeleteDetailTasklet deleteDetailTasklet,
                        @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelRegisterJob.step.preprocess", jobRepository)
            .tasklet(deleteDetailTasklet, transactionManager)
            .listener(listener)
            .build();
}

@Bean
public Step stepPlan(JobRepository jobRepository,
                  StepExecutionLoggingListener listener,
                  @Qualifier("planReader") ItemReader<SalesPlanDetail> reader,
                  @Qualifier("planWriter") ItemWriter<SalesPlanDetail> writer,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelRegisterJob.step.plan", jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(1,
                    transactionManager)
            .listener(listener)
            .reader(reader)
            .writer(writer)
            .build();
}

@Bean
public Step stepPerformance(JobRepository jobRepository,
                  StepExecutionLoggingListener listener,
                  @Qualifier("performanceReader") ItemReader<SalesPerformanceDetail> reader,
                  @Qualifier("performanceWriter") ItemWriter<SalesPerformanceDetail> writer,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("parallelRegisterJob.step.performance",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(1,
                    transactionManager)
            .listener(listener)
            .reader(reader)
            .writer(writer)
            .build();
}

@Bean
public Flow flow1(Step stepPlan) {
    return new FlowBuilder<SimpleFlow>("flow1")
            .start(stepPlan)
            .build();
}

@Bean
public Flow flow2(Step stepPerformance) {
    return new FlowBuilder<SimpleFlow>("flow2")
            .start(stepPerformance)
            .build();
}

// (2)
@Bean
public Flow splitFlow(
        @Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
        Flow flow1, Flow flow2) {
    return new FlowBuilder<SimpleFlow>("parallelRegisterJob.split")
            .split(taskExecutor) // (1)
            .add(flow1, flow2)
            .build();
}

@Bean
public Job parallelRegisterJob(JobRepository jobRepository,
                                Flow splitFlow,
                                Step stepPreprocess) {
    return new JobBuilder("parallelRegisterJob", jobRepository)
            .start(stepPreprocess)
            .on("*")
            .to(splitFlow)
            .end()
            .build();
}
<batch:job id="parallelRegisterJob" job-repository="jobRepository">
    <!-- (1) -->
    <batch:step id="parallelRegisterJob.step.preprocess"
                next="parallelRegisterJob.split">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="deleteDetailTasklet" />
    </batch:step>

    <!--(2) -->
    <batch:split id="parallelRegisterJob.split" task-executor="parallelTaskExecutor">
        <batch:flow>
            <batch:step id="parallelRegisterJob.step.plan">
                <batch:tasklet transaction-manager="jobTransactionManager">
                    <batch:chunk reader="planReader" writer="planWriter"
                            commit-interval="1000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
        <batch:flow>
            <batch:step id="parallelRegisterJob.step.performance">
                <batch:tasklet transaction-manager="jobTransactionManager">
                    <batch:chunk reader="performanceReader" writer="performanceWriter"
                            commit-interval="1000" />
                </batch:tasklet>
            </batch:step>
        </batch:flow>
    </batch:split>
</batch:job>
表 3. 説明
項番 説明

(1)

前処理として処理するステップを定義する。FlowBuilderクラスのsplitメソッドの引数executor/next属性Bean定義したTaskExecutorクラス/<batch:split>に設定したidを指定する。
addメソッド/next属性による後続ステップの指定に関する詳細は"シーケンシャルフロー"を参照。

(2)

Parallel Stepを定義する。
Bean定義したFlow/<batch:flow>ごとに並列処理したいBean定義したStep/<batch:step>を定義する。

Partitioning Step (多重処理)

Partitioning Step(多重処理)の方法を説明する

Partitioning Step
図 4. Partitioning Stepの概要図
概要図の説明

Partitioning Stepでは、ManagerステップとWorkerステップの処理フェーズに分割される。

  1. Managerステップでは、Partitionerにより各Workerステップが処理するデータ範囲を特定するためのParition Keyを生成する。 Parition Keyはステップコンテキストに格納される。

  2. Workerステップでは、ステップコンテキストから自身に割り当てられたParition Keyを取得し、それを使い処理対象データを特定する。 特定した処理対象データに対して定義したステップの処理を実行する。

Partitioning Stepでは処理データを分割必要があるが、分割数については可変数と固定数のどちらにも対応できる。

分割数
可変数の場合

部門別で分割や、特定のディレクトリに存在するファイル単位での処理

固定数の場合

全データを個定数で分割してデータを処理

Spring Batchでは、固定数のことをgrid-sizeといい、Partitionergrid-sizeになるようにデータ分割範囲を決定する。

Partitioning Stepでは、分割数をスレッドサイズより大きくすることができる。 この場合、スレッド数分で多重実行され、スレッドに空きが出るまで、処理が未実行のまま保留となるステップが発生する。

以下にPartitioning Stepのユースケースを示す。

表 4. Partitioning Stepのユースケース
ユースケース Manager(Patitioner) Worker 分割数

マスタ情報からトランザクション情報を分割・多重化するケース
部門別や月別の集計処理など

DB(マスタ情報)

DB(トランザクション情報)

可変

複数ファイルのリストから1ファイル単位に多重化するケース
各支店からの転送データを支店別に多重処理(支店別集計処理など)

複数ファイル

単一ファイル

可変

大量データを一定数で分割・多重化するケース

障害発生時にリラン以外のリカバリ設計が難しくなるため、実運用では利用されることはあまりないケース。
リランする場合は、全件やり直しなので分割したメリットが薄れてしまう。

grid-sizeとトランザクション情報件数からデータ範囲を特定

DB(トランザクション情報)

固定

分割数が可変の場合

Partitioning Stepで分割数を可変とする方法を説明する。
下記に処理イメージ図を示す。

Variable Partiton Number
図 5. 処理イメージ図

処理イメージを例とした実装方法を示す。

Repository(SQLMapper)の定義 (PostgreSQL)
<!-- (1) -->
<select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.mst.Branch">
    <![CDATA[
    SELECT
        branch_id AS branchId,
        branch_name AS branchName,
        branch_address AS branchAddrss,
        branch_tel AS branchTel,
        create_date AS createDate,
        update_date AS updateDate
    FROM
        branch_mst
    ]]>
</select>

<!-- (2) -->
<select id="summarizeInvoice"
        resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail">
    <![CDATA[
    SELECT
        branchId, year, month, customerId, SUM(amount) AS amount
    FROM (
        SELECT
            t2.charge_branch_id AS branchId,
            date_part('year', t1.invoice_date) AS year,
            date_part('month', t1.invoice_date) AS month,
            t1.customer_id AS customerId,
            t1.invoice_amount AS amount
        FROM invoice t1
        INNER JOIN customer_mst t2 ON t1.customer_id = t2.customer_id
        WHERE
            t2.charge_branch_id = #{branchId}
        ) t3
    GROUP BY branchId, year, month, customerId
    ORDER BY branchId ASC, year ASC, month ASC, customerId ASC
    ]]>
</select>

<!-- omitted -->
Partitionerの実装例
@Component
public class BranchPartitioner implements Partitioner {

    @Inject
    BranchRepository branchRepository; // (3)

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        Map<String, ExecutionContext> map = new HashMap<>();
        List<Branch> branches = branchRepository.findAll();

        int index = 0;
        for (Branch branch : branches) {
            ExecutionContext context = new ExecutionContext();
            context.putString("branchId", branch.getBranchId()); // (4)
            map.put("partition" + index, context);  // (5)
            index++;
        }

        return map;
    }
}
// (6)
@Bean
public TaskExecutor parallelTaskExecutor(
        @Value("${thread.size}") int threadSize) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threadSize);
    executor.setQueueCapacity(10);
    return executor;
}

// (7)
@Bean
@StepScope
public MyBatisCursorItemReader<SalesPerformanceDetail> reader(
        @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
        @Value("#{stepExecutionContext['branchId']}") String branchId) { // (8)
    final Map<String, Object> parameterValues = new LinkedHashMap<>();
    parameterValues.put("branchId", branchId);
    return new MyBatisCursorItemReaderBuilder<SalesPerformanceDetail>()
            .sqlSessionFactory(jobSqlSessionFactory)
            .queryId(
                    "org.terasoluna.batch.functionaltest.app.repository.performance.InvoiceRepository.summarizeInvoice")
            .parameterValues(parameterValues)
            .build();
}

// omitted

// (12)
@Bean
public Step step1(JobRepository jobRepository,
                  StepExecutionLoggingListener listener,
                  @Qualifier("reader") ItemReader<SalesPerformanceDetail> reader,
                  @Qualifier("writer") ItemWriter<SalesPerformanceDetail> writer,
                  @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("multipleInvoiceSummarizeJob.worker",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(1,
                    transactionManager)
            .listener(listener)
            .reader(reader)
            .writer(writer)
            .build();
}

@Bean
public PartitionHandler partitionHandler(
        @Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
        @Value("${grid.size}") int gridSize, Step step1) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setTaskExecutor(taskExecutor);
    handler.setStep(step1);
    handler.setGridSize(gridSize); // (11)
    return handler;
}

// (9)
@Bean
public Step step1Manager(JobRepository jobRepository,
                         BranchPartitioner partitioner,
                         PartitionHandler partitionHandlerandler) {
    return new StepBuilder("multipleInvoiceSummarizeJob.manager",
            jobRepository)
            .partitioner("multipleInvoiceSummarizeJob.worker", partitioner) // (10)
            .partitionHandler(partitionHandlerandler)
            .build();
}

@Bean
public Job multipleInvoiceSummarizeJob(JobRepository jobRepository,
                                       JobExecutionLoggingListener listener,
                                       Step step1Manager) {
    return new JobBuilder("multipleInvoiceSummarizeJob", jobRepository)
            .start(step1Manager)
            .listener(listener)
            .build();
}
<!-- (6) -->
<task:executor id="parallelTaskExecutor"
               pool-size="${thread.size}" queue-capacity="10"/>

<!-- (7) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.InvoiceRepository.summarizeInvoice"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues">
        <map>
            <!-- (8) -->
            <entry key="branchId" value="#{stepExecutionContext['branchId']}" />
        </map>
    </property>
</bean>

<!-- omitted -->

<batch:job id="multipleInvoiceSummarizeJob" job-repository="jobRepository">
    <!-- (9) -->
    <batch:step id="multipleInvoiceSummarizeJob.manager">
        <!-- (10) -->
        <batch:partition partitioner="branchPartitioner"
                         step="multipleInvoiceSummarizeJob.worker">
            <!-- (11) -->
            <batch:handler grid-size="0" task-executor="parallelTaskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

<!-- (12) -->
<batch:step id="multipleInvoiceSummarizeJob.worker">
    <batch:tasklet transaction-manager="jobTransactionManager">
        <batch:chunk reader="reader" writer="writer" commit-interval="10"/>
    </batch:tasklet>
</batch:step>
表 5. 説明
項番 説明

(1)

マスタデータから処理対象を取得するSQLを定義する。

(2)

マスタデータからの取得値を検索条件とするSQLを定義する。

(3)

定義したRepository(SQLMapper)をInjectする。

(4)

1つのWorkerステップが処理するマスタ値をステップコンテキストに格納する。

(5)

各Workerが該当するコンテキストを取得できるようMapに格納する。

(6)

多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。
Managerステップはメインスレッドで処理される。

(7)

マスタ値によるデータ取得のItemReaderを定義する。

(8)

(4)で設定したマスタ値をステップコンテキストから取得し、検索条件に追加する。

(9)

Managerステップを定義する。

(10)

データの分割条件を生成する処理を定義する。
StepBuilderのpartitionerメソッドの引数partitioner/partitioner属性には、Partitionerインタフェース実装を設定する。
partitionerメソッドの引数stepName/step属性には、(12)で定義するWorkerステップのBeanIDを設定する。

(11)

partitionerではgrid-sizeを使用しないため、TaskExecutorPartitionHandlerのsetGridSizeメソッドの引数gridSize/grid-size属性には任意の値を設定する。 setTaskExecutorメソッドの引数taskExecutor/task-executor属性に(6)で定義したスレッドプールのBeanIDを設定する。

(12)

Workerステップを定義する。
StepBuilderのreaderメソッド/reader属性に(7)で定義したItemReaderを設定する。

複数ファイルのリストから1ファイル単位に多重化する場合は、Spring Batchが提供している以下のPartitionerを利用することができる。

  • org.springframework.batch.core.partition.support.MultiResourcePartitioner

MultiResourcePartitionerの利用例を以下に示す。

// (1)
@Bean
public TaskExecutor parallelTaskExecutor(
        @Value("${thread.size}") int threadSize) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threadSize);
    executor.setQueueCapacity(200);
    return executor;
}

// (2)
@Bean
@StepScope
public FlatFileItemReader<SalesPlanDetail> reader(
        @Value("#{stepExecutionContext['fileName']}") File fileName) {
    final DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames("branchId", "year", "month", "customerId", "amount");
    final BeanWrapperFieldSetMapper<SalesPlanDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
    fieldSetMapper.setTargetType(SalesPlanDetail.class);
    final DefaultLineMapper<SalesPlanDetail> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return new FlatFileItemReaderBuilder<SalesPlanDetail>()
            .name(ClassUtils.getShortName(FlatFileItemReader.class))
            .resource(new FileSystemResource(fileName)) // (3)
            .lineMapper(lineMapper)
            .build();
}

// (4)
@Bean
@StepScope
public MultiResourcePartitioner partitioner(
        @Value("#{jobParameters['inputdir']}") File inputdir) throws Exception { // (5)
    MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
    ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
    Resource[] resources = resolver.getResources(
            new FileSystemResource(inputdir).getURL() + File.separator + "salesPlanDetail_*.csv");
    partitioner.setResources(resources);
    return partitioner;
}

// (7)
@Bean
public Step stepWorker(JobRepository jobRepository,
                        ItemReader<SalesPlanDetail> reader,
                        LoggingItemWriter writer,
                        @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("jobEvaluationReport.worker", jobRepository)
            .<SalesPlanDetail, SalesPlanDetail> chunk(20,
                    transactionManager)
            .reader(reader)
            .writer(writer)
            .build();
}

@Bean
public PartitionHandler partitionHandler(
        TaskExecutor parallelTaskExecutor,
        Step stepWorker) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setTaskExecutor(parallelTaskExecutor);
    handler.setStep(stepWorker);
    handler.setGridSize(0);
    return handler;
}

// (6)
@Bean
public Step stepManager(JobRepository jobRepository,
                        MultiResourcePartitioner partitioner,
                        PartitionHandler partitionHandler) {
    return new StepBuilder("multiplePartitioninglStepFileJob.manager", jobRepository)
            .partitioner("multiplePartitioninglStepFileJob.worker", partitioner)
            .partitionHandler(partitionHandler)
            .build();
}

@Bean
public Job multiplePartitioninglStepFileJob(JobRepository jobRepository,
                                       Step stepManager) {
    return new JobBuilder("multiplePartitioninglStepFileJob", jobRepository)
            .start(stepManager)
            .build();
}
<!-- (1) -->
<task:executor id="parallelTaskExecutor" pool-size="10" queue-capacity="200"/>

<!-- (2) -->
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
      p:resource="#{stepExecutionContext['fileName']}"> <!-- (3) -->
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
                      p:names="branchId,year,month,customerId,amount"/>
            </property>
            <property name="fieldSetMapper">
                <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                      p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail"/>
            </property>
        </bean>
    </property>
</bean>

<!-- (4) -->
<bean id="partitioner"
      class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
      scope="step"
      p:resources="file:#{jobParameters['inputdir']}/salesPlanDetail_*.csv"/> <!-- (5) -->

<!--(6) -->
<batch:job id="multiplePartitioninglStepFileJob" job-repository="jobRepository">
    <batch:step id="multiplePartitioninglStepFileJob.step.manager">
        <batch:partition partitioner="partitioner"
                         step="multiplePartitioninglStepFileJob.step.worker">
            <batch:handler grid-size="0" task-executor="parallelTaskExecutor"/>
        </batch:partition>
    </batch:step>
</batch:job>

<!-- (7) -->
<batch:step id="multiplePartitioninglStepFileJob.step.worker">
    <batch:tasklet transaction-manager="jobTransactionManager">
        <batch:chunk reader="reader" writer="writer" commit-interval="20"/>
    </batch:tasklet>
</batch:step>
表 6. 説明
項番 説明

(1)

多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。
Managerステップはメインスレッドで処理される。

(2)

1つのファイルを読み込むためのItemReaderを定義する。

(3)

FlatFileItemReaderBuilderのresourceメソッドの引数resource/resouce属性に、MultiResourcePartitionerで分割されたファイルを入力ファイルに指定する。
MultiResourcePartitionerは、"fileName"というキーでステップコンテキストにファイルパスを格納している。

(4)

MultiResourcePartitionerpartitionerとして定義する。

(5)

*を用いたパターンを使用することで、複数ファイルを対象にすることができる。

(6)

Managerステップを定義する。
定義内容は上記で説明したPartitioning Stepの内容と同じ。

(7)

Workerステップを定義する。
StepBuilderのreaderメソッド/reader属性に(2)で定義したItemReaderを設定する。

分割数が固定の場合

Partitioning Stepで分割数を固定する方法を説明する。
下記に処理イメージ図を示す。

Fixing Partiton Number
図 6. 処理イメージ図

処理イメージを例とした実装方法を示す。

Repository(SQLMapper)の定義 (PostgreSQL)
<!-- (1) -->
<select id="findByYearAndMonth"
    resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
    <![CDATA[
    SELECT
        branch_id AS branchId, year, month, amount
    FROM
        sales_performance_summary
    WHERE
        year = #{year} AND month = #{month}
    ORDER BY
        branch_id ASC
    LIMIT
        #{dataSize}
    OFFSET
        #{offset}
    ]]>
</select>

<!-- (2) -->
<select id="countByYearAndMonth" resultType="_int">
    <![CDATA[
    SELECT
        count(*)
    FROM
        sales_performance_summary
    WHERE
        year = #{year} AND month = #{month}
    ]]>
</select>

<!-- omitted -->
Partitionerの実装例
@Component
public class SalesDataPartitioner implements Partitioner {

    @Inject
    SalesSummaryRepository repository;  // (3)

    // omitted.

    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {

        Map<String, ExecutionContext> map = new HashMap<>();
        int count = repository.countByYearAndMonth(year, month);
        int dataSize = (count / gridSize) + 1;        // (4)
        int offset = 0;

        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("dataSize", dataSize);     // (5)
            context.putInt("offset", offset);         // (6)
            offset += dataSize;
            map.put("partition:" + i, context);       // (7)
        }

        return map;
    }
}
// (8)
@Bean
public TaskExecutor parallelTaskExecutor(
        @Value("${thread.size}") int threadSize) {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threadSize);
    executor.setQueueCapacity(10);
    return executor;
}

// (9)
@Bean
@StepScope
public MyBatisCursorItemReader<SalesPerformanceSummary> reader(
        @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
        @Value("#{jobParameters['year']}") Integer year,
        @Value("#{jobParameters['month']}") Integer month,
        @Value("#{stepExecutionContext['dataSize']}") Integer dataSize, // (10)
        @Value("#{stepExecutionContext['offset']}") Integer offset) { // (11)
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("year", year);
    parameterValues.put("month", month);
    parameterValues.put("dataSize", dataSize);
    parameterValues.put("offset", offset);
    return new MyBatisCursorItemReaderBuilder<SalesPerformanceSummary>()
            .sqlSessionFactory(jobSqlSessionFactory)
            .queryId(
                    "org.terasoluna.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth")
            .parameterValues(parameterValues)
            .build();
}

// omitted

// (15)
@Bean
public Step stepWorker(JobRepository jobRepository,
                        StepExecutionLoggingListener listener,
                        ItemReader<SalesPerformanceSummary> reader,
                        ItemWriter<SalesPlanSummary> writer,
                        AddProfitsItemProcessor addProfitsItemProcessor,
                        @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
    return new StepBuilder("multipleCreateSalesPlanSummaryJob.worker", jobRepository)
            .<SalesPerformanceSummary, SalesPlanSummary> chunk(10,
                    transactionManager)
            .listener(listener)
            .reader(reader)
            .processor(addProfitsItemProcessor)
            .writer(writer)
            .build();
}

@Bean
public PartitionHandler partitionHandler(
        TaskExecutor parallelTaskExecutor,
        @Value("${grid.size}") int gridSize,
        Step stepWorker) {
    TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
    handler.setTaskExecutor(parallelTaskExecutor); // (14)
    handler.setStep(stepWorker);
    handler.setGridSize(gridSize); // (14)
    return handler;
}

// (12)
@Bean
public Step stepManager(JobRepository jobRepository,
                        SalesDataPartitioner salesDataPartitioner,
                        PartitionHandler partitionHandler) {
    return new StepBuilder("multipleCreateSalesPlanSummaryJob.manager", jobRepository)
            .partitioner("multipleCreateSalesPlanSummaryJob.worker", salesDataPartitioner) // (13)
            .partitionHandler(partitionHandler)
            .build();
}

@Bean
public Job multipleCreateSalesPlanSummaryJob(JobRepository jobRepository,
                                       JobExecutionLoggingListener listener,
                                       Step stepManager) {
    return new JobBuilder("multipleCreateSalesPlanSummaryJob", jobRepository)
            .start(stepManager)
            .listener(listener)
            .build();
}
<!-- (8) -->
<task:executor id="parallelTaskExecutor"
               pool-size="${thread.size}" queue-capacity="10"/>

<!-- (9) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues">
        <map>
            <entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
            <entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>
            <!-- (10) -->
            <entry key="dataSize" value="#{stepExecutionContext['dataSize']}" />
            <!-- (11) -->
            <entry key="offset" value="#{stepExecutionContext['offset']}" />
        </map>
    </property>
</bean>

<!-- omitted -->

<batch:job id="multipleCreateSalesPlanSummaryJob" job-repository="jobRepository">
    <!-- (12) -->
    <batch:step id="multipleCreateSalesPlanSummaryJob.manager">
        <!-- (13) -->
        <batch:partition partitioner="salesDataPartitioner"
              step="multipleCreateSalesPlanSummaryJob.worker">
            <!-- (14) -->
            <batch:handler grid-size="4" task-executor="parallelTaskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

<!-- (15) -->
<batch:step id="multipleCreateSalesPlanSummaryJob.worker">
    <batch:tasklet transaction-manager="jobTransactionManager">
        <batch:chunk reader="reader" processor="addProfitsItemProcessor"
              writer="writer" commit-interval="10"/>
    </batch:tasklet>
</batch:step>
表 7. 説明
項番 説明

(1)

特定のデータ範囲を取得するためにページネーション検索(SQL絞り込み方式)を定義する。
ページネーション検索(SQL絞り込み方式)の詳細は、Macchinetta Server 1.x 開発ガイドラインの Entityのページネーション検索(SQL絞り込み方式)を参照。

(2)

処理対象の全件数を取得するSQLを定義する。

(3)

定義したRepository(SQLMapper)をInjectする。

(4)

1つのWorkerステップが処理するデータ件数を算出する。

(5)

(4)のデータ件数をステップコンテキストに格納する。

(6)

各Workerステップの検索開始位置をステップコンテキストに格納する。

(7)

各Workerが該当するコンテキストを取得できるようMapに格納する。

(8)

多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。
Managerステップはメインスレッドで処理される。

(9)

ページネーション検索(SQL絞り込み方式)によるデータ取得のItemReaderを定義する。

(10)

(5)で設定したデータ件数をステップコンテキストから取得し、検索条件に追加する。

(11)

(6)で設定した検索開始位置をステップコンテキストから取得し、検索条件に追加する。

(12)

Managerステップを定義する。

(13)

データの分割条件を生成する処理を定義する。
StepBuilderのpartitionerメソッドの引数partitioner/partitioner属性には、Partitionerインタフェース実装を設定する。
partitionerメソッドの引数stepName/step属性には、(15)で定義するWorkerステップのBeanIDを設定する。

(14)

TaskExecutorPartitionHandlerのsetGridSizeメソッドの引数gridSize/grid-size属性に分割数(固定値)を設定する。
setTaskExecutorメソッドの引数taskExecutor/task-executor属性に(8)で定義したスレッドプールのBeanIDを設定する。

(15)

Workerステップを定義する。
StepBuilderのreaderメソッド/reader属性に(9)で定義したItemReaderを設定する。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28