Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

ジョブ実行時に発生する例外のハンドリング方法について説明する。

本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。

まず、例外の分類について説明し、例外の種類に応じたハンドリング方法を説明する。

例外の分類

ジョブ実行時に発生する例外は、以下の3つに分類される。

表 1. 例外の分類一覧

項番

分類

説明

例外の種類

(1)

ジョブの再実行(パラメータ、入力データの変更/修正など)によって発生原因が解消できる例外

ジョブの再実行で発生原因が解消できる例外は、アプリケーションコードで例外をハンドリングし、例外処理を行う。

ビジネス例外
正常稼働時に発生するライブラリ例外

(2)

ジョブの再実行によって発生原因が解消できない例外

ジョブの再実行で発生原因が解消できる例外は、以下のパターンにてハンドリングする。

1. StepListenerで例外の捕捉が可能な場合は、 アプリケーションコードで例外をハンドリングする。

2. StepListenerで例外の捕捉が不可能な場合は、 フレームワークで例外処理をハンドリングする。

システム例外
予期しないシステム例外
致命的なエラー

(3)

(非同期実行時に)ジョブ要求のリクエスト不正により発生する例外

ジョブ要求のリクエスト不正により発生する例外は、フレームワークで例外処理をハンドリングし、例外処理を行う。

非同期実行(DBポーリング)の場合は、 ポーリング処理ではジョブ要求に対する妥当性検証をしない。そのため、ジョブ要求を登録するアプリケーションで事前にリクエストに対する入力チェックが行われていることが望ましい。

非同期実行(Webコンテナ)の場合は、 Webアプリケーションにより事前にリクエストに対する入力チェックが行われていることを前提としている。

そのため、ジョブ要求やリクエストを受け付けるアプリケーションで例外ハンドリングを行う。

ジョブ要求リクエスト不正エラー

例外処理内でトランザクショナルな処理は避ける

例外処理内でデータベースへの書き込みを始めとするトランザクショナルな処理を行うと、 二次例外を引き起こしてしまう可能性がある。 例外処理は、解析用ログ出力と終了コード設定を基本とすること。

例外の種類

例外の種類について説明する。

ビジネス例外

ビジネス例外とは、ビジネスルールの違反を検知したことを通知する例外である。
本例外は、ステップのロジック内で発生させる。
アプリケーションとして想定される状態なので、システム運用者による対処は不要である。

ビジネス例外の例
  • 在庫引当時に在庫切れの場合

  • 予定日より日数が超過した場合

  • etc …​

該当する例外クラス
  • java.lang.RuntimeExceptionまたはそのサブクラス

    • ビジネス例外クラスをユーザにて作成することを推奨する

正常稼働時に発生するライブラリ例外

正常稼働時に発生するライブラリ例外とは、フレームワーク、およびライブラリ内で発生する例外のうち、システムが正常稼働している時に発生する可能性のある例外のことを指す。
フレームワーク、およびライブラリ内で発生する例外とは、Spring Frameworkや、その他のライブラリ内で発生する例外クラスを対象とする。
アプリケーションとして想定される状態なので、システム運用者による対処は不要である。

正常稼働時に発生するライブラリ例外の例
  • オンライン処理との排他制御で発生する楽観ロック例外

  • 複数ジョブやオンライン処理からの同一データを同時登録する際に発生する一意制約例外

  • etc …​

該当する例外クラス
  • org.springframework.dao.EmptyResultDataAccessException (楽観ロックをした時、データ更新件数が0件の場合に発生する例外)

  • org.springframework.dao.DuplicateKeyException (一意制約違反となった場合に発生する例外)

  • etc …​

システム例外

システム例外とは、システムが正常稼働している時に、発生してはいけない状態を検知したことを通知する例外である。
本例外は、ステップのロジック内で発生させる。
システム運用者による対処が必要となる。

システム例外の例
  • 事前に存在しているはずのマスタデータ、ディレクトリ、ファイルなどが存在しない場合。

  • フレームワーク、ライブラリ内で発生する検査例外のうち、システム異常に分類される例外を捕捉した場合(ファイル操作時のIOExceptionなど)。

  • etc…​

該当する例外クラス
  • java.lang.RuntimeExceptionまたはそのサブクラス

    • システム例外クラスを作成することを推奨する

予期しないシステム例外

予期しないシステム例外とは、システムが正常稼働している時には発生しない非検査例外である。
システム運用者による対処、またはシステム開発者による解析が必要となる。

予期しないシステム例外は、以下の処理をする以外はハンドリングしない。ハンドリングした場合は、例外を再度スローすること。

  • 捕捉例外を解析用にログ出力を行い、該当する終了コードの設定する。

予期しないシステム例外の例
  • アプリケーション、フレームワーク、ライブラリにバグが潜んでいる場合。

  • データベースサーバなどがダウンしている場合。

  • etc…​

該当する例外クラス
  • java.lang.NullPointerException (バグ起因で発生する例外)

  • org.springframework.dao.DataAccessResourceFailureException(データベースサーバがダウンしている場合に発生する例外)

  • etc …​

致命的なエラー

致命的なエラーとは、システム(アプリケーション)全体に影響を及ぼす、致命的な問題が発生している事を通知するエラーである。
システム運用者、またはシステム開発者による対処・リカバリが必要となる。

致命的なエラーは、以下の処理をする以外はハンドリングしない。ハンドリングした場合は、例外を再度スローすること。

  • 捕捉例外を解析用にログ出力を行い、該当する終了コードの設定する。

致命的なエラーの例
  • Java仮想マシンで使用できるメモリが不足している場合。

  • etc…​

該当する例外クラス
  • java.lang.Errorを継承しているクラス

    • java.lang.OutOfMemoryError (メモリ不足時に発生するエラー)など

  • etc …​

ジョブ要求リクエスト不正エラー

ジョブ要求リクエスト不正エラーとは、非同期実行時にジョブ要求のリクエストに問題が発生していることを通知するエラーである。
システム運用者による対処・リカバリが必要となる。

ジョブ要求リクエスト不正エラーは、ジョブ要求のリクエストを処理するアプリケーションでの例外ハンドリングを前提にするため、 本ガイドラインでは説明はしない。

