Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

Macchinetta Batch 2.xでは、データベースアクセスの方法として、MyBatis3(以降、「MyBatis」と呼ぶ)を利用する。 MyBatisによるデータベースアクセスの基本的な利用方法は、Macchinetta Server 1.x 開発ガイドラインの以下を参照。

本節では、Macchinetta Batch 2.x特有の使い方を中心に説明する。

Linux環境でのOracle JDBC利用時の留意事項について

Linux環境でのOracle JDBCを利用時は、Oracle JDBCが使用するOSの乱数生成器によるロックが発生する。 そのため、ジョブを並列実行しようとしても逐次実行になる事象や片方のコネクションがタイムアウトする事象が発生する。
本事象に対する2パターンの回避策を以下に示す。

  • Javaコマンド実行時に、システムプロパティで以下を設定する。

    • -Djava.security.egd=file:///dev/urandom

  • ${JAVA_HOME}/jre/lib/security/java.security内のsecurerandom.source=/dev/randomsecurerandom.source=/dev/urandomに変更する。

How to use

Macchinetta Batch 2.xでのデータベースアクセス方法を説明する。

なお、チャンクモデルとタスクレットモデルにおけるデータベースアクセス方法の違いに留意する。

Macchinetta Batch 2.xでのデータベースアクセスは、以下の2つの方法がある。
これらはデータベースアクセスするコンポーネントによって使い分ける。

  1. MyBatis用のItemReaderおよびItemWriterを利用する。

    • チャンクモデルでのデータベースアクセスによる入出力で使用する。

      • org.mybatis.spring.batch.MyBatisCursorItemReader

      • org.mybatis.spring.batch.MyBatisBatchItemWriter

  2. Mapperインタフェースを利用する

    • チャンクモデルでのビジネスロジック処理で使用する。

      • ItemProcessor実装で利用する。

    • タスクレットモデルでのデータベースアクセス全般で使用する。

      • Tasklet実装で利用する。

共通設定

データベースアクセスにおいて必要な共通設定について説明を行う。

データソースの設定

Macchinetta Batch 2.xでは、2つのデータソースを前提としている。 LaunchContextConfig.java/launch-context.xmlでデフォルト設定している2つのデータソースを示す。

表 1. データソース一覧
データソース名 説明

adminDataSource

Spring BatchやMacchinetta Batch 2.xが利用するデータソース
JobRepositoryや非同期実行(DBポーリング)で利用している。

jobDataSource

ジョブが利用するデータソース

JobRepositoryのトランザクション

JobRepositoyと業務ジョブは利用するデータソースおよび、トランザクションが異なる。 そのため、JobRepositoyへの更新は、ユーザが使用するデータベースのトランザクションとは独立したトランザクションで行われる。 つまり、業務処理に対する障害のみがリカバリ対象となる。

以下に、LaunchContextConfig.java/launch-context.xmlと接続情報のプロパティを示す。
これらをユーザの環境に合わせて設定すること。

jp.co.ntt.fw.macchinetta.batch.functionaltest.config.LaunchContextConfig.java
// (1)
@Bean
public BasicDataSource adminDataSource(@Value("${admin.h2.jdbc.driver}") String driverClassName,
                                       @Value("${admin.h2.jdbc.url}") String url,
                                       @Value("${admin.h2.jdbc.username}") String username,
                                       @Value("${admin.h2.jdbc.password}") String password) {
    final BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName(driverClassName);
    dataSource.setUrl(url);
    dataSource.setUsername(username);
    dataSource.setPassword(password);
    dataSource.setMaxTotal(10);
    dataSource.setMinIdle(1);
    dataSource.setMaxWaitMillis(5000);
    dataSource.setDefaultAutoCommit(false);
    return dataSource;
}

// (2)
@Bean
public BasicDataSource jobDataSource(@Value("${jdbc.driver}") String driverClassName,
                                     @Value("${jdbc.url}") String url,
                                     @Value("${jdbc.username}") String username,
                                     @Value("${jdbc.password}") String password) {
    final BasicDataSource basicDataSource = new BasicDataSource();
    basicDataSource.setDriverClassName(driverClassName);
    basicDataSource.setUrl(url);
    basicDataSource.setUsername(username);
    basicDataSource.setPassword(password);
    basicDataSource.setMaxTotal(10);
    basicDataSource.setMinIdle(1);
    basicDataSource.setMaxWaitMillis(5000);
    basicDataSource.setDefaultAutoCommit(false);
    return basicDataSource;
}
resources\META-INF\spring\launch-context.xml
<!-- (1) -->
<bean id="adminDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
      destroy-method="close"
      p:driverClassName="${admin.h2.jdbc.driver}"
      p:url="${admin.h2.jdbc.url}"
      p:username="${admin.h2.jdbc.username}"
      p:password="${admin.h2.jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false"/>

<!-- (2) -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource" 
      destroy-method="close"
      p:driverClassName="${jdbc.driver}"
      p:url="${jdbc.url}"
      p:username="${jdbc.username}"
      p:password="${jdbc.password}"
      p:maxTotal="10"
      p:minIdle="1"
      p:maxWaitMillis="5000"
      p:defaultAutoCommit="false" />
batch-application.properties
# (3)
# Admin DataSource settings.
admin.h2.jdbc.driver=org.h2.Driver
admin.h2.jdbc.url=jdbc:h2:mem:batch;DB_CLOSE_DELAY=-1
admin.h2.jdbc.username=sa
admin.h2.jdbc.password=

# (4)
# Job DataSource settings.
jdbc.driver=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/postgres
jdbc.username=postgres
jdbc.password=postgres
表 2. 説明
項番 説明

(1)

adminDataSourceの定義。(3)の接続情報が設定される。

(2)

jobDataSourceの定義。(4)の接続情報が設定される。

(3)

adminDataSourceで利用するデータベースへの接続情報
この例では、H2を利用している。

(4)

jobDataSourceで利用するデータベースへの接続情報
この例では、PostgreSQLを利用している。

MyBatisの設定

Macchinetta Batch 2.xで、MyBatisの設定をする上で重要な点について説明をする。

バッチ処理を実装する際の重要なポイントの1つとして「大量のデータを一定のリソースで効率よく処理する」が挙げられる。
これに関する設定を説明する。

  • fetchSize

    • 一般的なバッチ処理では、大量のデータを処理する際の通信コストを低減するために、 JDBCドライバに適切なfetchSizeを指定することが必須である。 fetchSizeとは、JDBCドライバとデータベース間とで1回の通信で取得するデータ件数を設定するパラメータである。 この値は出来る限り大きい値を設定することが望ましいが、大きすぎるとメモリを圧迫するため、注意が必要である。 ユーザにてチューニングする必要がある箇所と言える。

    • MyBatisでは、全クエリ共通の設定としてdefaultFetchSizeを設定することができ、さらにクエリごとのfetchSize設定で上書きできる。

  • executorType

    • 一般的なバッチ処理では、同一トランザクション内で同じSQLを全データ件数/fetchSizeの回数分実行することになる。 この際、都度ステートメントを作成するのではなく再利用することで効率よく処理できる。

    • MyBatisの設定における、defaultExecutorTypeREUSEを設定することでステートメントの再利用ができ、 処理スループット向上に寄与する。

    • 大量のデータを一度に更新する場合、JDBCのバッチ更新を利用することで性能向上が期待できる。
      そのため、MyBatisBatchItemWriterで利用するSqlSessionTemplateには、
      executorTypeに(REUSEではなく)BATCHが設定されている。

