Macchinetta Batch Framework (2.x) Development Guideline - version 2.2.0.RELEASE, 2020-6-29
> INDEX

Overview

リスナーとは、ジョブやステップを実行する前後に処理を挿入するためのインタフェースである。

本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。

リスナーには多くのインタフェースがあるため、それぞれの役割について説明する。 その後に、設定および実装方法について説明をする。

リスナーの種類

Spring Batchでは、実に多くのリスナーインタフェースが定義されている。 ここではそのすべてを説明するのではなく、利用頻度が高いものを中心に扱う。

まず、リスナーは2種類に大別される。

JobListener

ジョブの実行に対して処理を挟み込むためのインタフェース

StepListener

ステップの実行に対して処理を挟み込むためのインタフェース

JobListenerについて

Spring Batchには、JobListenerという名前のインタフェースは存在しない。 StepListenerとの対比のため 、本ガイドラインでは便宜的に定義している。
Java Batch(jBatch)には、javax.batch.api.listener.JobListenerというインタフェースが存在するので、実装時には間違えないように注意すること。 また、StepListenerもシグネチャが異なる同名インタフェース(javax.batch.api.listener.StepListener)が存在するので、同様に注意すること。

JobListener

JobListenerのインタフェースは、JobExecutionListenerの1つのみとなる。

JobExecutionListener

ジョブの開始前、終了後に処理を挟み込む。

JobExecutionListenerインタフェース
public interface JobExecutionListener {
  void beforeJob(JobExecution jobExecution);
  void afterJob(JobExecution jobExecution);
}

StepListener

StepListenerのインタフェースは以下のように多くの種類がある。

StepListener

以降に紹介する各種リスナーのマーカーインタフェース。

StepExecutionListener

ステップ実行の開始前、終了後に処理を挟み込む。

StepExecutionListenerインタフェース
public interface StepExecutionListener extends StepListener {
  void beforeStep(StepExecution stepExecution);
  ExitStatus afterStep(StepExecution stepExecution);
}
ChunkListener

1つのチャンクを処理する前後と、エラーが発生した場合に処理を挟み込む。

ChunkListenerインタフェース
public interface ChunkListener extends StepListener {
  static final String ROLLBACK_EXCEPTION_KEY = "sb_rollback_exception";
  void beforeChunk(ChunkContext context);
  void afterChunk(ChunkContext context);
  void afterChunkError(ChunkContext context);
}
ROLLBACK_EXCEPTION_KEYの用途

afterChunkErrorメソッドにて、発生した例外を取得したい場合に利用する。 Spring Batchはチャンク処理中にエラーが発生した場合、ChunkContextsb_rollback_exceptionというキー名で 例外を格納した上でChunkListenerを呼び出すため、以下の要領でアクセスできる。

使用例
public void afterChunkError(ChunkContext context) {
    logger.error("Exception occurred while chunk. [context:{}]", context,
            context.getAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY));
}

例外ハンドリングについては、ChunkListenerインタフェースによる例外ハンドリングを参照。

ItemReadListener

ItemReaderが1件のデータを取得する前後と、エラーが発生した場合に処理を挟み込む。

ItemReadListenerインタフェース
public interface ItemReadListener<T> extends StepListener {
  void beforeRead();
  void afterRead(T item);
  void onReadError(Exception ex);
}
ItemProcessListener

ItemProcessorが1件のデータを加工する前後と、エラーが発生した場合に処理を挟み込む。

ItemProcessListenerインタフェース
public interface ItemProcessListener<T, S> extends StepListener {
  void beforeProcess(T item);
  void afterProcess(T item, S result);
  void onProcessError(T item, Exception e);
}
ItemWriteListener

ItemWriterが1つのチャンクを出力する前後と、エラーが発生した場合に処理を挟み込む。

ItemWriteListenerインタフェース
public interface ItemWriteListener<S> extends StepListener {
  void beforeWrite(List<? extends S> items);
  void afterWrite(List<? extends S> items);
  void onWriteError(Exception exception, List<? extends S> items);
}

本ガイドラインでは、以下のリスナーについては説明をしない。

  • リトライ系リスナー

  • スキップ系リスナー

これらのリスナーは例外ハンドリングでの使用を想定したものであるが、 本ガイドラインではこれらのリスナーを用いた例外ハンドリングは行わない方針である。 詳細は、例外ハンドリングを参照。

JobExecutionListenerや、StepExecutionListenerは、Spring Batchにおけるトランザクション制御で説明するフレームワークトランザクションによる制御範囲外となる。 加えて、リスナーでのデータベースアクセスにおける制約があるため、本ガイドラインではリスナーによるデータベース更新は基本的に推奨しない。

