Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

チャンクモデルジョブの作成方法について説明する。 チャンクモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照。

ここでは、チャンクモデルジョブの構成要素について説明する。

構成要素

チャンクモデルジョブの構成要素を以下に示す。 これらの構成要素をBean定義にて組み合わせることで1つのジョブを実現する。

表 1. チャンクモデルジョブの構成要素
項番 名称 役割 設定必須 実装必須

1

ItemReader

様々なリソースからデータを取得するためのインタフェース。
Spring Batchにより、フラットファイルや
データベースを対象とした実装が提供されているため、ユーザにて作成する必要はない。

2

ItemProcessor

入力から出力へデータを加工するためのインタフェース。
ユーザは必要に応じてこのインタフェースをimplementsし、ビジネスロジックを実装する。

3

ItemWriter

様々なリソースへデータを出力するためのインタフェース。
ItemReaderと対になるインタフェースと考えてよい。
Spring Batchにより、フラットファイルや
データベースのための実装が提供されているため、ユーザにて作成する必要はない。

この表のポイントは以下である。

  • 入力リソースから出力リソースへ単純にデータを移し替えるだけであれば、設定のみで実現できる。

  • ItemProcessorは、必要が生じた際にのみ実装すればよい。

以降、これらの構成要素を用いたジョブの実装方法について説明する。

How to use

ここでは、実際にチャンクモデルジョブを実装する方法について、以下の順序で説明する。

ジョブの設定

Bean定義ファイルにて、チャンクモデルジョブを構成する要素の組み合わせ方を定義する。 以下に例を示し、構成要素の繋がりを説明する。

Bean定義ファイルの例(チャンクモデル)
@Configuration
@Import(JobBaseContextConfig.class) // (1)
@ComponentScan("org.terasoluna.batch.functionaltest.app.common") // (2)
@MapperScan(value = "org.terasoluna.batch.functionaltest.app.repository.mst", sqlSessionFactoryRef = "jobSqlSessionFactory") // (3)
public class JobCustomerList01Config {

    // (4)
    @Bean
    @StepScope
    public MyBatisCursorItemReader<Customer> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
        return new MyBatisCursorItemReaderBuilder<Customer>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .queryId(
                        "org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll")
                .build();
    }

    // (5)
    // Item Processor
    // Item Processor in order that based on the Bean defined by the annotations, not defined here

    // (6)
    @Bean
    @StepScope
    public FlatFileItemWriter<Customer> writer(
            @Value("#{jobParameters['outputFile']}") File outputFile) {
        final BeanWrapperFieldExtractor<Customer> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(
                new String[] { "customerId", "customerName", "customerAddress",
                        "customerTel", "chargeBranchId" });
        final DelimitedLineAggregator<Customer> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(",");
        lineAggregator.setFieldExtractor(fieldExtractor);
        return new FlatFileItemWriterBuilder<Customer>()
                .name(ClassUtils.getShortName(FlatFileItemWriter.class))
                .resource(new FileSystemResource(outputFile))
                .transactional(false)
                .lineAggregator(lineAggregator)
                .build();
    }

    @Bean
    public Step step01(JobRepository jobRepository, // (8)
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       @Qualifier("reader") ItemReader<Customer> reader,
                       @Qualifier("processor") ItemProcessor<Customer, Customer> processor,
                       @Qualifier("writer") ItemWriter<Customer> writer,
                       @Qualifier("loggingItemReaderListener") LoggingItemReaderListener listener) {
        return new StepBuilder("jobCustomerList01.step01", jobRepository) // // (9)
                .<Customer, Customer> chunk(10, transactionManager) // (10)
                .reader(reader) // (11)
                .processor(processor)
                .writer(writer)
                .listener(listener)
                .build();
    }

    @Bean
    public Job jobCustomerList01(JobRepository jobRepository, // (8)
                                 Step step01,
                                 @Qualifier("jobExecutionLoggingListener") JobExecutionLoggingListener listener) {
        return new JobBuilder("jobCustomerList01", jobRepository) // (7)
                .start(step01)
                .listener(listener)
                .build();
    }
}
Bean定義ファイルの例(チャンクモデル)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <!-- (1) -->
    <import resource="classpath:META-INF/spring/job-base-context.xml"/>

    <!-- (2) -->
    <context:component-scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common" />

    <!-- (3) -->
    <mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
        factory-ref="jobSqlSessionFactory"/>

    <!-- (4) -->
    <bean id="reader"
          class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
          p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
          p:sqlSessionFactory-ref="jobSqlSessionFactory"/>

    <!-- (5) -->
    <!-- Item Processor -->
    <!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->

    <!-- (6) -->
    <bean id="writer"
          class="org.springframework.batch.item.file.FlatFileItemWriter"
          scope="step"
          p:resource="file:#{jobParameters['outputFile']}">
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
                <property name="fieldExtractor">
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
                          p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
                </property>
            </bean>
        </property>
    </bean>

    <!-- (7) -->
    <batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (8) -->
        <batch:step id="jobCustomerList01.step01"> <!-- (9) -->
            <batch:tasklet transaction-manager="jobTransactionManager"> <!-- (10) -->
                <batch:chunk reader="reader"
                             processor="processor"
                             writer="writer"
                             commit-interval="10" /> <!-- (11) -->
            </batch:tasklet>
        </batch:step>
    </batch:job>
