Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.1.RELEASE, 2018-3-9
> INDEX

Overview

Macchinetta Batch 2.xでは、データベースアクセスの方法として、MyBatis3(以降、「MyBatis」と呼ぶ)を利用する。 MyBatisによるデータベースアクセスの基本的な利用方法は、Macchinetta Server 1.x 開発ガイドラインの以下を参照してほしい。

本節では、Macchinetta Batch 2.x特有の使い方を中心に説明する。

Linux環境でのOracle JDBC利用時の留意事項について

Linux環境でのOracle JDBCを利用時は、Oracle JDBCが使用するOSの乱数生成器によるロックが発生する。 そのため、ジョブを並列実行しようとしても逐次実行になる事象や片方のコネクションがタイムアウトする事象が発生する。
本事象に対する2パターンの回避策を以下に示す。

  • Javaコマンド実行時に、システムプロパティで以下を設定する。

    • -Djava.security.egd=file:///dev/urandom

  • ${JAVA_HOME}/jre/lib/security/java.security内のsecurerandom.source=/dev/randomsecurerandom.source=/dev/urandomに変更する。

How to use

Macchinetta Batch 2.xでのデータベースアクセス方法を説明する。

なお、チャンクモデルとタスクレットモデルにおけるデータベースアクセス方法の違いに留意する。

Macchinetta Batch 2.xでのデータベースアクセスは、以下の2つの方法がある。
これらはデータベースアクセスするコンポーネントによって使い分ける。

  1. MyBatis用のItemReaderおよびItemWriterを利用する。

    • チャンクモデルでのデータベースアクセスによる入出力で使用する。

      • org.mybatis.spring.batch.MyBatisCursorItemReader

      • org.mybatis.spring.batch.MyBatisBatchItemWriter

  2. Mapperインターフェースを利用する

    • チャンクモデルでのビジネスロジック処理で使用する。

      • ItemProcessor実装で利用する。

    • タスクレットモデルでのデータベースアクセス全般で使用する。

      • Tasklet実装で利用する。

共通設定

データベースアクセスにおいて必要な共通設定について説明を行う。

データソースの設定

Macchinetta Batch 2.xでは、2つのデータソースを前提としている。 launch-context.xmlでデフォルト設定している2つのデータソースを示す。

データソース一覧
データソース名 説明

adminDataSource

Spring BatchやMacchinetta Batch 2.xが利用するデータソース
JobRepositoryや非同期実行(DBポーリング)で利用している。

jobDataSource

ジョブが利用するデータソース

以下に、launch-context.xmlと接続情報のプロパティを示す。
これらをユーザの環境に合わせて設定すること。

resources\META-INF\spring\launch-context.xml
<!-- (1) -->
<bean id="adminDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
      destroy-method="close"
      p:driverClassName="${admin.h2.jdbc.driver}"
      p:url="${admin.h2.jdbc.url}"
      p:username="${admin.h2.jdbc.username}"
      p:password="${admin.h2.jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false"/>

<!-- (2) -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource" 
      destroy-method="close"
      p:driverClassName="${jdbc.driver}"
      p:url="${jdbc.url}"
      p:username="${jdbc.username}"
      p:password="${jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false" />
batch-application.properties
# (3)
# Admin DataSource settings.
admin.h2.jdbc.driver=org.h2.Driver
admin.h2.jdbc.url=jdbc:h2:mem:batch;DB_CLOSE_DELAY=-1
admin.h2.jdbc.username=sa
admin.h2.jdbc.password=

# (4)
# Job DataSource settings.
jdbc.driver=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/postgres
jdbc.username=postgres
jdbc.password=postgres
説明
項番 説明

(1)

adminDataSourceの定義。(3)の接続情報が設定される。

(2)

jobDataSourceの定義。(4)の接続情報が設定される。

(3)

adminDataSourceで利用するデータベースへの接続情報
この例では、H2を利用している。

(4)

jobDataSourceで利用するデータベースへの接続情報
この例では、PostgreSQLを利用している。

MyBatisの設定

Macchinetta Batch 2.xで、MyBatisの設定をする上で重要な点について説明をする。

バッチ処理を実装する際の重要なポイントの1つとして「大量のデータを一定のリソースで効率よく処理する」が挙げられる。
これに関する設定を説明する。

  • fetchSize

    • 一般的なバッチ処理では、大量のデータを処理する際の通信コストを低減するために、 JDBCドライバに適切なfetchSizeを指定することが必須である。 fetchSizeとは、JDBCドライバとデータベース間とで1回の通信で取得するデータ件数を設定するパラメータである。 この値は出来る限り大きい値を設定することが望ましいが、大きすぎるとメモリを圧迫するため、注意が必要である。 ユーザにてチューニングする必要がある箇所と言える。

    • MyBatisでは、全クエリ共通の設定としてdefaultFetchSizeを設定することができ、さらにクエリごとのfetchSize設定で上書きできる。

  • executorType

    • 一般的なバッチ処理では、同一トランザクション内で同じSQLを全データ件数/fetchSizeの回数分実行することになる。 この際、都度ステートメントを作成するのではなく再利用することで効率よく処理できる。

    • MyBatisの設定における、defaultExecutorTypeREUSEを設定することでステートメントの再利用ができ、 処理スループット向上に寄与する。

    • 大量のデータを一度に更新する場合、JDBCのバッチ更新を利用することで性能向上が期待できる。
      そのため、MyBatisBatchItemWriterで利用するSqlSessionTemplateには、
      executorTypeに(REUSEではなく)BATCHが設定されている。

