Macchinetta Batch Framework (2.x) Development Guideline - version 2.2.0.RELEASE, 2020-6-29
> INDEX
前提

チュートリアルの進め方で説明しているとおり、 入力データの妥当性検証を行うジョブに対して、 例外ハンドリングの実装を追加していく形式とする。なお、例外ハンドリング方式にはtry-catchやChunkListenerなど様々な方式がある。
ただし、記述はデータベースアクセスするジョブに実装を追加した場合の説明としているため留意すること。

概要

try-catchで例外ハンドリングを行うジョブを作成する。

なお、詳細についてはMacchinetta Batch 2.x 開発ガイドラインのItemProcessor内でtry~catchする方法および タスクレットモデルにおける例外ハンドリングを参照。

終了コードの意味合いについて

本節では、終了コードは2つの意味合いで扱われており、それぞれの説明を以下に示す。

  • COMPLETED、FAILEDなどの文字列の終了コードは、ジョブやステップの終了コードである。

  • 0、255などの数値の終了コードは、Javaプロセスの終了コードである。

作成するアプリケーションの説明の 背景、処理概要、業務仕様を以下に再掲する。

背景

とある量販店では、会員に対してポイントカードを発行している。
会員には「ゴールド会員」「一般会員」の会員種別が存在し、会員種別に応じたサービスを提供している。
今回そのサービスの一環として、月内に商品を購入した会員のうち、 会員種別が「ゴールド会員」の場合は100ポイント、「一般会員」の場合は10ポイントを月末に加算することにした。

処理概要

会員種別に応じてポイント加算を行うアプリケーションを 月次バッチ処理としてMacchinetta Batch 2.xを使用して実装する。
入力データにポイントの上限値を超えるデータが存在するか妥当性検証を行う処理を追加実装し、 エラーの場合は警告メッセージを出力し、スキップして処理を継続する。その際にスキップしたことを示す終了コードを出力する。

業務仕様

業務仕様は以下のとおり。

  • 入力データのポイントが1,000,000ポイントを超過していないことをチェックする

    • チェックエラーとなる場合は、警告メッセージをログに出力し、対象レコードはスキップして処理を継続する

    • スキップした場合は、スキップしたことを示すために終了コードを"200"(SKIPPED)に変換する

  • 商品購入フラグが"1"(処理対象)の場合に、会員種別に応じてポイントを加算する

    • 会員種別が"G"(ゴールド会員)の場合は100ポイント、"N"(一般会員)の場合は10ポイント加算する

  • 商品購入フラグはポイント加算後に"0"(初期状態)に更新する

  • ポイントの上限値は1,000,000ポイントとする

  • ポイント加算後に1,000,000ポイントを超えた場合は、1,000,000ポイントに補正する

テーブル仕様

入出力リソースとなる会員情報テーブルの仕様は以下のとおり。
前提のとおりデータベースアクセスするジョブの場合の説明となるため、ファイルアクセスするジョブの場合の 入出力のリソース仕様はファイル仕様を参照。

表 1. 会員情報テーブル(member_info)
No 属性名 カラム名 PK データ型 桁数 説明

1

会員番号

id

CHAR

8

会員を一意に示す8桁固定の番号を表す。

2

会員種別

type

-

CHAR

1

会員の種別を以下のとおり表す。
"G"(ゴールド会員)、"N"(一般会員)

3

商品購入フラグ

status

-

CHAR

1

月内に商品を買ったかどうかを表す。
商品購入で"1"(処理対象)、月次バッチ処理で"0"(初期状態)に更新される。

4

ポイント

point

-

INT

7

会員の保有するポイントを表す。
初期値は0。

ジョブの概要

ここで作成する入力チェックを行うジョブの概要を把握するために、 処理フローおよび処理シーケンスを以下に示す。

前提のとおりデータベースアクセスするジョブの場合の説明となるため、 ファイルアクセスするジョブの場合の処理フローおよび処理シーケンスとは異なる部分があるため留意する。

処理フロー概要

処理フローの概要を以下に示す。

ProcessFlow of ExceptionHandlingWithTryCatch Job
図 1. 例外ハンドリングを行うジョブの処理フロー
チャンクモデルの場合の処理シーケンス

チャンクモデルの場合の処理シーケンスを説明する。
本ジョブは異常系データを利用することを前提として説明しているため、 このシーケンス図は入力チェックでエラー(警告終了)となった場合を示している。

橙色のオブジェクトは今回実装するクラスを表す。

