Macchinetta Batch Framework (2.x) Development Guideline - version 2.3.0.RELEASE, 2021-3-26, commit-id:9b11e9d
> INDEX

Overview

タスクレットモデルジョブの作成方法について説明する。 タスクレットモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照。

構成要素

タスクレットモデルジョブでは、複数の構成要素は登場しない。 org.springframework.batch.core.step.tasklet.Taskletを実装し、Bean定義で設定するのみである。 また、発展的な実装手段としてチャンクモデルの構成要素であるItemReaderItemWriterをコンポーネントとして使うことも可能である。

How to use

ここでは、実際にタスクレットモデルジョブを実装する方法について、以下の順序で説明する。

ジョブの設定

Bean定義ファイルにて、タスクレットモデルジョブを定義する。 以下に例を示す。

Bean定義ファイルの例(タスクレットモデル)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd">

    <!-- (1) -->
    <import resource="classpath:META-INF/spring/job-base-context.xml"/>

    <!-- (2) -->
    <context:component-scan
          base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common"/>

    <!-- (3) -->
    <!-- Tasklet -->
    <!-- Tasklet in order that based on the Bean defined by the annotations, not defined here -->

    <!-- (4) -->
    <batch:job id="simpleJob" job-repository="jobRepository"> <!-- (5) -->
        <batch:step id="simpleJob.step01"> <!-- (6) -->
            <batch:tasklet transaction-manager="jobTransactionManager"
                           ref="simpleJobTasklet"/> <!-- (7) -->
        </batch:step>
    </batch:job>

</beans>
Tasklet実装クラスの例
package jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common;

@Component // (3)
public class SimpleJobTasklet implements Tasklet {
  // omitted.
}
項番 説明

(1)

Macchinetta Batch 2.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。

(2)

コンポーネントスキャン対象のベースパッケージを設定する。
タスクレットモデルはアノテーションによるBean定義を基本とし、Tasklet実装クラスのBean定義はXML上では不要とする。

(3)

Taskletは、(2)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。

(4)

ジョブの設定。
id属性は、1つのバッチアプリケーションに含まれる全ジョブの範囲内で一意とする必要がある。

(5)

JobRepositoryの設定。
job-repository属性に設定する値は、特別な理由がない限りjobRepository固定とすること。
これにより、すべてのジョブが1つのJobRepositoryで管理できる。 jobRepositoryのBean定義は、(1)により解決する。

(6)

ステップの設定。
id属性は、1つのバッチアプリケーションに含まれる全ジョブの範囲内で一意とする必要はないが、障害発生時に追跡しやすくなる等の様々なメリットがあるため一意とする。
特別な理由がない限り、(3)で指定したid属性に[step+連番]を付加する形式とする。

(7)

タスクレットの設定。
transaction-manager属性に設定する値は、特別な理由がない限りjobTransactionManager固定とすること。
これにより、タスクレット全体の処理が1つのトランザクションで管理される。 詳細については、トランザクション制御を参照。
jobTransactionManagerのBean定義は、(1)により解決する。

また、ref属性は、(2)により解決するTaskletの実装クラスのBeanIDを指定する。
ここでは、Tasklet実装クラス名SimpleJobTaskletの先頭を小文字にしたsimpleJobTaskletとなる。

アノテーション利用時のBean名

@Componentアノテーション利用時のBean名は、デフォルトでは org.springframework.context.annotation.AnnotationBeanNameGenerator を通じて生成されるため、命名ルールについては本クラスのJavadocを参照。

Taskletの実装

まずはシンプルな実装で概要を理解し、次にチャンクモデルのコンポーネントを利用する実装へと進む。

以下の順序で説明する。

シンプルなTaskletの実装

ログを出力するのみのTasklet実装を通じ、最低限のポイントを説明する。

シンプルなTasklet実装クラスの例
package jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common;

// omitted.

@Component
public class SimpleJobTasklet implements Tasklet { // (1)

    private static final Logger logger =
            LoggerFactory.getLogger(SimpleJobTasklet.class);

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {  // (2)
        logger.info("called tasklet."); // (3)
        return RepeatStatus.FINISHED; // (4)
    }
}
項番 説明

(1)

org.springframework.batch.core.step.tasklet.Taskletインタフェースをimplementsして実装する。

(2)

Taskletインタフェースが定義するexecuteメソッドを実装する。 引数のStepContribution, ChunkContextは必要に応じて利用するが、ここでは説明を割愛する。

(3)

任意の処理を実装する。ここではINFOログを出力している。

(4)

Taskletの処理が完了したかどうかを返却する。
常にreturn RepeatStatus.FINISHED;と明示する。

チャンクモデルのコンポーネントを利用するTasklet実装