Macchinetta Batch 2.xでは、同時に2つの異なるExecutorTypeが存在する。 一方のExecutorTypeで実装する場合が多いと想定するが、併用時は特に注意が必要である。 この点は、Mapperインターフェース(入力)にて詳しく説明する。

MyBatisのその他のパラメータ

その他のパラメータに関しては以下リンクを参照し、 アプリケーションの特性にあった設定を行うこと。
http://www.mybatis.org/mybatis-3/configuration.html

以下にデフォルト提供されている設定を示す。

META-INF/spring/launch-context.xml
<bean id="jobSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="jobDataSource">
    <!-- (1) -->
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

<!-- (2) -->
<bean id="batchModeSqlSessionTemplate"
      class="org.mybatis.spring.SqlSessionTemplate"
      c:sqlSessionFactory-ref="jobSqlSessionFactory"
      c:executorType="BATCH"/>
説明
項番 説明

(1)

MyBatisの各種設定を行う。
デフォルトでは、fetchSizeを1000に設定している。

(2)

MyBatisBatchItemWriterのために、executorTypeBATCHSqlSessionTemplateを定義している。

adminDataSourceを利用したSqlSessionFactoryの定義箇所について

同期実行をする場合は、adminDataSourceを利用したSqlSessionFactoryは不要であるため、定義がされていない。 非同期実行(DBポーリング)を利用する場合、ジョブ要求テーブルへアクセスするために META-INF/spring/async-batch-daemon.xml内に定義されている。

META-INF/spring/async-batch-daemon.xml
<bean id="adminSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="adminDataSource" >
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

Mapper XMLの定義

Macchinetta Batch 2.x特有の説明事項はないので、Macchinetta Server 1.x 開発ガイドラインの データベースアクセス処理の実装を参照してほしい。

MyBatis-Springの設定

MyBatis-Springが提供するItemReaderおよびItemWriterを使用する場合、MapperのConfigで使用するMapper XMLを設定する必要がある。

設定方法としては、以下の2つが考えられる。

  1. 共通設定として、すべてのジョブで使用するMapper XMLを登録する。

    • META-INF/spring/launch-context.xmlにすべてのMapper XMLを記述することになる。

  2. 個別設定として、ジョブ単位で利用するMapper XMLを登録する。

    • META-INF/jobs/配下のBean定義に、個々のジョブごとに必要なMapper XMLを記述することになる。

共通設定をしてしまうと、同期実行をする際に実行するジョブのMapper XMLだけでなく、その他のジョブが使用するMapper XMLも読み込んでしまうために以下に示す弊害が生じる。

  • ジョブの起動までに時間がかかる

  • メモリリソースの消費が大きくなる

これを回避するために、Macchinetta Batch 2.xでは、個別設定として、個々のジョブ定義でそのジョブが必要とするMapper XMLだけを指定する設定方法を採用する。

基本的な設定方法については、Macchinetta Server 1.x 開発ガイドラインの MyBatis-Springの設定を参照してほしい。

Macchinetta Batch 2.xでは、複数のSqlSessionFactoryおよびSqlSessionTemplateが定義されているため、 どれを利用するか明示的に指定する必要がある。
基本的にはjobSqlSessionFactoryを指定すればよい。

以下に設定例を示す。

META-INF/jobs/common/jobCustomerList01.xml
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
    factory-ref="jobSqlSessionFactory"/>
説明
項番 説明

(1)

<mybatis:scan>factory-ref属性にjobSqlSessionFactoryを設定する。

入力

データベースアクセスの入力について以下のとおり説明する。

MyBatisCursorItemReader

ここではItemReaderとして MyBatis-Springが提供するMyBatisCursorItemReaderによるデータベースアクセスについて説明する。

機能概要

MyBatis-Springが提供するItemReaderとして下記の2つが存在する。

  • org.mybatis.spring.batch.MyBatisCursorItemReader

  • org.mybatis.spring.batch.MyBatisPagingItemReader

MyBatisPagingItemReaderは、Macchinetta Server 1.x 開発ガイドラインの Entityのページネーション検索(SQL絞り込み方式)で 説明している仕組みを利用したItemReaderである。
一定件数を取得した後に再度SQLを発行するため、データの一貫性が保たれない可能性がある。 そのため、バッチ処理で利用するには危険であることから、Macchinetta Batch 2.xでは原則使用しない。
Macchinetta Batch 2.xではMyBatisと連携して、Cursorを利用し取得データを返却するMyBatisCursorItemReaderのみを利用する。

Macchinetta Batch 2.xでは、MyBatis-Springの設定で説明したとおり、 mybatis:scanによって動的にMapper XMLを登録する方法を採用している。 そのため、Mapper XMLに対応するインターフェースを用意する必要がある。 詳細については、Macchinetta Server 1.x 開発ガイドラインの データベースアクセス処理の実装を参照。

MyBatisCursorItemReaderにおけるクローズ時の注意事項について

Mybatis-Spring1.3.1までの不具合により、MyBatisCursorItemReaderをオープンせずにクローズする場合(@BeforeStepアノテーションで異常終了する場合等)、java.lang.NullPointerExceptionが発生する。 その場合は、MyBatisCursorItemReaderを拡張するなどして、クローズ時に例外を捕捉し、正常終了するように実装する必要がある。

MyBatisCursorItemReaderを利用してデータベースを参照するための実装例を処理モデルごとに以下に示す。