ProcessSequence of ExceptionHandlingWithTryCatch Job by ChunkModel
図 2. チャンクモデルのシーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

  2. ステップは、リソースをオープンする。

  3. MyBatisCursorItemReaderは、member_infoテーブルから会員情報をすべて取得(select文の発行)する。

    • 入力データがなくなるまで、以降の処理を繰り返す。

    • チャンク単位で、フレームワークトランザクションを開始する。

    • チャンクサイズに達するまで4から12までの処理を繰り返す。

  4. ステップは、MyBatisCursorItemReaderから入力データを1件取得する。

  5. MyBatisCursorItemReaderは、member_infoテーブルから入力データを1件取得する。

  6. member_infoテーブルは、MyBatisCursorItemReaderに入力データを返却する。

  7. MyBatisCursorItemReaderは、ステップに入力データを返却する。

  8. ステップは、PointAddItemProcessorで入力データに対して処理を行う。

  9. PointAddItemProcessorは、SpringValidatorに入力チェック処理を依頼する。

  10. SpringValidatorは、入力チェックルールに基づき入力チェックを行い、チェックエラーの場合は例外(ValidationException)をスローする。

  11. PointAddItemProcessorは、入力データを読み込んでポイント加算処理を行う。例外(ValidationException)をキャッチした場合はnullを返却してエラーレコードをスキップする。

  12. PointAddItemProcessorは、ステップに処理結果を返却する。

  13. ステップは、チャンクサイズ分のデータをMyBatisBatchItemWriterで出力する。

  14. MyBatisBatchItemWriterは、member_infoテーブルに対して会員情報の更新(update文の発行)を行う。

  15. ステップはフレームワークトランザクションをコミットする。

  16. ステップはStepExitStatusChangeListenerを実行する。

  17. StepExitStatusChangeListenerは、入力データと出力データの件数が異なる場合にStepExecutionに独自の終了コードとしてSKIPPEDを設定する。

  18. ステップはジョブに終了コード(ここでは正常終了:0)を返却する。

  19. ジョブはJobExitCodeChangeListenerを実行する。

  20. JobExitCodeChangeListenerStepExecutionから終了コードを取得する。

  21. StepExecutionJobExitCodeChangeListenerに終了コードを返却する。

  22. JobExitCodeChangeListenerは最終的なジョブの終了コードとして、ジョブにSKIPPED(ここでは警告終了:200)を返却する。

タスクレットモデルの場合の処理シーケンス

タスクレットモデルの場合の処理シーケンスについて説明する。
本ジョブは異常系データを利用することを前提として説明しているため、 このシーケンス図は入力チェックでエラー(警告終了)となった場合を示している。

橙色のオブジェクトは今回実装するクラスを表す。

ProcessSequence of ExceptionHandlingWithTryCatch Job by TaskletModel
図 3. タスクレットモデルの処理シーケンス図
シーケンス図の説明
  1. ジョブからステップが実行される。

    • ステップはフレームワークトランザクションを開始する。

  2. ステップはPointAddTaskletを実行する。

  3. PointAddTaskletは、リソースをオープンする。

  4. MyBatisCursorItemReaderは、member_infoテーブルから会員情報をすべて取得(select文の発行)する。

    • 入力データがなくなるまで5から13までの処理を繰り返す。

    • 一定件数に達するまで5から11までの処理を繰り返す。

  5. PointAddTaskletは、MyBatisCursorItemReaderから入力データを1件取得する。

  6. MyBatisCursorItemReaderは、member_infoテーブルから入力データを1件取得する。

  7. member_infoテーブルは、MyBatisCursorItemReaderに入力データを返却する。

  8. MyBatisCursorItemReaderは、タスクレットに入力データを返却する。

  9. PointAddTaskletは、SpringValidatorに入力チェック処理を依頼する。

  10. SpringValidatorは、入力チェックルールに基づき入力チェックを行い、チェックエラーの場合は例外(ValidationException)をスローする。

  11. PointAddTaskletは、入力データを読み込んでポイント加算処理を行う。例外(ValidationException)をキャッチした場合はcontinueで処理を継続してエラーレコードをスキップする。

    • スキップした場合、以降の処理はせず5から処理を行う。

  12. PointAddTaskletは、一定件数分のデータをMyBatisBatchItemWriterで出力する。

  13. MyBatisBatchItemWriterは、member_infoテーブルに対して会員情報の更新(update文の発行)を行う。

  14. PointAddTaskletは、StepExecutionに独自の終了コードとしてSKIPPEDを設定する。

  15. PointAddTaskletはステップへ処理終了を返却する。

  16. ステップはフレームワークトランザクションをコミットする。

  17. ステップはジョブに終了コード(ここでは正常終了:0)を返却する。

  18. ステップはJobExitCodeChangeListenerを実行する。

  19. JobExitCodeChangeListenerStepExecutionから終了コードを取得する。

  20. StepExecutionJobExitCodeChangeListenerに終了コードを返却する。

  21. ステップはジョブに終了コード(ここでは警告終了:200)を返却する。