例外への対応方法

例外への対応方法について説明する。

例外への対応パターンは次のとおり。

  1. 例外発生時にジョブの継続可否を決める (3種類)

  2. 中断したジョブの再実行方法を決める (2種類)

表 2. ジョブの継続可否を決定する方法
項番 例外への対応方法 説明

(1)

スキップ

エラーレコードをスキップし、処理を継続する。

(2)

リトライ

エラーレコードを指定した条件(回数、時間等)に達するまで再処理する。

(3)

処理中断

処理を中断する。

例外が発生していなくても、ジョブが想定以上の処理時間になったため処理途中で停止する場合がある。
この場合は、ジョブの停止を参照。

表 3. 中断したジョブの再実行方法
項番 例外への対応方法 説明

(1)

ジョブのリラン

中断したジョブを最初から再実行する。

(2)

ジョブのリスタート

中断したジョブを中断した箇所から再実行する。

中断したジョブの再実行方法についての詳細は、処理の再実行を参照。

スキップ

スキップとは、バッチ処理を止めずにエラーデータを飛ばして処理を継続する方法である。

スキップを行う例
  • 入力データ内に不正なレコードが存在する場合

  • ビジネス例外が発生した場合

  • etc …​

スキップレコードの再処理

スキップを行う場合は、スキップした不正なレコードについてどのように対応するか設計すること。 不正なレコードを抽出して再処理する場合、次回実行時に含めて処理する場合、などといった方法が考えられる。

リトライ

リトライとは、特定の処理に失敗したレコードに対して指定した回数や時間に達するまで再試行を繰り返す対応方法である。
処理失敗の原因が実行環境に依存しており、かつ、時間の経過により解決される見込みのある場合にのみ用いる。

リトライを行う例
  • 排他制御により、処理対象のレコードがロックされている場合

  • ネットワークの瞬断によりメッセージ送信が失敗する場合

  • etc …​

リトライの適用

リトライをあらゆる場面で適用してしまうと、異常発生時に処理時間がむやみに伸びてしまい、異常の検出が遅れる危険がある。
よって、リトライは処理のごく一部に適用することが望ましく、 その対象は外部システム連携など信頼性が担保しにくいものに限定するとよい。

処理中断

処理中断とは、文字どおり処理を途中で中断する対応方式である。
処理の継続が不可能な内容のエラーが検知された場合や、レコードのスキップを許容しない要件の場合に用いる。

処理中断を行う例
  • 入力データ内に不正なレコードが存在する場合

  • ビジネス例外が発生した場合

  • etc …​

How to use

例外ハンドリングの実現方法について説明をする。

バッチアプリケーション運用時のユーザインタフェースはログが主体である。よって、例外発生の監視もログを通じて行うことになる。

Spring Batch では、ステップ実行時に例外が発生した場合はログを出力し異常終了するため、ユーザにて追加実装をせずとも要件を満たせる可能性がある。 以降の説明は、ユーザにてシステムに応じたログ出力を行う必要があるときのみ、ピンポイントに実装するとよい。 すべての処理を実装しなくてはならないケースは基本的にはない。

例外ハンドリングの共通であるログ設定については、ロギングを参照。

ステップ単位の例外ハンドリング

ステップ単位での例外ハンドリング方法について説明する。

ChunkListenerインタフェースによる例外ハンドリング

処理モデルによらず、発生した例外を統一的にハンドリングしたい場合は、 ChunkListenerインタフェースを利用する。
チャンクよりスコープの広い、ステップやジョブのリスナーを利用しても実現できるが、 出来る限り発生した直後にハンドリングすることを重視し、ChunkListenerを採用する。

各処理モデルごとの例外ハンドリング方法は以下のとおり。

チャンクモデルにおける例外ハンドリング

Spring Batch提供の各種Listenerインタフェースを使用して機能を実現する。

タスクレットモデルにおける例外ハンドリング

タスクレット実装内にて独自に例外ハンドリングを実装する。

ChunkListenerで統一的にハンドリングできるのはなぜか

ChunkListenerによってタスクレット実装内で発生した例外をハンドリングできることに違和感を感じるかもしれない。 これは、Spring Batch においてビジネスロジックの実行はチャンクを基準に考えられており、 1回のタスクレット実行は、1つのチャンク処理として扱われているためである。

この点はorg.springframework.batch.core.step.tasklet.Taskletのインタフェースにも表れている。

public interface Tasklet {
  RepeatStatus execute(StepContribution contribution,
          ChunkContext chunkContext) throws Exception;
}

ChunkListenerインタフェースによる例外ハンドリング

ChunkListenerインタフェースのafterChunkErrorメソッドを実装する。
afterChunkErrorメソッドの引数であるChunkContextからChunkListener.ROLLBACK_EXCEPTION_KEYをキーにしてエラー情報を取得する。

リスナーの設定方法については、リスナーの設定を参照。

ChunkListenerの実装例
@Component
public class ChunkAroundListener implements ChunkListener {

    private static final Logger logger =
            LoggerFactory.getLogger(ChunkAroundListener.class);

    @Override
    public void beforeChunk(ChunkContext context) {
        logger.info("before chunk. [context:{}]", context);
    }

    @Override
    public void afterChunk(ChunkContext context) {
        logger.info("after chunk. [context:{}]", context);
    }

    // (1)
    @Override
    public void afterChunkError(ChunkContext context) {
        logger.error("Exception occurred while chunk. [context:{}]",
                context.getAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY)); // (2)
    }
}
表 4. 説明
項番 説明

(1)

afterChunkErrorメソッドを実装する。

(2)

ChunkContextからChunkListener.ROLLBACK_EXCEPTION_KEYをキーにしてエラー情報を取得する。
この例では、取得した例外のスタックトレースをログ出力している。

処理モデルの違いによるChunkListenerの挙動の違い

チャンクモデルでは、リソースのオープン・クローズで発生した例外は、ChunkListenerインタフェースが捕捉するスコープ外となる。 そのため、afterChunkErrorメソッドでハンドリングが行われない。 概略図を以下に示す。

Difference in resource open timing by chunk model
図 1. チャンクモデルでの例外ハンドリング概略図

