Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.2.RELEASE, 2019-4-19
> INDEX

Overview

ジョブの実行を管理する方法について説明する。

本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。

ジョブの実行管理とは

ジョブの起動状態や実行結果を記録しバッチシステムを維持することを指す。 特に、異常発生時の検知や次に行うべき行動(異常終了後のリラン・リスタート等)を判断するために、必要な情報を確保することが重要である。
バッチアプリケーションの特性上、起動直後にその結果をユーザインターフェースで確認できることは稀である。 よって、ジョブスケジューラ/RDBMS/アプリケーションログといった、ジョブの実行とは別に実行状態・結果の記録を行う仕組みが必要となる。

Spring Batch が提供する機能

Spring Batchは、ジョブの実行管理向けに以下のインターフェースを提供している。

ジョブの管理機能一覧
機能 対応するインターフェース

ジョブの実行状態・結果の記録

org.springframework.batch.core.repository.JobRepository

ジョブの終了コードとプロセス終了コードの変換

org.springframework.batch.core.launch.support.ExitCodeMapper

Spring Batch はジョブの起動状態・実行結果の記録にJobRepositoryを使用する。 Macchinetta Batch 2.xでは、以下のすべてに該当する場合は永続化は任意としてよい。

  • 同期型ジョブ実行のみでMacchinetta Batch 2.xを使用する。

  • ジョブの停止・リスタートを含め、ジョブの実行管理はすべてジョブスケジューラに委ねる。

    • Spring BatchがもつJobRepositoryを前提としたリスタートを利用しない。

これらに該当する場合はJobRepositoryが使用するRDBMSの選択肢として、インメモリ・組み込み型データベースであるH2を利用する。
一方で非同期実行を利用する場合や、Spring Batchの停止・リスタートを活用する場合は、ジョブの実行状態・結果を永続化可能なRDBMSが必要となる。

デフォルトのトランザクション分離レベル

Spring Batchが提供するxsdでは、JobRepositoryのトランザクション分離レベルはSERIALIZABLEをデフォルト値としている。 しかし、この場合、同期/非同期にかかわらず複数のジョブを同時に実行した際にJobRepositoryの更新で例外が発生してしまう。 そのため、Macchinetta Batch 2.xでは、あらかじめJobRepositoryのトランザクション分離レベルをREAD_COMMITTEDに設定している。

インメモリJobRepositoryの選択肢

Spring Batch にはインメモリでジョブの実行管理を行う org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean が用意されているが、本ガイドラインでは使用しない。
このクラスのJavadocにあるとおり、 This repository is only really intended for use in testing and rapid prototyping. とテスト用であることが示されており、また、 Not suited for use in multi-threaded jobs with splits と並列処理には不適切であると示されているためである。

ジョブスケジューラを使用したジョブの実行管理については各製品のマニュアルを参照のこと。
本ガイドラインではMacchinetta Batch 2.x内部でJobRepositoryを用いたジョブの状態を管理するうえで関連する、 以下の項目について説明する。

Macchinetta Batch内部での状態管理に関する項目

How to use

JobRepositoryはSpring BatchによりRDBMSへ自動的に新規登録・更新を行う。
ジョブの状態・実行結果の確認を行う場合は、意図しない変更処理がバッチアプリケーションの内外から行われることのないよう、以下のいずれかの方法を選択する。

  • ジョブの状態管理に関するテーブルに対しクエリを発行する

  • org.springframework.batch.core.explore.JobExplorerを使用する

ジョブの状態管理

JobRepositoryを用いたジョブの状態管理方法を説明する。
Spring Batchにより、以下のEntityがRDBMSのテーブルに登録される。

JobRepositoryで管理されるEntityクラスとテーブル名
項番 Entityクラス テーブル名 生成単位 説明

(1)

JobExecution

BATCH_JOB_EXECUTION

1回のジョブ実行

ジョブの状態・実行結果を保持する。

(2)

JobExecutionContext

BATCH_JOB_EXECUTION_CONTEXT