前処理でデータベース更新を行う必要がある場合はフロー制御を参照し、データベース更新を行う前処理と後続処理のステップを分けて、JobExecutionListenerStepExecutionListenerではデータベース更新を行わない設計・実装を行うことを検討してほしい。

How to use

リスナーの実装と設定方法について説明する。

リスナーの実装

リスナーの実装と設定方法について説明する。

  1. リスナーインタフェースをimplementsして実装する。

  2. コンポーネントにメソッドベースでアノテーションを付与して実装する。

どちらで実装するかは、リスナーの役割に応じて選択する。基準は後述する。

インタフェースを実装する場合

各種リスナーインタフェースをimplementsして実装する。必要に応じて、複数のインタフェースを同時に実装してもよい。 以下に実装例を示す。

JobExecutionListenerの実装例
@Component
public class JobExecutionLoggingListener implements JobExecutionListener { // (1)

    private static final Logger logger =
            LoggerFactory.getLogger(JobExecutionLoggingListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) { // (2)
        logger.info("job started. [JobName:{}]", jobExecution.getJobInstance().getJobName());
    }

    @Override
    public void afterJob(JobExecution jobExecution) { // (3)

        logger.info("job finished.[JobName:{}][ExitStatus:{}]", jobExecution.getJobInstance().getJobName(),
                jobExecution.getExitStatus().getExitCode());
    }
}
リスナーの設定例
<batch:job id="chunkJobWithListener" job-repository="jobRepository">
     <batch:step id="chunkJobWithListener.step01">
         <batch:tasklet transaction-manager="jobTransactionManager">
             <batch:chunk reader="reader" processor="processor"
                          writer="writer" commit-interval="10"/>
             <batch:listeners>
                 <batch:listener ref="loggingEachProcessInStepListener"/>
             </batch:listeners>
         </batch:tasklet>
     </batch:step>
     <batch:listeners>
         <batch:listener ref="jobExecutionLoggingListener"/> <!-- (4) -->
     </batch:listeners>
 </batch:job>
表 1. 説明
項番 説明

(1)

JobExecutionListenerimplementsして実装する。

(2)

JobExecutionListenerが定義しているbeforeJobメソッドを実装する。
この例では、ジョブ開始ログを出力する。

(3)

JobExecutionListenerが定義しているafterJobメソッドを実装する。
この例では、ジョブ終了ログを出力する。

(4)

Bean定義の<batch:listeners>要素で、(1)で実装したリスナーを設定する。
設定方法の詳細は、リスナーの設定で説明する。

リスナーのサポートクラス

複数のリスナーインタフェースをimplementsした場合、処理が不要な部分についても空実装をする必要がある。 この作業を簡略化するため、あらかじめ空実装を施したサポートクラスがSpring Batchには用意されている。 インタフェースではなく、サポートクラスを活用してもよいが、その場合implementsではなくextendsになるため注意すること。

サポートクラス
  • org.springframework.batch.core.listener.ItemListenerSupport

  • org.springframework.batch.core.listener.StepListenerSupport

アノテーションを付与する場合

各種リスナーインタフェースに対応したアノテーションを付与する。必要に応じて、複数のアノテーションを同時に実装してもよい。

表 2. リスナーインタフェースとの対応表
リスナーインタフェース アノテーション

JobExecutionListener

@BeforeJob
@AfterJob

StepExecutionListener

@BeforeStep
@AfterStep

ChunkListener

@BeforeChunk
@AfterChunk
@AfterChunkError

ItemReadListener

@BeforeRead
@AfterRead
@OnReadError

ItemProcessListener

@BeforeProcess
@AfterProcess
@OnProcessError

ItemWriteListener

@BeforeWrite
@AfterWrite
@OnWriteError

これらアノテーションはコンポーネント化された実装のメソッドに付与することで目的のスコープで動作する。 以下に実装例を示す。

