Overview
リスナーとは、ジョブやステップを実行する前後に処理を挿入するためのインタフェースである。
本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。
リスナーには多くのインタフェースがあるため、それぞれの役割について説明する。 その後に、設定および実装方法について説明をする。
リスナーの種類
Spring Batchでは、実に多くのリスナーインタフェースが定義されている。 ここではそのすべてを説明するのではなく、利用頻度が高いものを中心に扱う。
まず、リスナーは2種類に大別される。
- JobListener
-
ジョブの実行に対して処理を挟み込むためのインタフェース
- StepListener
-
ステップの実行に対して処理を挟み込むためのインタフェース
JobListenerについて
Spring Batchには、 |
JobListener
JobListener
のインタフェースは、JobExecutionListener
の1つのみとなる。
- JobExecutionListener
-
ジョブの開始前、終了後に処理を挟み込む。
public interface JobExecutionListener {
void beforeJob(JobExecution jobExecution);
void afterJob(JobExecution jobExecution);
}
StepListener
StepListener
のインタフェースは以下のように多くの種類がある。
- StepListener
-
以降に紹介する各種リスナーのマーカーインタフェース。
- StepExecutionListener
-
ステップ実行の開始前、終了後に処理を挟み込む。
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
- ChunkListener
-
1つのチャンクを処理する前後と、エラーが発生した場合に処理を挟み込む。
public interface ChunkListener extends StepListener {
static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
ROLLBACK_EXCEPTION_KEYの用途
使用例
|
例外ハンドリングについては、ChunkListenerインタフェースによる例外ハンドリングを参照。
- ItemReadListener
-
ItemReaderが1件のデータを取得する前後と、エラーが発生した場合に処理を挟み込む。
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
- ItemProcessListener
-
ItemProcessorが1件のデータを加工する前後と、エラーが発生した場合に処理を挟み込む。
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
- ItemWriteListener
-
ItemWriterが1つのチャンクを出力する前後と、エラーが発生した場合に処理を挟み込む。
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(Chunk<? extends S> items);
void afterWrite(Chunk<? extends S> items);
void onWriteError(Exception exception, Chunk<? extends S> items);
}
本ガイドラインでは、以下のリスナーについては説明をしない。
これらのリスナーは例外ハンドリングでの使用を想定したものであるが、 本ガイドラインではこれらのリスナーを用いた例外ハンドリングは行わない方針である。 詳細は、例外ハンドリングを参照。 |
前処理でデータベース更新を行う必要がある場合はフロー制御を参照し、データベース更新を行う前処理と後続処理のステップを分けて、 |
How to use
リスナーの実装と設定方法について説明する。
リスナーの実装
リスナーの実装と設定方法について説明する。
-
リスナーインタフェースを
implements
して実装する。 -
コンポーネントにメソッドベースでアノテーションを付与して実装する。
どちらで実装するかは、リスナーの役割に応じて選択する。基準は後述する。
インタフェースを実装する場合
各種リスナーインタフェースをimplements
して実装する。必要に応じて、複数のインタフェースを同時に実装してもよい。
以下に実装例を示す。
@Component
public class JobExecutionLoggingListener implements JobExecutionListener { // (1)
private static final Logger logger =
LoggerFactory.getLogger(JobExecutionLoggingListener.class);
@Override
public void beforeJob(JobExecution jobExecution) { // (2)
logger.info("job started. [JobName:{}]", jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) { // (3)
logger.info("job finished.[JobName:{}][ExitStatus:{}]", jobExecution.getJobInstance().getJobName(),
jobExecution.getExitStatus().getExitCode());
}
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader<SalesPlanDetail> reader,
ItemWriter<SalesPlanDetail> writer,
AmountCheckProcessor processor,
@Qualifier("loggingEachProcessInStepListener") StepExecutionListener listener) {
return new StepBuilder("chunkJobWithListener.step01",
jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.listener(listener)
.build();
}
@Bean
public Job chunkJobWithListener(JobRepository jobRepository,
Step step01,
@Qualifier("jobExecutionLoggingListener") JobExecutionLoggingListener listener) {
return new JobBuilder("chunkJobWithListener",
jobRepository)
.start(step01)
.listener(listener) // (4)
.build();
}
<batch:job id="chunkJobWithListener" job-repository="jobRepository">
<batch:step id="chunkJobWithListener.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="processor"
writer="writer" commit-interval="10"/>
<batch:listeners>
<batch:listener ref="loggingEachProcessInStepListener"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="jobExecutionLoggingListener"/> <!-- (4) -->
</batch:listeners>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
|
(4) |
Bean定義の |
リスナーのサポートクラス
複数のリスナーインタフェースを サポートクラス
|
アノテーションを付与する場合
各種リスナーインタフェースに対応したアノテーションを付与する。必要に応じて、複数のアノテーションを同時に実装してもよい。
リスナーインタフェース | アノテーション |
---|---|
|
|
|
|
|
|
|
|
|
|
|
これらアノテーションはコンポーネント化された実装のメソッドに付与することで目的のスコープで動作する。 以下に実装例を示す。
@Component
public class AnnotationAmountCheckProcessor implements
ItemProcessor<SalesPlanDetail, SalesPlanDetail> {
private static final Logger logger =
LoggerFactory.getLogger(AnnotationAmountCheckProcessor.class);
@Override
public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
if (item.getAmount().signum() == -1) {
throw new IllegalArgumentException("amount is negative.");
}
return item;
}
// (1)
/*
@BeforeProcess
public void beforeProcess(Object item) {
logger.info("before process. [Item :{}]", item);
}
*/
// (2)
@AfterProcess
public void afterProcess(Object item, Object result) {
logger.info("after process. [Result :{}]", result);
}
// (3)
@OnProcessError
public void onProcessError(Object item, Exception e) {
logger.error("on process error.", e);
}
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader<SalesPlanDetail> reader,
ItemWriter<SalesPlanDetail> writer,
AnnotationAmountCheckProcessor annotationAmountCheckProcessor) {
return new StepBuilder("chunkJobWithListenerAnnotation.step01",
jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
.reader(reader)
.processor(annotationAmountCheckProcessor) // (4)
.writer(writer)
.build();
}
@Bean
public Job chunkJobWithListenerAnnotation(JobRepository jobRepository,
Step step01) {
return new JobBuilder("chunkJobWithListenerAnnotation",
jobRepository)
.start(step01)
.build();
}
<batch:job id="chunkJobWithListenerAnnotation" job-repository="jobRepository">
<batch:step id="chunkJobWithListenerAnnotation.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader"
processor="annotationAmountCheckProcessor"
writer="writer" commit-interval="10"/> <! -- (4) -->
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
アノテーションで実装する場合は、処理が必要なタイミングのアノテーションのみを付与すればよい。 |
(2) |
ItemProcessの処理後に行う処理を実装する。 |
(3) |
ItemProcessでエラーが発生したときの処理を実装する。 |
(4) |
アノテーションでリスナー実装がされているItemProcessorを |
アノテーションを付与するメソッドの制約
アノテーションを付与するメソッドはどのようなメソッドでもよいわけではない。 対応するリスナーインタフェースのメソッドと、シグネチャを一致させる必要がある。 この点は、各アノテーションのjavadocに明記されている。 |
JobExecutionListenerをアノテーションで実装したときの注意
JobExecutionListenerは、他のリスナーとスコープが異なるため、上記の設定では自動的にリスナー登録がされない。
そのため、 |
Tasklet実装へのアノテーションによるリスナー実装
Tasklet実装へのアノテーションによるリスナー実装した場合、以下の設定では一切リスナーが起動しないため注意する。 Taskletの場合
Taskletの場合
タスクレットモデルの場合は、インタフェースとアノテーションの使い分けに従ってリスナーインタフェースを利用するのがよい。 |
リスナーの設定
リスナーを設定する方法は、下記となる。
-
JavaConfig : Builderの
listener()
メソッド -
XMLConfig :
<batch:listener>
要素
上記のメソッドもしくは要素を用いてリスナーを設定できる位置は複数あるが、設定可能なリスナーは各位置で決められている。
リスナーの設定位置は以下を参照のこと。
// for chunk mode
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader<SalesPlanDetail> reader,
ItemWriter<SalesPlanDetail> writer,
AmountCheckProcessor amountCheckProcessor,
StepExecutionListener stepExecutionListener,
ChunkListener chunkListener) {
return new StepBuilder("chunkJob.step01", jobRepository)
.listener(stepExecutionListener) // (5)
.<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
.listener(chunkListener) // (1)
.reader(reader) // (2)
.processor(amountCheckProcessor) // (3)
.writer(writer) // (4)
.build();
}
@Bean
public Job chunkJob(JobRepository jobRepository,
Step step01,
JobExecutionListener listener) {
return new JobBuilder("chunkJob", jobRepository)
.start(step01)
.listener(listener) // (6)
.build();
}
// for tasklet mode
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
SalesPlanDetailRegisterTasklet tasklet,
StepExecutionListener listener) {
return new StepBuilder("taskletJob.step01", jobRepository)
.listener(listener) // (5)
.tasklet(tasklet, transactionManager)
.build();
}
@Bean
public Job taskletJob(JobRepository jobRepository,
Step step01,
@Qualifier("jobExecutionListener") JobExecutionListener listener) {
return new JobBuilder("taskletJob", jobRepository)
.start(step01)
.listener(listener) // (6)
.build();
}
項番 | 説明 |
---|---|
(1) |
ChunkListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。 |
(2) |
ItemReadListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。 |
(3) |
ItemProcessListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。 |
(4) |
ItemWriteListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。 |
(5) |
StepExecutionListenerに属するリスナーインタフェース実装を設定する。 |
(6) |
JobListenerに属するリスナーを設定する。 |
<!-- for chunk mode -->
<batch:job id="chunkJob" job-repository="jobRepository">
<batch:step id="chunkJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="(1)"
processor="(1)"
writer="(1)" commit-interval="10"/>
<batch:listeners>
<batch:listener ref="(2)"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="(3)"/>
</batch:listeners>
</batch:job>
<!-- for tasklet mode -->
<batch:job id="taskletJob" job-repository="jobRepository">
<batch:step id="taskletJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager" ref="tasklet">
<batch:listeners>
<batch:listener ref="(2)"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="(3)"/>
</batch:listeners>
</batch:job>
項番 | 説明 |
---|---|
(1) |
StepListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。 |
(2) |
StepListenerに属するリスナーインタフェース実装を設定する。 |
(3) |
JobListenerに属するリスナーを設定する。 |
複数リスナーの設定
<batch:listeners>
要素には複数のリスナーを設定することができる。
複数のリスナーを登録したときに、リスナーがどのような順番で起動されるかを以下に示す。
-
ItemProcessListener実装
-
listenerA, listenerB
-
-
JobExecutionListener実装
-
listenerC, listenerD
-
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader<SalesPlanDetail> reader,
ItemWriter<SalesPlanDetail> writer,
AmountCheckProcessor processor,
@Qualifier("listenerA") ItemProcessListener listenerA,
@Qualifier("listenerB") ItemProcessListener listenerB) {
return new StepBuilder("chunkJob.step01", jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
.listener(listenerA)
.listener(listenerB)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job taskletJob(JobRepository jobRepository,
Step step01,
@Qualifier("listenerC") JobExecutionListener listenerC,
@Qualifier("listenerD") JobExecutionListener listenerD) {
return new JobBuilder("taskletJob", jobRepository)
.start(step01)
.listener(listenerC)
.listener(listenerD)
.build();
}
<batch:job id="chunkJob" job-repository="jobRepository">
<batch:step id="chunkJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader"
processor="pocessor"
writer="writer" commit-interval="10"/>
<batch:listeners>
<batch:listener ref="listenerA"/>
<batch:listener ref="listenerB"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
<batch:listeners>
<batch:listener ref="listenerC"/>
<batch:listener ref="listenerD"/>
</batch:listeners>
</batch:job>
-
前処理に該当する処理は、リスナーの登録順に起動される。
-
後処理またはエラー処理に該当する処理は、リスナー登録の逆順に起動される。
インタフェースとアノテーションの使い分け
リスナーインタフェースとアノテーションによるリスナーの使い分けを説明する。
- リスナーインタフェース
-
job、step、chunkにおいて共通する横断的な処理の場合に利用する。
- アノテーション
-
ビジネスロジック固有の処理を行いたい場合に利用する。
原則として、ItemProcessorに対してのみ実装する。
StepExecutionListenerでの前処理における例外発生
前処理(beforeStep
メソッド)で例外が発生した場合、モデルによりリソースのオープン/クローズの実行有無が変わる。それぞれのモデルにおいて前処理で例外が発生した場合について説明する。
- チャンクモデル
-
リソースのオープン前に前処理が実行されるため、リソースのオープンは行われない。
リソースのクローズは、リソースのオープンがされていない場合でも実行されるため、ItemReader
/ItemWriter
を実装する場合にはこのことに注意する必要がある。 - タスクレットモデル
-
タスクレットモデルでは、
execute
メソッド内で明示的にリソースのオープン/クローズを行う。
前処理で例外が発生すると、execute
メソッドは実行されないため、当然リソースのオープン/クローズも行われない。
前処理(StepExecutionListener#beforeStep())でのジョブの打ち切り
ジョブを実行する条件が整っていない場合、ジョブを実行する前に処理を打ち切りたい場合がある。
そのような場合は、前処理(beforeStep
メソッド)にて例外をスローすることでジョブ実行前に処理を打ち切ることができる。
ここでは以下の要件を実装する場合を例に説明する。
-
StepExecutionListener
が定義しているbeforeStep
メソッドで入力ファイルと出力ファイルの起動パラメータの妥当性検証を行う。 -
起動パラメータのいずれかが未指定の場合、例外をスローする。
しかし、Macchinetta Batch 2.xでは起動パラメータの妥当性検証は、JobParametersValidator
の使用を推奨している。
ここでは、あくまでも前処理中断のサンプルとしてわかりやすい妥当性検証を利用しているため、実際に起動パラメータの妥当性検証を行う場合は"パラメータの妥当性検証"を参照。
以下に実装例を示す。
@Component
@Scope("step")
public class CheckingJobParameterErrorStepExecutionListener implements StepExecutionListener {
@Value("#{jobParameters['inputFile']}") // (1)
private File inputFile;
@Value("#{jobParameters['outputFile']}") // (1)
private File outputFile;
@Override
public void beforeStep(StepExecution stepExecution) {
if (inputFile == null) {
throw new BeforeStepException("The input file must be not null."); // (2)
}
else if (outputFile == null) {
throw new BeforeStepException("The output file must be not null."); // (2)
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// omitted.
}
}
@Bean
@StepScope
public LoggingReader reader(
@Value("#{jobParameters['inputFile']}") File inputFile) { // (3)
LoggingReader reader = new LoggingReader();
reader.setResource(new FileSystemResource(inputFile));
return reader;
}
@Bean
@StepScope
public LoggingWriter writer(
@Value("#{jobParameters['outputFile']}") File outputFile) { // (3)
LoggingWriter writer = new LoggingWriter();
writer.setResource(new FileSystemResource(outputFile));
return writer;
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
LoggingReader reader,
LoggingWriter writer,
CheckingJobParameterErrorStepExecutionListener listener) {
return new StepBuilder("chunkJobWithAbortListener.step01",
jobRepository)
.listener(listener) // (4)
.<LoggingReader, LoggingWriter> chunk(10, transactionManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Job chunkJobWithAbortListener(JobRepository jobRepository,
Step step01) {
return new JobBuilder("chunkJobWithAbortListener",
jobRepository)
.start(step01)
.build();
}
<bean id="reader" class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch04.listener.LoggingReader" scope="step"
p:resource="file:#{jobParameters['inputFile']}"/> <!-- (3) -->
<bean id="writer" class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch04.listener.LoggingWriter" scope="step"
p:resource="file:#{jobParameters['outputFile']}"/> <!-- (3) -->
<batch:job id="chunkJobWithAbortListener" job-repository="jobRepository">
<batch:step id="chunkJobWithAbortListener.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" writer="writer" commit-interval="10"/>
</batch:tasklet>
<batch:listeners>
<batch:listener ref="checkingJobParameterErrorStepExecutionListener"/> <!-- (4) -->
</batch:listeners>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
@Valueアノテーションを使用して参照するパラメータを指定する。 |
(2) |
例外をスローする。 |
(3) |
参照するパラメータを指定する。 |
(4) |
リスナーインタフェース実装を設定する。 |