Overview
タスクレットモデルジョブの作成方法について説明する。 タスクレットモデルのアーキテクチャについては、Spring Batchのアーキテクチャを参照。
How to use
ここでは、実際にタスクレットモデルジョブを実装する方法について、以下の順序で説明する。
ジョブの設定
Bean定義ファイルにて、タスクレットモデルジョブを定義する。 以下に例を示す。
@Configuration
@Import(JobBaseContextConfig.class) // (1)
@ComponentScan("org.terasoluna.batch.functionaltest.app.common") // (2)
public class JobSimpleJobConfig {
// (3)
// Tasklet
// Tasklet in order that based on the Bean defined by the annotations, not defined here
@Bean
public Step step01(JobRepository jobRepository, // (5)
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
SimpleJobTasklet simpleJobTasklet) {
return new StepBuilder("jobSimpleJob.step01", jobRepository) // (6)
.tasklet(simpleJobTasklet, transactionManager) // (7)
.build();
}
@Bean
public Job jobSimpleJob(JobRepository jobRepository, // (5)
Step step01) {
return new JobBuilder("jobSimpleJob", jobRepository) // (4)
.start(step01)
.build();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd">
<!-- (1) -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<!-- (2) -->
<context:component-scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common"/>
<!-- (3) -->
<!-- Tasklet -->
<!-- Tasklet in order that based on the Bean defined by the annotations, not defined here -->
<!-- (4) -->
<batch:job id="simpleJob" job-repository="jobRepository"> <!-- (5) -->
<batch:step id="simpleJob.step01"> <!-- (6) -->
<batch:tasklet transaction-manager="jobTransactionManager"
ref="simpleJobTasklet"/> <!-- (7) -->
</batch:step>
</batch:job>
</beans>
package jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common;
@Component // (3)
public class SimpleJobTasklet implements Tasklet {
// omitted.
}
項番 | 説明 |
---|---|
(1) |
Macchinetta Batch 2.xを利用する際に、常に必要なBean定義を読み込む設定をインポートする。 |
(2) |
コンポーネントスキャン対象のベースパッケージを設定する。 |
(3) |
Taskletは、(2)によりアノテーションにて定義することができ、Bean定義ファイルで定義する必要がない。 |
(4) |
ジョブの設定。 |
(5) |
|
(6) |
ステップの設定。 |
(7) |
タスクレットの設定。 また、 |
アノテーション利用時のBean名
|
シンプルなTaskletの実装
ログを出力するのみのTasklet実装を通じ、最低限のポイントを説明する。
package jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common;
// omitted.
@Component
public class SimpleJobTasklet implements Tasklet { // (1)
private static final Logger logger =
LoggerFactory.getLogger(SimpleJobTasklet.class);
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception { // (2)
logger.info("called tasklet."); // (3)
return RepeatStatus.FINISHED; // (4)
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
任意の処理を実装する。ここではINFOログを出力している。 |
(4) |
Taskletの処理が完了したかどうかを返却する。 |
チャンクモデルのコンポーネントを利用するTasklet実装
Spring Batch では、Tasklet実装の中でチャンクモデルの各種コンポーネントを利用することに言及していない。 Macchinetta Batch 2.xでは、以下のような状況に応じてこれを選択してよい。
-
複数のリソースを組み合わせながら処理するため、チャンクモデルの形式に沿いにくい
-
チャンクモデルでは処理が複数箇所に実装することになるため、タスクレットモデルの方が全体像を把握しやすい
-
リカバリをシンプルにするため、チャンクモデルの中間コミットではなく、タスクレットモデルの一括コミットを使いたい
また、チャンクモデルのコンポーネントを利用してTasklet実装するうえで処理の単位についても考慮してほしい。 出力件数の単位としては以下の3パターンが考えられる。
出力件数 | 特徴 |
---|---|
1件 |
データを1件ずつ、入力、処理、出力しているため、処理のイメージがしやすい。 |
全件 |
データを1件ずつ、入力、処理してメモリ上に貯めておき、最後に全件一括で出力する。 |
一定件数 |
データを1件ずつ、入力、処理してメモリ上に貯めておき、一定件数まできたところで出力する。 |
以下に、チャンクモデルのコンポーネントであるItemReader
やItemWriter
を利用するTasklet実装について説明する。
この実装例は、1件単位に処理している例である。
@Component
@Scope("step") // (1)
public class SalesPlanChunkTranTask implements Tasklet {
@Inject
@Named("detailCSVReader") // (2)
ItemStreamReader<SalesPlanDetail> itemReader; // (3)
@Inject
SalesPlanDetailRepository repository; // (4)
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
SalesPlanDetail item;
try {
itemReader.open(chunkContext.getStepContext().getStepExecution()
.getExecutionContext()); // (5)
while ((item = itemReader.read()) != null) { // (6)
// do some processes.
repository.create(item); // (7)
}
} finally {
itemReader.close(); // (8)
}
return RepeatStatus.FINISHED;
}
}
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = "org.terasoluna.batch.functionaltest.ch05.transaction.component", scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.app.repository.plan", sqlSessionFactoryRef = "jobSqlSessionFactory") // (9)
public class CreateSalesPlanChunkTranTaskConfig {
// (10)
@Bean
@StepScope
public FlatFileItemReader<SalesPlanDetail> detailCSVReader(
@Value("#{jobParameters['inputFile']}") File inputFile) {
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setNames("branchId", "year", "month", "customerId", "amount");
BeanWrapperFieldSetMapper<SalesPlanDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(SalesPlanDetail.class);
DefaultLineMapper<SalesPlanDetail> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(lineTokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return new FlatFileItemReaderBuilder<SalesPlanDetail>()
.name(ClassUtils.getShortName(FlatFileItemReader.class))
.resource(new FileSystemResource(inputFile))
.lineMapper(lineMapper)
.build();
}
// (11)
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobResourcelessTransactionManager") PlatformTransactionManager transactionManager,
SalesPlanChunkTranTask salesPlanChunkTranTask) {
return new StepBuilder("createSalesPlanChunkTranTask.step01",
jobRepository)
.tasklet(salesPlanChunkTranTask, transactionManager)
.build();
}
@Bean
public Job createSalesPlanChunkTranTask(JobRepository jobRepository,
Step step01) {
return new JobBuilder("createSalesPlanChunkTranTask",jobRepository)
.start(step01)
.build();
}
}
<!-- omitted -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<context:component-scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.plan" />
<context:component-scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.transaction.component" />
<!-- (9) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
factory-ref="jobSqlSessionFactory"/>
<!-- (10) -->
<bean id="detailCSVReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
p:resource="file:#{jobParameters['inputFile']}">
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="branchId,year,month,customerId,amount"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail"/>
</property>
</bean>
</property>
</bean>
<!-- (11) -->
<batch:job id="createSalesPlanChunkTranTask" job-repository="jobRepository">
<batch:step id="createSalesPlanChunkTranTask.step01">
<batch:tasklet transaction-manager="jobTransactionManager"
ref="salesPlanChunkTranTask"/>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
本クラス内で利用するItemReaderのBeanスコープに合わせ、stepスコープとする。 |
(2) |
入力リソース(この例ではフラットファイル)へのアクセスは |
(3) |
|
(4) |
出力リソース(この例ではデータベース)へのアクセスはMyBatisのMapperを通じて行う。 |
(5) |
入力リソースをオープンする。 |
(6) |
入力リソース全件を逐次ループ処理する。 |
(7) |
データベースへ出力する。 |
(8) |
リソースは必ずクローズすること。 |
(9) |
MyBatis-Springの設定。 |
(10) |
ファイルから入力するため、 |
(11) |
各種コンポーネントはアノテーションによって解決するため、 |
スコープの統一について
Tasklet実装クラスと、InjectするBeanのスコープは、同じスコープに統一すること。 たとえば、 仮にTasklet実装クラスのスコープを |
@Injectを付与するフィールドの型について
利用する実装クラスに応じて、以下のいずれかとする。
必ずjavadocを確認してどちらを利用するか判断すること。以下に代表例を示す。
|
この実装例は、一定件数単位に処理するチャンクモデルを模倣した例である
@Component
@Scope("step")
public class SalesPerformanceTasklet implements Tasklet {
@Inject
ItemStreamReader<SalesPerformanceDetail> reader;
@Inject
ItemWriter<SalesPerformanceDetail> writer; // (1)
int chunkSize = 10; // (2)
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
try {
reader.open(chunkContext.getStepContext().getStepExecution()
.getExecutionContext());
List<SalesPerformanceDetail> items = new ArrayList<>(chunkSize); // (2)
SalesPerformanceDetail item = null;
do {
// Pseudo operation of ItemReader
for (int i = 0; i < chunkSize; i++) { // (3)
item = reader.read();
if (item == null) {
break;
}
// Pseudo operation of ItemProcessor
// do some processes.
items.add(item);
}
// Pseudo operation of ItemWriter
if (!items.isEmpty()) {
writer.write(new Chunk(items)); // (4)
items.clear();
}
} while (item != null);
} finally {
try {
reader.close();
} catch (Exception e) {
// do nothing.
}
}
return RepeatStatus.FINISHED;
}
}
@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = { "org.terasoluna.batch.functionaltest.app.common",
"org.terasoluna.batch.functionaltest.ch06.exceptionhandling" }, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(value = "org.terasoluna.batch.functionaltest.app.repository.performance", sqlSessionFactoryRef = "jobSqlSessionFactory")
public class JobSalesPerfTaskletConfig {
@Bean
@StepScope
public FlatFileItemReader<SalesPerformanceDetail> detailCSVReader(
@Value("#{jobParameters['inputFile']}") File inputFile) {
final DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("branchId", "year", "month", "customerId", "amount");
final BeanWrapperFieldSetMapper<SalesPerformanceDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(SalesPerformanceDetail.class);
final DefaultLineMapper<SalesPerformanceDetail> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return new FlatFileItemReaderBuilder<SalesPlanDetail>()
.name(ClassUtils.getShortName(FlatFileItemReader.class))
.resource(new FileSystemResource(inputFile))
.lineMapper(lineMapper)
.build();
}
// (1)
@Bean
public MyBatisBatchItemWriter<SalesPerformanceDetail> detailWriter(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
SqlSessionTemplate batchModeSqlSessionTemplate) {
return new MyBatisBatchItemWriterBuilder<SalesPerformanceDetail>()
.sqlSessionFactory(jobSqlSessionFactory)
.statementId(
"org.terasoluna.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.create")
.sqlSessionTemplate(batchModeSqlSessionTemplate).build();
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
SalesPerformanceTasklet salesPerformanceTasklet) {
return new StepBuilder("jobSalesPerfTasklet.step01",
jobRepository)
.tasklet(salesPerformanceTasklet, transactionManager)
.build();
}
@Bean
public Job jobSalesPerfTasklet(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfTasklet", jobRepository)
.start(step01)
.listener(listener)
.build();
}
}
<!-- omitted -->
<import resource="classpath:META-INF/spring/job-base-context.xml"/>
<context:component-scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.common,
jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.exceptionhandling"/>
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance"
factory-ref="jobSqlSessionFactory"/>
<bean id="detailCSVReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
p:resource="file:#{jobParameters['inputFile']}">
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="branchId,year,month,customerId,amount"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail"/>
</property>
</bean>
</property>
</bean>
<!-- (1) -->
<bean id="detailWriter"
class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.create"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<batch:job id="jobSalesPerfTasklet" job-repository="jobRepository">
<batch:step id="jobSalesPerfTasklet.step01">
<batch:tasklet ref="salesPerformanceTasklet"
transaction-manager="jobTransactionManager"/>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
チャンクモデルの動作にそって、 |
(4) |
|
ItemReader
やItemWriter
の実装クラスを利用するかどうかは都度判断してほしいが、
ファイルアクセスはItemReader
やItemWriter
の実装クラスを利用するとよい。
それ以外のデータベースアクセス等は無理に使う必要はない。性能向上のために使えばよい。