アノテーションを付与したItemProcessorの実装例
@Component
public class AnnotationAmountCheckProcessor implements
        ItemProcessor<SalesPlanDetail, SalesPlanDetail> {

    private static final Logger logger =
            LoggerFactory.getLogger(AnnotationAmountCheckProcessor.class);

    @Override
    public SalesPlanDetail process(SalesPlanDetail item) throws Exception {
        if (item.getAmount().signum() == -1) {
            throw new IllegalArgumentException("amount is negative.");
        }
        return item;
    }

    // (1)
    /*
    @BeforeProcess
    public void beforeProcess(Object item) {
        logger.info("before process. [Item :{}]", item);
    }
    */

    // (2)
    @AfterProcess
    public void afterProcess(Object item, Object result) {
        logger.info("after process. [Result :{}]", result);
    }

    // (3)
    @OnProcessError
    public void onProcessError(Object item, Exception e) {
        logger.error("on process error.", e);
    }
}
リスナーの設定例
<batch:job id="chunkJobWithListenerAnnotation" job-repository="jobRepository">
    <batch:step id="chunkJobWithListenerAnnotation.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="annotationAmountCheckProcessor"
                         writer="writer" commit-interval="10"/>  <! -- (4) -->
        </batch:tasklet>
    </batch:step>
</batch:job>
表 3. 説明
項番 説明

(1)

アノテーションで実装する場合は、処理が必要なタイミングのアノテーションのみを付与すればよい。
この例では、ItemProcessの処理前には何もする必要がないため、@BeforeProcessを付与した実装は不要となる。

(2)

ItemProcessの処理後に行う処理を実装する。
この例では処理結果をログを出力している。

(3)

ItemProcessでエラーが発生したときの処理を実装する。
この例では発生した例外をログを出力している。

(4)

アノテーションでリスナー実装がされているItemProcessを<batch:chunk>要素に設定する。
リスナーインタフェースとは異なり、<batch:listener>要素で設定しなくても、自動的にリスナーが登録される。

アノテーションを付与するメソッドの制約

アノテーションを付与するメソッドはどのようなメソッドでもよいわけではない。 対応するリスナーインタフェースのメソッドと、シグネチャを一致させる必要がある。 この点は、各アノテーションのjavadocに明記されている。

JobExecutionListenerをアノテーションで実装したときの注意

JobExecutionListenerは、他のリスナーとスコープが異なるため、上記の設定では自動的にリスナー登録がされない。 そのため、<batch:listener>要素で明示的に設定する必要がある。詳細は、リスナーの設定を参照。

Tasklet実装へのアノテーションによるリスナー実装

Tasklet実装へのアノテーションによるリスナー実装した場合、以下の設定では一切リスナーが起動しないため注意する。

Taskletの場合
<batch:job id="taskletJobWithListenerAnnotation" job-repository="jobRepository">
    <batch:step id="taskletJobWithListenerAnnotation.step01">
        <batch:tasklet transaction-manager="jobTransactionManager"
                       ref="annotationSalesPlanDetailRegisterTasklet"/>
    </batch:step>
</batch:job>

タスクレットモデルの場合は、インタフェースとアノテーションの使い分けに従ってリスナーインタフェースを利用するのがよい。

リスナーの設定

リスナーは、Bean定義の<batch:listeners>.<batch:listener>要素によって設定する。 XMLスキーマ定義では様々な箇所に記述できるが、インタフェースの種類によっては意図とおり動作しないものが存在するため、 以下の位置に設定すること。

リスナーを設定する位置
<!-- for chunk mode -->
<batch:job id="chunkJob" job-repository="jobRepository">
    <batch:step id="chunkJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="(1)"
                         processor="(1)"
                         writer="(1)" commit-interval="10"/>
            <batch:listeners>
                <batch:listener ref="(2)"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    <batch:listeners>
        <batch:listener ref="(3)"/>
    </batch:listeners>
</batch:job>

<!-- for tasklet mode -->
<batch:job id="taskletJob" job-repository="jobRepository">
    <batch:step id="taskletJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager" ref="tasklet">
            <batch:listeners>
                <batch:listener ref="(2)"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    <batch:listeners>
        <batch:listener ref="(3)"/>
    </batch:listeners>
</batch:job>
表 4. 設定値の説明
項番 説明

(1)

StepListenerに属するアノテーションによる実装を含んだコンポーネントを設定する。
アノテーションの場合、必然的にこの場所に設定することになる。

(2)

StepListenerに属するリスナーインタフェース実装を設定する。
タスクレットモデルの場合、ItemReadListener,ItemProcessListener,ItemWriteListenerは利用できない。

(3)

JobListenerに属するリスナーを設定する。
インタフェースとアノテーション、どちらの実装でもここに設定する必要がある。

複数リスナーの設定

<batch:listeners>要素には複数のリスナーを設定することができる。

複数のリスナーを登録したときに、リスナーがどのような順番で起動されるかを以下に示す。

  • ItemProcessListener実装

    • listenerA, listenerB

  • JobExecutionListener実装

    • listenerC, listenerD

