Overview
ジョブの実行を管理する方法について説明する。
本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。
ジョブの実行管理とは
ジョブの起動状態や実行結果を記録しバッチシステムを維持することを指す。
特に、異常発生時の検知や次に行うべき行動(異常終了後のリラン・リスタート等)を判断するために、必要な情報を確保することが重要である。
バッチアプリケーションの特性上、起動直後にその結果をユーザインタフェースで確認できることは稀である。
よって、ジョブスケジューラ/RDBMS/アプリケーションログといった、ジョブの実行とは別に実行状態・結果の記録を行う仕組みが必要となる。
本ガイドラインではジョブスケジューラを使用したジョブ管理については説明しないが、ジョブスケジューラを使用するのに適したジョブについては以下に記す。
- ジョブスケジューラを使用するのに適したジョブ
-
Macchinetta Batch 2.xの同期型ジョブで
JobRepository
による状態管理を必要としない(非永続化のRDBMSを使用)
JobRepository
を前提としたリスタート機能を使わないジョブ
上記の条件を満たす場合、ジョブの停止・リスタートを含めた、ジョブの実行管理をすべてジョブスケジューラで行うことが適している。
ジョブスケジューラを使用したジョブの実行管理については各製品のマニュアルを参照。
以降、Spring Batchが提供する機能を利用したジョブの実行管理について説明をする。
Spring Batch が提供する機能
Spring Batchは、ジョブの実行管理向けに以下のインタフェースを提供している。
機能 | 対応するインタフェース |
---|---|
ジョブの実行状態・結果の記録 |
|
ジョブの終了コードとプロセス終了コードの変換 |
|
Spring Batch はジョブの起動状態・実行結果の記録にJobRepository
を使用する。
Macchinetta Batch 2.xでは、以下のすべてに該当する場合は永続化は任意としてよい。
-
同期型ジョブ実行のみでMacchinetta Batch 2.xを使用する。
-
ジョブの停止・リスタートを含め、ジョブの実行管理はすべてジョブスケジューラに委ねる。
-
Spring Batchがもつ
JobRepository
を前提としたリスタートを利用しない。
-
これらに該当する場合はJobRepository
が使用するRDBMSの選択肢として、インメモリ・組み込み型データベースであるH2
を利用する。
一方で非同期実行を利用する場合や、Spring Batchの停止・リスタートを活用する場合は、ジョブの実行状態・結果を永続化可能なRDBMSが必要となる。
デフォルトのトランザクション分離レベル
Spring Batchが提供するxsdでは、 |
IndexによるJobRepositoryの性能改善
Indexを作成することで、
|
Spring BatchのバージョンアップによるJobRepositoryの性能問題の修正
Macchinetta Batch 2.2.0では、Spring Batch 4.2.xから加わった変更により、ジョブの起動時にSpring Batchが発行する
本件の対策として、Macchinetta Batch 2.2.0では「Spring Batchで対処された変更を実装する」ことを推奨しているが、 Macchinetta Batch 2.5.0.RELEASEではこの問題は発生しないため、対策は不要となる。 |
ジョブスケジューラを使用したジョブの実行管理については各製品のマニュアルを参照。
本ガイドラインではMacchinetta Batch 2.x内部でJobRepository
を用いたジョブの状態を管理するうえで関連する、
以下の項目について説明する。
-
-
状態を永続化する方法
-
状態を確認する方法
-
ジョブを手動停止する方法
-
How to use
JobRepository
はSpring BatchによりRDBMSへ自動的に新規登録・更新を行う。
ジョブの状態・実行結果の確認を行う場合は、意図しない変更処理がバッチアプリケーションの内外から行われることのないよう、以下のいずれかの方法を選択する。
-
ジョブの状態管理に関するテーブルに対しクエリを発行する
-
org.springframework.batch.core.explore.JobExplorer
を使用する
ジョブの状態管理
JobRepository
を用いたジョブの状態管理方法を説明する。
Spring Batchにより、以下のEntityがRDBMSのテーブルに登録される。
項番 | Entityクラス | テーブル名 | 生成単位 | 説明 |
---|---|---|---|---|
(1) |
|
|
1回のジョブ実行 |
ジョブの状態・実行結果を保持する。 |
(2) |
|
|
1回のジョブ実行 |
ジョブ内部のコンテキストを保持する。 |
(3) |
|
|
1回のジョブ実行 |
起動時に与えられたジョブパラメータを保持する。 |
(4) |
|
|
1回のステップ実行 |
ステップの状態・実行結果、コミット・ロールバック件数を保持する。 |
(5) |
|
|
1回のステップ実行 |
ステップ内部のコンテキストを保持する。 |
(6) |
|
|
ジョブ名とジョブパラメータの組み合わせ |
ジョブ名、およびジョブパラメータをシリアライズした文字列を保持する。 |
たとえば、1回のジョブ起動で3つのステップを実行した場合、以下の差が生じる
-
JobExecution
、JobExecutionContext
、JobExecutionParams
は1レコード登録される -
StepExecution
、StepExecutionContext
は3レコード登録される
また、JobInstance
は過去に起動した同名ジョブ・同一パラメータよる二重実行を抑止するために使用されるが、
Macchinetta Batch 2.xではこのチェックを行わない。詳細は二重起動防止を参照。
|
チャンク方式におけるStepExecutionの件数項目について
以下のように、不整合が発生しているように見えるが、仕様上妥当なケースがある。
|
状態の永続化
外部RDBMSを使用することでJobRepository
によるジョブの実行管理情報を永続化させることができる。
batch-application.propertiesの以下項目を外部RDBMS向けのデータソース、スキーマ設定となるよう修正する。
# (1)
# Admin DataSource settings.
admin.jdbc.driver=org.postgresql.Driver
admin.jdbc.url=jdbc:postgresql://serverhost:5432/admin
admin.jdbc.username=postgres
admin.jdbc.password=postgres
# (2)
spring-batch.schema.script=classpath:org/springframework/batch/core/schema-postgresql.sql
項番 | 説明 |
---|---|
(1) |
接頭辞 |
(2) |
アプリケーション起動時に |
管理用/業務用データソースの補足
|
ジョブの状態・実行結果の確認
JobRepository
からジョブの実行状態を確認する方法について説明する。
いずれの方法も、あらかじめ確認対象のジョブ実行IDが既知であること。
クエリを直接発行する
RDBMSコンソールを用い、JobRepository
が永続化されたテーブルに対して直接クエリを発行する。
admin=# select JOB_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE from BATCH_JOB_EXECUTION where JOB_EXECUTION_ID = 1;
job_execution_id | start_time | end_time | status | exit_code
------------------+-------------------------+-------------------------+-----------+-----------
1 | 2017-02-14 17:57:38.486 | 2017-02-14 18:19:45.421 | COMPLETED | COMPLETED
(1 row)
admin=# select JOB_EXECUTION_ID, STEP_EXECUTION_ID, START_TIME, END_TIME, STATUS, EXIT_CODE from BATCH_STEP_EXECUTION where JOB_EXECUTION_ID = 1;
job_execution_id | step_execution_id | start_time | end_time | status | exit_code
------------------+-------------------+-------------------------+------------------------+-----------+-----------
1 | 1 | 2017-02-14 17:57:38.524 | 2017-02-14 18:19:45.41 | COMPLETED | COMPLETED
(1 row)
JobExplorer
を利用する
バッチアプリケーションと同じアプリケーションコンテキストを共有可能な環境下で、JobExplorer
をインジェクションすることでジョブの実行状態を確認する。
// omitted.
@Inject
private JobExplorer jobExplorer;
private void monitor(long jobExecutionId) {
// (1)
JobExecution jobExecution = jobExplorer.getJobExecution(jobExecutionId);
// (2)
String jobName = jobExecution.getJobInstance().getJobName();
LocalDateTime jobStartTime = jobExecution.getStartTime();
LocalDateTime jobEndTime = jobExecution.getEndTime();
BatchStatus jobBatchStatus = jobExecution.getStatus();
String jobExitCode = jobExecution.getExitStatus().getExitCode();
// omitted.
// (3)
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
String stepName = stepExecution.getStepName();
LocalDateTime stepStartTime = stepExecution.getStartTime();
LocalDateTime stepEndTime = stepExecution.getEndTime();
BatchStatus stepStatus = stepExecution.getStatus();
String stepExitCode = stepExecution.getExitStatus().getExitCode();
// omitted.
}
}
項番 | 説明 |
---|---|
(1) |
インジェクションされた |
(2) |
|
(3) |
|
ジョブの停止
ジョブの停止とはJobRepository
の実行中ステータスを停止中ステータスに更新し、ステップの境界や
チャンク方式によるチャンクコミット時にジョブを停止させる機能である。
リスタートと組み合わせることで、停止された位置からの処理を再開させることができる。
リスタートの詳細は"ジョブのリスタート"を参照。 |
「ジョブの停止」は仕掛かり中のジョブを直ちに中止する機能ではなく、 このため、ジョブの停止は「チャンクの切れ目など、節目となる処理が完了した際に停止するよう予約する」ことともいえる。 たとえば以下の状況下でジョブ停止を行っても、期待する動作とはならない。
|
以下、ジョブの停止方法を説明する。
-
コマンドラインからの停止
-
同期型ジョブ・非同期型ジョブのどちらでも利用できる
-
CommandLineJobRunner
の-stop
を利用する
-
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
com.example.batch.jobs.Job01Config job01 -stop
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
classpath:/META-INF/jobs/job01.xml job01 -stop
-
ジョブ名指定によるジョブ停止は同名のジョブが並列で起動することが少ない同期バッチ実行時に適している。
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
com.example.batch.jobs.Job01Config 3 -stop
$ java org.springframework.batch.core.launch.support.CommandLineJobRunner \
classpath:/META-INF/jobs/job01.xml 3 -stop
-
ジョブ実行ID指定によるジョブ停止は同名のジョブが並列で起動することの多い非同期バッチ実行時に適している。
|
終了コードのカスタマイズ
同期実行によりジョブ終了時、javaプロセスの終了コードをジョブやステップの終了コードに合わせてカスタマイズできる。 javaプロセスの終了コードをカスタマイズするのに必要な作業を以下に示す。
-
ステップの終了コードを変更する。
-
ステップの終了コードに合わせて、ジョブの終了コードを変更する。
-
ジョブの終了コードとjavaプロセスの終了コードをマッピングする。
終了コードの意味合いについて
本節では、終了コードは2つの意味合いで扱われており、それぞれの説明を以下に示す。
|
ステップの終了コードの変更
処理モデルごとにステップの終了コードを変更する方法を以下に示す。
- チャンクモデルにおけるステップの終了コードの変更
-
ステップ終了時の処理として、StepExecutionListenerのafterStepメソッドもしくは@AfterStepアノテーションを付与したメソッドを実装し、 任意のステップの終了コードを返却する。
@Component
@Scope("step")
public class CheckAmountItemProcessor implements ItemProcessor<SalesPlanSummary, SalesPlanSummary> {
// omitted.
@Override
public SalesPlanSummary process(SalesPlanSummary item) throws Exception {
if (item.getAmount().signum() == -1) {
logger.warn("amount is negative. skip item [item: {}]", item);
if (!stepExecution.getExecutionContext().containsKey(ERROR_ITEMS_KEY)) {
stepExecution.getExecutionContext().put(ERROR_ITEMS_KEY, new ArrayList<SalesPlanSummary>());
}
@SuppressWarnings("unchecked")
List<SalesPlanSummary> errorItems = (List<SalesPlanSummary>) stepExecution.getExecutionContext().get(ERROR_ITEMS_KEY);
errorItems.add(item);
return null;
}
return item;
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getExecutionContext().containsKey(ERROR_ITEMS_KEY)) {
logger.info("Change status 'STEP COMPLETED WITH SKIPS'");
// (1)
return new ExitStatus("STEP COMPLETED WITH SKIPS");
}
return stepExecution.getExitStatus();
}
}
項番 | 説明 |
---|---|
(1) |
ステップの実行結果に応じて独自の終了コードを設定する。 |
- タスクレットモデルにおけるステップの終了コードの変更
-
Taskletのexecuteメソッドの引数であるStepContributionに任意のステップの終了コードを設定する。
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
// omitted.
if (errorCount > 0) {
contribution.setExitStatus(new ExitStatus("STEP COMPLETED WITH SKIPS")); // (1)
}
return RepeatStatus.FINISHED;
}
項番 | 説明 |
---|---|
(1) |
タスクレットの実行結果に応じて独自の終了コードを設定する。 |
ジョブの終了コードの変更
ジョブ終了時の処理としてJobExecutionListenerのafterJobメソッドを実装し、最終的なジョブの終了コードを各ステップの終了コードによって設定する。
@Component
public class JobExitCodeChangeListener implements JobExecutionListener {
@Override
public void afterJob(JobExecution jobExecution) {
// (1)
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if ("STEP COMPLETED WITH SKIPS".equals(stepExecution.getExitStatus().getExitCode())) {
jobExecution.setExitStatus(new ExitStatus("JOB COMPLETED WITH SKIPS"));
logger.info("Change status 'JOB COMPLETED WITH SKIPS'");
break;
}
}
}
}
@Bean
public Job exitstatusjob(JobRepository jobRepository,
Step step,
jobExitCodeChangeListener listener) {
return new JobBuilder("exitstatusjob", jobRepository)
.start(step)
.listener(listener)
.build();
}
<batch:job id="exitstatusjob" job-repository="jobRepository">
<batch:step id="exitstatusjob.step">
<!-- omitted -->
</batch:step>
<batch:listeners>
<batch:listener ref="jobExitCodeChangeListener"/>
</batch:listeners>
</batch:job>
項番 | 説明 |
---|---|
(1) |
ジョブの実行結果に応じて、最終的なジョブの終了コードを |
終了コードのマッピング
ジョブの終了コードとプロセスの終了コードをマッピング定義を行う。
// exitCodeMapper
@Bean
public ExitCodeMapper exitCodeMapper() {
final SimpleJvmExitCodeMapper simpleJvmExitCodeMapper = new SimpleJvmExitCodeMapper();
final Map<String, Integer> exitCodeMapper = new HashMap<>();
// ExitStatus
exitCodeMapper.put("NOOP", 0);
exitCodeMapper.put("COMPLETED", 0);
exitCodeMapper.put("STOPPED", 255);
exitCodeMapper.put("FAILED", 255);
exitCodeMapper.put("UNKNOWN", 255);
exitCodeMapper.put("JOB COMPLETED WITH SKIPS", 100);
simpleJvmExitCodeMapper.setMapping(exitCodeMapper);
return simpleJvmExitCodeMapper;
}
<!-- exitCodeMapper -->
<bean id="exitCodeMapper"
class="org.springframework.batch.core.launch.support.SimpleJvmExitCodeMapper">
<property name="mapping">
<util:map id="exitCodeMapper" key-type="java.lang.String"
value-type="java.lang.Integer">
<!-- ExitStatus -->
<entry key="NOOP" value="0" />
<entry key="COMPLETED" value="0" />
<entry key="STOPPED" value="255" />
<entry key="FAILED" value="255" />
<entry key="UNKNOWN" value="255" />
<entry key="JOB COMPLETED WITH SKIPS" value="100" />
</util:map>
</property>
</bean>
プロセスの終了コードに1は厳禁
一般的にJavaプロセスはVMクラッシュやSIGKILLシグナル受信などによりプロセスが強制終了した際、 終了コードとして1を返却することがある。 正常・異常を問わずバッチアプリケーションの終了コードとは明確に区別すべきであるため、 アプリケーション内ではプロセスの終了コードとして1を定義しないこと。 |
終了ステータスと終了コードの違いについて
|
二重起動防止
Spring Batchではジョブを起動する際、JobRepositry
からJobInstance(BATCH_JOB_INSTANCEテーブル)
に対して
以下の組み合わせが存在するか確認する。
-
起動対象となるジョブ名
-
ジョブパラメータ
Macchinetta Batch 2.xではジョブ・ジョブパラメータの組み合わせが一致しても複数回起動可能としている。
つまり、二重起動を許容する。
詳細は、ジョブの起動パラメータを参照。
二重起動を防止する場合は、ジョブスケジューラやアプリケーション内で実施する必要がある。
詳細な手段については、ジョブスケジューラ製品や業務要件に強く依存するため割愛する。
個々のジョブについて、二重起動を抑止する必要があるかについて、検討すること。
ロギング
ログの設定方法について説明する。
ログの出力、設定、考慮事項はMacchinetta Server 1.xと共通点が多い。まずは、 ロギングを参照。
ここでは、Macchinetta Batch 2.x特有の考慮点について説明する。
ログ出力元の明確化
バッチ実行時のログは出力元のジョブやジョブ実行を明確に特定できるようにしておく必要がある。 そのため、スレッド名、ジョブ名、実行ジョブIDを出力するとよい。 特に非同期実行時は同名のジョブが異なるスレッドで並列に動作することになるため、 ジョブ名のみの記録はログ出力元を特定しにくくなる恐れがある。
それぞれの要素は、以下の要領で実現できる。
- スレッド名
-
logback.xml
の出力パターンである%thread
を指定する - ジョブ名・実行ジョブID
-
JobExecutionListener
を実装したコンポーネントを作成し、ジョブの開始・終了時に記録する
// package and import omitted.
@Component
public class JobExecutionLoggingListener implements JobExecutionListener {
private static final Logger logger =
LoggerFactory.getLogger(JobExecutionLoggingListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
// (1)
logger.info("job started. [JobName:{}][jobExecutionId:{}]",
jobExecution.getJobInstance().getJobName(), jobExecution.getId());
}
@Override
public void afterJob(JobExecution jobExecution) {
// (2)
logger.info("job finished.[JobName:{}][jobExecutionId:{}][ExitStatus:{}]"
, jobExecution.getJobInstance().getJobName(),
, jobExecution.getId(), jobExecution.getExitStatus().getExitCode());
}
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
Tasklet tasklet) {
return new StepBuilder("loggingJob.step01",
jobRepository)
.tasklet(tasklet)
.build();
}
@Bean
public Job loggingJob(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("loggingJob", jobRepository)
.start(step01)
.listener(listener) // (3)
.build();
}
<!-- omitted. -->
<batch:job id="loggingJob" job-repository="jobRepository">
<batch:step id="loggingJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- omitted. -->
</batch:tasklet>
</batch:step>
<batch:listeners>
<!-- (3) -->
<batch:listener ref="jobExecutionLoggingListener"/>
</batch:listeners>
</batch:job>
<!-- omitted. -->
項番 | 説明 |
---|---|
(1) |
ジョブの開始前にジョブ名とジョブ実行IDをINFOログに出力している。 |
(2) |
ジョブの終了時は(1)に加えて終了コードも出力している。 |
(3) |
コンポーネントとして登録されている |
ログ監視
バッチアプリケーションは運用時のユーザインタフェースはログが主体となる。 監視対象と発生時のアクションを明確に設計しておかないと、 フィルタリングが困難となり、対処に必要なログが埋もれてしまう危険がある。 このため、ログの監視対象としてキーワードとなるメッセージやコード体系をあらかじめ決めておくとよい。 ログに出力するメッセージ管理については、後述の"メッセージ管理"を参照。
ログ出力先
バッチアプリケーションにおけるログの出力先について、どの単位でログを分散/集約するのかを設計するとよい。 たとえばフラットファイルにログを出力する場合でも以下のように複数パターンが考えられる。
-
1ジョブあたり1ファイルに出力する
-
複数ジョブを1グループにまとめた単位で1ファイルに出力する
-
1サーバあたり1ファイルに出力する
-
複数サーバをまとめて1ファイルに出力する
いずれも対象システムにおける、ジョブ総数/ログ総量/発生する入出力レートなどによって、 どの単位でまとめるのが最適かが分かれる。 また、ログを確認する方法にも依存する。ジョブスケジューラ上から参照することが多いか、コンソールから参照することが多いか、 といった活用方法によっても選択肢が変わると想定する。
重要なことは、運用設計にてログ出力を十分検討し、試験にてログの有用性を確認することに尽きる。
メッセージ管理
メッセージ管理について説明する。
コード体系のばらつき防止や、監視対象のキーワードとしての抽出を設計しやすくするため、 一定のルールに従ってメッセージを付与することが望ましい。
なお、ログと同様、メッセージ管理についても基本的にはMacchinetta Server 1.xと同様である。
MessageSourceの活用について
プロパティファイルからメッセージを使用するには
jp.co.ntt.fw.macchinetta.batch.functionaltest.config.LaunchContextConfig.java
META-INF/spring/launch-context.xml
|