Spring Batch では、Tasklet実装の中でチャンクモデルの各種コンポーネントを利用することに言及していない。 Macchinetta Batch 2.xでは、以下のような状況に応じてこれを選択してよい。

  • 複数のリソースを組み合わせながら処理するため、チャンクモデルの形式に沿いにくい

  • チャンクモデルでは処理が複数箇所に実装することになるため、タスクレットモデルの方が全体像を把握しやすい

  • リカバリをシンプルにするため、チャンクモデルの中間コミットではなく、タスクレットモデルの一括コミットを使いたい

また、チャンクモデルのコンポーネントを利用してTasklet実装するうえで処理の単位についても考慮してほしい。 出力件数の単位としては以下の3パターンが考えられる。

表 1. 出力件数の単位と特徴
出力件数 特徴

1件

データを1件ずつ、入力、処理、出力しているため、処理のイメージがしやすい。
大量データの場合は入出力の多発により性能劣化を引き起こす可能性があるので留意が必要である。

全件

データを1件ずつ、入力、処理してメモリ上に貯めておき、最後に全件一括で出力する。
少量データの場合はデータの整合性を担保するとともに性能向上を期待できる。 ただし、大量データの場合はリソース(CPU、メモリ)に高負荷がかかる可能性があるので留意が必要である。

一定件数

データを1件ずつ、入力、処理してメモリ上に貯めておき、一定件数まできたところで出力する。
大量データを一定のリソース(CPU、メモリ)で効率よく処理できることにより性能向上を期待できる。
また、一定件数ごとに処理するため、トランザクション制御を実装することにより中間コミット方式にも移行できる。 ただし、中間コミット方式とする場合、ジョブが異常終了した後のリカバリは処理済みデータと未処理データが混在する可能性があるので留意が必要である。

以下に、チャンクモデルのコンポーネントであるItemReaderItemWriterを利用するTasklet実装について説明する。

この実装例は、1件単位に処理している例である。

チャンクモデルのコンポーネントを利用するTasklet実装例1
@Component
@Scope("step") // (1)
public class SalesPlanChunkTranTask implements Tasklet {

    @Inject
    @Named("detailCSVReader") // (2)
    ItemStreamReader<SalesPlanDetail> itemReader; // (3)

    @Inject
    SalesPlanDetailRepository repository; // (4)

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        SalesPlanDetail item;

        try {
            itemReader.open(chunkContext.getStepContext().getStepExecution()
                    .getExecutionContext()); // (5)

            while ((item = itemReader.read()) != null) { // (6)

                // do some processes.

                repository.create(item); // (7)
            }
        } finally {
            itemReader.close(); // (8)
        }
        return RepeatStatus.FINISHED;
    }
}
Bean定義例1
<!-- omitted -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>

<context:component-scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.plan" />
<context:component-scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.transaction.component" />

<!-- (9) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
    factory-ref="jobSqlSessionFactory"/>

<!-- (10) -->
<bean id="detailCSVReader"
      class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
      p:resource="file:#{jobParameters['inputFile']}">
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
                      p:names="branchId,year,month,customerId,amount"/>
            </property>
            <property name="fieldSetMapper">
                <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                      p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail"/>
            </property>
        </bean>
    </property>
</bean>

<!-- (11) -->
<batch:job id="createSalesPlanChunkTranTask" job-repository="jobRepository">
    <batch:step id="createSalesPlanChunkTranTask.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="salesPlanChunkTranTask"/>
    </batch:step>
</batch:job>
項番 説明

(1)

本クラス内で利用するItemReaderのBeanスコープに合わせ、stepスコープとする。

(2)

入力リソース(この例ではフラットファイル)へのアクセスはItemReaderを通じて行う。
ここでは、detailCSVReaderというBean名を指定するが、わかりやすさのためなので任意とする。

(3)

ItemReaderのサブインタフェースである、ItemStreamReaderとして型を定義する。
これは、(5), (8)のリソースオープン/クローズを実装する必要があるためである。 後ほど補足する。

(4)

出力リソース(この例ではデータベース)へのアクセスはMyBatisのMapperを通じて行う。
ここでは、簡単のためMapperを直接利用している。常にItemWriterを用いる必要はない。 もちろん、MyBatisBatchItemWriterを用いてもよい。

(5)

入力リソースをオープンする。

(6)

入力リソース全件を逐次ループ処理する。
ItemReader#readは、入力データがすべて読み取り末端に到達した場合、nullを返却する。

(7)

データベースへ出力する。

(8)

リソースは必ずクローズすること。
なお、例外処理は必要に応じて実装すること。 ここで例外が発生した場合、タスクレット全体のトランザクションがロールバックされ、 例外のスタックトレースを出力し、ジョブが異常終了する。

(9)