複数リスナーの設定例
<batch:job id="chunkJob" job-repository="jobRepository">
    <batch:step id="chunkJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader"
                         processor="pocessor"
                         writer="writer" commit-interval="10"/>
            <batch:listeners>
                <batch:listener ref="listenerA"/>
                <batch:listener ref="listenerB"/>
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    <batch:listeners>
        <batch:listener ref="listenerC"/>
        <batch:listener ref="listenerD"/>
    </batch:listeners>
</batch:job>
Listener execution order
図 1. リスナーの起動順序
  • 前処理に該当する処理は、リスナーの登録順に起動される。

  • 後処理またはエラー処理に該当する処理は、リスナー登録の逆順に起動される。

インタフェースとアノテーションの使い分け

リスナーインタフェースとアノテーションによるリスナーの使い分けを説明する。

リスナーインタフェース

job、step、chunkにおいて共通する横断的な処理の場合に利用する。

アノテーション

ビジネスロジック固有の処理を行いたい場合に利用する。
原則として、ItemProcessorに対してのみ実装する。

StepExecutionListenerでの前処理における例外発生

前処理(beforeStepメソッド)で例外が発生した場合、モデルによりリソースのオープン/クローズの実行有無が変わる。それぞれのモデルにおいて前処理で例外が発生した場合について説明する。

チャンクモデル

リソースのオープン前に前処理が実行されるため、リソースのオープンは行われない。
リソースのクローズは、リソースのオープンがされていない場合でも実行されるため、ItemReader/ItemWriterを実装する場合にはこのことに注意する必要がある。

タスクレットモデル

タスクレットモデルでは、executeメソッド内で明示的にリソースのオープン/クローズを行う。
前処理で例外が発生すると、executeメソッドは実行されないため、当然リソースのオープン/クローズも行われない。

前処理(StepExecutionListener#beforeStep())でのジョブの打ち切り

ジョブを実行する条件が整っていない場合、ジョブを実行する前に処理を打ち切りたい場合がある。

そのような場合は、前処理(beforeStepメソッド)にて例外をスローすることでジョブ実行前に処理を打ち切ることができる。
ここでは以下の要件を実装する場合を例に説明する。

  1. StepExecutionListenerが定義しているbeforeStepメソッドで入力ファイルと出力ファイルの起動パラメータの妥当性検証を行う。

  2. 起動パラメータのいずれかが未指定の場合、例外をスローする。

しかし、Macchinetta Batch 2.xでは起動パラメータの妥当性検証は、JobParametersValidatorの使用を推奨している。 ここでは、あくまでも前処理中断のサンプルとしてわかりやすい妥当性検証を利用しているため、実際に起動パラメータの妥当性検証を行う場合は"パラメータの妥当性検証"を参照。

以下に実装例を示す。

起動パラメータの妥当性検証を行うStepExecutionListenerの実装例
@Component
@Scope("step")
public class CheckingJobParameterErrorStepExecutionListener implements StepExecutionListener {

    @Value("#{jobParameters['inputFile']}") // (1)
    private File inputFile;

    @Value("#{jobParameters['outputFile']}") // (1)
    private File outputFile;

    @Override
    public void beforeStep(StepExecution stepExecution) {
        if (inputFile == null) {
            throw new BeforeStepException("The input file must be not null."); // (2)
        }
        else if (outputFile == null) {
            throw new BeforeStepException("The output file must be not null."); // (2)
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // omitted.
    }
}
リスナーの設定例
<bean id="reader" class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch04.listener.LoggingReader" scope="step"
      p:resource="file:#{jobParameters['inputFile']}"/> <!-- (3) -->
<bean id="writer" class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch04.listener.LoggingWriter" scope="step"
      p:resource="file:#{jobParameters['outputFile']}"/> <!-- (3) -->

<batch:job id="chunkJobWithAbortListener" job-repository="jobRepository">
    <batch:step id="chunkJobWithAbortListener.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader" writer="writer" commit-interval="10"/>
        </batch:tasklet>
        <batch:listeners>
            <batch:listener ref="checkingJobParameterErrorStepExecutionListener"/> <!-- (4) -->
        </batch:listeners>
    </batch:step>
</batch:job>
表 5. 説明
項番 説明

(1)

@Valueアノテーションを使用して参照するパラメータを指定する。

(2)

例外をスローする。
この例ではRuntimeExceptionクラスを継承し、自作した例外クラスを使用している。

(3)

参照するパラメータを指定する。
パラメータをセットしているクラスはそれぞれItemStreamReaderItemStreamWriterを実装した独自クラスである。

(4)

リスナーインタフェース実装を設定する。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.2.0.RELEASE, 2020-6-29