処理モデルによるスキップ実装について

チャンクモデルとタスクレットモデルではスキップ処理の実装方法が異なる。

チャンクモデル

ItemProcessor内でtry-catchを実装し、catchブロック内でnullを返却することでエラーデータをスキップする。 ItemReader、ItemWriterでのスキップはスキップを参照。

タスクレットモデル

ビジネスロジック内でtry-catchを実装し、catchブロック内でcontinueなど以降の処理をせずに次のデータを取得するようにして処理を継続することでエラーデータをスキップする。

以降で、チャンクモデル、タスクレットモデルそれぞれの実装方法を説明する。

チャンクモデルでの実装

チャンクモデルで入力チェックを行うジョブの作成から実行までを以下の手順で実施する。

メッセージ定義の追加

コード体系のばらつき防止や、監視対象のキーワードとしての抽出を設計しやすくするため、 ログメッセージはメッセージ定義を使用し、ログ出力時に使用する。

チャンクモデル/タスクレットモデルで共通して利用するため、既に作成している場合は読み飛ばしてよい。

application-messages.propertiesおよびlaunch-context.xmlを以下のとおり設定する。
なお、launch-context.xmlの設定はブランクプロジェクトに設定済みである。

src/main/resources/i18n/application-messages.properties
# (1)
errors.maxInteger=The {0} exceeds {1}.
src/main/resources/META-INF/spring/launch-context.xml
<!-- omitted -->

<bean id="messageSource" class="org.springframework.context.support.ResourceBundleMessageSource"
      p:basenames="i18n/application-messages" /> <!-- (2) -->

<!-- omitted -->
表 2. 説明
項番 説明

(1)

ポイント上限超過時に出力するメッセージを設定する。
{0)に項目名、{1}に上限値を割り当てる。

(2)

プロパティファイルからメッセージを使用するために、MessageSourceを設定する。
basenamesに、プロパティファイルの格納場所を指定する。

終了コードのカスタマイズ

ジョブ終了時のjavaプロセスの終了コードをカスタマイズする。
詳細は終了コードのカスタマイズを参照。

以下の作業を実施する。

StepExecutionListenerの実装

StepExecutionListenerインタフェースを利用してステップの終了コードを条件により変更する。
ここでは、StepExecutionListenerインタフェースの実装クラスとして、 入力データと出力データの件数が異なる場合に終了コードをSKIPPEDに変更する処理を実装する。

なお、このクラスはタスクレットモデルでは作成する必要がない。 タスクレットモデルではTaskletの実装クラス内でStepExecutionクラスに独自の終了コードを設定することができるためである。

com.example.batch.tutorial.common.listener.StepExitStatusChangeListener
package com.example.batch.tutorial.common.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;

@Component
public class StepExitStatusChangeListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        // do nothing.
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        ExitStatus exitStatus = stepExecution.getExitStatus();
        if (conditionalCheck(stepExecution)) {
            exitStatus = new ExitStatus("SKIPPED"); // (1)
        }
        return exitStatus;
    }

    private boolean conditionalCheck(StepExecution stepExecution) {
        return (stepExecution.getWriteCount() != stepExecution.getReadCount()); // (2)
    }
}
表 3. 説明
項番 説明

(1)

ステップの実行結果に応じて独自の終了コードを設定する。
ここでは、スキップした場合に終了コードとしてSKIPPEDを指定する。

(2)

スキップしたことを判定するため、入力データと出力データの件数の比較を行う。
スキップした場合、入力データと出力データの件数が異なるため、件数の差を利用してスキップの判定を行っている。 ここで件数に差があった場合に(1)が実行される。

JobExecutionListenerの実装

JobExecutionListenerインタフェースを利用してジョブの終了コードを条件により変更する。
ここでは、JobExecutionListenerインタフェースの実装クラスとして、 最終的なジョブの終了コードを各ステップの終了コードに合わせて変更する処理を実装する。

com.example.batch.tutorial.common.listener.JobExitCodeChangeListener
package com.example.batch.tutorial.common.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
public class JobExitCodeChangeListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // do nothing.
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) { // (1)
            if ("SKIPPED".equals(stepExecution.getExitStatus().getExitCode())) {
                jobExecution.setExitStatus(new ExitStatus("SKIPPED"));
                break;
            }
        }
    }
}
表 4. 説明
項番 説明

(1)