Macchinetta Batch 2.xでは、同時に2つの異なるExecutorTypeが存在する。 一方のExecutorTypeで実装する場合が多いと想定するが、併用時は特に注意が必要である。 この点は、Mapperインタフェース(入力)にて詳しく説明する。

MyBatisのその他のパラメータ

その他のパラメータに関しては以下リンクを参照し、 アプリケーションの特性にあった設定を行うこと。
https://mybatis.org/mybatis-3/configuration.html

以下にデフォルト提供されている設定を示す。

jp.co.ntt.fw.macchinetta.batch.functionaltest.config.LaunchContextConfig.java
// (1)
@Bean
public SqlSessionFactory jobSqlSessionFactory(@Qualifier("jobDataSource") DataSource jobDataSource) throws Exception {
    final SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(jobDataSource);

    final org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
    configuration.setLocalCacheScope(LocalCacheScope.STATEMENT);
    configuration.setLazyLoadingEnabled(true);
    configuration.setAggressiveLazyLoading(false);
    configuration.setDefaultFetchSize(1000);
    configuration.setDefaultExecutorType(ExecutorType.REUSE);
     sqlSessionFactoryBean.setConfiguration(configuration);
    return sqlSessionFactoryBean.getObject();
}

// (2)
@Bean
public SqlSessionTemplate batchModeSqlSessionTemplate(@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
    return new SqlSessionTemplate(jobSqlSessionFactory, ExecutorType.BATCH);
}
META-INF/spring/launch-context.xml
<bean id="jobSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="jobDataSource">
    <!-- (1) -->
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

<!-- (2) -->
<bean id="batchModeSqlSessionTemplate"
      class="org.mybatis.spring.SqlSessionTemplate"
      c:sqlSessionFactory-ref="jobSqlSessionFactory"
      c:executorType="BATCH"/>
表 3. 説明
項番 説明

(1)

MyBatisの各種設定を行う。
デフォルトでは、fetchSizeを1000に設定している。

(2)

MyBatisBatchItemWriterのために、executorTypeBATCHSqlSessionTemplateを定義している。

adminDataSourceを利用したSqlSessionFactoryの定義箇所について

同期実行をする場合は、adminDataSourceを利用したSqlSessionFactoryは不要であるため、定義がされていない。 非同期実行(DBポーリング)を利用する場合、ジョブ要求テーブルへアクセスするために AsyncBatchDaemonConfig.java/META-INF/spring/async-batch-daemon.xml内に定義されている。

jp.co.ntt.fw.macchinetta.batch.functionaltest.config.AsyncBatchDaemonConfig.java
@Bean
public SqlSessionFactory adminSqlSessionFactory(@Qualifier("adminDataSource") DataSource adminDataSource,
                                                DatabaseIdProvider databaseIdProvider) throws Exception {
    final SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(adminDataSource);
    sqlSessionFactoryBean.setDatabaseIdProvider(databaseIdProvider);
    final org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
    configuration.setLocalCacheScope(LocalCacheScope.STATEMENT);
    configuration.setLazyLoadingEnabled(true);
    configuration.setAggressiveLazyLoading(false);
    configuration.setDefaultFetchSize(1000);
    configuration.setDefaultExecutorType(ExecutorType.REUSE);
    sqlSessionFactoryBean.setConfiguration(configuration);
    return sqlSessionFactoryBean.getObject();
}
META-INF/spring/async-batch-daemon.xml
<bean id="adminSqlSessionFactory"
      class="org.mybatis.spring.SqlSessionFactoryBean"
      p:dataSource-ref="adminDataSource" >
    <property name="configuration">
        <bean class="org.apache.ibatis.session.Configuration"
              p:localCacheScope="STATEMENT"
              p:lazyLoadingEnabled="true"
              p:aggressiveLazyLoading="false"
              p:defaultFetchSize="1000"
              p:defaultExecutorType="REUSE"/>
    </property>
</bean>

Mapper XMLの定義

Macchinetta Batch 2.x特有の説明事項はないので、Macchinetta Server 1.x 開発ガイドラインの データベースアクセス処理の実装を参照。

MyBatis-Springの設定

MyBatis-Springが提供するItemReaderおよびItemWriterを使用する場合、MapperのConfigで使用するMapper XMLを設定する必要がある。

設定方法としては、以下の2つが考えられる。

  1. 共通設定として、すべてのジョブで使用するMapper XMLを登録する。

    • LaunchContextConfig.java/launch-context.xmlにすべてのMapper XMLを記述することになる。

  2. 個別設定として、ジョブ単位で利用するMapper XMLを登録する。

    • jobs/配下のBean定義に、個々のジョブごとに必要なMapper XMLを記述することになる。

基本的な設定方法については、Macchinetta Server 1.x 開発ガイドラインの MyBatis-Springの設定を参照。

共通設定をした場合の性能面での弊害

共通設定をしてしまうと、同期実行をする際に実行するジョブのMapper XMLだけでなく、その他のジョブが使用するMapper XMLも読み込んでしまうために以下に示す弊害が生じる。

  • ジョブの起動までに時間がかかる

  • メモリリソースの消費が大きくなる

これを回避するために、Macchinetta Batch 2.xでは、個別設定として、個々のジョブ定義でそのジョブが必要とするMapper XMLだけを指定する設定方法を採用する。 この方法においては、<mybatis:scan>によるスキャン範囲を最小限にするため、ジョブごとに必要となるMapper XMLを分けて格納するパッケージ構成であることが望ましい。

Macchinetta Batch 2.xでは、複数のSqlSessionFactoryおよびSqlSessionTemplateが定義されているため、 どれを利用するか明示的に指定する必要がある。
基本的にはjobSqlSessionFactoryを指定すればよい。

以下に設定例を示す。

org.terasoluna.batch.jobs.common.JobSalesPlan01Config.java
@MapperScan(value = "org.terasoluna.batch.functionaltest.app.repository.plan", sqlSessionFactoryRef = "jobSqlSessionFactory")
jobSalesPlan01.xml
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
    factory-ref="jobSqlSessionFactory"/>
表 4. 説明
項番 説明

(1)

@MapperScanのvalue属性/<mybatis:scan>要素のbase-package属性にMapper XMLをスキャンするパッケージを設定する。 指定するのはこのジョブが必要とするMapper XMLが直下に格納されているパッケージである。
sqlSessionFactoryRef属性/factory-ref属性にはjobSqlSessionFactoryを設定する。

入力

データベースアクセスの入力について以下のとおり説明する。

MyBatisCursorItemReader

ここではItemReaderとして MyBatis-Springが提供するMyBatisCursorItemReaderによるデータベースアクセスについて説明する。

機能概要

MyBatis-Springが提供するItemReaderとして下記の2つが存在する。

  • org.mybatis.spring.batch.MyBatisCursorItemReader

  • org.mybatis.spring.batch.MyBatisPagingItemReader

MyBatisPagingItemReaderは、Macchinetta Server 1.x 開発ガイドラインの Entityのページネーション検索(SQL絞り込み方式)で 説明している仕組みを利用したItemReaderである。
一定件数を取得した後に再度SQLを発行するため、データの一貫性が保たれない可能性がある。 そのため、バッチ処理で利用するには危険であることから、Macchinetta Batch 2.xでは原則使用しない。
Macchinetta Batch 2.xではMyBatisと連携して、Cursorを利用し取得データを返却するMyBatisCursorItemReaderのみを利用する。