</beans>
ItemProcessor実装クラスの設定
@Component("processor") // (5)
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {
  // omitted.
}
項番 説明

(1)

Macchinetta Batch 2.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。

(2)

コンポーネントスキャン対象のベースパッケージを設定する。

(3)

MyBatis-Springの設定。
MyBatis-Springの設定の詳細は、データベースアクセスを参照。

(4)

ItemReaderの設定。
ItemReaderの詳細は、データベースアクセスファイルアクセスを参照。

(5)

ItemProcessorは、(2)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。

(6)

ItemWriterの設定。
ItemWriterの詳細は、データベースアクセスファイルアクセスを参照。

(7)

ジョブの設定。
JobBuilderのコンストラクタの引数name/<batch:job>のid属性は、1つのバッチアプリケーションに含まれる全ジョブの範囲内で一意とする必要がある。

(8)

JobRepositoryの設定。
JobBuilder、StepBuilderのコンストラクタの引数jobRepository/<batch:job>のjob-repository属性に設定する値は、特別な理由がない限りjobRepository固定とすること。
これにより、すべてのジョブが1つのJobRepositoryで管理できる。 jobRepositoryのBean定義は、(1)により解決する。

(9)

ステップの設定。
StepBuilderのコンストラクタの引数name/<batch:job>のid属性は、1つのバッチアプリケーションに含まれる全ジョブの範囲内で一意とする必要はないが、障害発生時に追跡しやすくなる等の様々なメリットがあるため一意とする。
特別な理由がない限り、(7)で指定したJobBuilderのコンストラクタの引数name/<batch:job>のid属性に[step+連番]を付加する形式とする。

(10)

タスクレットの設定。
StepBuilderのchunkメソッドの引数transactionManager/<batch:tasklet>のtransaction-manager属性に設定する値は、特別な理由がない限りjobTransactionManager固定とすること。
これにより、(11)のcommit-intervalごとにトランザクションが管理される。 詳細については、トランザクション制御を参照。
jobTransactionManagerのBean定義は、(1)により解決する。

(11)

チャンクモデルジョブの設定。
StepBuilderのreaderメソッド、writerメソッド/<batch:chunk>のreader属性、writer属性に、前項までに定義したItemReaderItemWriterのBeanIDを指定する。
StepBuilderのprocessorメソッド/processor属性に、ItemProcessorの実装クラスのBeanIDを指定する。
StepBuilderのchunkメソッドの引数chunkSize/<batch:chunk>のcommit-interval属性に1チャンクあたりの入力データ件数を設定する。

アノテーションベースのBeanの依存性解決の有効化方法

アノテーションベースのBeanの依存性解決を有効化する方法は、JavaConfig/XMLConfigで異なる。

  • JavaConfig

    • ジョブのBean定義ファイルのクラスに付与する@Configurationによって有効化される。

  • XMLConfig

    • ジョブのBean定義ファイルに<context:component-scan>を設定することで有効化される(正確には、<context:component-scan>が内部で呼び出す<context:annotation-config>によって有効化される)。コンポーネントスキャンを利用しない場合は<context:annotation-config>を設定する必要がある。

chunkSize/commit-intervalのチューニング

chunkSize/commit-intervalはチャンクモデルジョブにおける、性能上のチューニングポイントである。

前述の例では10件としているが、利用できるマシンリソースやジョブの特性によって適切な件数は異なる。 複数のリソースにアクセスしてデータを加工するジョブであれば10件から100件程度で処理スループットが頭打ちになることもある。 一方、入出力リソースが1:1対応しておりデータを移し替える程度のジョブであれば5000件や10000件でも処理スループットが伸びることがある。