ジョブの実行結果に応じて、最終的なジョブの終了コードをJobExecutionに設定する。
ここではステップから返却された終了コードのいずれかにSKIPPEDが含まれている場合、 終了コードをSKIPPEDとしている。

ジョブBean定義ファイルの設定

作成したリスナーを利用するためのジョブBean定義ファイルの設定を以下に示す。

src/main/resources/META-INF/jobs/dbaccess/jobPointAddChunk.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <!-- omitted -->

    <context:component-scan base-package="com.example.batch.tutorial.dbaccess.chunk,
            com.example.batch.tutorial.common.listener"/> <!-- (1) -->

    <!-- omitted -->

    <batch:job id="jobPointAddChunk" job-repository="jobRepository">
        <batch:step id="jobPointAddChunk.step01">
            <batch:tasklet transaction-manager="jobTransactionManager">
                <batch:chunk reader="reader"
                             processor="pointAddItemProcessor"
                             writer="writer" commit-interval="10"/>
            </batch:tasklet>
            <batch:listeners>
                <batch:listener ref="stepExitStatusChangeListener"/> <!-- (2) -->
            </batch:listeners>
        </batch:step>
        <batch:listeners>
            <batch:listener ref="jobExitCodeChangeListener"/> <!-- (3) -->
        </batch:listeners>
    </batch:job>

</beans>
表 5. 説明
項番 説明

(1)

コンポーネントスキャン対象とするベースパッケージの設定を行う。
base-package属性に、StepExecutionListenerおよびJobExecutionListenerの実装クラスが格納されているパッケージを追加で指定する。

(2)

StepExecutionListenerの実装クラスを設定する。 なお、StepExecutionListenerStepListenerの拡張インタフェースである。
ここでは、StepExecutionListenerの実装クラスのBeanIDであるstepExitStatusChangeListenerを指定する。

(3)

JobExecutionListenerの実装クラスを設定する。
ここでは、JobExecutionListenerの実装クラスのBeanIDであるjobExitCodeChangeListenerを指定する。

StepExitStatusChangeListenerとJobExitCodeChangeListenerの設定箇所の違いについて

StepExitStatusChangeListenerはステップの実行前後に処理を割り込めるStepListenerに属するため、<batch:tasklet>要素内に<batch:listeners>.<batch:listener>要素によって設定する。
JobExitCodeChangeListenerはジョブの実行前後に処理を割り込めるJobListenerに属するため、<batch:job>要素内に<batch:listeners>.<batch:listener>要素によって設定する。

詳細はリスナーの設定を参照。

終了コードのマッピング定義

終了コードのマッピングを追加で設定する。

launch-context.xmlに以下のとおり、独自の終了コードを追加する。

src/main/resources/META-INF/spring/launch-context.xml
<!-- omitted -->

<bean id="exitCodeMapper" class="org.springframework.batch.core.launch.support.SimpleJvmExitCodeMapper">
    <property name="mapping">
        <util:map id="exitCodeMapper" key-type="java.lang.String"
                  value-type="java.lang.Integer">
            <!-- ExitStatus -->
            <entry key="NOOP" value="0" />
            <entry key="COMPLETED" value="0" />
            <entry key="STOPPED" value="255" />
            <entry key="FAILED" value="255" />
            <entry key="UNKNOWN" value="255" />
            <entry key="SKIPPED" value="200" /> <!-- (1) -->
        </util:map>
    </property>
</bean>

<!-- omitted -->
表 6. 説明
項番 説明

(1)

独自の終了コードを追加する。
マッピングするためのキーにSKIPPED、コード値として200を指定する。

例外ハンドリングの実装

ポイント加算処理を行うビジネスロジッククラスにtry-catch処理を実装する。
既に実装してあるPointAddItemProcessorクラスにtry-catch処理の実装を追加する。

前提のとおりデータベースアクセスするジョブの場合の説明となるため、 ファイルアクセスするジョブの場合の実装は以下の(1)~(5)のみ追加する。

com.example.batch.tutorial.dbaccess.chunk.PointAddItemProcessor
// Package and the other import are omitted.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.context.MessageSource;

import java.util.Locale;

@Component
public class PointAddItemProcessor implements ItemProcessor<MemberInfoDto, MemberInfoDto> {
    // Definition of constans are omitted.

    private static final Logger logger = LoggerFactory.getLogger(PointAddItemProcessor.class); // (1)

    @Inject
    Validator<MemberInfoDto> validator;

    @Inject
    MessageSource messageSource; // (2)

    @Override
    public MemberInfoDto process(MemberInfoDto item) throws Exception {
        try { // (3)
            validator.validate(item);
        } catch (ValidationException e) {
            logger.warn(messageSource
                    .getMessage("errors.maxInteger", new String[] { "point", "1000000" }, Locale.getDefault())); // (4)
            return null; // (5)
        }

        // The other codes of bussiness logic are omitted.
    }
}
表 7. 説明
項番 説明