Macchinetta Batch 2.xでは、MyBatis-Springの設定で説明したとおり、 mybatis:scanによって動的にMapper XMLを登録する方法を採用している。 そのため、Mapper XMLに対応するインタフェースを用意する必要がある。 詳細については、Macchinetta Server 1.x 開発ガイドラインの データベースアクセス処理の実装を参照。

MyBatisCursorItemReaderのトランザクションについて

MyBatisCursorItemReaderselectステートメントを発行する際に、ステップ処理の一部として生成された他のトランザクション(チャンクモデルにおけるチャンクごとのトランザクション)には参加しない。 これは、MyBatisCursorItemReaderが異なるコネクションを利用しているためである。そのため、適切に排他制御を行っていない場合、デッドロックが発生する可能性があるため留意すること。

MyBatisCursorItemReaderを利用してデータベースを参照するための実装例を処理モデルごとに以下に示す。

チャンクモデルにおける利用方法

チャンクモデルでMyBatisCursorItemReaderを利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReaderの実装例と、実装したMyBatisCursorItemReaderを利用してデータベースから取得したデータを処理するItemProcessorの実装例を説明する。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = {"org.terasoluna.batch.functionaltest.app.common",
    "org.terasoluna.batch.functionaltest.ch05.transaction.component",
    "org.terasoluna.batch.functionaltest.ch05.transaction.listener"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.ch05.transaction.repository.admin", sqlSessionFactoryRef = "adminSqlSessionFactory")
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.app.repository.mst", sqlSessionFactoryRef = "jobSqlSessionFactory") // (1)
public class OutputAllCustomerList01Config {

    // (2)
    @Bean
    @StepScope
    public MyBatisCursorItemReader<Customer> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<Customer>()
                .sqlSessionFactory(jobSqlSessionFactory) // (4)
                .queryId(
                        "org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll") // (3)
                .build();
    }
    // omitted
    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       ItemReader<Customer> reader,
                       ItemWriter<Customer> writer,
                       RetrieveBranchFromContextItemProcessor processor,
                       BranchMasterReadStepListener listener) {
        return new StepBuilder("outputAllCustomerList01.step01",
                jobRepository)
                .<Customer, Customer> chunk(10, transactionManager)
                .reader(reader) // (5)
                .processor(processor)
                .listener(listener)
                .writer(writer)
                .build();
    }

    @Bean
    public Job outputAllCustomerList01(JobRepository jobRepository,
                                            Step step01) {
        return new JobBuilder("outputAllCustomerList01",jobRepository)
                .start(step01)
                .build();
    }
}
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<batch:job id="outputAllCustomerList01" job-repository="jobRepository">
    <batch:step id="outputAllCustomerList01.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- (5) -->
            <batch:chunk reader="reader"
                         processor="retrieveBranchFromContextItemProcessor"
                         writer="writer" commit-interval="10"/>
            <!-- omitted -->
        </batch:tasklet>
    </batch:step>
</batch:job>
Mapper XML
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository">

    <!-- omitted -->

    <!-- (7) -->
    <select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.mst.Customer">
        <![CDATA[
        SELECT
            customer_id AS customerId,
            customer_name AS customerName,
            customer_address AS customerAddress,
            customer_tel AS customerTel,
            charge_branch_id AS chargeBranchId,
            create_date AS createDate,
            update_date AS updateDate
        FROM
            customer_mst
        ORDER by
            charge_branch_id ASC, customer_id ASC
        ]]>
    </select>

    <!-- omitted -->
</mapper>
Mapperインタフェース
public interface CustomerRepository {
    // (8)
    Cursor<Customer> findAll();

    // omitted.
}
ItemProcessor実装
@Component
@Scope("step")
public class RetrieveBranchFromContextItemProcessor implements ItemProcessor<Customer, CustomerWithBranch> {
    // omitted.

    @Override
    public CustomerWithBranch process(Customer item) throws Exception { // (9)
        CustomerWithBranch newItem = new CustomerWithBranch(item);
        newItem.setBranch(branches.get(item.getChargeBranchId())); // (10)
        return newItem;
    }
}
表 5. 説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisCursorItemReaderを定義する。

(3)

MyBatisCursorItemReaderBuilderのqueryIdメソッド/<bean>要素のqueryId属性に、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

MyBatisCursorItemReaderBuilderのsqlSessionFactoryメソッド/<bean>要素のsqlSessionFactory-ref属性に、アクセスするデータベースのSqlSessionFactoryを指定する。

(5)

(2)で定義したMyBatisCursorItemReaderStepBuilderのreaderメソッド/<batch:chunk>要素のreader属性に指定する。

(6)

Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。

(7)

SQLを定義する。

(8)

(7)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。

(9)

引数として受け取るitemの型は、 このクラスで実装しているItemProcessorインタフェースの型引数で指定した入力オブジェクトの型であるCustomerとする。

(10)

引数に渡されたitemを利用して各カラムの値を取得する。

タスクレットモデルにおける利用方法

タスクレットモデルでMyBatisCursorItemReaderを利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReaderの実装例と、実装したMyBatisCursorItemReaderを利用してデータベースから取得したデータを処理するTaskletの実装例を説明する。

タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。

タスクレットモデルではチャンクモデルと異なり、Tasklet実装においてリソースを明示的にオープン/クローズする必要がある。 また、入力データの読み込みも明示的に行う。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = {"org.terasoluna.batch.functionaltest.app.common",
    "org.terasoluna.batch.functionaltest.ch07.jobmanagement"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.app.repository.plan", sqlSessionFactoryRef = "jobSqlSessionFactory") // (1)
public class CustomizedJobExitCodeTaskletJobConfig {

    // (2)
    @Bean
    public MyBatisCursorItemReader<SalesPlanSummary> summarizeDetails(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<SalesPlanSummary>()
                .sqlSessionFactory(jobSqlSessionFactory) // (4)
                .queryId(
                        "org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.summarizeDetails") // (3)
                .build();
    }
    // omitted
    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       CheckAmountTasklet tasklet) {
        return new StepBuilder("customizedJobExitCodeTaskletJob.step01",
                jobRepository)
                .tasklet(tasklet, transactionManager)
                .build();
    }

    @Bean
    public Job customizedJobExitCodeTaskletJob(JobRepository jobRepository,
                                                 Step step01,
                                                 JobExitCodeChangeListener jobExitCodeChangeListener) {
        return new JobBuilder("customizedJobExitCodeTaskletJob",
                jobRepository)
                .start(step01)
                .listener(jobExitCodeChangeListener)
                .build();
    }
    // omitted
}
<!-- (1) -->
<mybatis:scan
    base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
    factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="summarizeDetails" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
          p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.summarizeDetails"
          p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<batch:job id="customizedJobExitCodeTaskletJob" job-repository="jobRepository">
    <batch:step id="customizedJobExitCodeTaskletJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager" ref="checkAmountTasklet"/>
    </batch:step>
    <!-- omitted -->
</batch:job>
Mapper XML
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- omitted -->

    <!-- (6) -->
    <select id="summarizeDetails" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanSummary">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, SUM(amount) AS amount
        FROM
            sales_plan_detail
        GROUP BY
            branch_id, year, month
        ORDER BY
            branch_id ASC, year ASC, month ASC
         ]]>
    </select>

</mapper>
Mapperインタフェース
public interface SalesPlanDetailRepository {
    // (7)
    Cursor<SalesPlanSummary> summarizeDetails();

    // omitted.
}
Tasklet実装
@Component
@Scope("step")
public class CheckAmountTasklet implements Tasklet {
    // (8)
    @Inject
    ItemStreamReader<SalesPlanSummary> reader;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        SalesPlanSummary item = null;