ジョブ実装時のchunkSize/commit-intervalは100件程度で仮置きしておき、 その後に実施した性能測定の結果に応じてジョブごとにチューニングするとよい。

コンポーネントの実装

ここでは主に、ItemProcessorを実装する方法について説明する。

他のコンポーネントについては、以下を参照。

ItemProcessorの実装

ItemProcessorの実装方法を説明する。

ItemProcessorは、以下のインタフェースが示すとおり、入力リソースから取得したデータ1件をもとに、 出力リソースに向けたデータ1件を作成する役目を担う。 つまり、ItemProcessorはデータ1件に対するビジネスロジックを実装する箇所、と言える。

ItemProcessorインタフェース
public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

なお、インタフェースが示すIOは以下のとおり同じ型でも異なる型でもよい。 同じ型であれば入力データを一部修正することを意味し、 異なる型であれば入力データをもとに出力データを生成することを意味する。

ItemProcessor実装例(入出力が同じ型)
@Component
public class AmountUpdateItemProcessor implements
        ItemProcessor<SalesPlanDetail, SalesPlanDetail> {

    @Override
    public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
        item.setAmount(new BigDecimal("1000"));
        return item;
    }
}
ItemProcessor実装例(入出力が異なる型)
@Component
public class UpdateItemFromDBProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {

    @Inject
    CustomerRepository customerRepository;

    @Override
    public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
        Customer customer = customerRepository.findOne(readItem.getCustomerId());

        SalesPlanDetail writeItem = new SalesPlanDetail();
        writeItem.setBranchId(customer.getChargeBranchId());
        writeItem.setYear(readItem.getYear());
        writeItem.setMonth(readItem.getMonth());
        writeItem.setCustomerId(readItem.getCustomerId());
        writeItem.setAmount(readItem.getAmount());
        return writeItem;
    }
}
ItemProcessorからnullを返却することの意味

ItemProcessorからnullを返却することは、当該データを後続処理(Writer)に渡さないことを意味し、 言い換えるとデータをフィルタすることになる。 これは、入力データの妥当性検証を実施する上で有効活用できる。 詳細については、入力チェックを参照。

ItemProcessorの処理スループットをあげるには

前述した実装例のように、ItemProcessorの実装クラスではデータベースやファイルを始めとしたリソースにアクセスしなければならないことがある。 ItemProcessorは入力データ1件ごとに実行されるため、入出力が少しでも発生するとジョブ全体では大量の入出力が発生することになる。 そのため、極力入出力を抑えることが処理スループットをあげる上で重要となる。

1つの方法として、後述のListenerを活用することで事前に必要なデータをメモリ上に確保しておき、 ItemProcessorにおける処理の大半を、CPU/メモリ間で完結するように実装する手段がある。 ただし、1ジョブあたりのメモリを大量に消費することにも繋がるので、何でもメモリ上に確保すればよいわけではない。 入出力回数やデータサイズをもとに、メモリに格納するデータを検討すること。

この点については、データの入出力でも紹介する。

PassThroughItemProcessorの省略

ジョブを定義する場合は、ItemProcessorの設定を省略することができる。 省略した場合、PassThroughItemProcessorと同様に何もせずに入力データをItemWriterへ受け渡すことになる。

@Bean
public Step exampleStep(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader reader,
                   ItemWriter writer) {
    return new StepBuilder("exampleJob.exampleStep",
            jobRepository)
            .<ItemReader, ItemWriter> chunk(10, transactionManager)
            .reader(reader)
            .writer(writer)
            .build();
}
<batch:job id="exampleJob">
    <batch:step id="exampleStep">
        <batch:tasklet>
            <batch:chunk reader="reader" writer="writer" commit-interval="10" />
        </batch:tasklet>
    </batch:step>
</batch:job>
複数のItemProcessorを同時に利用する

汎用的なItemProcessorを用意し、個々のジョブに適用したい場合は、 Spring Batchが提供するCompositeItemProcessorを利用し連結することで実現できる。

@Bean
public CompositeItemProcessor processor(
        ItemProcessor commonItemProcessor,
        ItemProcessor businessLogicItemProcessor) {
    return new CompositeItemProcessorBuilder<>()
            .delegates(commonItemProcessor)
            .delegates(businessLogicItemProcessor)
            .build();
}
<bean id="processor"
      class="org.springframework.batch.item.support.CompositeItemProcessor">
    <property name="delegates">
        <list>
            <ref bean="commonItemProcessor"/>
            <ref bean="businessLogicItemProcessor"/>
        </list>
    </property>
</bean>

delegates属性に指定した順番に処理されることに留意すること。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28