チャンクモデルにおける利用方法

チャンクモデルでMyBatisCursorItemReaderを利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReaderの実装例と、実装したMyBatisCursorItemReaderを利用してデータベースから取得したデータを処理するItemProcessorの実装例を説明する。

Bean定義
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<batch:job id="outputAllCustomerList01" job-repository="jobRepository">
    <batch:step id="outputAllCustomerList01.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="retrieveBranchFromContextItemProcessor"
                         writer="writer" commit-interval="10"/>
            <!-- omitted -->
        </batch:tasklet>
    </batch:step>
</batch:job>
Mapper XML
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository">

    <!-- omitted -->

    <!-- (7) -->
    <select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.mst.Customer">
        <![CDATA[
        SELECT
            customer_id AS customerId,
            customer_name AS customerName,
            customer_address AS customerAddress,
            customer_tel AS customerTel,
            charge_branch_id AS chargeBranchId,
            create_date AS createDate,
            update_date AS updateDate
        FROM
            customer_mst
        ORDER by
            charge_branch_id ASC, customer_id ASC
        ]]>
    </select>

    <!-- omitted -->
</mapper>
Mapperインターフェース
public interface CustomerRepository {
    // (8)
    List<Customer> findAll();

    // omitted.
}
ItemProcessor実装
@Component
@Scope("step")
public class RetrieveBranchFromContextItemProcessor implements ItemProcessor<Customer, CustomerWithBranch> {
    // omitted.

    @Override
    public CustomerWithBranch process(Customer item) throws Exception { // (9)
        CustomerWithBranch newItem = new CustomerWithBranch(item);
        newItem.setBranch(branches.get(item.getChargeBranchId())); // (10)
        return newItem;
    }
}
説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisCursorItemReaderを定義する。

(3)

queryIdのプロパティに、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionFactory-refのプロパティに、アクセスするデータベースのSqlSessionFactoryを指定する。

(5)

(2)で定義したMyBatisCursorItemReaderをreader属性に指定する。

(6)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(7)

SQLを定義する。

(8)

(7)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。
この例では検索条件のパラメータとしてbranchIdを@Paramアノテーションにより渡している。条件が無い場合は不要である。

(9)

引数として受け取るitemの型は、 このクラスで実装しているItemProcessorインターフェースの型引数で指定した入力オブジェクトの型であるSalesPerformanceDetailとする。

(10)

引数に渡されたitemを利用して各カラムの値を取得する。

タスクレットモデルにおける利用方法

タスクレットモデルでMyBatisCursorItemReaderを利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReaderの実装例と、実装したMyBatisCursorItemReaderを利用してデータベースから取得したデータを処理するTaskletの実装例を説明する。

タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。

タスクレットモデルではチャンクモデルと異なり、Tasklet実装においてリソースを明示的にオープン/クローズする必要がある。 また、入力データの読み込みも明示的に行う。

Bean定義
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="summarizeDetails" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
          p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.summarizeDetails"
          p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<batch:job id="customizedJobExitCodeTaskletJob" job-repository="jobRepository">
    <batch:step id="customizedJobExitCodeTaskletJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager" ref="checkAmountTasklet"/>
    </batch:step>
    <!-- omitted -->
</batch:job>
Mapper XML
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- omitted -->

    <!-- (6) -->
    <select id="summarizeDetails" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanSummary">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, SUM(amount) AS amount
        FROM
            sales_plan_detail
        GROUP BY
            branch_id, year, month
        ORDER BY
            branch_id ASC, year ASC, month ASC
         ]]>
    </select>

</mapper>
Mapperインターフェース
public interface SalesPlanDetailRepository {
    // (7)
    List<SalesPlanSummary> summarizeDetails();

    // omitted.
}
Tasklelet実装
@Component
@Scope("step")
public class CheckAmountTasklet implements Tasklet {
    // (8)
    @Inject
    ItemStreamReader<SalesPlanSummary> reader;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        SalesPlanSummary item = null;

        List<SalesPlanSummary> items = new ArrayList<>(CHUNK_SIZE);
        int errorCount = 0;

        try {
            // (9)
            reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());
            while ((item = reader.read()) != null) { // (10)
                if (item.getAmount().signum() == -1) {
                    logger.warn("amount is negative. skip item [item: {}]", item);
                    errorCount++;
                    continue;
                }

                // omitted.
            }

            // catch block is omitted.
        } finally {
            // (11)
            reader.close();
        }
    }
    // omitted.

    return RepeatStatus.FINISHED;
}
説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisCursorItemReaderを定義する。

(3)

queryIdのプロパティに、(6)で定義しているSQLのIDを(5)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionFactory-refのプロパティに、アクセスするデータベースのSqlSessionFactoryを指定する。

(5)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。

(8)

@Injectアノテーションを付与して、ItemStreamReaderの実装をインジェクションする。
対象となるリソースへのオープン・クローズを実施する必要があるため、 ItemReaderにリソースのopen/closeメソッドを追加したItemStreamReaderインタフェースで実装をインジェクションする。

(9)

入力リソースをオープンする。

(10)

入力データを1件ずつ読み込む。

(11)

入力リソースをクローズする。
リソースは必ずクローズすること。なお、ここで例外が発生した場合、タスクレット全体のトランザクションがロールバックされ、 例外のスタックトレースを出力しジョブが異常終了する。そのため、必要に応じて例外処理を実装すること。

検索条件の指定方法