        List<SalesPlanSummary> items = new ArrayList<>(CHUNK_SIZE);
        int errorCount = 0;

        try {
            // (9)
            reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());
            while ((item = reader.read()) != null) { // (10)
                if (item.getAmount().signum() == -1) {
                    logger.warn("amount is negative. skip item [item: {}]", item);
                    errorCount++;
                    continue;
                }

                // omitted.
            }

            // catch block is omitted.
        } finally {
            // (11)
            reader.close();
        }
    }
    // omitted.

    return RepeatStatus.FINISHED;
}
表 6. 説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisCursorItemReaderを定義する。

(3)

MyBatisCursorItemReaderBuilderのqueryIdメソッド/<bean>要素のqueryId属性に、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

MyBatisCursorItemReaderBuilderのsqlSessionFactoryメソッド/<bean>要素のsqlSessionFactory-ref属性に、アクセスするデータベースのSqlSessionFactoryを指定する。

(5)

Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。

(8)

@Injectアノテーションを付与して、ItemStreamReaderの実装をインジェクションする。
対象となるリソースへのオープン・クローズを実施する必要があるため、 ItemReaderにリソースのopen/closeメソッドを追加したItemStreamReaderインタフェースで実装をインジェクションする。

(9)

入力リソースをオープンする。

(10)

入力データを1件ずつ読み込む。

(11)

入力リソースをクローズする。
リソースは必ずクローズすること。なお、ここで例外が発生した場合、タスクレット全体のトランザクションがロールバックされ、 例外のスタックトレースを出力しジョブが異常終了する。そのため、必要に応じて例外処理を実装すること。

検索条件の指定方法

データベースアクセスの際に検索条件を指定して検索を行いたい場合は、 Bean定義にてMap形式でジョブパラメータから値を取得し、キーを設定することで検索条件を指定することができる。 以下にジョブパラメータを指定したジョブ起動コマンドの例と実装例を示す。

ジョブパラメータを指定した場合のジョブ起動コマンド
$ java -cp ${CLASSPATH} org.springframework.batch.core.launch.support.CommandLineJobRunner /META-INF/job/job001 job001 year=2017 month=12
MapperXMLの実装例
<!-- (1) -->
<select id="findByYearAndMonth"
    resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
    <![CDATA[
    SELECT
        branch_id AS branchId, year, month, amount
    FROM
        sales_performance_summary
    WHERE
        year = #{year} AND month = #{month}
    ORDER BY
        branch_id ASC
    ]]>
</select>

<!-- omitted -->
// omitted
// (2)
@Bean
@StepScope
public MyBatisCursorItemReader<SalesPerformanceSummary> reader(
        @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
        @Value("#{jobParameters['year']}") Integer year,
        @Value("#{jobParameters['month']}") Integer month,
        @Value("#{stepExecutionContext['dataSize']}") Integer dataSize,
        @Value("#{stepExecutionContext['offset']}") Integer offset) {
    Map<String, Object> parameterValues = new HashMap<>();
    parameterValues.put("year", year); // (4)
    parameterValues.put("month", month); // (4)
    // omitted
    return new MyBatisCursorItemReaderBuilder<SalesPerformanceSummary>()
            .sqlSessionFactory(jobSqlSessionFactory)
            .queryId(
                    "org.terasoluna.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth")
            .parameterValues(parameterValues) // (3)
            .build();
}
// omited
<!-- omitted -->

<!-- (2) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues"> <!-- (3) -->
        <map>
            <!-- (4) -->
            <entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
            <entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>

            <!-- omitted -->
        </map>
    </property>
</bean>

<!-- omitted -->
表 7. 説明
項番 説明

(1)

検索条件を指定して取得するSQLを定義する。

(2)

データベースからデータを取得するためのItemReaderを定義する。

(3)

MyBatisCursorItemReaderBuilderのparameterValuesメソッド/<property>要素のname属性にparameterValuesを設定する。

(4)

検索条件にする値をジョブパラメータから取得し、キーに設定することで検索条件を指定する。 SQLの引数が数値型で定義されているため、Integerに変換して渡している。

StepExectionContextによる検索指定方法について

@BeforeStepなどジョブの前処理で検索条件を指定する場合は、StepExecutionContextに設定することで、JobParameters同様に取得することができる。

Mapperインタフェース(入力)

ItemReader以外でデータベースの参照を行うにはMapperインタフェースを利用する。
ここではMapperインタフェースを利用したデータベースの参照について説明する。

機能概要

Mapperインタフェースを利用するにあたって、Macchinetta Batch 2.xでは以下の制約を設けている。

表 8. Mapperインタフェースの利用可能な箇所
処理 ItemProcessor Tasklet リスナー

参照

利用可

利用可

利用可

更新

条件付で利用可

利用可

利用不可

チャンクモデルにおける利用方法

チャンクモデルでMapperインタフェースを利用してデータベースを参照する実装例を以下に示す。