タスクレットモデルでは、リソースのオープン・クローズで発生した例外は、ChunkListenerインタフェースが捕捉するスコープ内となる。 そのため、afterChunkErrorメソッドでハンドリングが行わる。 概略図を以下に示す。

Difference in resource open timing by tasklet model
図 2. タスクレットモデルでの例外ハンドリング概略図

この挙動の差を吸収して統一的に例外をハンドリングしたい場合は、 StepExecutionListenerインタフェースで例外の発生有無をチェックすることで実現できる。 ただし、ChunkListenerよりも実装が少々複雑になる。

StepExecutionListenerの実装例
@Component
public class StepErrorLoggingListener implements StepExecutionListener {

    private static final Logger logger =
            LoggerFactory.getLogger(StepErrorLoggingListener.class);

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // do nothing.
    }

    // (1)
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        // (2)
        List<Throwable> exceptions = stepExecution.getFailureExceptions();
        // (3)
        if (exceptions.isEmpty()) {
            return ExitStatus.COMPLETED;
        }

        // (4)
        logger.info("This step has occurred some exceptions as follow. " +
                "[step-name:{}] [size:{}]",
                stepExecution.getStepName(), exceptions.size());
        for (Throwable th : exceptions) {
            logger.error("exception has occurred in job.", th);
        }
        return ExitStatus.FAILED;
    }
表 5. 説明
項番 説明

(1)

afterStepメソッドを実装する。

(2)

引数のstepExecutionからエラー情報を取得する。複数の例外をまとめて扱う必要がある点に注意する。

(3)

エラー情報がない場合は、正常終了とする。

(4)

エラー情報がある場合は、例外ハンドリングを行う。
この例では、発生した例外をすべてスタックトレース付きのログ出力を行っている。

チャンクモデルにおける例外ハンドリング

チャンクモデルでは、 StepListenerを継承したListenerで例外ハンドリングする。

リスナーの設定方法については、リスナーの設定を参照。

コーディングポイント(ItemReader編)

ItemReadListenerインタフェースの onReadErrorメソッドを実装することで、ItemReader内で発生した例外をハンドリングする。

ItemReadListener#onReadErrorの実装例
@Component
public class ChunkComponentListener implements ItemReadListener<Object> {

    private static final Logger logger =
            LoggerFactory.getLogger(ChunkComponentListener.class);

    // omitted.

    // (1)
    @Override
    public void onReadError(Exception ex) {
        logger.error("Exception occurred while reading.", ex);  // (2)
    }

    // omitted.
}
表 6. 説明
項番 説明

(1)

onReadErrorメソッドを実装する。

(2)

例外ハンドリングを実装する
この例では、引数から取得した例外のスタックトレースをログ出力している。

コーディングポイント(ItemProcessor編)

ItemProcessorでの例外ハンドリングには、2つの方法があり、要件に応じて使い分ける。

  1. ItemProcessor 内でtry~catchをする方法

  2. ItemProcessListenerインタフェースを使用する方法

使い分ける理由について説明する。
ItemProcessorの処理内で例外発生時に実行されるonProcessErrorメソッドの引数は、処理対処のアイテムと例外の2つである。
システムの要件によっては、ItemProcessListenerインタフェース内でログ出力等の例外をハンドリングする際に、この2つの引数で要件を満たせない場合が出てくる。 その場合は、ItemProcessor内でtry~catchにて例外をcatchし例外ハンドリング処理を行うことを推奨する。
注意点として、ItemProcessor内でtry~catchを実装した上で、ItemProcessListenerインタフェースを実装すると二重処理になる場合があるため、注意が必要である。
きめ細かい例外ハンドリングを行いたい場合は、ItemProcessor内でtry~catchをする方法を採用すること。

それぞれの方法について説明する。

ItemProcessor 内でtry~catchする方法

きめ細かい例外ハンドリングが必要になる場合はこちらを使用する。
後述するスキップの項で説明するが、エラーレコードのスキップを行う際にはこちらを使用することとなる。

ItemProcessor内でtry~catchする実装例
@Component
public class AmountCheckProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {

    // omitted.

    @Override
    public SalesPerformanceDetail process(SalesPerformanceDetail item)
            throws Exception {
        // (1)
        try {
            checkAmount(item.getAmount(), amountLimit);
        } catch (ArithmeticException ae) {
            // (2)
            logger.error(
                "Exception occurred while processing. [item:{}]", item, ae);
            // (3)
            throw new IllegalStateException("check error at processor.", ae);
        }
        return item;
    }
}
表 7. 説明
項番 説明

(1)

try~catchで実装する。ここでは、特定の例外(ArithmeticException)のみ特別なハンドリングをしている。

(2)

例外ハンドリングを実装する
この例では、引数から取得した例外のスタックトレースをログ出力している。

(3)

トランザクションのロールバック例外をスローする。
また、この例外スローによりItemProcessListenerで共通の例外ハンドリングをすることもできる。

ItemProcessListenerインタフェースを使用する方法

業務例外に対するハンドリングが共通化できる場合はこちらを使用する。

ItemProcessListener#onProcessErrorの実装例
@Component
public class ChunkComponentListener implements ItemProcessListener<Object, Object> {

    private static final Logger logger =
            LoggerFactory.getLogger(ChunkComponentListener.class);

    // omitted.

    // (1)
    @Override
    public void onProcessError(Object item, Exception e) {
        // (2)
        logger.error("Exception occurred while processing. [item:{}]", item, e);
    }

    // omitted.
}
表 8. 説明
項番 説明

(1)

onProcessErrorメソッドを実装する。

(2)

例外ハンドリングを実装する
この例では、引数から取得した処理対象データと例外のスタックトレースをログ出力している。

コーディングポイント(ItemWriter編)

ItemWriteListenerインタフェースの onWriteErrorメソッドを実装することで、ItemWriter内で発生した例外をハンドリングする。

ItemWriteListener#onWriteErrorの実装例
@Component
public class ChunkComponentListener implements ItemWriteListener<Object> {

    private static final Logger logger =
            LoggerFactory.getLogger(ChunkComponentListener.class);

    // omitted.

