Overview
チャンクモデルジョブの作成方法について説明する。 チャンクモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照。
ここでは、チャンクモデルジョブの構成要素について説明する。
構成要素
チャンクモデルジョブの構成要素を以下に示す。 これらの構成要素をBean定義にて組み合わせることで1つのジョブを実現する。
項番 | 名称 | 役割 | 設定必須 | 実装必須 |
---|---|---|---|---|
1 |
ItemReader |
様々なリソースからデータを取得するためのインタフェース。 |
||
2 |
ItemProcessor |
入力から出力へデータを加工するためのインタフェース。 |
||
3 |
ItemWriter |
様々なリソースへデータを出力するためのインタフェース。 |
この表のポイントは以下である。
-
入力リソースから出力リソースへ単純にデータを移し替えるだけであれば、設定のみで実現できる。
-
ItemProcessor
は、必要が生じた際にのみ実装すればよい。
以降、これらの構成要素を用いたジョブの実装方法について説明する。
How to use
ここでは、実際にチャンクモデルジョブを実装する方法について、以下の順序で説明する。
ジョブの設定
Bean定義ファイルにて、チャンクモデルジョブを構成する要素の組み合わせ方を定義する。 以下に例を示し、構成要素の繋がりを説明する。
@Configuration
@Import(JobBaseContextConfig.class) // (1)
@ComponentScan("org.terasoluna.batch.functionaltest.app.common") // (2)
@MapperScan(value = "org.terasoluna.batch.functionaltest.app.repository.mst", sqlSessionFactoryRef = "jobSqlSessionFactory") // (3)
public class JobCustomerList01Config {
// (4)
@Bean
@StepScope
public MyBatisCursorItemReader<Customer> reader(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
return new MyBatisCursorItemReaderBuilder<Customer>()
.sqlSessionFactory(jobSqlSessionFactory)
.queryId(
"org.terasoluna.batch.functionaltest.app.repository.mst.CustomerRepository.findAll")
.build();
}
// (5)
// Item Processor
// Item Processor in order that based on the Bean defined by the annotations, not defined here
// (6)
@Bean
@StepScope
public FlatFileItemWriter<Customer> writer(
@Value("#{jobParameters['outputFile']}") File outputFile) {
final BeanWrapperFieldExtractor<Customer> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(
new String[] { "customerId", "customerName", "customerAddress",
"customerTel", "chargeBranchId" });
final DelimitedLineAggregator<Customer> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<Customer>()
.name(ClassUtils.getShortName(FlatFileItemWriter.class))
.resource(new FileSystemResource(outputFile))
.transactional(false)
.lineAggregator(lineAggregator)
.build();
}
@Bean
public Step step01(JobRepository jobRepository, // (8)
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("reader") ItemReader<Customer> reader,
@Qualifier("processor") ItemProcessor<Customer, Customer> processor,
@Qualifier("writer") ItemWriter<Customer> writer,
@Qualifier("loggingItemReaderListener") LoggingItemReaderListener listener) {
return new StepBuilder("jobCustomerList01.step01", jobRepository) // // (9)
.<Customer, Customer> chunk(10, transactionManager) // (10)
.reader(reader) // (11)
.processor(processor)
.writer(writer)
.listener(listener)
.build();
}
@Bean
public Job jobCustomerList01(JobRepository jobRepository, // (8)
Step step01,
@Qualifier("jobExecutionLoggingListener") JobExecutionLoggingListener listener) {
return new JobBuilder("jobCustomerList01", jobRepository) // (7)
.start(step01)
.listener(listener)
.build();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd
http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">
<!-- (1) -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<!-- (2) -->
<context:component-scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common" />
<!-- (3) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
<!-- (4) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (5) -->
<!-- Item Processor -->
<!-- Item Processor in order that based on the Bean defined by the annotations, not defined here -->
<!-- (6) -->
<bean id="writer"
class="org.springframework.batch.item.file.FlatFileItemWriter"
scope="step"
p:resource="file:#{jobParameters['outputFile']}">
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
p:names="customerId,customerName,customerAddress,customerTel,chargeBranchId"/>
</property>
</bean>
</property>
</bean>
<!-- (7) -->
<batch:job id="jobCustomerList01" job-repository="jobRepository"> <!-- (8) -->
<batch:step id="jobCustomerList01.step01"> <!-- (9) -->
<batch:tasklet transaction-manager="jobTransactionManager"> <!-- (10) -->
<batch:chunk reader="reader"
processor="processor"
writer="writer"
commit-interval="10" /> <!-- (11) -->
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
@Component("processor") // (5)
public class CustomerProcessor implements ItemProcessor<Customer, Customer> {
// omitted.
}
項番 | 説明 |
---|---|
(1) |
Macchinetta Batch 2.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。 |
(2) |
コンポーネントスキャン対象のベースパッケージを設定する。 |
(3) |
MyBatis-Springの設定。 |
(4) |
ItemReaderの設定。 |
(5) |
ItemProcessorは、(2)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。 |
(6) |
ItemWriterの設定。 |
(7) |
ジョブの設定。 |
(8) |
|
(9) |
ステップの設定。 |
(10) |
タスクレットの設定。 |
(11) |
チャンクモデルジョブの設定。 |
アノテーションベースのBeanの依存性解決の有効化方法
アノテーションベースのBeanの依存性解決を有効化する方法は、JavaConfig/XMLConfigで異なる。
|
chunkSize/commit-intervalのチューニング
前述の例では10件としているが、利用できるマシンリソースやジョブの特性によって適切な件数は異なる。 複数のリソースにアクセスしてデータを加工するジョブであれば10件から100件程度で処理スループットが頭打ちになることもある。 一方、入出力リソースが1:1対応しておりデータを移し替える程度のジョブであれば5000件や10000件でも処理スループットが伸びることがある。 ジョブ実装時の |
コンポーネントの実装
ここでは主に、ItemProcessorを実装する方法について説明する。
他のコンポーネントについては、以下を参照。
-
ItemReader、ItemWriter
-
Listener
ItemProcessorの実装
ItemProcessorの実装方法を説明する。
ItemProcessorは、以下のインタフェースが示すとおり、入力リソースから取得したデータ1件をもとに、 出力リソースに向けたデータ1件を作成する役目を担う。 つまり、ItemProcessorはデータ1件に対するビジネスロジックを実装する箇所、と言える。
public interface ItemProcessor<I, O> {
O process(I item) throws Exception;
}
なお、インタフェースが示すI
とO
は以下のとおり同じ型でも異なる型でもよい。
同じ型であれば入力データを一部修正することを意味し、
異なる型であれば入力データをもとに出力データを生成することを意味する。
@Component
public class AmountUpdateItemProcessor implements
ItemProcessor<SalesPlanDetail, SalesPlanDetail> {
@Override
public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
item.setAmount(new BigDecimal("1000"));
return item;
}
}
@Component
public class UpdateItemFromDBProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
@Inject
CustomerRepository customerRepository;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
Customer customer = customerRepository.findOne(readItem.getCustomerId());
SalesPlanDetail writeItem = new SalesPlanDetail();
writeItem.setBranchId(customer.getChargeBranchId());
writeItem.setYear(readItem.getYear());
writeItem.setMonth(readItem.getMonth());
writeItem.setCustomerId(readItem.getCustomerId());
writeItem.setAmount(readItem.getAmount());
return writeItem;
}
}
ItemProcessorからnullを返却することの意味
ItemProcessorからnullを返却することは、当該データを後続処理(Writer)に渡さないことを意味し、 言い換えるとデータをフィルタすることになる。 これは、入力データの妥当性検証を実施する上で有効活用できる。 詳細については、入力チェックを参照。 |
ItemProcessorの処理スループットをあげるには
前述した実装例のように、ItemProcessorの実装クラスではデータベースやファイルを始めとしたリソースにアクセスしなければならないことがある。 ItemProcessorは入力データ1件ごとに実行されるため、入出力が少しでも発生するとジョブ全体では大量の入出力が発生することになる。 そのため、極力入出力を抑えることが処理スループットをあげる上で重要となる。 1つの方法として、後述のListenerを活用することで事前に必要なデータをメモリ上に確保しておき、 ItemProcessorにおける処理の大半を、CPU/メモリ間で完結するように実装する手段がある。 ただし、1ジョブあたりのメモリを大量に消費することにも繋がるので、何でもメモリ上に確保すればよいわけではない。 入出力回数やデータサイズをもとに、メモリに格納するデータを検討すること。 この点については、データの入出力でも紹介する。 |
PassThroughItemProcessorの省略
ジョブを定義する場合は、ItemProcessorの設定を省略することができる。 省略した場合、PassThroughItemProcessorと同様に何もせずに入力データをItemWriterへ受け渡すことになる。 @Bean public Step exampleStep(JobRepository jobRepository, @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager, ItemReader reader, ItemWriter writer) { return new StepBuilder("exampleJob.exampleStep", jobRepository) .<ItemReader, ItemWriter> chunk(10, transactionManager) .reader(reader) .writer(writer) .build(); } <batch:job id="exampleJob"> <batch:step id="exampleStep"> <batch:tasklet> <batch:chunk reader="reader" writer="writer" commit-interval="10" /> </batch:tasklet> </batch:step> </batch:job> |
複数のItemProcessorを同時に利用する
汎用的なItemProcessorを用意し、個々のジョブに適用したい場合は、
Spring Batchが提供する
|