1回のジョブ実行

ジョブ内部のコンテキストを保持する。

(3)

JobExecutionParams

BATCH_JOB_EXECUTION_PARAMS

1回のジョブ実行

起動時に与えられたジョブパラメータを保持する。

(4)

StepExecution

BATCH_STEP_EXECUTION

1回のステップ実行

ステップの状態・実行結果、コミット・ロールバック件数を保持する。

(5)

StepExecutionContext

BATCH_STEP_EXECUTION_CONTEXT

1回のステップ実行

ステップ内部のコンテキストを保持する。

(6)

JobInstance

BATCH_JOB_INSTANCE

ジョブ名とジョブパラメータの組み合わせ

ジョブ名、およびジョブパラメータをシリアライズした文字列を保持する。

たとえば、1回のジョブ起動で3つのステップを実行した場合、以下の差が生じる

  • JobExecutionJobExecutionContextJobExecutionParamsは1レコード登録される

  • StepExecutionStepExecutionContextは3レコード登録される

また、JobInstanceは過去に起動した同名ジョブ・同一パラメータよる二重実行を抑止するために使用されるが、 Macchinetta Batch 2.xではこのチェックを行わない。詳細は二重起動防止を参照。

JobRepositoryによる各テーブルの構成は、 Spring Batchのアーキテクチャにて説明している。

チャンク方式におけるStepExecutionの件数項目について

以下のように、不整合が発生しているように見えるが、仕様上妥当なケースがある。

  • StepExecution(BATCH_STEP_EXECUTIONテーブル)のトランザクション発行回数が入力データ件数と一致しない場合がある。

    • トランザクション発行回数はBATCH_STEP_EXECUTIONCOMMIT_COUNTROLLBACK_COUNTの総和を指す。
      ただし、入力データ件数がチャンクサイズで割り切れる場合COMMIT_COUNTが+1となる。
      これは入力データ件数分を読み込んだ後、終端を表すnullも入力データとカウントされて空処理されるためである。

  • BATCH_STEP_EXECUTIONBATCH_STEP_EXECUTION_CONTEXTの処理件数が異なることがある。

    • BATCH_STEP_EXECUTIONテーブルのREAD_COUNTWRITE_COUNTはそれぞれItemReaderItemWriterによる読み込み・書き込みを行った件数が記録される。

    • BATCH_STEP_EXECUTION_CONTEXTテーブルのSHORT_CONTEXTカラムはJSON形式で ItemReaderによる読み込み処理件数が記録される。しかし、必ずしもBATCH_STEP_EXECUTIONによる処理件数と一致しない。

    • これはチャンク方式によるBATCH_STEP_EXECUTIONテーブルが成功・失敗を問わず読み込み・書き込み件数を記録するのに対し、 BATCH_STEP_EXECUTION_CONTEXTテーブルは処理途中で失敗した場合のリスタートで再開される位置として記録するためである。

状態の永続化

外部RDBMSを使用することでJobRepositoryによるジョブの実行管理情報を永続化させることができる。 batch-application.propertiesの以下項目を外部RDBMS向けのデータソース、スキーマ設定となるよう修正する。

batch-application.properties
# (1)
# Admin DataSource settings.
admin.jdbc.driver=org.postgresql.Driver
admin.jdbc.url=jdbc:postgresql://serverhost:5432/admin
admin.jdbc.username=postgres
admin.jdbc.password=postgres

# (2)
spring-batch.schema.script=classpath:org/springframework/batch/core/schema-postgresql.sql
設定内容の項目一覧(PostgreSQL)
項番 説明

(1)

接頭辞adminが付与されているプロパティの値として、接続する外部RDBMSの設定をそれぞれ記述する。

(2)

アプリケーション起動時にJobRepositoryとしてスキーマの自動生成を行うスクリプトファイルを指定する。