    // (1)
    @Override
    public void onWriteError(Exception ex, Chunk item) {
        // (2)
        logger.error("Exception occurred while processing. [items:{}]", item, ex);
    }

    // omitted.
}
表 9. 説明
項番 説明

(1)

onWriteErrorメソッドを実装する。

(2)

例外ハンドリングを実装する
この例では、引数から取得した出力対象のチャンクと例外のスタックトレースをログ出力している。

タスクレットモデルにおける例外ハンドリング

タスクレットモデルの例外ハンドリングはタスクレット内で独自に実装する。

トランザクション処理を行う場合は、ロールバックさせるために必ず例外を再度スローすること。

タスクレットモデルでの例外ハンドリング実装例
@Component
public class SalesPerformanceTasklet implements Tasklet {

    private static final Logger logger =
            LoggerFactory.getLogger(SalesPerformanceTasklet.class);

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        // (1)
        try {
            reader.open(chunkContext.getStepContext().getStepExecution()
                    .getExecutionContext());

            List<SalesPerformanceDetail> items = new ArrayList<>(10);
            SalesPerformanceDetail item = null;
            do {
                // Pseudo operation of ItemReader
                // omitted.

                // Pseudo operation of ItemProcessor
                checkAmount(item.getAmount(), amountLimit);


                // Pseudo operation of ItemWriter
                // omitted.

            } while (item != null);
        } catch (Exception e) {
            logger.error("exception in tasklet.", e);   // (2)
            throw e;    // (3)
        } finally {
            try {
                reader.close();
            } catch (Exception e) {
                // do nothing.
            }
        }

        return RepeatStatus.FINISHED;
    }
}
表 10. 説明
項番 説明

(1)

try-catchを実装する。

(2)

例外ハンドリングを実装する
この例では、発生した例外のスタックトレースをログ出力している。

(3)

トランザクションをロールバックするため、例外を再度スローする。

ジョブ単位の例外ハンドリング

ジョブ単位に例外ハンドリング方法を説明する。
チャンクモデルとタスクレットモデルとで共通のハンドリング方法となる。

システム例外や致命的エラー等エラーはジョブ単位に JobExecutionListenerインタフェースの実装を行う。

例外ハンドリング処理を集約して定義するために、ステップごとにハンドリング処理を定義はせずジョブ単位でハンドリングを行う。
ここでの例外ハンドリングは、ログ出力、およびExitCodeの設定を行い、トランザクション処理は実装しないこと。

トランザクション処理の禁止

JobExecutionListenerで行われる処理は、業務トランザクション管理の範囲外となる。 よってジョブ単位の例外ハンドリングでトランザクション処理を実施することは禁止する。

ここでは、ItemProcessorで例外が発生したときのハンドリング例を示す。 リスナーの設定方法については、リスナーの設定を参照。

ItemProcessorの実装例
@Component
public class AmountCheckProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {

    // omitted.

    private StepExecution stepExecution;

    // (1)
    @BeforeStep
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public SalesPerformanceDetail process(SalesPerformanceDetail item)
            throws Exception {
        // (2)
        try {
            checkAmount(item.getAmount(), amountLimit);
        } catch (ArithmeticException ae) {
            // (3)
            stepExecution.getExecutionContext().put("ERROR_ITEM", item);
            // (4)
            throw new IllegalStateException("check error at processor.", ae);
        }
        return item;
    }
}
JobExecutionListenerでの例外ハンドリング実装
@Component
public class JobErrorLoggingListener implements JobExecutionListener {

    private static final Logger logger =
            LoggerFactory.getLogger(JobErrorLoggingListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // do nothing.
    }

    // (5)
    @Override
    public void afterJob(JobExecution jobExecution) {

        // whole job execution
        List<Throwable> exceptions = jobExecution.getAllFailureExceptions(); // (6)
        // (7)
        if (exceptions.isEmpty()) {
            return;
        }
        // (8)
        logger.info("This job has occurred some exceptions as follow. " +
                "[job-name:{}] [size:{}]",
                jobExecution.getJobInstance().getJobName(), exceptions.size());
        for (Throwable th : exceptions) {
            logger.error("exception has occurred in job.", th);
        }
        // (9)
        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
            Object errorItem = stepExecution.getExecutionContext()
                    .get("ERROR_ITEM"); // (10)
            if (errorItem != null) {
                logger.error("detected error on this item processing. " +
                        "[step:{}] [item:{}]", stepExecution.getStepName(),
                        errorItem);
            }
        }

    }
}
表 11. 説明
項番 説明

(1)

JobExecutionListenerでエラーデータを出力するため、ステップ実行前にStepExecutionインスタンスを取得する。

(2)

try-catchを実装する。

(3)

例外ハンドリングを実装する
この例では、StepExecutionインスタンスのコンテキストにエラーデータをERROR_ITEMというキーで格納している。

(4)

JobExecutionListenerで例外ハンドリングをするために、例外をスローする。

(5)

afterJobメソッドに例外ハンドリングを実装する。

(6)

引数のjobExecutionからジョブ全体で発生したエラー情報を取得する。

(7)

エラー情報がない場合は、正常終了とする。

(8)

エラー情報がある場合は、例外ハンドリングを行う。
この例では、発生した例外をすべてスタックトレース付きのログ出力を行っている。

(9)

この例では、エラーデータがある場合はログ出力を行うようにしている。
ジョブで定義されたすべてのステップからStepExecutionインスタンスを取得し、ERROR_ITEMというキーでエラーデータが格納されているかチェックする。 格納されていた場合は、エラーデータとしてログ出力する。

ExecutionContextへ格納するオブジェクト

ExecutionContextへ格納するオブジェクトは、java.io.Serializableを実装したクラスでなければならない。 これは、ExecutionContextJobRepositoryへ格納されるためである。

処理継続可否の決定

例外発生時にジョブの処理継続可否を決定する実装方法を説明する。

処理継続可否方法一覧

スキップ

エラーレコードをスキップして、処理を継続する方法を説明する。

チャンクモデル

チャンクモデルでは、各処理のコンポーネントで実装方法が異なる。