データベースアクセスの際に検索条件を指定して検索を行いたい場合は、 Bean定義にてMap形式でジョブパラメータから値を取得し、キーを設定することで検索条件を指定することができる。 以下にジョブパラメータを指定したジョブ起動コマンドの例と実装例を示す。

ジョブパラメータを指定した場合のジョブ起動コマンド
java -cp ${CLASSPATH} org.springframework.batch.core.launch.support.CommandLineJobRunner
 /META-INF/job/job001 job001 year=2017 month=12
MapperXMLの実装例
<!-- (1) -->
<select id="findByYearAndMonth"
    resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
    <![CDATA[
    SELECT
        branch_id AS branchId, year, month, amount
    FROM
        sales_performance_summary
    WHERE
        year = #{year} AND month = #{month}
    ORDER BY
        branch_id ASC
    ]]>
</select>

<!-- omitted -->
Bean定義
<!-- omitted -->

<!-- (2) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues"> <!-- (3) -->
        <map>
            <!-- (4) -->
            <entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
            <entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>

            <!-- omitted -->
        </map>
    </property>
</bean>

<!-- omitted -->
説明
項番 説明

(1)

検索条件を指定して取得するSQLを定義する。

(2)

データベースからデータを取得するためのItemReaderを定義する。

(3)

プロパティ名にparameterValuesを設定する。

(4)

検索条件にする値をジョブパラメータから取得し、キーに設定することで検索条件を指定する。 SQLの引数が数値型で定義されているため、value-typeIntegerに変換して渡している。

StepExectionContextによる検索指定方法について

@beforeStepなどジョブの前処理で検索条件を指定する場合は、StepExecutionContextに設定することで、JobParameters同様に取得することができる。

Mapperインターフェース(入力)

ItemReader以外でデータベースの参照を行うにはMapperインターフェースを利用する。
ここではMapperインターフェースを利用したデータベースの参照について説明する。

機能概要

Mapperインターフェースを利用するにあたって、Macchinetta Batch 2.xでは以下の制約を設けている。

Mapperインターフェースの利用可能な箇所
処理 ItemProcessor Tasklet リスナー

参照

利用可

利用可

利用可

更新

条件付で利用可

利用可

利用不可

ItemProcessorでの制約

MyBatisには、同一トランザクション内で2つ以上のExecutorTypeで実行してはいけないという制約がある。
「ItemWriterにMyBatisBatchItemWriterを使用する」と「ItemProcessorでMapperインターフェースを使用し参照更新をする」を 同時に満たす場合は、この制約に抵触する。
制約を回避するには、ItemProcessorではExecutorTypeBATCHのMapperインターフェースによって データベースアクセスすることになる。
加えて、MyBatisBatchItemWriterではSQL実行後のステータスチェックにより、自身が発行したSQLかどうかチェックしているのだが、 当然ItemProcessorによるSQL実行は管理できないためエラーが発生してしまう。
よって、MyBatisBatchItemWriterを利用している場合は、Mapperインターフェースによる更新はできなくなり、参照のみとなる。

MyBatisBatchItemWriterのエラーチェックを無効化する設定ができるが、予期せぬ動作が起きる可能性があるため無効化は禁止する。

Taskletでの制約

Taskletでは、Mapperインターフェースを利用することが基本であるため、ItemProcessorのような影響はない。
MyBatisBatchItemWriterをInjectして利用することも考えられるが、その場合はMapperインターフェース自体を BATCH設定で処理すればよい。つまり、Taskletでは、MyBatisBatchItemWriterをInjectして使う必要は基本的にない。

チャンクモデルにおける利用方法

チャンクモデルでMapperインターフェースを利用してデータベースを参照する実装例を以下に示す。

ItemProcessorでの実装例
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {

        // (2)
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        // omitted.

        return writeItem;
    }
}
Bean定義
<!-- (3) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>

<!-- (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- omitted job definition -->

MapperインターフェースとMapper XMLについてはMyBatisCursorItemReader で説明している内容以外に特筆すべきことがないため省略する。

説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

Mapperインターフェースで検索処理を実行する。

(3)

Mapper XMLの登録を行う。
template-ref属性にBATCH設定されているbatchModeSqlSessionTemplateを指定することで、 ItemProcessorでのデータベースアクセスはBATCHとなる。

(4)

MyBatisCursorItemReaderを定義する。
sqlSessionFactory-refプロパティに、アクセスするデータベースのSqlSessionFactoryを指定する。

MyBatisCursorItemReader設定の補足

以下に示す定義例のように、MyBatisCursorItemReaderとMyBatisBatchItemWriterで異なるExecutorTypeを使用しても問題ない。 これは、MyBatisCursorItemReaderによるトランザクションはItemWriterのトランザクションとは別であるからである。

<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="xxx"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="yyy"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
タスクレットモデルにおける利用方法

タスクレットモデルでMapperインターフェースを利用してデータベースを参照する実装例を以下に示す。

Taskletでの実装例
@Component
public class OptimisticLockTasklet implements Tasklet {

    // (1)
    @Inject
    ExclusiveControlRepository repository;

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        Branch branch = repository.branchFindOne(branchId); // (2)

        // omitted.

        return RepeatStatus.FINISHED;
    }
}
Bean定義
<!-- (3) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
        factory-ref="jobSqlSessionFactory"/>

<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="taskletOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="optimisticLockTasklet"> <!-- (4) -->
        </batch:tasklet>
    </batch:step>
</batch:job>

MapperインターフェースとMapper XMLについてはMyBatisCursorItemReader で説明している内容以外に特筆すべきことがないため省略する。