(1)

ログを出力するためにLoggerFactoryのインスタンスを定義する。

(2)

MessageSourceのインスタンスをインジェクトする。

(3)

例外ハンドリングを実装する。
入力チェック処理をtry-catchで囲み、ValidationExceptionをハンドリングする。

(4)

プロパティファイルからメッセージIDがerrors.maxIntegerのメッセージを取得し、ログに出力している。

(5)

エラーレコードをスキップするためにnullを返却する。

ジョブの実行と結果の確認

作成したジョブをSTS上で実行し、結果を確認する。

実行構成からジョブを実行

既に作成してある実行構成からジョブを実行し、結果を確認する。

ここでは、異常系データを利用してジョブを実行する。
例外ハンドリングを実装したジョブが扱うリソース(データベース or ファイル)によって、 入力データの切替方法が異なるため、以下のとおり実行すること。

データベースアクセスでデータ入出力を行うジョブに対して例外ハンドリングを実装した場合

データベースアクセスでデータ入出力を行うジョブの実行構成からジョブを実行 で作成した実行構成を使ってジョブを実行する。

異常系データを利用するために、batch-application.proeprtiesのDatabase Initializeで 正常系データのスクリプトをコメントアウトし、異常系データのスクリプトのコメントアウトを解除する。

src/main/resources/batch-application.proeprties
# Database Initialize
tutorial.create-table.script=file:sqls/create-member-info-table.sql
#tutorial.insert-data.script=file:sqls/insert-member-info-data.sql
tutorial.insert-data.script=file:sqls/insert-member-info-error-data.sql
ファイルアクセスでデータ入出力を行うジョブに対して例外ハンドリングを実装した場合

ファイルアクセスでデータ入出力を行うジョブの実行構成からジョブを実行 で作成した実行構成を使ってジョブを実行する。

異常系データを利用するために、実行構成で設定する引数のうち、 入力ファイル(inputFile)のパスを正常系データ(insert-member-info-data.csv)から異常系データ(insert-member-info-error-data.csv)に変更する。

コンソールログの確認

Console Viewを開き、以下の内容のログが出力されていることを確認する。

  • 処理が完了(COMPLETED)し、例外が発生していないこと。

  • WARNログとして次のメッセージを出力していること。

    • 「The point exceeds 1000000.」

コンソールログ出力例
(.. omitted)

[2020/03/10 16:13:54] [main] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] launched with the following parameters: [{jsr_batch_run_id=148}]
[2020/03/10 16:13:54] [main] [o.s.b.c.j.SimpleStepHandler] [INFO ] Executing step: [jobPointAddChunk.step01]
[2020/03/10 16:13:54] [main] [c.e.b.t.d.c.PointAddItemProcessor] [WARN ] The point exceeds 1000000.
[2020/03/10 16:13:54] [main] [o.s.b.c.s.AbstractStep] [INFO ] Step: [jobPointAddChunk.step01] executed in 237ms
[2020/03/10 16:13:54] [main] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] completed with the following parameters: [{jsr_batch_run_id=148}] and the following status: [COMPLETED] in 304ms

終了コードの確認

終了コードにより、警告終了したことを確認する。
確認手順はジョブの実行と結果の確認を参照。 終了コード(exit value)が200(警告終了)となっていることを確認する。

Confirm the Exit Code of ExceptionHandlingWithTryCatchJob for ChunkModel
図 4. 終了コードの確認

出力リソースの確認

例外ハンドリングを実装したジョブによって出力リソース(データベース or ファイル)を確認する。

スキップを実装しているため、エラーレコード以外の更新対象レコードについては 正常に更新されていることを確認する。

会員情報テーブルの確認

更新前後の会員情報テーブルの内容を比較し、確認内容のとおりとなっていることを確認する。
確認手順はDBeaverを使用してデータベースを参照するを参照。

確認内容
  • エラーレコード(会員番号が"000000013")を除くすべてのレコードについて

    • statusカラム

      • "1"(処理対象)から"0"(初期状態)に更新されていること

    • pointカラム

      • ポイント加算対象について、会員種別に応じたポイントが加算されていること

        • typeカラムが"G"(ゴールド会員)の場合は100ポイント

        • typeカラムが"N"(一般会員)の場合は10ポイント

  • エラーレコード(会員番号が"000000013")について

    • 更新されていないこと(破線の赤枠で示した範囲)

更新前後の会員情報テーブルの内容は以下のとおり。

Table of member_info
図 5. 更新前後の会員情報テーブルの内容
会員情報ファイルの確認