ここで説明する内容を適用する前に、必ず<skippable-exception-classes>を使わない理由についてを一読すること。

ItemReaderでのスキップ

StepBuilderのskipPolicyメソッド/<batch:chunk>のskip-policy属性にスキップ方法を指定する。 skipメソッド/<batch:skippable-exception-classes>に、スキップ対象とするItemReaderで発生する例外クラスを指定する。
skipPolicyメソッドの引数skipPolicy/skip-policy属性には、Spring Batchが提供している下記に示すいづれかのクラスを使用する。

表 12. skip-policy一覧
クラス名 説明

AlwaysSkipItemSkipPolicy

常にスキップをする。

NeverSkipItemSkipPolicy

スキップをしない。

LimitCheckingItemSkipPolicy

指定したスキップ数の上限に達するまでスキップをする。
上限値に達した場合は、以下の例外が発生する。
org.springframework.batch.core.step.skip.SkipLimitExceededException

skip-policyを省略した時にデフォルトで使われるスキップ方法である。

ExceptionClassifierSkipPolicy

例外ごとに適用するskip-policyを変えたい場合に利用する。

スキップの実装例を説明する。

FlatFileItemReaderでCSVファイルを読み込む際、不正なレコードが存在するケースを扱う。
なお、この時以下の例外が発生する。

  • org.springframework.batch.item.ItemReaderException(ベースとなる例外クラス)

    • org.springframework.batch.item.file.FlatFileParseException (発生する例外クラス)

skip-policy別に定義方法を示す。

@Bean
@StepScope
public FlatFileItemReader<SalesPerformanceDetail> detailCSVReader(
        @Value("#{jobParameters['inputFile']}") File inputFile) {
    final DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    tokenizer.setNames("branchId", "year", "month", "customerId", "amount");
    final BeanWrapperFieldSetMapper<SalesPerformanceDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<SalesPerformanceDetail>();
    fieldSetMapper.setTargetType(SalesPerformanceDetail.class);
    final DefaultLineMapper<SalesPerformanceDetail> lineMapper = new DefaultLineMapper<SalesPerformanceDetail>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(fieldSetMapper);
    return new FlatFileItemReaderBuilder<SalesPerformanceDetail>()
            .name(ClassUtils.getShortName(FlatFileItemReader.class))
            .resource(new FileSystemResource(inputFile))
            .lineMapper(lineMapper)
            .build();
}
<bean id="detailCSVReader"
      class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
    <property name="resource" value="file:#{jobParameters['inputFile']}"/>
    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
                      p:names="branchId,year,month,customerId,amount"/>
            </property>
            <property name="fieldSetMapper">
                <bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                      p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail"/>
            </property>
        </bean>
    </property>
</bean>
AlwaysSkipItemSkipPolicy

.AlwaysSkipItemSkipPolicyの指定例