説明
項番 説明

(1)

MapperインターフェースをInjectする。

(2)

Mapperインターフェースで検索処理を実行する。

(3)

Mapper XMLの登録を行う。
factory-ref属性にREUSE設定されているjobSqlSessionFactoryを指定する。

(4)

MapperインターフェースをInjectしTaskletを設定する。

出力

データベースアクセスの出力について以下のとおり説明する。

MyBatisBatchItemWriter

ここではItemWriterとして MyBatis-Springが提供するMyBatisBatchItemWriterによるデータベースアクセスについて説明する。

機能概要

MyBatis-Springが提供するItemWriterは以下の1つのみである。

  • org.mybatis.spring.batch.MyBatisBatchItemWriter

MyBatisBatchItemWriterはMyBatisと連携してJDBCのバッチ更新機能を利用するItemWriterであり、 大量のデータを一度に更新する場合に性能向上が期待できる。
基本的な設定については、MyBatisCursorItemReaderと同じである。 MyBatisBatchItemWriterでは、MyBatisの設定で説明した batchModeSqlSessionTemplateを指定する必要がある。

MyBatisBatchItemWriterを利用してデータベースを更新するための実装例を以下に示す。

チャンクモデルにおける利用方法

チャンクモデルでMyBatisBatchItemWriterを利用してデータベースを更新(登録)する実装例を以下に示す。
ここでは、MyBatisBatchItemWriterの実装例と、実装したMyBatisBatchItemWriterを利用するItemProcessorの実装例を説明する。 ItemProcessor実装では取得したデータをMyBatisBatchItemWriterを利用してデータベースの更新を行っている。

Bean定義
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="writer"
      class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
      p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>

<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="chunkOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader" processor="branchEditItemProcessor"
                         writer="writer" commit-interval="10"/> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
</batch:job>
Mapper XML
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- (7) -->
    <insert id="create"
            parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        INSERT INTO
            sales_plan_detail(branch_id, year, month, customer_id, amount)
        VALUES (
            #{branchId}, #{year}, #{month}, #{customerId}, #{amount}
        )
        ]]>
    </insert>

    <!-- omitted -->
</mapper>
Mapperインターフェース
public interface SalesPlanDetailRepository {

    // (8)
    void create(SalesPlanDetail salesPlanDetail);

    // omitted.
}
ItemProcessor実装
@Component
@Scope("step")
public class BranchEditItemProcessor implements ItemProcessor<Branch, ExclusiveBranch> {
    // omitted.

    @Override
    public ExclusiveBranch process(Branch item) throws Exception { // (9)
        ExclusiveBranch branch = new ExclusiveBranch();
        branch.setBranchId(item.getBranchId());
        branch.setBranchName(item.getBranchName() + " - " + identifier);
        branch.setBranchAddress(item.getBranchAddress() + " - " + identifier);
        branch.setBranchTel(item.getBranchTel());
        branch.setCreateDate(item.getUpdateDate());
        branch.setUpdateDate(new Timestamp(clock.millis()));
        branch.setOldBranchName(item.getBranchName());

        // (10)
        return branch;
    }
}
説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisBatchItemWriterを定義する。

(3)

statementIdのプロパティに、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionTemplate-refのプロパティに、アクセスするデータベースのSessionTemplateを指定する。
指定するSessionTemplateは、executorTypeBATCHに設定されていることが必須である。

(5)

(2)で定義したMyBatisBatchItemWriterをwriter属性に指定する。

(6)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(7)

SQLを定義する。

(8)

(7)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。

(9)

返り値の型は、このクラスで実装しているItemProcessorインターフェースの型引数で指定した 出力オブジェクトの型であるExclusiveBranchとする。

(10)

更新データを設定したDTOオブジェクトを返すことでデータベースへ出力する。

タスクレットモデルにおける利用方法

タスクレットモデルでMyBatisBatchItemWriterを利用してデータベースを更新(登録)する実装例を以下に示す。
ここでは、MyBatisBatchItemWriterの実装例と実装したMyBatisBatchItemWriterを利用するTaskletの実装例を説明する。 タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。

Bean定義
<!-- (1) -->
<mybatis:scan base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
            factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
          p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
          p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<batch:job id="taskletJobWithListenerWithinJobScope" job-repository="jobRepository">
    <batch:step id="taskletJobWithListenerWithinJobScope.step01">
        <batch:tasklet transaction-manager="jobTransactionManager" ref="salesPlanDetailRegisterTasklet"/>
    </batch:step>
    <!-- omitted. -->
</batch:job>
Mapper XML
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- (6) -->
    <insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        INSERT INTO
            sales_plan_detail(branch_id, year, month, customer_id, amount)
        VALUES (
            #{branchId}, #{year}, #{month}, #{customerId}, #{amount}
        )
        ]]>
    </insert>

    <!-- omitted -->
</mapper>
Mapperインターフェース
public interface SalesPlanDetailRepository {
    // (7)
    void create(SalesPlanDetail salesPlanDetail);

    // omitted.
}
Tasklet実装
@Component
@Scope("step")
public class SalesPlanDetailRegisterTasklet implements Tasklet {

    // omitted.

    // (8)
    @Inject
    ItemWriter<SalesPlanDetail> writer;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        SalesPlanDetail item = null;

        try {
            reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());

            List<SalesPlanDetail> items = new ArrayList<>(); // (9)