会員情報ファイルの入出力内容を比較し、確認内容のとおりとなっていることを確認する。

確認内容
  • 出力ディレクトリに会員情報ファイルが出力されていること

    • 出力ファイル: files/output/output-member-info-data.csv

  • エラーレコード(会員番号が"00000013")を除くすべてのレコードについて

    • statusフィールド

      • "1"(処理対象)から"0"(初期状態)に更新されていること

    • pointフィールド

      • ポイント加算対象について、会員種別に応じたポイントが加算されていること

        • typeフィールドが"G"(ゴールド会員)の場合は100ポイント

        • typeフィールドが"N"(一般会員)の場合は10ポイント

  • エラーレコード(会員番号が"00000013")について

    • 出力されていないこと(破線の赤枠で示した範囲)

会員情報ファイルの入出力内容は以下のとおり。
ファイルのフィールドはid(会員番号)、type(会員種別)、status(商品購入フラグ)、point(ポイント)の順で出力される。

File of member_info
図 6. 会員情報ファイルの入出力内容

タスクレットモデルでの実装

タスクレットモデルで入力チェックを行うジョブの作成から実行までを以下の手順で実施する。

メッセージ定義の追加

コード体系のばらつき防止や、監視対象のキーワードとしての抽出を設計しやすくするため、 ログメッセージはメッセージ定義を使用し、ログ出力時に使用する。

チャンクモデル/タスクレットモデルで共通して利用するため、既に作成している場合は読み飛ばしてよい。

application-messages.propertiesおよびlaunch-context.xmlを以下のとおり設定する。
なお、launch-context.xmlの設定はブランクプロジェクトに設定済みである。

src/main/resources/i18n/application-messages.properties
# (1)
errors.maxInteger=The {0} exceeds {1}.
src/main/resources/META-INF/spring/launch-context.xml
<!-- omitted -->

<bean id="messageSource" class="org.springframework.context.support.ResourceBundleMessageSource"
      p:basenames="i18n/application-messages" /> <!-- (2) -->

<!-- omitted -->
表 8. 説明
項番 説明

(1)

ポイント上限超過時に出力するメッセージを設定する。
{0)に項目名、{1}に上限値を割り当てる。

(2)

プロパティファイルからメッセージを使用するために、MessageSourceを設定する。
basenamesに、プロパティファイルの格納場所を指定する。

終了コードのカスタマイズ

ジョブ終了時のjavaプロセスの終了コードをカスタマイズする。
詳細は終了コードのカスタマイズを参照。

以下の作業を実施する。

JobExecutionListenerの実装

JobExecutionListenerインタフェースを利用してジョブの終了コードを条件により変更する。
ここでは、JobExecutionListenerインタフェースの実装クラスとして、 最終的なジョブの終了コードを各ステップの終了コードに合わせて変更する処理を実装する。

com.example.batch.tutorial.common.listener.JobExitCodeChangeListener
package com.example.batch.tutorial.common.listener;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
public class JobExitCodeChangeListener implements JobExecutionListener {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // do nothing.
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        Collection<StepExecution> stepExecutions = jobExecution.getStepExecutions();
        for (StepExecution stepExecution : stepExecutions) { // (1)
            if ("SKIPPED".equals(stepExecution.getExitStatus().getExitCode())) {
                jobExecution.setExitStatus(new ExitStatus("SKIPPED"));
                break;
            }
        }
    }
}
表 9. 説明
項番 説明

(1)

ジョブの実行結果に応じて、最終的なジョブの終了コードをJobExecutionに設定する。
ここではステップから返却された終了コードのいずれかにSKIPPEDが含まれている場合、 終了コードをSKIPPEDとしている。

ジョブBean定義ファイルの設定

作成したリスナーを利用するためのジョブBean定義ファイルの設定を以下に示す。

src/main/resources/META-INF/jobs/dbaccess/jobPointAddTasklet.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
       xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch https://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <!-- omitted -->

    <context:component-scan base-package="com.example.batch.tutorial.dbaccess.tasklet,
            com.example.batch.tutorial.common.listener"/> <!-- (1) -->

    <!-- omitted -->

    <batch:job id="jobPointAddTasklet" job-repository="jobRepository">
        <batch:step id="jobPointAddTasklet.step01">
            <batch:tasklet transaction-manager="jobTransactionManager"
                           ref="pointAddTasklet"/>
        </batch:step>
        <batch:listeners>
            <batch:listener ref="jobExitCodeChangeListener"/> <!-- (2) -->
        </batch:listeners>
    </batch:job>

</beans>
表 10. 説明
項番 説明

(1)