ItemProcessorでの実装例
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {

        // (2)
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        // omitted.

        return writeItem;
    }
}
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan({ "org.terasoluna.batch.functionaltest.app.common",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess" })
@MapperScan(basePackages = {
        "org.terasoluna.batch.functionaltest.app.repository",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository" }, sqlSessionTemplateRef = "batchModeSqlSessionTemplate") // (3)
public class DBAccessByItemProcessorConfig {

    // (4)
    @Bean
    public MyBatisCursorItemReader<SalesPerformanceDetail> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<SalesPerformanceDetail>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .queryId(
                        "org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll")
                .build();
    }
    // omitted
<!-- (3) -->
<mybatis:scan
base-package="org.terasoluna.batch.functionaltest.app.repository"
template-ref="batchModeSqlSessionTemplate"/>

<!-- (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- omitted job definition -->

MapperインタフェースとMapper XMLについてはMyBatisCursorItemReader で説明している内容以外に特筆すべきことがないため省略する。

表 9. 説明

項番

説明

(1)

MapperインタフェースをInjectする。

(2)

Mapperインタフェースで検索処理を実行する。

(3)

Mapper XMLの登録を行う。
@MapperScanのsqlSessionTemplateRef属性/<mybatis:scan>要素のtemplate-ref属性BATCH設定されているbatchModeSqlSessionTemplateを指定することで、 ItemProcessorでのデータベースアクセスはBATCHとなる。

(4)

MyBatisCursorItemReaderを定義する。
MyBatisCursorItemReaderBuilderのsqlSessionFactoryメソッド/<bean>要素のsqlSessionFactory-ref属性に、アクセスするデータベースのSqlSessionFactoryを指定する。

MyBatisCursorItemReader設定の補足

以下に示す定義例のように、MyBatisCursorItemReaderとMyBatisBatchItemWriterで異なるExecutorTypeを使用しても問題ない。 これは、MyBatisCursorItemReaderによるトランザクションはItemWriterのトランザクションとは別であるからである。

@Bean
public MyBatisCursorItemReader reader(
        @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
    return new MyBatisCursorItemReaderBuilder()
            .sqlSessionFactory(jobSqlSessionFactory)
            .queryId("xxx")
            .build();
}

@Bean
public MyBatisBatchItemWriter writer(
        SqlSessionTemplate batchModeSqlSessionTemplate) {
    return new MyBatisBatchItemWriterBuilder()
            .statementId("yyy")
            .sqlSessionTemplate(batchModeSqlSessionTemplate)
            .build();
}
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="xxx"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="yyy"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
タスクレットモデルにおける利用方法

タスクレットモデルでMapperインタフェースを利用してデータベースを参照する実装例を以下に示す。
大量データを処理する場合、Cursorを利用して取得データを1件ずつ処理することでメモリ容量をひっ迫せずに効率良く処理することができる。 Macchinetta Batch 2.xでは、タスクレットモデルでMapperインタフェースを利用してデータベースアクセスする場合はCursorを利用することを基本とする。
Cursor同様に大量データを処理するうえで、ResultHandlerを利用することも有効である。 ResultHandlerについては、Macchinetta Server 1.x 開発ガイドラインの ResultHandlerの実装を参照。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan({ "org.terasoluna.batch.functionaltest.app.common",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess" })
@MapperScan(basePackages = {
        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository",
        "org.terasoluna.batch.functionaltest.app.repository" }, sqlSessionFactoryRef = "jobSqlSessionFactory") // (1)
public class JobSalesPlanCursorTaskletConfig {

    @Bean
    public Step step01(JobRepository jobRepository,
                       SalesPlanCursorTasklet tasklet,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
        return new StepBuilder("jobSalesPlanCursorTasklet.step01",
                jobRepository)
                .tasklet(tasklet, transactionManager)
                .build();
    }

    @Bean
    public Job jobSalesPlanCursorTasklet(JobRepository jobRepository,
                                         Step step01) {
        return new JobBuilder("jobSalesPlanCursorTasklet", jobRepository)
                .start(step01)
                .build();
    }
}
<!-- (1) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository;jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
        factory-ref="jobSqlSessionFactory"/>

    <batch:job id="jobSalesPlanCursorTasklet" job-repository="jobRepository">
        <batch:step id="jobSalesPlanCursorTasklet.step01">
            <batch:tasklet transaction-manager="jobTransactionManager" ref="salesPlanCursorTasklet"/>
        </batch:step>
    </batch:job>
Mapper XML
<!-- (2) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">

    <!-- omitted -->

    <!-- (3) -->
    <select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, customer_id AS customerId, amount
        FROM
            sales_plan_detail
        ORDER BY
            branch_id ASC, year ASC, month ASC, customer_id ASC
        ]]>
    </select>

</mapper>
Mapperインタフェース
public interface SalesRepository {
    // (4)
    Cursor<SalesPlanDetail> findAll();
}
Taskletでの実装例
@Component
public class SalesPlanCursorTasklet implements Tasklet {

    // omitted.

    // (5)
    @Inject
    SalesRepository repository;

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        try (Cursor<SalesPlanDetail> cursor = repository.findAll()) {  // (6)
            for (SalesPlanDetail salesPlan : cursor) { // (7)
                // omitted.
            }
        }

        return RepeatStatus.FINISHED;
    }
}
表 10. 説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。

(3)

SQLを定義する。

(4)

(3)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。
メソッドの戻り値の型はorg.apache.ibatis.cursor.Cursorとする。

(5)

MapperインタフェースをInjectする。

(6)

Cursorから1件ずつデータを取得する。
処理の途中でエラーが発生した場合、MyBatisはCursorのクローズを保証しないため、try-with-resources文によりCursorを確実にクローズする。

(7)

拡張for文によりCursorをフェッチする。

Cursor利用時の注意点

Cursorで読み取り中のテーブルを他の処理で更新をかけると、 読み取り済みの古いデータを処理してしまうことになり、データの不整合が発生する可能性があるため注意すること。 これを防ぐために安易にロックをかけるとデッドロックを引き起こしかねないため、 排他制御を参照し適切にロックをかけるか、 テーブルアクセスが競合しないようにジョブ設計することを検討してほしい。

出力

データベースへの出力は、MyBatisBatchItemWriterもしくはMapperインタフェースを利用することで実現できる。
MyBatisBatchItemWriterとMapperインタフェースによる出力を併用した場合、MyBatisBatchItemWriterでエラーが発生する。 よって、Mapperインタフェースを用いた出力はチャンクモデルでの利用に適さないため、利用方法を記載していない。
詳細はMapperインタフェース(出力)の機能概要で後述するため、そちらを参照されたい。

上記の内容について以下のとおり説明する。

MyBatisBatchItemWriter

ここではItemWriterとして MyBatis-Springが提供するMyBatisBatchItemWriterによるデータベースアクセスについて説明する。

機能概要

MyBatis-Springが提供するItemWriterは以下の1つのみである。

  • org.mybatis.spring.batch.MyBatisBatchItemWriter

MyBatisBatchItemWriterはMyBatisと連携してJDBCのバッチ更新機能を利用するItemWriterであり、 大量のデータを一度に更新する場合に性能向上が期待できる。
基本的な設定については、MyBatisCursorItemReaderと同じである。 MyBatisBatchItemWriterでは、MyBatisの設定で説明した batchModeSqlSessionTemplateを指定する必要がある。

MyBatisBatchItemWriterを利用してデータベースに登録する実装例を以下に示す。

チャンクモデルにおける利用方法

チャンクモデルでMyBatisBatchItemWriterを利用してデータベースに登録する実装例を以下に示す。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan("org.terasoluna.batch.functionaltest.app.common")
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.app.repository.performance",
                            sqlSessionFactoryRef = "jobSqlSessionFactory") // (1)
public class DBAccessByMaybatisScanConfig {
    // omitted
    // (2)
    @Bean
    public MyBatisBatchItemWriter<SalesPerformanceSummary> writer(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            SqlSessionTemplate batchModeSqlSessionTemplate) {
        return new MyBatisBatchItemWriterBuilder<SalesPerformanceSummary>()
                .sqlSessionFactory(jobSqlSessionFactory) // (4)
                .statementId(
                        "org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceSummaryRepository.create") // (3)
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .build();
    }

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       ItemReader<SalesPerformanceSummary> reader,
                       ItemWriter<SalesPerformanceSummary> writer) {
        return new StepBuilder("DBAccessByMaybatisScan.step01",
                jobRepository)
                .<SalesPerformanceSummary, SalesPerformanceSummary> chunk(10, transactionManager)
                .reader(reader)
                .writer(writer) // (5)
                .build();
    }

    @Bean
    public Job DBAccessByMaybatisScan(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
        return new JobBuilder("DBAccessByMaybatisScan",
                jobRepository).start(step01).listener(listener).build();
    }
}
<!-- (1) -->
<mybatis:scan base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance"
                  factory-ref="jobSqlSessionFactory"/>

<!-- omitted -->

<!-- (2) (3) (4) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceSummaryRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<batch:job id="DBAccessByMaybatisScan" job-repository="jobRepository">
    <batch:step id="DBAccessByMaybatisScan.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         writer="writer" commit-interval="10"/> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
    <!-- omitted -->
</batch:job>
Mapper XML
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceSummaryRepository">

    <!-- (7) -->
    <insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
        <![CDATA[
        INSERT INTO
            sales_performance_summary(branch_id, year, month, amount)
        VALUES (
            #{branchId}, #{year}, #{month}, #{amount}
        )
        ]]>
    </insert>

    <!-- omitted -->
</mapper>
Mapperインタフェース
public interface SalesPerformanceSummaryRepository {

    // (8)
    void create(SalesPerformanceSummary salesPerformanceSummary);

    // omitted.
}
表 11. 説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisBatchItemWriterを定義する。

(3)

MyBatisBatchItemWriterBuilderのstatementIdメソッド/<bean>要素のstatementId属性に、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

MyBatisBatchItemWriterBuilderのsqlSessionFactoryメソッド/<bean>要素のsqlSessionTemplate-ref属性に、アクセスするデータベースのSessionTemplateを指定する。
指定するSessionTemplateは、executorTypeBATCHに設定されていることが必須である。

(5)

(2)で定義したMyBatisBatchItemWriterStepBuilderのwriterメソッド/<batch:chunk>要素のwriter属性に指定する。

(6)

Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。

(7)

SQLを定義する。

(8)

(7)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。

タスクレットモデルにおける利用方法

タスクレットモデルでMyBatisBatchItemWriterを利用してデータベースに登録する実装例を以下に示す。
ここでは、MyBatisBatchItemWriterの実装例と実装したMyBatisBatchItemWriterを利用するTaskletの実装例を説明する。 タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = {"org.terasoluna.batch.functionaltest.app.plan",
    "org.terasoluna.batch.functionaltest.ch04.listener"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.app.repository.plan", sqlSessionFactoryRef = "jobSqlSessionFactory") // (1)
public class TaskletJobWithListenerConfig {
    // omitted

    // (2)
    @Bean
    public MyBatisBatchItemWriter<SalesPlanDetail> writer(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            SqlSessionTemplate batchModeSqlSessionTemplate) {
        return new MyBatisBatchItemWriterBuilder<SalesPlanDetail>()
                .sqlSessionFactory(jobSqlSessionFactory) // (4)
                .statementId(
                        "org.terasoluna.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create") // (3)
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .build();
    }

    @Bean
    public Step jobScopeStep01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       SalesPlanDetailRegisterTasklet salesPlanDetailRegisterTasklet) {
        return new StepBuilder("taskletJobWithListenerWithinJobScope.step01",
                jobRepository)
                .tasklet(salesPlanDetailRegisterTasklet, transactionManager)
                .build();
    }

    @Bean
    public Job taskletJobWithListenerWithinJobScope(JobRepository jobRepository,
                                            Step jobScopeStep01,
                                            @Qualifier("allProcessListener") JobExecutionListener listener) {
        return new JobBuilder("taskletJobWithListenerWithinJobScope",
                jobRepository)
                .start(jobScopeStep01)
                .listener(listener)
                .build();
    }

    // omitted
}
<!-- (1) -->
<mybatis:scan base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
            factory-ref="jobSqlSessionFactory"/>

<!-- (2) (3) (4) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
          p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
          p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<batch:job id="taskletJobWithListenerWithinJobScope" job-repository="jobRepository">
    <batch:step id="taskletJobWithListenerWithinJobScope.step01">
        <batch:tasklet transaction-manager="jobTransactionManager" ref="salesPlanDetailRegisterTasklet"/>
    </batch:step>
    <!-- omitted. -->
</batch:job>
Mapper XML
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">

    <!-- (6) -->
    <insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        INSERT INTO
            sales_plan_detail(branch_id, year, month, customer_id, amount)
        VALUES (
            #{branchId}, #{year}, #{month}, #{customerId}, #{amount}
        )
        ]]>
    </insert>

    <!-- omitted -->