            while ((item = reader.read()) != null) {

                items.add(processor.process(item)); // (10)
                if (items.size() == 10) {
                    writer.write(items); // (11)
                    items.clear();
                }
            }
            // omitted.
        }
        // omitted.

        return RepeatStatus.FINISHED;
    }
}

MapperインターフェースとMapper XMLについてはMyBatisBatchItemWriter で説明している内容以外に特筆すべきことがないため省略する。

説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisBatchItemWriterを定義する。

(3)

statementIdのプロパティに、(6)で定義しているSQLのIDを(5)のnamespace + <メソッド名>で指定する。

(4)

sqlSessionTemplate-refのプロパティに、アクセスするデータベースのSessionTemplateを指定する。
指定するSessionTemplateは、executorTypeBATCHに設定されていることが必須である。

(5)

Mapper XMLを定義する。namespaceの値とインターフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインターフェースに定義する。

(8)

@Injectアノテーションを付与して、ItemWriterの実装をインジェクションする。
ItemReaderとは異なり、データベースの更新ではリソースのopen/closeが不要であるため、ItemStreamWriterではなくItemWriterインターフェースにインジェクションする。

(9)

出力データを格納するリストを定義する。
ItemWriterでは一定件数のデータをまとめて出力する。

(10)

リストに更新データを設定する。

(11)

更新データを設定したリストを引数に指定して、データベースへ出力する。

Mapperインターフェース(出力)

ItemWriter以外でデータベースの更新を行うにはMapperインターフェースを利用する。
ここではMapperインターフェースを利用したデータベースの更新について説明する。

機能概要

Mapperインターフェースを利用してデータベースアクセスするうえでのMacchinetta Batch 2.xとしての制約はMapperインターフェース(入力)を参照のこと。

チャンクモデルにおける利用方法

チャンクモデルでMapperインターフェースを利用してデータベースを更新(登録)する実装例を以下に示す。

ItemProcessorでの実装例
@Component
public class UpdateCustomerItemProcessor implements ItemProcessor<Customer, Customer> {

    // omitted.

    // (1)
    @Inject
    DBAccessCustomerRepository customerRepository;

    @Override
    public Customer process(Customer item) throws Exception {
        item.setCustomerName(String.format("%s updated by mapper if", item.getCustomerName()));
        item.setCustomerAddress(String.format("%s updated by item writer", item.getCustomerAddress()));
        item.setUpdateDate(new Timestamp(clock.millis()));

        // (2)
        long cnt = customerRepository.updateName(item);

        // omitted.

        return item;
    }
}
Bean定義
<!-- (3) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository;jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>

<!-- (4) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

    <batch:job id="updateMapperAndItemWriterBatchModeJob" job-repository="jobRepository">
        <batch:step id="updateMapperAndItemWriterBatchModeJob.step01">
            <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="updateCustomerItemProcessor"
                         writer="writer" commit-interval="10"/> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
    <!-- omitted -->
</batch:job>

MapperインターフェースとMapper XMLについてはMyBatisBatchItemWriter で説明している内容以外に特筆すべきことがないため省略する。

説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

DTOオブジェクトを生成して更新データを設定し、DTOオブジェクトを返却することでデータベースの更新を行う。

(3)

Mapper XMLの登録を行う。
template-ref属性にBATCH設定されているbatchModeSqlSessionTemplateを指定することで、ItemProcessorでのデータベースアクセスはBATCHとなる。 ここで、factory-ref="jobSqlSessionFactory"としてしまうと、前述の制約に抵触し、MyBatisBatchItemWriter実行時に例外が発生してしまう。

(4)

MyBatisBatchItemWriterを定義する。
sqlSessionTemplate-refプロパティにBATCH設定されているbatchModeSqlSessionTemplateを指定する。

(5)

(4)で定義したMyBatisBatchItemWriterをwriter属性に指定する。

タスクレットモデルにおける利用方法

タスクレットモデルでMapperインターフェースを利用してデータベースを更新(登録)する実装例を以下に示す。

Taskletでの実装例
@Component
public class OptimisticLockTasklet implements Tasklet {

    // (1)
    @Inject
    ExclusiveControlRepository repository;

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        Branch branch = repository.branchFindOne(branchId);

        // (2)
        ExclusiveBranch exclusiveBranch = new ExclusiveBranch();
        exclusiveBranch.setBranchId(branch.getBranchId());
        exclusiveBranch.setBranchName(branch.getBranchName() + " - " + identifier);
        exclusiveBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
        exclusiveBranch.setBranchTel(branch.getBranchTel());
        exclusiveBranch.setCreateDate(branch.getUpdateDate());
        exclusiveBranch.setUpdateDate(new Timestamp(clock.millis()));
        exclusiveBranch.setOldBranchName(branch.getBranchName());

        // (3)
        int result = repository.branchExclusiveUpdate(exclusiveBranch);

        // omitted.

        return RepeatStatus.FINISHED;
    }
}
Bean定義
<!-- (4) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
        factory-ref="jobSqlSessionFactory"/>

<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="taskletOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="optimisticLockTasklet"> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
</batch:job>

MapperインターフェースとMapper XMLは省略する。

説明
項番 説明

(1)

MapperインターフェースをInjectする。

(2)

DTOオブジェクトを生成して、更新データを設定する。

(3)

更新データを設定したDTOオブジェクトを引数に指定して、Mapperインターフェースで更新処理を実行する。

(4)

Mapper XMLの登録を行う。
factory-ref属性にREUSE設定されているjobSqlSessionFactoryを指定する。