管理用/業務用データソースの補足
  • データベースへの接続設定は、管理用と業務用データソースとして別々に定義する。
    Macchinetta Batch 2.xでは別々に定義した上で、 JobRepositoryは、プロパティ接頭辞にadminが付与された管理用データソースを使用するよう設定済みである。

  • 非同期実行(DBポーリング)を使用する場合は、ジョブ要求テーブルも同じ管理用データソース、スキーマ生成スクリプトを指定すること。
    詳細は非同期実行(DBポーリング)を参照。

ジョブの状態・実行結果の確認

JobRepositoryからジョブの実行状態を確認する方法について説明する。
いずれの方法も、あらかじめ確認対象のジョブ実行IDが既知であること。

クエリを直接発行する

RDBMSコンソールを用い、JobRepositoryが永続化されたテーブルに対して直接クエリを発行する。

SQLサンプル
admin=# select JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE from BATCH_JOB_EXECUTION where JOB_EXECUTION_ID = 1;
 job_execution_id |       start_time        |        end_time         |  status   | exit_code
------------------+-------------------------+-------------------------+-----------+-----------
                1 | 2017-02-14 17:57:38.486 | 2017-02-14 18:19:45.421 | COMPLETED | COMPLETED
(1 row)
admin=# select JOB_EXECUTION_ID, STEP_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE from BATCH_STEP_EXECUTION where JOB_EXECUTION_ID = 1;
 job_execution_id | step_execution_id |       start_time        |        end_time        |  status   | exit_code
------------------+-------------------+-------------------------+------------------------+-----------+-----------
                1 |                 1 | 2017-02-14 17:57:38.524 | 2017-02-14 18:19:45.41 | COMPLETED | COMPLETED
(1 row)
JobExplorerを利用する

バッチアプリケーションと同じアプリケーションコンテキストを共有可能な環境下で、JobExplorerをインジェクションすることでジョブの実行状態を確認する。

APIコールサンプル
// omitted.

@Inject
private JobExplorer jobExplorer;

private void monitor(long jobExecutionId) {

    // (1)
    JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId);

    // (2)
    String jobName = jobExecution.getJobInstance().getJobName();
    Date jobStartTime = jobExecution.getStartTime();
    Date jobEndTime = jobExecution.getEndTime();
    BatchStatus jobBatchStatus = jobExecution.getStatus();
    String jobExitCode = jobExecution.getExitStatus().getExitCode();

    // omitted.

    // (3)
    for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
        String stepName = stepExecution.getStepName();
        Date stepStartTime = stepExecution.getStartTime();
        Date stepEndTime = stepExecution.getEndTime();
        BatchStatus stepStatus = stepExecution.getStatus();
        String stepExitCode = stepExecution.getExitStatus().getExitCode();

        // omitted.
    }
}
設定内容の項目一覧(PostgreSQL)
項番 説明

(1)

インジェクションされたJobExplorerからジョブ実行IDを指定しJobExecutionを取得する。

(2)

JobExecutionによるジョブの実行結果を取得する。

(3)

JobExecutionから、ジョブ内で実行されたステップのコレクションを取得し、個々の実行結果を取得する。

ジョブの停止

ジョブの停止とはJobRepositoryの実行中ステータスを停止中ステータスに更新し、ステップの境界や チャンク方式によるチャンクコミット時にジョブを停止させる機能である。
リスタートと組み合わせることで、停止された位置からの処理を再開させることができる。

リスタートの詳細は"ジョブのリスタート"を参照。

「ジョブの停止」は仕掛かり中のジョブを直ちに中止する機能ではなく、JobRepositoryの実行中ステータスを停止中に更新する機能である。
ジョブに対して即座に仕掛かり中スレッドに対して割り込みするといったような、何らかの停止処理を行うわけではない。

このため、ジョブの停止は「チャンクの切れ目など、節目となる処理が完了した際に停止するよう予約する」ことともいえる。 たとえば以下の状況下でジョブ停止を行っても、期待する動作とはならない。

  • 単一ステップでTaskletにより構成されたジョブ実行。

  • チャンク方式で、データ入力件数 < commit-intervalのとき。

  • 処理内で無限ループが発生している場合。