</mapper>
Mapperインタフェース
public interface SalesPlanDetailRepository {
    // (7)
    void create(SalesPlanDetail salesPlanDetail);

    // omitted.
}
Tasklet実装
@Component
@Scope("step")
public class SalesPlanDetailRegisterTasklet implements Tasklet {

    // omitted.

    // (8)
    @Inject
    ItemWriter<SalesPlanDetail> writer;

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        SalesPlanDetail item = null;

        try {
            reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());

            List<SalesPlanDetail> items = new ArrayList<>(); // (9)

            while ((item = reader.read()) != null) {

                items.add(processor.process(item)); // (10)
                if (items.size() == 10) {
                    writer.write(new Chunk(items)); // (11)
                    items.clear();
                }
            }
            // omitted.
        }
        // omitted.

        return RepeatStatus.FINISHED;
    }
}

MapperインタフェースとMapper XMLについてはMyBatisBatchItemWriter で説明している内容以外に特筆すべきことがないため省略する。

表 12. 説明
項番 説明

(1)

Mapper XMLの登録を行う。

(2)

MyBatisBatchItemWriterを定義する。

(3)

MyBatisBatchItemWriterBuilderのstatementIdメソッド/<bean>要素のstatementId属性に、(7)で定義しているSQLのIDを(6)のnamespace + <メソッド名>で指定する。

(4)

MyBatisBatchItemWriterBuilderのsqlSessionFactoryメソッド/<bean>要素のsqlSessionTemplate-ref属性に、アクセスするデータベースのSessionTemplateを指定する。
指定するSessionTemplateは、executorTypeBATCHに設定されていることが必須である。

(5)

Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。

(6)

SQLを定義する。

(7)

(6)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。

(8)

@Injectアノテーションを付与して、ItemWriterの実装をインジェクションする。
ItemReaderとは異なり、データベースの更新ではリソースのopen/closeが不要であるため、ItemStreamWriterではなくItemWriterインタフェースにインジェクションする。

(9)

出力データを格納するリストを定義する。
ItemWriterでは一定件数のデータをまとめて出力する。

(10)

リストに更新データを設定する。

(11)

更新データを設定したリストを引数に指定して、データベースへ出力する。

Mapperインタフェース(出力)

ItemWriter以外でデータベースの更新を行うにはMapperインタフェースを利用する。
ここではMapperインタフェースを利用したデータベースの更新について説明する。

機能概要

Mapperインタフェースを利用してデータベースアクセスするうえでのMacchinetta Batch 2.xとしての制約はMapperインタフェース(入力)を参照。
さらにItemProcessor、TaskletでMapperインタフェースを利用したデータベースの更新を行う際には以下の制約がある。

ItemProcessorでの制約

ItemWriterにMyBatisBatchItemWriterを利用する場合、ItemProcessorではMapperインタフェースを利用してデータベースの更新処理を行うことができないという制約がある。
これは、MyBatisBatchItemWriterがSQL実行後に自身が発行したSQLによる更新処理が行われたかをチェックしており、 その際に同一トランザクション内での他の更新処理を検知するとエラーを発生させることによるものである。
よって、ItemProcessorでMapperインタフェースを利用したデータベースの更新はできず、参照のみが可能となる。
ItemProcessor内で更新処理を行いたいケースとして、「特定の条件に当てはまる入力データについて、 ItemWriterとは異なる更新処理をするためにItemProcessorで個別に更新処理を行う」などが考えられるが、上記の制約によりこのような利用はできない。
代替手段として、後述のタスクレットモデルによる実装、もしくはItemProcessor、ItemWriterでの更新をステップやジョブで分離したバッチ処理の設計を検討されたい。

MyBatisBatchItemWriterのエラーチェックを無効化する設定ができるが、予期せぬ動作が起きる可能性があるため無効化は禁止する。

Taskletでの制約

Taskletでは、Mapperインタフェースを利用することが基本であるため、ItemProcessorのような影響はない。
MyBatisBatchItemWriterをInjectして利用することも考えられるが、その場合はMapperインタフェース自体を BATCH設定で処理すればよい。つまり、Taskletでは、MyBatisBatchItemWriterをInjectして使う必要は基本的にない。

タスクレットモデルにおける利用方法

タスクレットモデルでMapperインタフェースを利用してデータベースを更新(登録)する実装例を以下に示す。

Taskletでの実装例
@Component
public class OptimisticLockTasklet implements Tasklet {

    // (1)
    @Inject
    ExclusiveControlRepository repository;

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        Branch branch = repository.branchFindOne(branchId);