// (1)
@Bean
public AlwaysSkipItemSkipPolicy skipPolicy() {
    AlwaysSkipItemSkipPolicy skipPolicy = new AlwaysSkipItemSkipPolicy();
    return skipPolicy;
}

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
                   AlwaysSkipItemSkipPolicy skipPolicy,
                   SkipLoggingListener listener) {
    return new StepBuilder("jobSalesPerfAtSkipAllReadError.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skipPolicy(skipPolicy) // (2)
            .listener(listener)
            .build();
}

@Bean
public Job jobSalesPerfAtSkipAllReadError(JobRepository jobRepository,
                                          Step step01,
                                          JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfAtSkipAllReadError", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- (1) -->
<bean id="skipPolicy"
      class="org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy"/>

<batch:job id="jobSalesPerfAtSkipAllReadError" job-repository="jobRepository">
    <batch:step id="jobSalesPerfAtSkipAllReadError.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"
                         skip-policy="skipPolicy"> <!-- (2) -->
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 13. 説明
項番 説明

(1)

AlwaysSkipItemSkipPolicyをBean定義する。

(2)

StepBuilderのskipPolicyメソッド/<batch:chunk>のskip-policy属性に(1)で定義したBeanを設定する。

NeverSkipItemSkipPolicy

.NeverSkipItemSkipPolicyの指定例

// (1)
@Bean
public NeverSkipItemSkipPolicy skipPolicy() {
    return new NeverSkipItemSkipPolicy();
}

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
                   SkipLoggingListener skipLoggingListener,
                   NeverSkipItemSkipPolicy skipPolicy) {
    return new StepBuilder("jobSalesPerfAtSkipNeverReadError.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .listener(skipLoggingListener)
            .skipPolicy(skipPolicy) // (2)
            .build();
}

@Bean
public Job jobSalesPerfAtSkipNeverReadError(JobRepository jobRepository,
                                          Step step01,
                                          JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfAtSkipNeverReadError", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- (1) -->
<bean id="skipPolicy"
      class="org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy"/>

<batch:job id="jobSalesPerfAtSkipNeverReadError" job-repository="jobRepository">
    <batch:step id="jobSalesPerfAtSkipNeverReadError.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"
                         skip-policy="skipPolicy"> <!-- (2) -->
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 14. 説明
項番 説明

(1)

NeverSkipItemSkipPolicyをBean定義する。

(2)

StepBuilderのskipPolicyメソッド/<batch:chunk>のskip-policy属性に(1)で定義したBeanを設定する。

LimitCheckingItemSkipPolicy

.LimitCheckingItemSkipPolicyの指定例

// (1)
// @Bean
// public LimitCheckingItemSkipPolicy skipPolicy() {
//     return new LimitCheckingItemSkipPolicy();
// }

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
                   SkipLoggingListener listener) {
    return new StepBuilder("jobSalesPerfAtValidSkipReadError.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skipLimit(2) // (2)
            .skip(ItemReaderException.class) // (3) (4)
            .listener(listener)
            .build();
}

@Bean
public Job jobSalesPerfAtValidSkipReadError(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfAtValidSkipReadError", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- (1) -->
<!--
<bean id="skipPolicy"
      class="org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy"/>
-->

<batch:job id="jobSalesPerfAtValidSkipReadError" job-repository="jobRepository">
    <batch:step id="jobSalesPerfAtValidSkipReadError.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"
                         skip-limit="2">  <!-- (2) -->
                <!-- (3) -->
                <batch:skippable-exception-classes>
                    <!-- (4) -->
                    <batch:include
                        class="org.springframework.batch.item.ItemReaderException"/>
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 15. 説明
項番 説明

(1)

LimitCheckingItemSkipPolicyをBean定義する。
StepBuilderのskipPolicyメソッド/skip-policy属性省略時のデフォルトであるため、定義しなくてもよい。

(2)

StepBuilderのskipLimitメソッド/<batch:chunk>のskip-limit属性にスキップ数の上限値を設定する。
skipPolicyメソッド/skip-policy属性はデフォルトを使用ため省略。

(3)

StepBuilderのskipメソッド/<batch:skippable-exception-classes>を定義し、要素内に対象となる例外を設定する。

(4)

ItemReaderExceptionをスキップ対象クラスとして設定を行う。

ExceptionClassifierSkipPolicy

.ExceptionClassifierSkipPolicyの指定例

// (1)
@Bean
public ExceptionClassifierSkipPolicy skipPolicy(
        AlwaysSkipItemSkipPolicy alwaysSkip) {
    ExceptionClassifierSkipPolicy skipPolicy = new ExceptionClassifierSkipPolicy();
    Map<Class<? extends Throwable>, SkipPolicy> policyMap = new HashMap<>();
    policyMap.put(ItemReaderException.class, alwaysSkip); // (2)
    skipPolicy.setPolicyMap(policyMap);
    return skipPolicy;
}

// (3)
@Bean
public AlwaysSkipItemSkipPolicy alwaysSkip() {
    return new AlwaysSkipItemSkipPolicy();
}

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
                   SkipLoggingListener skipLoggingListener,
                   ExceptionClassifierSkipPolicy skipPolicy) {
    return new StepBuilder("jobSalesPerfAtValidNolimitSkipReadError.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skipPolicy(skipPolicy) // (4)
            .listener(skipLoggingListener)
            .build();
}

@Bean
public Job jobSalesPerfAtValidNolimitSkipReadError(JobRepository jobRepository,
                                          Step step01,
                                          JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfAtValidNolimitSkipReadError", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- (1) -->
<bean id="skipPolicy"
      class="org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy">
    <property name="policyMap">
        <map>
            <!-- (2) -->
            <entry key="org.springframework.batch.item.ItemReaderException"
                   value-ref="alwaysSkip"/>
        </map>
    </property>
</bean>
<!-- (3) -->
<bean id="alwaysSkip"
      class="org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy"/>

<batch:job id="jobSalesPerfAtValidNolimitSkipReadError"
           job-repository="jobRepository">
    <batch:step id="jobSalesPerfAtValidNolimitSkipReadError.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- skip-limit value is dummy. -->
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"
                         skip-policy="skipPolicy"> <!-- (4) -->
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 16. 説明
項番 説明

(1)

ExceptionClassifierSkipPolicyをBean定義する。

(2)

policyMapに、キーを例外クラス、値をスキップ方法にしたマップを設定する。
この例では、ItemReaderExceptionが発生したときに(3)で定義したスキップ方法になるように設定している。

(3)

例外別に実行したいスキップ方法を定義する。
この例では、AlwaysSkipItemSkipPolicyを定義している。

(4)

StepBuilderのskipPolicyメソッド/<batch:chunk>のskip-policy属性に(1)で定義したBeanを設定する。

ItemProcessorでのスキップ

ItemProcessor内でtry~catchをして、nullを返却する。
skip-policyによるスキップは、ItemProcessorで再処理が発生するため利用しない。
詳細は、<skippable-exception-classes>を使わない理由についてを参照。

ItemProcessorにおける例外ハンドリンクの制約

<skippable-exception-classes>を使わない理由についてにあるように、 ItemProcessorでは、skipメソッド/<batch:skippable-exception-classes>を利用したスキップは禁止している。 そのため、コーディングポイント(ItemProcessor編)で説明している 「ItemProcessListenerインタフェースを使用する方法」を応用したスキップはできない。

スキップの実装例を説明する。

ItemProcessor 内でtry~catchする例
@Component
public class AmountCheckProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {

    // omitted.

    @Override
    public SalesPerformanceDetail process(SalesPerformanceDetail item) throws Exception {
        // (1)
        try {
            checkAmount(item.getAmount(), amountLimit);
        } catch (ArithmeticException ae) {
            logger.warn("Exception occurred while processing. Skipped. [item:{}]",
                    item, ae); // (2)
            return null; // (3)
        }
        return item;
    }
}
表 17. 説明
項番 説明

(1)

try~catchで実装する。

(2)

例外ハンドリングを実装する
この例では、引数から取得した例外のスタックトレースをログ出力している。

(3)

nullを返却することでエラーデータをスキップする。

ItemWriterでのスキップ

ItemWriterにおいてスキップ処理は原則として行わない。
スキップが必要な場合でも、 skip-policyによるスキップは、チャンクサイズが変動するので利用しない。
詳細は、<skippable-exception-classes>を使わない理由についてを参照。

タスクレットモデル

ビジネスロジック内で例外をハンドリングし、独自にエラーレコードをスキップする処理を実装する。

タスクレットモデルでの実装例
@Component
public class SalesPerformanceTasklet implements Tasklet {

    private static final Logger logger =
            LoggerFactory.getLogger(SalesPerformanceTasklet.class);

    // omitted.

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        // (1)
        try {
            reader.open(chunkContext.getStepContext().getStepExecution()
                    .getExecutionContext());

            List<SalesPerformanceDetail> items = new ArrayList<>(10);
            SalesPerformanceDetail item = null;
            do {
                // Pseudo operation of ItemReader
                // omitted.

                // Pseudo operation of ItemProcessor
                checkAmount(item.getAmount(), amountLimit);


                // Pseudo operation of ItemWriter
                // omitted.

            } while (item != null);
        } catch (Exception e) {
            logger.warn("exception in tasklet. Skipped.", e);   // (2)
            continue;    // (3)
        } finally {
            try {
                reader.close();
            } catch (Exception e) {
                // do nothing.
            }
        }

        return RepeatStatus.FINISHED;
    }
}
表 18. 説明
項番 説明

(1)

try-catchを実装する。

(2)

例外ハンドリングを実装する
この例では、発生した例外のスタックトレースをログ出力している。

(3)

continueにより、エラーデータの処理をスキップする。

リトライ

例外を検知した場合に、規定回数に達するまで再処理する方法を説明する。

リトライには、状態管理の有無やリトライが発生するシチュエーションなどさまざまな要素を考慮する必要があり、 確実な方法は存在しないうえに、むやみにリトライするとかえって状況を悪化させてしまう。

そのため、本ガイドラインでは、局所的なリトライを実現するorg.springframework.retry.support.RetryTemplateを利用する方法を説明する。

スキップと同様に<retryable-exception-classes>で対象となる例外クラスを指定する方法もある。 しかし、<skippable-exception-classes>を使わない理由についてと同様に 性能劣化を招く副作用があるため、Macchinetta Batch 2.xでは利用しない。

RetryTemplate実装コード
public class RetryableAmountCheckProcessor implements
        ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {

    // omitted.

    // (1)
    private RetryPolicy retryPolicy;

    @Override
    public SalesPerformanceDetail process(SalesPerformanceDetail item)
            throws Exception {

        // (2)
        RetryTemplate rt = new RetryTemplate();
        if (retryPolicy != null) {
            rt.setRetryPolicy(retryPolicy);
        }

        try {
            // (3)
            rt.execute(new RetryCallback<SalesPerformanceDetail, Exception>() {
                @Override
                public SalesPerformanceDetail doWithRetry(RetryContext context) throws Exception {
                    logger.info("execute with retry. [retry-count:{}]", context.getRetryCount());
                    // retry mocking
                    if (context.getRetryCount() == adjustTimes) {
                        item.setAmount(item.getAmount().divide(new BigDecimal(10)));
                    }
                    checkAmount(item.getAmount(), amountLimit);
                    return null;
                }
            });
        } catch (ArithmeticException ae) {
            // (4)
            throw new IllegalStateException("check error at processor.", ae);
        }
        return item;
    }

    public void setRetryPolicy(RetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;
    }
}
@Bean
@StepScope
public RetryableAmountCheckProcessor amountCheckProcessor(
        @Value("#{jobParameters['adjust-times']}") int adjustTimes,
        SimpleRetryPolicy retryPolicy) {
    RetryableAmountCheckProcessor processor = new RetryableAmountCheckProcessor();
    processor.setAmountLimit(BigDecimal.valueOf(10000));
    processor.setAdjustTimes(adjustTimes);
    processor.setRetryPolicy(retryPolicy); // (5)
    return processor;
}

// (6)
@Bean
public SimpleRetryPolicy retryPolicy(
        Map<Class<? extends Throwable>, Boolean> exceptionMap) {
    return new SimpleRetryPolicy(3, exceptionMap); // (7) (8)
}

// (9)
@Bean
public Map<Class<? extends Throwable>, Boolean> exceptionMap() {
    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
    exceptionMap.put(UnsupportedOperationException.class, true);
    return exceptionMap;
}

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer) {
    return new StepBuilder("jobSalesPerfWithRetryPolicy.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
}

@Bean
public Job jobSalesPerfWithRetryPolicy(JobRepository jobRepository,
                                          Step step01,
                                          JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfWithRetryPolicy", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- omitted -->

<bean id="amountCheckProcessor"
      class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.exceptionhandling.RetryableAmountCheckProcessor"
      scope="step"
      p:retryPolicy-ref="retryPolicy"/> <!-- (5) -->

<!-- (6) (7) (8)-->
<bean id="retryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy"
      c:maxAttempts="3"
      c:retryableExceptions-ref="exceptionMap"/>

<!-- (9) -->
<util:map id="exceptionMap">
    <entry key="java.lang.ArithmeticException" value="true"/>
</util:map>

<batch:job id="jobSalesPerfWithRetryPolicy" job-repository="jobRepository">
    <batch:step id="jobSalesPerfWithRetryPolicy.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 19. 説明
項番 説明

(1)

リトライ条件を格納する

(2)

RetryTemplateのインスタンスを作成する。
デフォルトは、リトライ回数=3、すべての例外がリトライ対象である。

(3)

RetryTemplate#executeメソッドで、リトライを行いたいビジネスロジックを実行するようにする。
ビジネスロジック全体ではなく、リトライしたい部分のみをRetryTemplate#executeメソッドで実行するようにする。

(4)

リトライ回数が規定回数を超えた場合の例外ハンドリング。
ビジネスロジックで発生する例外がそのままスローされてくる。

(5)

(6)で定義するリトライ条件を指定する。

(6)

リトライ条件を、org.springframework.retry.RetryPolicyを実装したクラスで定義する。
この例では、Spring Batchから提供されているSimpleRetryPolicyを利用している。

(7)

コンストラクタ引数のmaxAttemptsにリトライ回数の指定をする。

(8)

コンストラクタ引数のretryableExceptionsに(9)で定義するリトライ対象の例外を定義したマップを指定する。

(9)

キーにリトライ対象の例外クラス、値に真偽値を設定したマップを定義する。
真偽値がtrueであれば、リトライ対象の例外となる。

処理中断

ステップ実行を打ち切りたい場合、スキップ・リトライ対象以外のRuntimeExceptionもしくはそのサブクラスをスローする。

LimitCheckingItemSkipPolicyをもとに、スキップの実装例を示す。

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
                   SkipLoggingListener listener) {
    return new StepBuilder("jobSalesPerfAtValidSkipReadError.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .faultTolerant()
            .skipLimit(2)
            .skip(ValidationException.class) // (1)
            .listener(listener)
            .build();
}

@Bean
public Job jobSalesPerfAtValidSkipReadError(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfAtValidSkipReadError", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<batch:job id="jobSalesPerfAtValidSkipReadError" job-repository="jobRepository">
    <batch:step id="jobSalesPerfAtValidSkipReadError.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"
                         skip-limit="2">
                <batch:skippable-exception-classes>
                    <!-- (1) -->
                    <batch:include class="org.springframework.batch.item.validator.ValidationException"/>
                </batch:skippable-exception-classes>
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 20. 説明
項番 説明

(1)

ValidationException以外の例外が発生すれば処理が中断される。

リトライをもとに、リトライの実装例を示す。

@Bean
public SimpleRetryPolicy retryPolicy(
        Map<Class<? extends Throwable>, Boolean> exceptionMap) {
    return new SimpleRetryPolicy(3, exceptionMap);
}

@Bean
public Map<Class<? extends Throwable>, Boolean> exceptionMap() {
    Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
    exceptionMap.put(UnsupportedOperationException.class, true); // (1)
    return exceptionMap;
}

@Bean
public Step step01(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   @Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
                   @Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
                   @Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer) {
    return new StepBuilder("jobSalesPerfWithRetryPolicy.step01",
            jobRepository)
            .<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
                    transactionManager)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
}

@Bean
public Job jobSalesPerfWithRetryPolicy(JobRepository jobRepository,
                                          Step step01,
                                          JobExecutionLoggingListener listener) {
    return new JobBuilder("jobSalesPerfWithRetryPolicy", jobRepository)
            .start(step01)
            .listener(listener)
            .build();
}
<!-- omitted -->

<bean id="retryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy"
      c:maxAttempts="3"
      c:retryableExceptions-ref="exceptionMap"/>

<util:map id="exceptionMap">
    <!-- (1) -->
    <entry key="java.lang.UnsupportedOperationException" value="true"/>
</util:map>

<batch:job id="jobSalesPerfWithRetryPolicy" job-repository="jobRepository">
    <batch:step id="jobSalesPerfWithRetryPolicy.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="detailCSVReader"
                         processor="amountCheckProcessor"
                         writer="detailWriter" commit-interval="10"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
表 21. 説明
項番 説明

(1)

UnsupportedOperationException以外の例外が発生すれば処理が中断される。

Appendix

<skippable-exception-classes>を使わない理由について

Spring Batchでは、ジョブ全体を対象としてスキップする例外を指定し、例外が発生したアイテムへの処理をスキップして処理を継続させる機能を提供している。

その機能は、以下のようにStepBuilderのskipメソッド/<chunk>要素配下に<skippable-exception-classes>要素を設定し、スキップ対象の例外を指定する形で実装する。

@Bean
public Step retryStep(JobRepository jobRepository,
                   @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                   ItemReader itemReader,
                   ItemProcessor itemProcessor,
                   ItemWriter itemWriter) {
    return new StepBuilder("flowJob.retryStep",
            jobRepository)
            .chunk(20, transactionManager)
            .reader(itemReader)
            .processor(itemProcessor)
            .writer(itemWriter)
            .faultTolerant()
            .skipLimit(10)
            .skip(Exception.class)
            .noSkip(NullPointerException.class)
            .build();
}

@Bean
public Job flowJob(JobRepository jobRepository,
                                            Step retryStep,
                                            JobExecutionLoggingListener listener) {
    return new JobBuilder("flowJob", jobRepository)
            .start(retryStep)
            .listener(listener)
            .build();
}
<job id="flowJob">
    <step id="retryStep">
        <tasklet>
            <chunk reader="itemReader" writer="itemWriter"
                   processor="itemProcessor" commit-interval="20"
                   skip-limit="10">
                <skippable-exception-classes>
                    <!-- specify exceptions to the skipped -->
                    <include class="java.lang.Exception"/>
                    <exclude class="java.lang.NullPointerException"/>
                </skippable-exception-classes>
            </chunk>
        </tasklet>
    </step>
</job>

この機能を利用することによって、入力チェックエラーが発生したレコードをスキップして後続データの処理を継続することは可能だが、 Macchinetta Batch 2.xでは以下の理由により使用しない。

  • skipメソッド/<skippable-exception-classes>要素を利用して例外をスキップした場合、 1つのチャンクに含まれるデータ件数が変動するため、性能劣化を引き起こす可能性がある。

    • これは、例外の発生箇所(ItemReader/ItemProcessor/ItemWriter)によって変わる。詳細は後述する。

skip/<skippable-exception-classes>を定義せずにSkipPolicyを利用することは必ず避ける

暗黙的にすべての例外が登録された状況になり、性能劣化の可能性が飛躍的に高まる。

例外発生箇所(ItemReader/ItemProcessor/ItemWriter)ごとの挙動についてそれぞれ説明する。
なお、トランザクションの動作は例外の発生箇所によらず、例外が発生した場合は必ずロールバックした後、再度処理される。

ItemReaderで例外が発生した場合
  • ItemReaderの処理内で例外が発生した場合は、次のitemへ処理対象が移る。

  • これによる副作用はない

ItemProcessorで例外が発生した場合
  • ItemProcessorの処理内で例外が発生した場合は、チャンクの最初に戻り1件目から再処理する。

  • 再処理の対象にスキップされるitemは含まれない。

  • 1度目の処理と再処理時のチャンクサイズは変わらない。

ItemWriterで例外が発生した場合
  • ItemWriterの処理内で例外が発生した場合は、チャンクの最初に戻り1件目から再処理する。

  • 再処理はChunkSize=1に固定し、1件ずつ実行される。

  • 再処理対象にスキップされるitemも含まれる。

ItemProcessorにて例外が発生した場合、ChunkSize=1000の場合を例に考えると、 1000件目で例外が発生すると1件目から再処理が行われ、合計で1999件分の処理が実行されてしまう。

ItemWriterにて例外が発生した場合、ChunkSize=1に固定し再処理される。 仮にChunkSize=1000の場合を例に考えると、 本来1回のトランザクションにも関わらず1000回のトランザクションに分割し処理されてしまう。

これらはジョブ全体の処理時間が長期化することを意味し、異常時に状況を悪化させる可能性が高い。 また、二重処理すること自体が問題化する可能性を秘めており、設計製造に追加の考慮事項を生む。

よって、skipメソッド/<skippable-exception-classes>を使用することは推奨しない。 ItemReaderでエラーになったデータをスキップすることはこれらの問題を引き起こさないが、 事故を未然に防ぐためには基本的に避けるようにし、どうしても必要な場合に限定的に適用すること。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28