コンポーネントスキャン対象とするベースパッケージの設定を行う。
base-package属性に、StepExecutionListenerおよびJobExecutionListenerの実装クラスが格納されているパッケージを追加で指定する。

(2)

JobExecutionListenerの実装クラスを設定する。 なお、JobExecutionListenerJobListenerの拡張インタフェースである。
ここでは、JobExecutionListenerの実装クラスのBeanIDであるjobExitCodeChangeListenerを指定する。

終了コードのマッピング定義

終了コードのマッピングを追加で設定する。

チャンクモデル/タスクレットモデルで共通して利用するため、既に実施している場合は読み飛ばしてよい。

launch-context.xmlに以下のとおり、独自の終了コードを追加する。

src/main/resources/META-INF/spring/launch-context.xml
<!-- omitted -->

<bean id="exitCodeMapper" class="org.springframework.batch.core.launch.support.SimpleJvmExitCodeMapper">
    <property name="mapping">
        <util:map id="exitCodeMapper" key-type="java.lang.String"
                  value-type="java.lang.Integer">
            <!-- ExitStatus -->
            <entry key="NOOP" value="0" />
            <entry key="COMPLETED" value="0" />
            <entry key="STOPPED" value="255" />
            <entry key="FAILED" value="255" />
            <entry key="UNKNOWN" value="255" />
            <entry key="SKIPPED" value="200" /> <!-- (1) -->
        </util:map>
    </property>
</bean>

<!-- omitted -->
表 11. 説明
項番 説明

(1)

独自の終了コードを追加する。
マッピングするためのキーにSKIPPED、コード値として200を指定する。

例外ハンドリングの実装

ポイント加算処理を行うビジネスロジッククラスにtry-catch処理を実装する。
既に実装してあるPointAddItemProcessorクラスにtry-catch処理の実装を追加する。

前提のとおりデータベースアクセスするジョブの場合の説明となるため、 ファイルアクセスするジョブの場合の実装は以下の(1)~(5)のみ追加する。

com.example.batch.tutorial.dbaccess.tasklet.PointAddTasklet
// Package and the other import are omitted.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.item.validator.ValidationException;
import org.springframework.context.MessageSource;

import java.util.Locale;

@Component
public class PointAddTasklet implements Tasklet {
    // Definition of constans, ItemStreamReader and ItemWriter are omitted.

    private static final Logger logger = LoggerFactory.getLogger(PointAddTasklet.class); // (1)

    @Inject
    Validator<MemberInfoDto> validator;

    @Inject
    MessageSource messageSource; // (2)

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
        MemberInfoDto item = null;

        List<MemberInfoDto> items = new ArrayList<>(CHUNK_SIZE);
        int errorCount = 0; // (3)

        try {
            reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());
            while ((item = reader.read()) != null) {
                try { // (4)
                    validator.validate(item);
                } catch (ValidationException e) {
                    logger.warn(messageSource
                            .getMessage("errors.maxInteger", new String[] { "point", "1000000" }, Locale.getDefault()));  // (5)
                    errorCount++;
                    continue; // (6)
                }

                // The other codes of bussiness logic are omitted.
            }

            writer.write(items);
        } finally {
            reader.close();
        }
        if (errorCount > 0) {
            contribution.setExitStatus(new ExitStatus("SKIPPED")); // (7)
        }
        return RepeatStatus.FINISHED;
    }
}
表 12. 説明
項番 説明

(1)

ログを出力するためにLoggerFactoryのインスタンスを定義する。

(2)

MessageSourceのインスタンスをインジェクトする。

(3)

例外の発生を判定するためのカウンターを用意する。
ValidationExceptionをキャッチした際にインクリメントする。

(4)

例外ハンドリングを実装する。
入力チェック処理をtry-catchで囲み、ValidationExceptionをハンドリングする。

(5)

プロパティファイルからメッセージIDがerrors.maxIntegerのメッセージを取得し、ログに出力している。

(6)

エラーレコードをスキップするためにcontinueで処理を継続する。

(7)

独自の終了コードとしてSKIPPEDを設定する。

ジョブの実行と結果の確認

作成したジョブをSTS上で実行し、結果を確認する。

実行構成からジョブを実行

既に作成してある実行構成からジョブを実行し、結果を確認する。

ここでは、異常系データを利用してジョブを実行する。
例外ハンドリングを実装したジョブが扱うリソース(データベース or ファイル)によって、 入力データの切替方法が異なるため、以下のとおり実行すること。

データベースアクセスでデータ入出力を行うジョブに対して例外ハンドリングを実装した場合

データベースアクセスでデータ入出力を行うジョブの実行構成からジョブを実行 で作成した実行構成を使ってジョブを実行する。