        // (2)
        ExclusiveBranch exclusiveBranch = new ExclusiveBranch();
        exclusiveBranch.setBranchId(branch.getBranchId());
        exclusiveBranch.setBranchName(branch.getBranchName() + " - " + identifier);
        exclusiveBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
        exclusiveBranch.setBranchTel(branch.getBranchTel());
        exclusiveBranch.setCreateDate(branch.getUpdateDate());
        exclusiveBranch.setUpdateDate(new Timestamp(clock.millis()));
        exclusiveBranch.setOldBranchName(branch.getBranchName());

        // (3)
        int result = repository.branchExclusiveUpdate(exclusiveBranch);

        // omitted.

        return RepeatStatus.FINISHED;
    }
}
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = { "org.terasoluna.batch.functionaltest.app.common",
        "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository", sqlSessionFactoryRef = "jobSqlSessionFactory") // (4)
public class TaskletOptimisticLockCheckJobConfig {

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       OptimisticLockTasklet optimisticLockTasklet) {
        return new StepBuilder("taskletOptimisticLockCheckJob.step01",
                jobRepository)
                .tasklet(optimisticLockTasklet, transactionManager) // (5)
                .build();
    }

    @Bean
    public Job taskletOptimisticLockCheckJob(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
        return new JobBuilder("taskletOptimisticLockCheckJob",jobRepository)
                .start(step01)
                .listener(listener)
                .build();
    }
}
<!-- (4) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
        factory-ref="jobSqlSessionFactory"/>

<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="taskletOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="optimisticLockTasklet"> <!-- (5) -->
        </batch:tasklet>
    </batch:step>
</batch:job>

MapperインタフェースとMapper XMLは省略する。

表 13. 説明
項番 説明

(1)

MapperインタフェースをInjectする。

(2)

DTOオブジェクトを生成して、更新データを設定する。

(3)

更新データを設定したDTOオブジェクトを引数に指定して、Mapperインタフェースで更新処理を実行する。

(4)

Mapper XMLの登録を行う。
@MapperScanのsqlSessionFactoryRef属性/<mybatis:scan>要素のfactory-ref属性REUSE設定されているjobSqlSessionFactoryを指定する。

(5)

MapperインタフェースをInjectしTaskletを設定する。

リスナーでのデータベースアクセス

リスナーでのデータベースアクセスは他のコンポーネントと連携することが多い。 使用するリスナー及び実装方法によっては、Mapperインタフェースで取得したデータを、 他のコンポーネントへ引き渡す仕組みを追加で用意する必要がある。

リスナーでMapperインタフェースを利用してデータベースアクセスを実装するにあたり、以下の制約がある。

リスナーでの制約

リスナーでもItemProcessorでの制約と同じ制約が成立する。 加えて、リスナーでは、更新を必要とするユースケースが考えにくい。よって、リスナーでは、更新系処理を行うことを推奨しない。

リスナーで想定される更新処理の代替
ジョブの状態管理

Spring BatchのJobRepositoryによって行われている

データベースへのログ出力

ログのAppenderで実施すべき。ジョブのトランザクションとも別管理する必要がある。

ここでは一例として、StepExecutionListenerで ステップ実行前にデータを取得して、ItemProcessorで取得したデータを利用する例を示す。

リスナーでの実装例
public class CacheSetListener implements StepExecutionListener {

    // (1)
    @Inject
    CustomerRepository customerRepository;

    // (2)
    @Inject
    CustomerCache cache;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // (3)
        for(Customer customer : customerRepository.findAllAtOnce()) {
            cache.addCustomer(customer.getCustomerId(), customer);
        }
    }
}
ItemProcessorでの利用例
@Component
public class UpdateItemFromCacheProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    // (4)
    @Inject
    CustomerCache cache;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = cache.getCustomer(readItem.getCustomerId());  // (5)

        SalesPlanDetail writeItem = new SalesPlanDetail();

        // omitted.
        writerItem.setCustomerName(customer.getCustomerName); // (6)

        return writeItem;
    }
}
キャッシュクラス
// (7)
@Component
public class CustomerCache {

    Map<String, Customer> customerMap = new HashMap<>();

    public Customer getCustomer(String customerId) {
        return customerMap.get(customerId);
    }