MyBatis-Springの設定。
MyBatis-Springの設定の詳細は、データベースアクセスを参照。

(10)

ファイルから入力するため、FlatFileItemReaderのBean定義を追加する。詳細はここでは割愛する。

(11)

各種コンポーネントはアノテーションによって解決するため、
シンプルなTaskletの実装の場合と同様となる。

スコープの統一について

Tasklet実装クラスと、InjectするBeanのスコープは、同じスコープに統一すること。

たとえば、FlatFileItemReaderが引数から入力ファイルパスを受け取る場合にはBeanスコープをstepにする必要がある。 この時、Tasklet実装クラスのスコープもstepにする必要がある。

仮にTasklet実装クラスのスコープをsingletonとしたケースを説明する。 この時、アプリケーション起動時のApplicationContext生成時にTasklet実装クラスをインスタンス化した後、 FlatFileItemReaderのインスタンスを解決してInjectしようとする。 しかし、FlatFileItemReaderstepスコープでありstep実行時に生成するためまだ存在しない。 結果、Tasklet実装クラスをインスタンス化できないと判断しApplicationContext生成に失敗してしまう。

@Injectを付与するフィールドの型について

利用する実装クラスに応じて、以下のいずれかとする。

  • ItemReader/ItemWriter

    • 対象となるリソースへのオープン・クローズを実施する必要がない場合に利用する。

  • ItemSteamReader/ItemStreamWriter

    • 対象となるリソースへのオープン・クローズを実施する必要がある場合に利用する。

必ずjavadocを確認してどちらを利用するか判断すること。以下に代表例を示す。

FlatFileItemReader/Writerの場合

ItemSteamReader/ItemStreamWriterにて扱う

MyBatisCursorItemReaderの場合

ItemStreamReaderにて扱う

MyBatisBatchItemWriterの場合

ItemWriterにて扱う

この実装例は、一定件数単位に処理するチャンクモデルを模倣した例である

チャンクモデルのコンポーネントを利用するTasklet実装例2
@Component
@Scope("step")
public class SalesPerformanceTasklet implements Tasklet {


    @Inject
    ItemStreamReader<SalesPerformanceDetail> reader;

    @Inject
    ItemWriter<SalesPerformanceDetail> writer; // (1)

    int chunkSize = 10; // (2)

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        try {
            reader.open(chunkContext.getStepContext().getStepExecution()
                    .getExecutionContext());

            List<SalesPerformanceDetail> items = new ArrayList<>(chunkSize); // (2)
            SalesPerformanceDetail item = null;
            do {
                // Pseudo operation of ItemReader
                for (int i = 0; i < chunkSize; i++) { // (3)
                    item = reader.read();
                    if (item == null) {
                        break;
                    }
                    // Pseudo operation of ItemProcessor
                    // do some processes.

                    items.add(item);
                }

                // Pseudo operation of ItemWriter
                if (!items.isEmpty()) {
                    writer.write(items); // (4)
                    items.clear();
                }
            } while (item != null);
        } finally {
            try {
                reader.close();
            } catch (Exception e) {
                // do nothing.
            }
        }

        return RepeatStatus.FINISHED;
    }
}
Bean定義例2
<!-- omitted -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>

<context:component-scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common,
        jp.co.ntt.fw.macchinetta.batch.functionaltest.app.performance,
        jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.exceptionhandling"/>
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance"
    factory-ref="jobSqlSessionFactory"/>

<bean id="detailCSVReader"
      class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
      p:resource="file:#{jobParameters['inputFile']}">
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
                      p:names="branchId,year,month,customerId,amount"/>
            </property>
            <property name="fieldSetMapper">
                <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                      p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail"/>
            </property>
        </bean>
    </property>
</bean>

<!-- (1) -->
<bean id="detailWriter"
      class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>


<batch:job id="jobSalesPerfTasklet" job-repository="jobRepository">
    <batch:step id="jobSalesPerfTasklet.step01">
        <batch:tasklet ref="salesPerformanceTasklet"
                       transaction-manager="jobTransactionManager"/>
    </batch:step>
</batch:job>
項番 説明

(1)

ItemWriterの実装としてMyBatisBatchItemWriterを利用する。

(2)

ItemWriterは一定件数をまとめて出力する。
ここでは10件ごとに処理し出力する。

(3)

チャンクモデルの動作にそって、
read→process→read→process→…​→writeとなるようにする。

(4)

ItemWriterを通じてまとめて出力する。

ItemReaderItemWriterの実装クラスを利用するかどうかは都度判断してほしいが、 ファイルアクセスはItemReaderItemWriterの実装クラスを利用するとよい。 それ以外のデータベースアクセス等は無理に使う必要はない。性能向上のために使えばよい。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.3.0.RELEASE, 2021-3-26, commit-id:9b11e9d