異常系データを利用するために、batch-application.proeprtiesのDatabase Initializeで 正常系データのスクリプトをコメントアウトし、異常系データのスクリプトのコメントアウトを解除する。

src/main/resources/batch-application.proeprties
# Database Initialize
tutorial.create-table.script=file:sqls/create-member-info-table.sql
#tutorial.insert-data.script=file:sqls/insert-member-info-data.sql
tutorial.insert-data.script=file:sqls/insert-member-info-error-data.sql
ファイルアクセスでデータ入出力を行うジョブに対して例外ハンドリングを実装した場合

ファイルアクセスでデータ入出力を行うジョブの実行構成からジョブを実行 で作成した実行構成を使ってジョブを実行する。

異常系データを利用するために、実行構成で設定する引数のうち、 入力ファイル(inputFile)のパスを正常系データ(insert-member-info-data.csv)から異常系データ(insert-member-info-error-data.csv)に変更する。

コンソールログの確認

Console Viewを開き、以下の内容のログが出力されていることを確認する。

  • 処理が完了(COMPLETED)し、例外が発生していないこと。

  • WARNログとして次のメッセージを出力していること。

    • 「The point exceeds 1000000.」

コンソールログ出力例
(.. omitted)

[2020/03/10 16:15:14] [main] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddTasklet]] launched with the following parameters: [{jsr_batch_run_id=152}]
[2020/03/10 16:15:14] [main] [o.s.b.c.j.SimpleStepHandler] [INFO ] Executing step: [jobPointAddTasklet.step01]
[2020/03/10 16:15:14] [main] [c.e.b.t.d.t.PointAddTasklet] [WARN ] The point exceeds 1000000.
[2020/03/10 16:15:14] [main] [o.s.b.c.s.AbstractStep] [INFO ] Step: [jobPointAddTasklet.step01] executed in 204ms
[2020/03/10 16:15:14] [main] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddTasklet]] completed with the following parameters: [{jsr_batch_run_id=152}] and the following status: [COMPLETED] in 271ms

終了コードの確認

終了コードにより、警告終了したことを確認する。
確認手順はジョブの実行と結果の確認を参照。 終了コード(exit value)が200(警告終了)となっていることを確認する。

Confirm the Exit Code of ExceptionHandlingWithTryCatchJob for TaskletModel
図 7. 終了コードの確認

出力リソースの確認

例外ハンドリングを実装したジョブによって出力リソース(データベース or ファイル)を確認する。

スキップを実装しているため、エラーレコード以外の更新対象レコードについては 正常に更新されていることを確認する。

会員情報テーブルの確認

更新前後の会員情報テーブルの内容を比較し、確認内容のとおりとなっていることを確認する。
確認手順はDBeaverを使用してデータベースを参照するを参照。

確認内容
  • エラーレコード(会員番号が"000000013")を除くすべてのレコードについて

    • statusカラム

      • "1"(処理対象)から"0"(初期状態)に更新されていること

    • pointカラム

      • ポイント加算対象について、会員種別に応じたポイントが加算されていること

        • typeカラムが"G"(ゴールド会員)の場合は100ポイント

        • typeカラムが"N"(一般会員)の場合は10ポイント

  • エラーレコード(会員番号が"000000013")について

    • 更新されていないこと(破線の赤枠で示した範囲)

更新前後の会員情報テーブルの内容は以下のとおり。

Table of member_info
図 8. 更新前後の会員情報テーブルの内容
会員情報ファイルの確認

会員情報ファイルの入出力内容を比較し、確認内容のとおりとなっていることを確認する。

確認内容
  • 出力ディレクトリに会員情報ファイルが出力されていること

    • 出力ファイル: files/output/output-member-info-data.csv

  • エラーレコード(会員番号が"00000013")を除くすべてのレコードについて

    • statusフィールド

      • "1"(処理対象)から"0"(初期状態)に更新されていること

    • pointフィールド

      • ポイント加算対象について、会員種別に応じたポイントが加算されていること

        • typeフィールドが"G"(ゴールド会員)の場合は100ポイント

        • typeフィールドが"N"(一般会員)の場合は10ポイント

  • エラーレコード(会員番号が"00000013")について

    • 出力されていないこと(破線の赤枠で示した範囲)

会員情報ファイルの入出力内容は以下のとおり。
ファイルのフィールドはid(会員番号)、type(会員種別)、status(商品購入フラグ)、point(ポイント)の順で出力される。

File of member_info
図 9. 会員情報ファイルの入出力内容
Macchinetta Batch Framework (2.x) Development Guideline - version 2.2.0.RELEASE, 2020-6-29