    public void addCustomer(String id, Customer customer) {
        customerMap.put(id, customer);
    }
}
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan({ "org.terasoluna.batch.functionaltest.app.common",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess" })
@MapperScan(basePackages = {
        "org.terasoluna.batch.functionaltest.app.repository",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository" }, sqlSessionTemplateRef = "batchModeSqlSessionTemplate") // (8)
public class DBAccessByItemListenerConfig {

    // omitted

    // (9)
    @Bean
    public CacheSetListener cacheSetListener() {
        return new CacheSetListener();
    }

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       ItemReader<SalesPerformanceDetail> reader,
                       @Qualifier("updateItemFromDBProcessor") ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> processor,
                       ItemWriter<SalesPlanDetail> writer,
                       CacheSetListener cacheSetListener) {
        return new StepBuilder("DBAccessByItemListener.step01",
                jobRepository)
                .<SalesPerformanceDetail, SalesPlanDetail> chunk(10, transactionManager)
                .listener(cacheSetListener) // (11)
                .reader(reader).processor(processor) // (10)
                .writer(writer)
                .build();
    }

    @Bean
    public Job DBAccessByItemListener(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
        return new JobBuilder("DBAccessByItemListener",
                jobRepository).start(step01).listener(listener).build();
    }
}
<!-- omitted -->

<!-- (8) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
        template-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="cacheSetListener"
      class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.CacheSetListener"/>

<!-- omitted -->

<batch:job id="DBAccessByItemListener" job-repository="jobRepository">
    <batch:step id="DBAccessByItemListener.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="updateItemFromCacheProcessor"
                         writer="writer" commit-interval="10"/> <!-- (10) -->
            <!-- (11) -->
            <batch:listeners>
                <batch:listener ref="cacheSetListener"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 14. 説明

項番

説明

(1)

MapperインタフェースをInjectする。

(2)

Mapperインタフェースから取得したデータをキャッシュするためのBeanをInjectする。

(3)

リスナーにて、Mapperインタフェースからデータを取得してキャッシュする。
ここでは、StepExecutionListener#beforeStepにてステップ実行前にキャッシュを作成し、 以降の処理ではキャッシュを参照することで、入出力を低減し処理効率を高めている。

(4)

(2)で設定したキャッシュと同じBeanをInjectする。

(5)

キャッシュから該当するデータを取得する。

(6)

更新データにキャッシュからのデータを反映する。

(7)

キャッシュクラスをコンポーネントとして実装する。
ここではBeanスコープはsingletonにしている。ジョブに応じて設定すること。

(8)

Mapper XMLの登録を行う。
@MapperScanのsqlSessionTemplateRef属性/<mybatis:scan>要素のtemplate-ref属性BATCHが設定されているbatchModeSqlSessionTemplateを指定する。

(9)

Mapperインタフェースを利用するリスナーを定義する。

(10)

キャッシュを利用するItemProcessorを指定する。

(11)

(9)で定義したリスナーを登録する。

リスナーでのSqlSessionFactoryの利用

上記の例では、batchModeSqlSessionTemplateを設定しているが、jobSqlSessionFactoryを設定してもよい。

チャンクのスコープ外で動作するリスナーについては、トランザクション外で処理されるため、 jobSqlSessionFactoryを設定しても問題ない。

How to extend

CompositeItemWriterにおける複数テーブルの更新

チャンクモデルで、1つの入力データに対して複数のテーブルへ更新を行いたい場合は、Spring Batchが提供するCompositeItemWriterを利用し、 各テーブルに対応したMyBatisBatchItemWriterを連結することで実現できる。

ここでは、売上計画と売上実績の2つのテーブルを更新する場合の実装例を示す。

ItemProcessorの実装例
@Component
public class SalesItemProcessor implements ItemProcessor<SalesPlanDetail, SalesDTO> {
    @Override
    public SalesDTO process(SalesPlanDetail item) throws Exception { // (1)

        SalesDTO salesDTO = new SalesDTO();

        // (2)
        SalesPerformanceDetail spd = new SalesPerformanceDetail();
        spd.setBranchId(item.getBranchId());
        spd.setYear(item.getYear());
        spd.setMonth(item.getMonth());
        spd.setCustomerId(item.getCustomerId());
        spd.setAmount(new BigDecimal(0L));
        salesDTO.setSalesPerformanceDetail(spd);

        // (3)
        item.setAmount(item.getAmount().add(new BigDecimal(1L)));
        salesDTO.setSalesPlanDetail(item);

        return salesDTO;
    }
}
DTOの実装例
public class SalesDTO implements Serializable {

    // (4)
    private SalesPlanDetail salesPlanDetail;

    // (5)
    private SalesPerformanceDetail salesPerformanceDetail;

    // omitted
}
MapperXMLの実装例
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">

    <select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
        <![CDATA[
        SELECT
            branch_id AS branchId, year, month, customer_id AS customerId, amount
        FROM
            sales_plan_detail
        ORDER BY
            branch_id ASC, year ASC, month ASC, customer_id ASC
        ]]>
    </select>

    <!-- (6) -->
    <update id="update" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        UPDATE
            sales_plan_detail
        SET
            amount = #{salesPlanDetail.amount}
        WHERE
            branch_id = #{salesPlanDetail.branchId}
        AND
            year = #{salesPlanDetail.year}
        AND
            month = #{salesPlanDetail.month}
        AND
            customer_id = #{salesPlanDetail.customerId}
        ]]>
    </update>

    <!-- (7) -->
    <insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
        <![CDATA[
        INSERT INTO
            sales_performance_detail(
                branch_id,
                year,
                month,
                customer_id,
                amount
            )
        VALUES (
            #{salesPerformanceDetail.branchId},
            #{salesPerformanceDetail.year},
            #{salesPerformanceDetail.month},
            #{salesPerformanceDetail.customerId},
            #{salesPerformanceDetail.amount}
        )
        ]]>
    </insert>

</mapper>
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = { "org.terasoluna.batch.functionaltest.app.common",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess" }, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = {
        "org.terasoluna.batch.functionaltest.app.repository",
        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository" }, sqlSessionTemplateRef = "batchModeSqlSessionTemplate")
public class UseCompositeItemWriterConfig {

    @Bean
    public MyBatisCursorItemReader<SalesPlanDetail> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<SalesPlanDetail>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .queryId(
                        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.findAll")
                .build();
    }

    // (8)
    @Bean
    public MyBatisBatchItemWriter<SalesDTO> planWriter(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            @Qualifier("batchModeSqlSessionTemplate") SqlSessionTemplate batchModeSqlSessionTemplate) {
        return new MyBatisBatchItemWriterBuilder<SalesDTO>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .statementId(
                        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.update")
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .build();
    }

    // (9)
    @Bean
    public MyBatisBatchItemWriter<SalesDTO> performanceWriter(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            @Qualifier("batchModeSqlSessionTemplate") SqlSessionTemplate batchModeSqlSessionTemplate) {
        return new MyBatisBatchItemWriterBuilder<SalesDTO>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .statementId(
                        "org.terasoluna.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.create")
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .build();
    }

    // (10)
    @Bean
    public CompositeItemWriter<SalesDTO> writer(
            @Qualifier("performanceWriter") MyBatisBatchItemWriter<SalesDTO> performanceWriter,
            @Qualifier("planWriter") MyBatisBatchItemWriter<SalesDTO> planWriter) {
        return new CompositeItemWriter<SalesDTO>(performanceWriter, planWriter); // (11)
    }

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       @Qualifier("reader") ItemReader<SalesPlanDetail> reader,
                       @Qualifier("salesItemProcessor") ItemProcessor<SalesPlanDetail, SalesDTO> processor,
                       @Qualifier("writer") ItemWriter<SalesDTO> writer) {
        return new StepBuilder("useCompositeItemWriter.step01",
                jobRepository)
                .<SalesPlanDetail, SalesDTO> chunk(3, transactionManager)
                .reader(reader).processor(processor)
                .writer(writer) // (12)
                .build();
    }

    @Bean
    public Job useCompositeItemWriter(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
        return new JobBuilder("useCompositeItemWriter",
                jobRepository).start(step01).listener(listener).build();
    }

}
<!-- reader using MyBatisCursorItemReader -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.findAll"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

<!-- writer MyBatisBatchItemWriter -->
<!-- (8) -->
<bean id="planWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.update"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (9) -->
<bean id="performanceWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.create"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>

<!-- (10) -->
<bean id="writer" class="org.springframework.batch.item.support.CompositeItemWriter">
    <property name="delegates">
      <!-- (11)-->
        <list>
            <ref bean="performanceWriter"/>
            <ref bean="planWriter"/>
        </list>
    </property>
</bean>

<!-- (12) -->
<batch:job id="useCompositeItemWriter" job-repository="jobRepository">
    <batch:step id="useCompositeItemWriter.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="salesItemProcessor"
                         writer="writer" commit-interval="3"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 15. 説明

項番

説明

(1)

入力データに対して2つのテーブルを更新するための各エンティティを保持するDTOを出力とするItemProcessorを実装する。
ItemWriterには2つのテーブルを更新するために異なるオブジェクトを渡すことはできないため、更新に必要なオブジェクトを集約するDTOを定義している。

(2)

売上実績(SalesPerformanceDetail)を新規作成するためのエンティティを作成し、DTOに格納する。

(3)

入力データでもある売上計画(SalesPlanDetail)を更新するため、入力データを更新してDTOに格納する。

(4)

売上計画(SalesPlanDetail)を保持するようにDTOに定義する。

(5)

売上実績(SalesPerformanceDetail)を保持するようにDTOに定義する。

(6)

DTOから取得した売上計画(SalesPlanDetail)で、売上計画テーブル(sales_plan_detail)を更新するSQLを定義する。

(7)

DTOから取得した売上実績(SalesPlanDetail)で、売上実績テーブル(sales_performance_detail)を新規作成するSQLを定義する。

(8)

売上計画テーブル(sales_plan_detail)を更新するMyBatisBatchItemWriterを定義する。

(9)

売上実績テーブル(sales_performance_detail)を新規作成するMyBatisBatchItemWriterを定義する。

(10)

(8),(9)を順番に実行するためにCompositeItemWriterを定義する。

(11)

CompositeItemWriterのコンストラクタの引数/<list>要素内に(8),(9)を設定する。指定した順番にItemWriterが実行される。

(12)

StepBuilderのwriterメソッド/<batch:chunk>要素のwriter属性に(10)で定義したBeanを指定する。processor属性には(1)のItemProcessorを指定する。

複数データソースへの出力(1ステップ)で説明した org.springframework.data.transaction.ChainedTransactionManagerと同時に使用することで複数データソースに対しての更新もできる。

また、CompositeItemWriterは、ItemWriter実装であれば連結できるので、 MyBatisBatchItemWriterとFlatFileItemWriterを設定することで、データベース出力とファイル出力を同時に行うこともできる。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28