以下、ジョブの停止方法を説明する。

  • コマンドラインからの停止

    • 同期型ジョブ・非同期型ジョブのどちらでも利用できる

    • CommandLineJobRunner-stopを利用する

起動時のジョブ名を指定する方法
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
    classpath:/META-INF/jobs/job01.xml job01 -stop
  • ジョブ名指定によるジョブ停止は同名のジョブが並列で起動することが少ない同期バッチ実行時に適している。

ジョブ実行ID(jobExecutionId)を指定する方法
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
    classpath:/META-INF/jobs/job01.xml 3 -stop
  • ジョブ実行ID指定によるジョブ停止は同名のジョブが並列で起動することの多い非同期バッチ実行時に適している。

終了コードのカスタマイズ

同期実行によりジョブ終了時、javaプロセスの終了コードをジョブやステップの終了コードに合わせてカスタマイズできる。 javaプロセスの終了コードをカスタマイズするのに必要な作業を以下に示す。

  1. ステップの終了コードを変更する。

  2. ステップの終了コードに合わせて、ジョブの終了コードを変更する。

  3. ジョブの終了コードとjavaプロセスの終了コードをマッピングする。

終了コードの意味合いについて

本節では、終了コードは2つの意味合いで扱われており、それぞれの説明を以下に示す。

  • COMPLETED、FAILEDなどの文字列の終了コードは、ジョブやステップの終了コードである。

  • 0、255などの数値の終了コードは、Javaプロセスの終了コードである。

ステップの終了コードの変更

処理モデルごとにステップの終了コードを変更する方法を以下に示す。

チャンクモデルにおけるステップの終了コードの変更

ステップ終了時の処理としてStepExecutionListenerのafterStepメソッドを実装し、任意のステップの終了コードを返却する。

StepExecutionListener実装例
@Component
public class ExitStatusChangeListener implements StepExecutionListener {

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {

        ExitStatus exitStatus = stepExecution.getExitStatus();
        if (conditionalCheck(stepExecution)) {
            // (1)
            exitStatus = new ExitStatus("CUSTOM STEP FAILED");
        }
        return exitStatus;
    }

    private boolean conditionalCheck(StepExecution stepExecution) {
        // omitted.
    }
}
ジョブ定義
<batch:step id="exitstatusjob.step">
    <batch:tasklet transaction-manager="transactionManager">
        <batch:chunk reader="reader" writer="writer" commit-interval="10" />
    </batch:tasklet>
    <batch:listeners>
        <batch:listener ref="exitStatusChangeListener"/>
    </batch:listeners>
</batch:step>
実装内容の一覧
項番 説明

(1)

ステップの実行結果に応じて独自の終了コードを設定する。

タスクレットモデルにおけるステップの終了コードの変更

Taskletのexecuteメソッドの引数であるStepContributionに任意のステップの終了コードを設定する。

Tasklet実装例
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

    // omitted.
    if (errorCount > 0) {
        contribution.setExitStatus(new ExitStatus("STEP COMPLETED WITH SKIPS"));  // (1)
    }
    return RepeatStatus.FINISHED;
}
実装内容の一覧
項番 説明

(1)

タスクレットの実行結果に応じて独自の終了コードを設定する。

ジョブの終了コードの変更

ジョブ終了時の処理としてJobExecutionListenerのafterJobメソッドを実装し、最終的なジョブの終了コードを各ステップの終了コードによって設定する。

JobExecutionListener実装例
@Component
public class JobExitCodeChangeListener extends JobExecutionListenerSupport {

    @Override
    public void afterJob(JobExecution jobExecution) {
        // (1)
        for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
            if ("STEP COMPLETED WITH SKIPS".equals(stepExecution.getExitStatus().getExitCode())) {
                jobExecution.setExitStatus(new ExitStatus("JOB COMPLETED WITH SKIPS"));
                logger.info("Change status 'JOB COMPLETED WITH SKIPS'");
                break;
            }
        }
    }
}
ジョブ定義
<batch:job id="exitstatusjob" job-repository="jobRepository">
    <batch:step id="exitstatusjob.step">
        <!-- omitted -->
    </batch:step>
    <batch:listeners>
        <batch:listener ref="jobExitCodeChangeListener"/>
    </batch:listeners>
