Overview
障害発生などに起因してジョブが異常終了した後に、ジョブを再実行することで回復する手段について説明する。
本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。
ジョブの再実行には、以下の方法がある。
-
ジョブのリラン
-
ジョブのリスタート
-
ステートレスリスタート
-
件数ベースリスタート
-
-
ステートフルリスタート
-
処理状態を判断し、未処理のデータを抽出して処理するリスタート
-
処理状態を識別するための処理を別途実装する必要がある
-
-
-
以下に用語を定義する。
- リラン
-
ジョブを最初からやり直すこと。
事前作業として、データ初期化など障害発生前のジョブ開始時点に状態を回復する必要がある。 - リスタート
-
ジョブが中断した箇所から処理を再開すること。
処理再開位置の保持・取得方法、再開位置までのデータスキップ方法などをあらかじめ設計/実装する必要がある。
リスタートには、ステートレスとステートフルの2種類がある。 - ステートレスリスタート
-
個々の入力データに対する状態(未処理/処理済)を考慮しないリスタート方法。
- 件数ベースリスタート
-
ステートレスリスタートの1つ。
処理した入力データ件数を保持し、リスタート時にその件数分入力データをスキップする方法。
出力が非トランザクショナルなリソースの場合は、出力位置を保持し、リスタート時にその位置まで書き込み位置を移動することも必要になる。 - ステートフルリスタート
-
個々の入力データに対する状態(未処理/処理済)を判断し、未処理のデータのみを取得条件とするリスタート方法。
出力が非トランザクショナルなリソースの場合は、リソースを追記可能にして、リスタート時には前回の結果へ追記していくようにする。
一般的に、再実行の方法はリランがもっとも簡単である。 リラン < ステートレスリスタート < ステートフルリスタートの順に、設計や実装が難しくなる。 無論、可能であれば常にリランとすることが好ましいが、 ユーザが実装するジョブ1つ1つに対して、許容するバッチウィンドウや処理特性に応じてどの方法を適用するか検討してほしい。
How to use
リランとリスタートの実現方法について説明する。
ジョブのリラン
ジョブのリランを実現する方法を説明する。
-
リラン前にデータの初期化などデータ回復の事前作業を実施する。
-
失敗したジョブを同じ条件(同じパラメータ)で再度実行する。
-
Spring Batchでは同じパラメータでジョブを実行すると二重実行と扱われるが、Macchinetta Batch 2.xでは別ジョブとして扱う。
詳細は、"パラメータ変換クラスについて"を参照。
-
ジョブのリスタート
ジョブのリスタート方法を説明する。
ジョブのリスタートを行う場合は、同期実行したジョブに対して行うことを基本とする。
非同期実行したジョブは、リスタートではなくリランで対応するジョブ設計にすることを推奨する。 これは、「意図したリスタート実行」なのか「意図しない重複実行」であるかの判断が難しく、 運用で混乱をきたす可能性があるからである。
非同期実行ジョブでリスタート要件がどうしても外せない場合は、 「意図したリスタート実行」を明確にするために、以下の方法を利用できる。
-
CommandLineJobRunner
の-restart
によるリスタート-
非同期実行したジョブを別途同期実行によりリスタートする。逐次で回復処理を進めていく際に有効となる。
-
-
JobOperator#restart(JobExecutionId)
によるリスタート-
非同期実行したジョブを、再度非同期実行の仕組み上でリスタートする。一括で回復処理を進めていく際に有効となる。
-
非同期実行(DBポーリング)はリスタートをサポートしていない。そのため、別途ユーザにて実装する必要がある。
-
非同期実行(Webコンテナ)はリスタートの実現方法をガイドしている。この記述にしたがって、ユーザにて実装すること。
-
-
入力チェックがある場合のリスタートについて
入力チェックエラーは、チェックエラーの原因となる入力リソースを修正しない限り回復不可能である。 参考までに、入力チェックエラーが発生した際の入力リソース修正例を以下に示す。
|
多重処理(Partition Step)の場合について
"多重処理(Partition Step)"でリスタートする場合、
再び分割処理から処理が実施される。
データを分割した結果、すべて処理済みであった場合、無駄な分割処理が行われ |
ステートレスリスタート
ステートレスリスタートを実現する方法を説明する。
Macchinetta Batch 2.xでのステートレスリスタートは、件数ベースのリスタートを指す。これは、Spring Batchの仕組みをそのまま利用することで実現する。
件数ベースのリスタートは、チャンクモデルのジョブ実行で使用できる。
また、件数ベースのリスタートは、JobRepository
に登録される入出力に関するコンテキスト情報を利用する。
よって、件数ベースのリスタートでは、JobRepository
はインメモリデータベースではなく、永続性が担保されているデータベースを使用することを前提とする。
JobRepositoryの障害発生時について
データソースの設定で説明したとおり、
|
- リスタート時の入力
-
Spring Batchが提供しているItemReaderのほとんどが件数ベースのリスタートに対応しているため、特別な対応は不要である。
件数ベースのリスタート可能なItemReaderを自作する場合は、リスタート処理が実装されている抽象クラスorg.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader
を拡張すればよい。件数ベースリスタートは、あくまで件数のみを基準としてリスタート開始点を決定するため、処理済みの入力データの変更/追加/削除を検出することができない。
新たに入力データを追加する際に、以下のようなケースに留意すること。-
データの取得順を変更する
-
リスタート時に、重複処理や未処理となるデータが発生してしまい、リランした結果と異なる回復結果になるため、決して行ってはいけない。
-
-
処理済みデータを更新する
-
リスタート時に更新したデータは読み飛ばされるので、リランした結果とリスタートした結果で回復結果が変わるため好ましくない場合がある。
-
-
未処理データを更新または追加する
-
リランした結果と同じ回復結果になるため許容する。ただし、初回実行で正常終了した結果とは異なる。これは異常なデータを緊急対処的にパッチする場合や、実行時点で受領したデータを可能な限り多く処理する際に限定して使うとよい。
-
-
- リスタート時の出力
-
非トランザクショナルなリソースへの出力には注意が必要である。たとえば、ファイルではどの位置まで出力していたかを把握し、その位置から出力を行わなければいけない。
Spring Batchが提供しているFlatFileItemWriter
は、コンテキストから前回の出力位置を取得して、リスタート時にはその位置から出力を行ため、特別な対応は不要である。
トランザクショナルなリソースについては、失敗時にロールバックが行われているため、リスタート時には特に対処することなく処理を行うことができる。
上記の条件を満たしていれば、失敗したジョブに-restart
のオプションを付加して再度実行すればよい。
以下にジョブのリスタート例を示す。
$ # (1)
$ java -cp dependency/* org.springframework.batch.core.launch.support.CommandLineJobRunner <jobPath> <jobName> -restart
項番 |
説明 |
(1) |
|
実運用時の考慮
リスタート時に限らず、本番運用においては上記のように直にコマンドを打つような運用は避けること。 これは、誤ったコマンドの実行を防ぐためである。 誤ったコマンドの実行は、監視に不必要な情報をログに残したり、監視ツールが不要なアラートを発する原因になる可能性がある。 例えば、実行するべきコマンドをスクリプト化し、これを実行する運用とするなど、出来る限り直にコマンドを入力しない運用を検討してほしい。 なお、ジョブのリスタートを防止する場合には、ステートフルリスタートで後述するように、
ジョブのBean定義で、 |
非同期実行(DBポーリング)で実行したジョブのリスタート例を以下に示す。
$ # (1)
$ java -cp dependency/* org.springframework.batch.core.launch.support.CommandLineJobRunner <jobPath> <jobExecutionId> -restart
項番 |
説明 |
(1) |
ジョブ実行IDは、ジョブ要求テーブルから取得することができる。 ジョブ要求テーブルについては、"ポーリングするテーブルについて"を参照。 |
ジョブ実行IDのログ出力
異常終了したジョブのジョブ実行IDを迅速に特定するため、 ジョブ終了時や例外発生時にジョブ実行IDをログ出力するリスナーや例外ハンドリングクラスを実装することを推奨する。 |
非同期実行(Webコンテナ)でのリスタート例を以下に示す。
public long restart(long JobExecutionId) throws Execption {
return jobOperator.restart(JobExecutionId); // (1)
}
項番 |
説明 |
(1) |
ジョブ実行IDは、WebAPでジョブ実行した際に取得したIDを利用するか、 |
ステートフルリスタート
ステートフルリスタートを実現する方法を説明する。
ステートフルリスタートとは、実行時に入出力結果を付きあわせて未処理データだけ取得することで再処理する方法である。 この方法は、状態保持・未処理判定など設計が難しいが、データの変更に強い特徴があるため、時々用いられることがある。
ステートフルリスタートでは、リスタート条件を入出力リソースから判定するため、JobRepository
の永続化は不要となる。
- リスタート時の入力
-
入出力結果を付きあわせて未処理データだけ取得するロジックを実装したItemReaderを用意する。
- リスタート時の出力
-
ステートレスリスタートと同様に非トランザクショナルなリソースへ出力には注意が必要になる。
ファイルの場合、コンテキストを使用しないことを前提にすると、ファイルの追記を許可するような設計が必要になる。
ステートフルリスタートは、ジョブのリランと同様に失敗時のジョブと同じ条件でジョブを再実行する。
ステートレスリスタートとは異なり、-restart
のオプションは使用しない。
簡単ステートフルなリスタートの実現例を下記に示す。
-
入力対象のテーブルに処理済カラムを定義し、処理が成功したらNULL以外の値で更新する。
-
未処理データの抽出条件は、処理済カラムの値がNULLとなる。
-
-
処理結果をファイルに出力する。
<!-- (1) -->
<select id="findByProcessedIsNull"
resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
SELECT
branch_id AS branchId, year, month, customer_id AS customerId, amount
FROM
sales_plan_detail
WHERE
processed IS NULL
ORDER BY
branch_id ASC, year ASC, month ASC, customer_id ASC
]]>
</select>
<!-- (2) -->
<update id="update" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
UPDATE
sales_plan_detail
SET
processed = '1'
WHERE
branch_id = #{branchId}
AND
year = #{year}
AND
month = #{month}
AND
customer_id = #{customerId}
]]>
</update>
// (3)
@Bean
public MyBatisCursorItemReader<SalesPlanDetail> reader(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory) {
return new MyBatisCursorItemReaderBuilder<SalesPlanDetail>()
.sqlSessionFactory(jobSqlSessionFactory)
.queryId(
"org.terasoluna.batch.functionaltest.ch06.reprocessing.repository.RestartOnConditionRepository.findByZeroOrLessAmount")
.build();
}
// (4)
@Bean
public MyBatisBatchItemWriter<SalesPlanDetail> dbWriter(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
SqlSessionTemplate batchModeSqlSessionTemplate) {
return new MyBatisBatchItemWriterBuilder<SalesPlanDetail>()
.sqlSessionFactory(jobSqlSessionFactory)
.statementId(
"org.terasoluna.batch.functionaltest.ch06.reprocessing.repository.RestartOnConditionRepository.update")
.sqlSessionTemplate(batchModeSqlSessionTemplate)
.build();
}
@Bean
@StepScope
public FlatFileItemWriter<SalesPlanDetail> fileWriter(
@Value("#{jobParameters['outputFile']}") File outputFile) {
DelimitedLineAggregator<SalesPlanDetail> lineAggregator = new DelimitedLineAggregator<>();
BeanWrapperFieldExtractor<SalesPlanDetail> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"branchId", "year", "month", "customerId", "amount"});
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<SalesPlanDetail>()
.name(ClassUtils.getShortName(FlatFileItemWriter.class))
.resource(new FileSystemResource(outputFile))
.lineAggregator(lineAggregator)
.append(true) // (5)
.build();
}
// (6)
@Bean(destroyMethod="")
public CompositeItemWriter<SalesPlanDetail> compositeWriter(
@Qualifier("fileWriter") FlatFileItemWriter<SalesPlanDetail> fileWriter,
@Qualifier("dbWriter") MyBatisBatchItemWriter<SalesPlanDetail> dbWriter) throws Exception {
List<ItemWriter<? super SalesPlanDetail>> list = new ArrayList<>();
list.add(fileWriter);
list.add(dbWriter);
return new CompositeItemWriterBuilder<SalesPlanDetail>()
.delegates(list)
.build();
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader<SalesPlanDetail> reader,
@Qualifier("amountUpdateItemProcessor") ItemProcessor<SalesPlanDetail, SalesPlanDetail> processor,
@Qualifier("compositeWriter") ItemWriter<SalesPlanDetail> compositeWriter,
LoggingItemReaderListener listener) {
return new StepBuilder("restartOnConditionBasisJob.step01",
jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(10, transactionManager)
.reader(reader)
.processor(processor)
.writer(compositeWriter)
.listener(listener)
.build();
}
@Bean
public Job restartOnConditionBasisJob(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("restartOnConditionBasisJob",
jobRepository)
.preventRestart() // (7)
.start(step01)
.listener(listener)
.build();
}
<!-- (3) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.reprocessing.repository.RestartOnConditionRepository.findByZeroOrLessAmount"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- (4) -->
<bean id="dbWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.reprocessing.repository.RestartOnConditionRepository.update"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<bean id="fileWriter"
class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"
p:resource="file:#{jobParameters['outputFile']}"
p:appendAllowed="true"> <!-- (5) -->
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
p:names="branchId,year,month,customerId,amount"/>
</property>
</bean>
</property>
</bean>
<!-- (6) -->
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="fileWriter"/>
<ref bean="dbWriter"/>
</list>
</property>
</bean>
<batch:job id="restartOnConditionBasisJob"
job-repository="jobRepository" restartable="false"> <!-- (7) -->
<batch:step id="restartOnConditionBasisJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="amountUpdateItemProcessor"
writer="compositeWriter" commit-interval="10" />
</batch:tasklet>
</batch:step>
</batch:job>
$ # (8)
$ java -cp dependency/* org.springframework.batch.core.launch.support.CommandLineJobRunner <jobPath> <jobName> <jobParameters> ...
項番 | 説明 |
---|---|
(1) |
処理済カラムがNULLのデータのみ抽出するようにSQLを定義する。 |
(2) |
処理済カラムをNULL以外で更新するSQLを定義する。 |
(3) |
ItemReaderには、(1)で定義したSQLIDを設定する。 |
(4) |
データベースへ更新は、(2)で定義したSQLIDを設定する。 |
(5) |
リスタート時に前回中断箇所から書き込み可能にするため、ファイルの追記を許可する。 |
(6) |
ファイル出力 → データベース更新の順序で処理されるように |
(7) |
必須ではないが、誤って |
(8) |
失敗したジョブの実行条件で再度実行を行う。 |
ジョブのrestartable属性について
|