(5)

MapperインターフェースをInjectしTaskletを設定する。

リスナーでのデータベースアクセス

リスナーでのデータベースアクセスは他のコンポーネントと連携することが多い。 使用するリスナー及び実装方法によっては、Mapperインターフェースで取得したデータを、 他のコンポーネントへ引き渡す仕組みを追加で用意する必要がある。

リスナーでMapperインターフェースを利用してデータベースアクセスを実装するにあたり、以下の制約がある。

リスナーでの制約

リスナーでもItemProcessorでの制約と同じ制約が成立する。 加えて、リスナーでは、更新を必要とするユースケースが考えにくい。よって、リスナーでは、更新系処理を禁止する。

リスナーで想定される更新処理の代替
ジョブの状態管理

Spring BatchのJobRepositoryによって行われている

データベースへのログ出力

ログのAppenderで実施すべき。ジョブのトランザクションとも別管理する必要がある。

ここでは一例として、StepExecutionListenerで ステップ実行前にデータを取得して、ItemProcessorで取得したデータを利用する例を示す。

リスナーでの実装例
public class CacheSetListener extends StepExecutionListenerSupport {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    // (2)
    @Inject
    CustomerCache cache;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // (3)
        for(Customer customer : customerRepository.findAll()) {
            cache.addCustomer(customer.getCustomerId(), customer);
        }
    }
}
ItemProcessorでの利用例
@Component
public class UpdateItemFromCacheProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (4)
    @Inject
    CustomerCache cache;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = cache.getCustomer(readItem.getCustomerId());  // (5)

        SalesPlanDetail writeItem = new SalesPlanDetail();

        // omitted.
        writerItem.setCustomerName(customer.getCustomerName); // (6)

        return writeItem;
    }
}
キャッシュクラス
// (7)
@Component
public class CustomerCache {

    Map<String, Customer> customerMap = new HashMap<>();

    public Customer getCustomer(String customerId) {
        return customerMap.get(customerId);
    }

    public void addCustomer(String id, Customer customer) {
        customerMap.put(id, customer);
    }
}
Bean定義
<!-- omitted -->

<!-- (8) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="cacheSetListener"
      class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.CacheSetListener"/>

<!-- omitted -->

<batch:job id="DBAccessByItemListener" job-repository="jobRepository">
    <batch:step id="DBAccessByItemListener.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="updateItemFromCacheProcessor"
                         writer="writer" commit-interval="10"/> <!-- (10) -->
            <!-- (11) -->
            <batch:listeners>
                <batch:listener ref="cacheSetListener"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
</batch:job>
説明

項番

説明

(1)

MapperインターフェースをInjectする。

(2)

Mapperインターフェースから取得したデータをキャッシュするためのBeanをInjectする。

(3)

リスナーにて、Mapperインターフェースからデータを取得してキャッシュする。
ここでは、StepExecutionListener#beforeStepにてステップ実行前にキャッシュを作成し、 以降の処理ではキャッシュを参照することで、I/Oを低減し処理効率を高めている。

(4)

(2)で設定したキャッシュと同じBeanをInjectする。

(5)

キャッシュから該当するデータを取得する。

(6)

更新データにキャッシュからのデータを反映する。

(7)

キャッシュクラスをコンポーネントとして実装する。
ここではBeanスコープはsingletonにしている。ジョブに応じて設定すること。

(8)

Mapper XMLの登録を行う。
template-ref属性にBATCHが設定されているbatchModeSqlSessionTemplateを指定する。

(9)

Mapperインターフェースを利用するリスナーを定義する。

(10)

キャッシュを利用するItemProcessorを指定する。

(11)

(9)で定義したリスナーを登録する。

リスナーでのSqlSessionFactoryの利用

上記の例では、batchModeSqlSessionTemplateを設定しているが、jobSqlSessionFactoryを設定してもよい。

チャンクのスコープ外で動作するリスナーについては、トランザクション外で処理されるため、 jobSqlSessionFactoryを設定しても問題ない。

How To Extend

CompositeItemWriterにおける複数テーブルの更新

チャンクモデルで、1つの入力データに対して複数のテーブルへ更新を行いたい場合は、Spring Batchが提供するCompositeItemWriterを利用し、 各テーブルに対応したMyBatisBatchItemWriterを連結することで実現できる。

ここでは、売上計画と売上実績の2つのテーブルを更新する場合の実装例を示す。

ItemProcessorの実装例
@Component
public class SalesItemProcessor implements ItemProcessor<SalesPlanDetail, SalesDTO> {
    @Override
    public SalesDTO process(SalesPlanDetail item) throws Exception { // (1)

        SalesDTO salesDTO = new SalesDTO();

        // (2)
        SalesPerformanceDetail spd = new SalesPerformanceDetail();
        spd.setBranchId(item.getBranchId());
        spd.setYear(item.getYear());
        spd.setMonth(item.getMonth());
        spd.setCustomerId(item.getCustomerId());
        spd.setAmount(new BigDecimal(0L));
        salesDTO.setSalesPerformanceDetail(spd);

        // (3)
        item.setAmount(item.getAmount().add(new BigDecimal(1L)));
        salesDTO.setSalesPlanDetail(item);

        return salesDTO;
    }
}
DTOの実装例
public class SalesDTO implements Serializable {

    // (4)
    private SalesPlanDetail salesPlanDetail;

    // (5)
    private SalesPerformanceDetail salesPerformanceDetail;