</batch:job>
実装内容の一覧
項番 説明

(1)

ジョブの実行結果に応じて、最終的なジョブの終了コードをJobExecutionに設定する。
ここではステップから返却された終了コードのいずれかにCUSTOM STEP FAILEDが含まれている場合、 終了コードCUSTOM FAILEDとしている。

終了コードのマッピング

ジョブの終了コードとプロセスの終了コードをマッピング定義を行う。

launch-context.xml
<!-- exitCodeMapper -->
<bean id="exitCodeMapper"
      class="org.springframework.batch.core.launch.support.SimpleJvmExitCodeMapper">
    <property name="mapping">
        <util:map id="exitCodeMapper" key-type="java.lang.String"
                  value-type="java.lang.Integer">
            <!-- ExitStatus -->
            <entry key="NOOP" value="0" />
            <entry key="COMPLETED" value="0" />
            <entry key="STOPPED" value="255" />
            <entry key="FAILED" value="255" />
            <entry key="UNKNOWN" value="255" />
            <entry key="CUSTOM FAILED" value="100" /> <!-- Custom Exit Status -->
        </util:map>
    </property>
</bean>
プロセスの終了コードに1は厳禁

一般的にJavaプロセスはVMクラッシュやSIGKILLシグナル受信などによりプロセスが強制終了した際、 終了コードとして1を返却することがある。 正常・異常を問わずバッチアプリケーションの終了コードとは明確に区別すべきであるため、 アプリケーション内ではプロセスの終了コードとして1を定義しないこと。

終了ステータスと終了コードの違いについて

JobRepositoryで管理されるジョブとステップの状態として、「ステータス(STATUS)」と「終了コード(EXIT_CODE)」があるが、以下の点で異なる。

  • ステータスはSpring Batchの内部制御で用いられ enum型のBatchStatusによる具体値が定義されているためカスタマイズできない。

  • 終了コードはジョブのフロー制御やプロセス終了コードの変更で使用することができ、カスタマイズできる。

二重起動防止

Spring Batchではジョブを起動する際、JobRepositryからJobInstance(BATCH_JOB_INSTANCEテーブル)に対して 以下の組み合わせが存在するか確認する。

  • 起動対象となるジョブ名

  • ジョブパラメータ

Macchinetta Batch 2.xではジョブ・ジョブパラメータの組み合わせが一致しても複数回起動可能としている。
つまり、二重起動を許容する。 詳細は、ジョブの起動パラメータを参照のこと。

二重起動を防止する場合は、ジョブスケジューラやアプリケーション内で実施する必要がある。
詳細な手段については、ジョブスケジューラ製品や業務要件に強く依存するため割愛する。
個々のジョブについて、二重起動を抑止する必要があるかについて、検討すること。

ロギング

ログの設定方法について説明する。

ログの出力、設定、考慮事項はMacchinetta Server 1.xと共通点が多い。まずは、 ロギングを参照のこと。

ここでは、Macchinetta Batch 2.x特有の考慮点について説明する。

ログ出力元の明確化

バッチ実行時のログは出力元のジョブやジョブ実行を明確に特定できるようにしておく必要がある。 そのため、スレッド名、ジョブ名、実行ジョブIDを出力するとよい。 特に非同期実行時は同名のジョブが異なるスレッドで並列に動作することになるため、 ジョブ名のみの記録はログ出力元を特定しにくくなる恐れがある。

それぞれの要素は、以下の要領で実現できる。

スレッド名

logback.xmlの出力パターンである%threadを指定する

ジョブ名・実行ジョブID

JobExecutionListenerを実装したコンポーネントを作成し、ジョブの開始・終了時に記録する

JobExecutionListener実装例
// package and import omitted.

@Component
public class JobExecutionLoggingListener implements JobExecutionListener {
    private static final Logger logger =
            LoggerFactory.getLogger(JobExecutionLoggingListener.class);

    @Override
    public void beforeJob(JobExecution jobExecution) {
        // (1)
        logger.info("job started. [JobName:{}][jobExecutionId:{}]",
            jobExecution.getJobInstance().getJobName(), jobExecution.getId());
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        // (2)
        logger.info("job finished.[JobName:{}][jobExecutionId:{}][ExitStatus:{}]"
                , jobExecution.getJobInstance().getJobName(),
                , jobExecution.getId(), jobExecution.getExitStatus().getExitCode());
    }

}
ジョブBean定義ファイル
<!-- omitted. -->
<batch:job id="loggingJob" job-repository="jobRepository">
    <batch:step id="loggingJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- omitted. -->
        </batch:tasklet>
    </batch:step>
    <batch:listeners>
        <!-- (3) -->
        <batch:listener ref="jobExecutionLoggingListener"/>
    </batch:listeners>
</batch:job>
<!-- omitted. -->
ジョブ名、ジョブ実行IDのログ出力実装例
項番 説明

(1)

ジョブの開始前にジョブ名とジョブ実行IDをINFOログに出力している。

(2)

ジョブの終了時は(1)に加えて終了コードも出力している。

(3)

コンポーネントとして登録されているJobExecutionLoggingListenerを特定ジョブのBean定義に関連づけている。

ログ監視

バッチアプリケーションは運用時のユーザインターフェースはログが主体となる。 監視対象と発生時のアクションを明確に設計しておかないと、 フィルタリングが困難となり、対処に必要なログが埋もれてしまう危険がある。 このため、ログの監視対象としてキーワードとなるメッセージやコード体系をあらかじめ決めておくとよい。 ログに出力するメッセージ管理については、後述の"メッセージ管理"を参照。

ログ出力先

バッチアプリケーションにおけるログの出力先について、どの単位でログを分散/集約するのかを設計するとよい。 たとえばフラットファイルにログを出力する場合でも以下のように複数パターンが考えられる。

  • 1ジョブあたり1ファイルに出力する

  • 複数ジョブを1グループにまとめた単位で1ファイルに出力する

  • 1サーバあたり1ファイルに出力する

  • 複数サーバをまとめて1ファイルに出力する

いずれも対象システムにおける、ジョブ総数/ログ総量/発生するI/Oレートなどによって、 どの単位でまとめるのが最適かが分かれる。 また、ログを確認する方法にも依存する。ジョブスケジューラ上から参照することが多いか、コンソールから参照することが多いか、 といった活用方法によっても選択肢が変わると想定する。

重要なことは、運用設計にてログ出力を十分検討し、試験にてログの有用性を確認することに尽きる。

メッセージ管理

メッセージ管理について説明する。

コード体系のばらつき防止や、監視対象のキーワードとしての抽出を設計しやすくするため、 一定のルールに従ってメッセージを付与することが望ましい。

なお、ログと同様、メッセージ管理についても基本的にはMacchinetta Server 1.xと同様である。

MessageSourceの活用について

プロパティファイルからメッセージを使用するにはMessageSourceを使用することができる。

  • 具体的な設定・実装例については ログメッセージの一元管理 を参照のこと。

    • ここではログ出力のサンプルとして Spring MVC のコントローラー のケースにそって例示されているが、 Spring Batchの任意のコンポーネントに読み換えてほしい。

    • ここではMessageSourceのインスタンスを独自に生成しているが、Macchinetta Batch 2.xではその必要はない。 ApplicationContextが生成された後でのみ、各コンポーネントにアクセスされるためである。 なお、Macchinetta Batch 2.xでは以下のとおり設定済みである。

launch-context.xml
<bean id="messageSource"
      class="org.springframework.context.support.ResourceBundleMessageSource"
      p:basenames="i18n/application-messages" />
Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.2.RELEASE, 2019-4-19