    // omitted
}
MapperXMLの実装例
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">

    <select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, customer_id AS customerId, amount
        FROM
            sales_plan_detail
        ORDER BY
            branch_id ASC, year ASC, month ASC, customer_id ASC
        ]]>
    </select>

    <!-- (6) -->
    <update id="update" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        UPDATE
            sales_plan_detail
        SET
            amount = #{salesPlanDetail.amount}
        WHERE
            branch_id = #{salesPlanDetail.branchId}
        AND
            year = #{salesPlanDetail.year}
        AND
            month = #{salesPlanDetail.month}
        AND
            customer_id = #{salesPlanDetail.customerId}
        ]]>
    </update>

    <!-- (7) -->
    <insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        INSERT INTO
            sales_performance_detail(
                branch_id,
                year,
                month,
                customer_id,
                amount
            )
        VALUES (
            #{salesPerformanceDetail.branchId},
            #{salesPerformanceDetail.year},
            #{salesPerformanceDetail.month},
            #{salesPerformanceDetail.customerId},
            #{salesPerformanceDetail.amount}
        )
        ]]>
    </insert>

</mapper>
CompositeItemWriterの適用例
<!-- reader using MyBatisCursorItemReader -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- writer MyBatisBatchItemWriter -->
<!-- (8) -->
<bean id="planWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.update"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (9) -->
<bean id="performanceWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (10) -->
<bean id="writer" class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
      <!-- (11)-->
        <list>
            <ref bean="performanceWriter"/>
            <ref bean="planWriter"/>
        </list>
    </property>
</bean>

<!-- (12) -->
<batch:job id="useCompositeItemWriter" job-repository="jobRepository">
    <batch:step id="useCompositeItemWriter.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="salesItemProcessor"
                         writer="writer" commit-interval="3"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
説明

項番

説明

(1)

入力データに対して2つのテーブルを更新するための各エンティティを保持するDTOを出力とするItemProcessorを実装する。
ItemWriterには2つのテーブルを更新するために異なるオブジェクトを渡すことはできないため、更新に必要なオブジェクトを集約するDTOを定義している。

(2)

売上実績(SalesPerformanceDetail)を新規作成するためのエンティティを作成し、DTOに格納する。

(3)

入力データでもある売上計画(SalesPlanDetail)を更新するため、入力データを更新してDTOに格納する。

(4)

売上計画(SalesPlanDetail)を保持するようにDTOに定義する。

(5)

売上実績(SalesPerformanceDetail)を保持するようにDTOに定義する。

(6)

DTOから取得した売上計画(SalesPlanDetail)で、売上計画テーブル(sales_plan_detail)を更新するSQLを定義する。

(7)

DTOから取得した売上実績(SalesPlanDetail)で、売上実績テーブル(sales_performance_detail)を新規作成するSQLを定義する。

(8)

売上計画テーブル(sales_plan_detail)を更新するMyBatisBatchItemWriterを定義する。

(9)

売上実績テーブル(sales_performance_detail)を新規作成するMyBatisBatchItemWriterを定義する。

(10)

(8),(9)を順番に実行するためにCompositeItemWriterを定義する。

(11)

<list>タグ内に(8),(9)を設定する。指定した順番にItemWriterが実行される。

(12)

チャンクのwriter属性に(10)で定義したBeanを指定する。processor属性には(1)のItemProcessorを指定する。

複数データソースへの出力(1ステップ)で説明した org.springframework.data.transaction.ChainedTransactionManagerと同時に使用することで複数データソースに対しての更新もできる。

また、CompositeItemWriterは、ItemWriter実装であれば連結できるので、 MyBatisBatchItemWriterとFlatFileItemWriterを設定することで、データベース出力とファイル出力を同時に行うこともできる。

検索条件の指定方法

データベースアクセスの際に検索条件を指定して検索を行いたい場合は、 Bean定義にてMap形式でジョブパラメータから値を取得し、キーを設定することで検索条件を指定することができる。 以下にジョブパラメータを指定したジョブ起動コマンドの例と実装例を示す。

ジョブパラメータを指定した場合のジョブ起動コマンド
java -cp ${CLASSPATH} org.springframework.batch.core.launch.support.CommandLineJobRunner
 /META-INF/job/job001 job001 year=2017 month=12
MapperXMLの実装例
<!-- (1) -->
<select id="findByYearAndMonth"
    resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
    <![CDATA[
    SELECT
        branch_id AS branchId, year, month, amount
    FROM
        sales_performance_summary
    WHERE
        year = #{year} AND month = #{month}
    ORDER BY
        branch_id ASC
    ]]>
</select>

<!-- omitted -->
Bean定義
<!-- omitted -->

<!-- (2) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues"> <!-- (3) -->
        <map>
            <!-- (4) -->
            <entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
            <entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>

            <!-- omitted -->
        </map>
    </property>
</bean>

<!-- omitted -->
説明
項番 説明

(1)

検索条件を指定して取得するSQLを定義する。

(2)

データベースからデータを取得するためのItemReaderを定義する。

(3)

プロパティ名にparameterValuesを設定する。

(4)

検索条件にする値をジョブパラメータから取得し、キーに設定することで検索条件を指定する。 SQLの引数が数値型で定義されているため、value-typeIntegerに変換して渡している。

StepExectionContextによる検索指定方法について

@beforeStepなどジョブの前処理で検索条件を指定する場合は、StepExecutionContextに設定することで、JobParameters同様に取得することができる。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.1.RELEASE, 2018-3-9