Overview
ジョブ実行時に発生する例外のハンドリング方法について説明する。
本機能は、チャンクモデルとタスクレットモデルとで使い方が異なるため、それぞれについて説明する。
まず、例外の分類について説明し、例外の種類に応じたハンドリング方法を説明する。
例外の分類
ジョブ実行時に発生する例外は、以下の3つに分類される。
項番 |
分類 |
説明 |
|
(1) |
ジョブの再実行(パラメータ、入力データの変更/修正など)によって発生原因が解消できる例外 |
ジョブの再実行で発生原因が解消できる例外は、アプリケーションコードで例外をハンドリングし、例外処理を行う。 |
|
(2) |
ジョブの再実行によって発生原因が解消できない例外 |
ジョブの再実行で発生原因が解消できる例外は、以下のパターンにてハンドリングする。 1. StepListenerで例外の捕捉が可能な場合は、
アプリケーションコードで例外をハンドリングする。 2. StepListenerで例外の捕捉が不可能な場合は、 フレームワークで例外処理をハンドリングする。 |
|
(3) |
(非同期実行時に)ジョブ要求のリクエスト不正により発生する例外 |
ジョブ要求のリクエスト不正により発生する例外は、フレームワークで例外処理をハンドリングし、例外処理を行う。 非同期実行(DBポーリング)の場合は、 ポーリング処理ではジョブ要求に対する妥当性検証をしない。そのため、ジョブ要求を登録するアプリケーションで事前にリクエストに対する入力チェックが行われていることが望ましい。 非同期実行(Webコンテナ)の場合は、 Webアプリケーションにより事前にリクエストに対する入力チェックが行われていることを前提としている。 そのため、ジョブ要求やリクエストを受け付けるアプリケーションで例外ハンドリングを行う。 |
例外処理内でトランザクショナルな処理は避ける
例外処理内でデータベースへの書き込みを始めとするトランザクショナルな処理を行うと、 二次例外を引き起こしてしまう可能性がある。 例外処理は、解析用ログ出力と終了コード設定を基本とすること。 |
例外の種類
例外の種類について説明する。
ビジネス例外
ビジネス例外とは、ビジネスルールの違反を検知したことを通知する例外である。
本例外は、ステップのロジック内で発生させる。
アプリケーションとして想定される状態なので、システム運用者による対処は不要である。
-
在庫引当時に在庫切れの場合
-
予定日より日数が超過した場合
-
etc …
該当する例外クラス
|
正常稼働時に発生するライブラリ例外
正常稼働時に発生するライブラリ例外とは、フレームワーク、およびライブラリ内で発生する例外のうち、システムが正常稼働している時に発生する可能性のある例外のことを指す。
フレームワーク、およびライブラリ内で発生する例外とは、Spring Frameworkや、その他のライブラリ内で発生する例外クラスを対象とする。
アプリケーションとして想定される状態なので、システム運用者による対処は不要である。
-
オンライン処理との排他制御で発生する楽観ロック例外
-
複数ジョブやオンライン処理からの同一データを同時登録する際に発生する一意制約例外
-
etc …
該当する例外クラス
|
システム例外
システム例外とは、システムが正常稼働している時に、発生してはいけない状態を検知したことを通知する例外である。
本例外は、ステップのロジック内で発生させる。
システム運用者による対処が必要となる。
-
事前に存在しているはずのマスタデータ、ディレクトリ、ファイルなどが存在しない場合。
-
フレームワーク、ライブラリ内で発生する検査例外のうち、システム異常に分類される例外を捕捉した場合(ファイル操作時のIOExceptionなど)。
-
etc…
該当する例外クラス
|
予期しないシステム例外
予期しないシステム例外とは、システムが正常稼働している時には発生しない非検査例外である。
システム運用者による対処、またはシステム開発者による解析が必要となる。
予期しないシステム例外は、以下の処理をする以外はハンドリングしない。ハンドリングした場合は、例外を再度スローすること。
-
捕捉例外を解析用にログ出力を行い、該当する終了コードの設定する。
-
アプリケーション、フレームワーク、ライブラリにバグが潜んでいる場合。
-
データベースサーバなどがダウンしている場合。
-
etc…
該当する例外クラス
|
致命的なエラー
致命的なエラーとは、システム(アプリケーション)全体に影響を及ぼす、致命的な問題が発生している事を通知するエラーである。
システム運用者、またはシステム開発者による対処・リカバリが必要となる。
致命的なエラーは、以下の処理をする以外はハンドリングしない。ハンドリングした場合は、例外を再度スローすること。
-
捕捉例外を解析用にログ出力を行い、該当する終了コードの設定する。
-
Java仮想マシンで使用できるメモリが不足している場合。
-
etc…
該当する例外クラス
|
例外への対応方法
例外への対応方法について説明する。
例外への対応パターンは次のとおり。
-
例外発生時にジョブの継続可否を決める (3種類)
-
中断したジョブの再実行方法を決める (2種類)
項番 | 例外への対応方法 | 説明 |
---|---|---|
(1) |
エラーレコードをスキップし、処理を継続する。 |
|
(2) |
エラーレコードを指定した条件(回数、時間等)に達するまで再処理する。 |
|
(3) |
処理を中断する。 |
例外が発生していなくても、ジョブが想定以上の処理時間になったため処理途中で停止する場合がある。 |
項番 | 例外への対応方法 | 説明 |
---|---|---|
(1) |
中断したジョブを最初から再実行する。 |
|
(2) |
中断したジョブを中断した箇所から再実行する。 |
中断したジョブの再実行方法についての詳細は、処理の再実行を参照。
スキップ
スキップとは、バッチ処理を止めずにエラーデータを飛ばして処理を継続する方法である。
-
入力データ内に不正なレコードが存在する場合
-
ビジネス例外が発生した場合
-
etc …
スキップレコードの再処理
スキップを行う場合は、スキップした不正なレコードについてどのように対応するか設計すること。 不正なレコードを抽出して再処理する場合、次回実行時に含めて処理する場合、などといった方法が考えられる。 |
リトライ
リトライとは、特定の処理に失敗したレコードに対して指定した回数や時間に達するまで再試行を繰り返す対応方法である。
処理失敗の原因が実行環境に依存しており、かつ、時間の経過により解決される見込みのある場合にのみ用いる。
-
排他制御により、処理対象のレコードがロックされている場合
-
ネットワークの瞬断によりメッセージ送信が失敗する場合
-
etc …
リトライの適用
リトライをあらゆる場面で適用してしまうと、異常発生時に処理時間がむやみに伸びてしまい、異常の検出が遅れる危険がある。 |
How to use
例外ハンドリングの実現方法について説明をする。
バッチアプリケーション運用時のユーザインタフェースはログが主体である。よって、例外発生の監視もログを通じて行うことになる。
Spring Batch では、ステップ実行時に例外が発生した場合はログを出力し異常終了するため、ユーザにて追加実装をせずとも要件を満たせる可能性がある。 以降の説明は、ユーザにてシステムに応じたログ出力を行う必要があるときのみ、ピンポイントに実装するとよい。 すべての処理を実装しなくてはならないケースは基本的にはない。
例外ハンドリングの共通であるログ設定については、ロギングを参照。
ステップ単位の例外ハンドリング
ステップ単位での例外ハンドリング方法について説明する。
- ChunkListenerインタフェースによる例外ハンドリング
-
処理モデルによらず、発生した例外を統一的にハンドリングしたい場合は、 ChunkListenerインタフェースを利用する。
チャンクよりスコープの広い、ステップやジョブのリスナーを利用しても実現できるが、 出来る限り発生した直後にハンドリングすることを重視し、ChunkListener
を採用する。
各処理モデルごとの例外ハンドリング方法は以下のとおり。
- チャンクモデルにおける例外ハンドリング
-
Spring Batch提供の各種Listenerインタフェースを使用して機能を実現する。
- タスクレットモデルにおける例外ハンドリング
-
タスクレット実装内にて独自に例外ハンドリングを実装する。
ChunkListenerで統一的にハンドリングできるのはなぜか
この点は
|
ChunkListenerインタフェースによる例外ハンドリング
ChunkListenerインタフェースのafterChunkError
メソッドを実装する。
afterChunkError
メソッドの引数であるChunkContext
からChunkListener.ROLLBACK_EXCEPTION_KEY
をキーにしてエラー情報を取得する。
リスナーの設定方法については、リスナーの設定を参照。
@Component
public class ChunkAroundListener implements ChunkListener {
private static final Logger logger =
LoggerFactory.getLogger(ChunkAroundListener.class);
@Override
public void beforeChunk(ChunkContext context) {
logger.info("before chunk. [context:{}]", context);
}
@Override
public void afterChunk(ChunkContext context) {
logger.info("after chunk. [context:{}]", context);
}
// (1)
@Override
public void afterChunkError(ChunkContext context) {
logger.error("Exception occurred while chunk. [context:{}]",
context.getAttribute(ChunkListener.ROLLBACK_EXCEPTION_KEY)); // (2)
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
処理モデルの違いによるChunkListenerの挙動の違い
チャンクモデルでは、リソースのオープン・クローズで発生した例外は、ChunkListenerインタフェースが捕捉するスコープ外となる。 そのため、afterChunkErrorメソッドでハンドリングが行われない。 概略図を以下に示す。 図 1. チャンクモデルでの例外ハンドリング概略図
タスクレットモデルでは、リソースのオープン・クローズで発生した例外は、ChunkListenerインタフェースが捕捉するスコープ内となる。 そのため、afterChunkErrorメソッドでハンドリングが行わる。 概略図を以下に示す。 図 2. タスクレットモデルでの例外ハンドリング概略図
この挙動の差を吸収して統一的に例外をハンドリングしたい場合は、
StepExecutionListenerインタフェースで例外の発生有無をチェックすることで実現できる。
ただし、 StepExecutionListenerの実装例
|
チャンクモデルにおける例外ハンドリング
チャンクモデルでは、 StepListenerを継承したListenerで例外ハンドリングする。
リスナーの設定方法については、リスナーの設定を参照。
コーディングポイント(ItemReader編)
ItemReadListenerインタフェースの
onReadError
メソッドを実装することで、ItemReader内で発生した例外をハンドリングする。
@Component
public class ChunkComponentListener implements ItemReadListener<Object> {
private static final Logger logger =
LoggerFactory.getLogger(ChunkComponentListener.class);
// omitted.
// (1)
@Override
public void onReadError(Exception ex) {
logger.error("Exception occurred while reading.", ex); // (2)
}
// omitted.
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
コーディングポイント(ItemProcessor編)
ItemProcessorでの例外ハンドリングには、2つの方法があり、要件に応じて使い分ける。
-
ItemProcessor 内でtry~catchをする方法
-
ItemProcessListenerインタフェースを使用する方法
使い分ける理由について説明する。
ItemProcessorの処理内で例外発生時に実行されるonProcessError
メソッドの引数は、処理対処のアイテムと例外の2つである。
システムの要件によっては、ItemProcessListener
インタフェース内でログ出力等の例外をハンドリングする際に、この2つの引数で要件を満たせない場合が出てくる。
その場合は、ItemProcessor内でtry~catchにて例外をcatchし例外ハンドリング処理を行うことを推奨する。
注意点として、ItemProcessor内でtry~catchを実装した上で、ItemProcessListener
インタフェースを実装すると二重処理になる場合があるため、注意が必要である。
きめ細かい例外ハンドリングを行いたい場合は、ItemProcessor内でtry~catchをする方法を採用すること。
それぞれの方法について説明する。
- ItemProcessor 内でtry~catchする方法
-
きめ細かい例外ハンドリングが必要になる場合はこちらを使用する。
後述するスキップの項で説明するが、エラーレコードのスキップを行う際にはこちらを使用することとなる。
@Component
public class AmountCheckProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {
// omitted.
@Override
public SalesPerformanceDetail process(SalesPerformanceDetail item)
throws Exception {
// (1)
try {
checkAmount(item.getAmount(), amountLimit);
} catch (ArithmeticException ae) {
// (2)
logger.error(
"Exception occurred while processing. [item:{}]", item, ae);
// (3)
throw new IllegalStateException("check error at processor.", ae);
}
return item;
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
(3) |
トランザクションのロールバック例外をスローする。 |
ItemProcessListener
インタフェースを使用する方法-
業務例外に対するハンドリングが共通化できる場合はこちらを使用する。
@Component
public class ChunkComponentListener implements ItemProcessListener<Object, Object> {
private static final Logger logger =
LoggerFactory.getLogger(ChunkComponentListener.class);
// omitted.
// (1)
@Override
public void onProcessError(Object item, Exception e) {
// (2)
logger.error("Exception occurred while processing. [item:{}]", item, e);
}
// omitted.
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
コーディングポイント(ItemWriter編)
ItemWriteListenerインタフェースの
onWriteError
メソッドを実装することで、ItemWriter内で発生した例外をハンドリングする。
@Component
public class ChunkComponentListener implements ItemWriteListener<Object> {
private static final Logger logger =
LoggerFactory.getLogger(ChunkComponentListener.class);
// omitted.
// (1)
@Override
public void onWriteError(Exception ex, Chunk item) {
// (2)
logger.error("Exception occurred while processing. [items:{}]", item, ex);
}
// omitted.
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
タスクレットモデルにおける例外ハンドリング
タスクレットモデルの例外ハンドリングはタスクレット内で独自に実装する。
トランザクション処理を行う場合は、ロールバックさせるために必ず例外を再度スローすること。
@Component
public class SalesPerformanceTasklet implements Tasklet {
private static final Logger logger =
LoggerFactory.getLogger(SalesPerformanceTasklet.class);
// omitted.
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// (1)
try {
reader.open(chunkContext.getStepContext().getStepExecution()
.getExecutionContext());
List<SalesPerformanceDetail> items = new ArrayList<>(10);
SalesPerformanceDetail item = null;
do {
// Pseudo operation of ItemReader
// omitted.
// Pseudo operation of ItemProcessor
checkAmount(item.getAmount(), amountLimit);
// Pseudo operation of ItemWriter
// omitted.
} while (item != null);
} catch (Exception e) {
logger.error("exception in tasklet.", e); // (2)
throw e; // (3)
} finally {
try {
reader.close();
} catch (Exception e) {
// do nothing.
}
}
return RepeatStatus.FINISHED;
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
(3) |
トランザクションをロールバックするため、例外を再度スローする。 |
ジョブ単位の例外ハンドリング
ジョブ単位に例外ハンドリング方法を説明する。
チャンクモデルとタスクレットモデルとで共通のハンドリング方法となる。
システム例外や致命的エラー等エラーはジョブ単位に JobExecutionListenerインタフェースの実装を行う。
例外ハンドリング処理を集約して定義するために、ステップごとにハンドリング処理を定義はせずジョブ単位でハンドリングを行う。
ここでの例外ハンドリングは、ログ出力、およびExitCodeの設定を行い、トランザクション処理は実装しないこと。
トランザクション処理の禁止
|
ここでは、ItemProcessorで例外が発生したときのハンドリング例を示す。 リスナーの設定方法については、リスナーの設定を参照。
@Component
public class AmountCheckProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {
// omitted.
private StepExecution stepExecution;
// (1)
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public SalesPerformanceDetail process(SalesPerformanceDetail item)
throws Exception {
// (2)
try {
checkAmount(item.getAmount(), amountLimit);
} catch (ArithmeticException ae) {
// (3)
stepExecution.getExecutionContext().put("ERROR_ITEM", item);
// (4)
throw new IllegalStateException("check error at processor.", ae);
}
return item;
}
}
@Component
public class JobErrorLoggingListener implements JobExecutionListener {
private static final Logger logger =
LoggerFactory.getLogger(JobErrorLoggingListener.class);
@Override
public void beforeJob(JobExecution jobExecution) {
// do nothing.
}
// (5)
@Override
public void afterJob(JobExecution jobExecution) {
// whole job execution
List<Throwable> exceptions = jobExecution.getAllFailureExceptions(); // (6)
// (7)
if (exceptions.isEmpty()) {
return;
}
// (8)
logger.info("This job has occurred some exceptions as follow. " +
"[job-name:{}] [size:{}]",
jobExecution.getJobInstance().getJobName(), exceptions.size());
for (Throwable th : exceptions) {
logger.error("exception has occurred in job.", th);
}
// (9)
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
Object errorItem = stepExecution.getExecutionContext()
.get("ERROR_ITEM"); // (10)
if (errorItem != null) {
logger.error("detected error on this item processing. " +
"[step:{}] [item:{}]", stepExecution.getStepName(),
errorItem);
}
}
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
例外ハンドリングを実装する |
(4) |
|
(5) |
|
(6) |
引数の |
(7) |
エラー情報がない場合は、正常終了とする。 |
(8) |
エラー情報がある場合は、例外ハンドリングを行う。 |
(9) |
この例では、エラーデータがある場合はログ出力を行うようにしている。 |
ExecutionContextへ格納するオブジェクト
|
処理継続可否の決定
例外発生時にジョブの処理継続可否を決定する実装方法を説明する。
スキップ
エラーレコードをスキップして、処理を継続する方法を説明する。
チャンクモデル
チャンクモデルでは、各処理のコンポーネントで実装方法が異なる。
ここで説明する内容を適用する前に、必ず<skippable-exception-classes>を使わない理由についてを一読すること。 |
- ItemReaderでのスキップ
-
StepBuilderのskipPolicyメソッド/<batch:chunk>のskip-policy属性
にスキップ方法を指定する。skipメソッド/<batch:skippable-exception-classes>
に、スキップ対象とするItemReaderで発生する例外クラスを指定する。
skipPolicyメソッドの引数skipPolicy/skip-policy属性
には、Spring Batchが提供している下記に示すいづれかのクラスを使用する。
クラス名 | 説明 |
---|---|
常にスキップをする。 |
|
スキップをしない。 |
|
指定したスキップ数の上限に達するまでスキップをする。
|
|
例外ごとに適用する |
スキップの実装例を説明する。
FlatFileItemReader
でCSVファイルを読み込む際、不正なレコードが存在するケースを扱う。
なお、この時以下の例外が発生する。
-
org.springframework.batch.item.ItemReaderException
(ベースとなる例外クラス)-
org.springframework.batch.item.file.FlatFileParseException
(発生する例外クラス)
-
skip-policy
別に定義方法を示す。
@Bean
@StepScope
public FlatFileItemReader<SalesPerformanceDetail> detailCSVReader(
@Value("#{jobParameters['inputFile']}") File inputFile) {
final DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("branchId", "year", "month", "customerId", "amount");
final BeanWrapperFieldSetMapper<SalesPerformanceDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<SalesPerformanceDetail>();
fieldSetMapper.setTargetType(SalesPerformanceDetail.class);
final DefaultLineMapper<SalesPerformanceDetail> lineMapper = new DefaultLineMapper<SalesPerformanceDetail>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return new FlatFileItemReaderBuilder<SalesPerformanceDetail>()
.name(ClassUtils.getShortName(FlatFileItemReader.class))
.resource(new FileSystemResource(inputFile))
.lineMapper(lineMapper)
.build();
}
<bean id="detailCSVReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="file:#{jobParameters['inputFile']}"/>
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="branchId,year,month,customerId,amount"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail"/>
</property>
</bean>
</property>
</bean>
- AlwaysSkipItemSkipPolicy
-
.AlwaysSkipItemSkipPolicyの指定例
// (1)
@Bean
public AlwaysSkipItemSkipPolicy skipPolicy() {
AlwaysSkipItemSkipPolicy skipPolicy = new AlwaysSkipItemSkipPolicy();
return skipPolicy;
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
AlwaysSkipItemSkipPolicy skipPolicy,
SkipLoggingListener listener) {
return new StepBuilder("jobSalesPerfAtSkipAllReadError.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(skipPolicy) // (2)
.listener(listener)
.build();
}
@Bean
public Job jobSalesPerfAtSkipAllReadError(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfAtSkipAllReadError", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- (1) -->
<bean id="skipPolicy"
class="org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy"/>
<batch:job id="jobSalesPerfAtSkipAllReadError" job-repository="jobRepository">
<batch:step id="jobSalesPerfAtSkipAllReadError.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"
skip-policy="skipPolicy"> <!-- (2) -->
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
- NeverSkipItemSkipPolicy
-
.NeverSkipItemSkipPolicyの指定例
// (1)
@Bean
public NeverSkipItemSkipPolicy skipPolicy() {
return new NeverSkipItemSkipPolicy();
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
SkipLoggingListener skipLoggingListener,
NeverSkipItemSkipPolicy skipPolicy) {
return new StepBuilder("jobSalesPerfAtSkipNeverReadError.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.listener(skipLoggingListener)
.skipPolicy(skipPolicy) // (2)
.build();
}
@Bean
public Job jobSalesPerfAtSkipNeverReadError(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfAtSkipNeverReadError", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- (1) -->
<bean id="skipPolicy"
class="org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy"/>
<batch:job id="jobSalesPerfAtSkipNeverReadError" job-repository="jobRepository">
<batch:step id="jobSalesPerfAtSkipNeverReadError.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"
skip-policy="skipPolicy"> <!-- (2) -->
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
- LimitCheckingItemSkipPolicy
-
.LimitCheckingItemSkipPolicyの指定例
// (1)
// @Bean
// public LimitCheckingItemSkipPolicy skipPolicy() {
// return new LimitCheckingItemSkipPolicy();
// }
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
SkipLoggingListener listener) {
return new StepBuilder("jobSalesPerfAtValidSkipReadError.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(2) // (2)
.skip(ItemReaderException.class) // (3) (4)
.listener(listener)
.build();
}
@Bean
public Job jobSalesPerfAtValidSkipReadError(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfAtValidSkipReadError", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- (1) -->
<!--
<bean id="skipPolicy"
class="org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy"/>
-->
<batch:job id="jobSalesPerfAtValidSkipReadError" job-repository="jobRepository">
<batch:step id="jobSalesPerfAtValidSkipReadError.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"
skip-limit="2"> <!-- (2) -->
<!-- (3) -->
<batch:skippable-exception-classes>
<!-- (4) -->
<batch:include
class="org.springframework.batch.item.ItemReaderException"/>
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
|
(4) |
|
- ExceptionClassifierSkipPolicy
-
.ExceptionClassifierSkipPolicyの指定例
// (1)
@Bean
public ExceptionClassifierSkipPolicy skipPolicy(
AlwaysSkipItemSkipPolicy alwaysSkip) {
ExceptionClassifierSkipPolicy skipPolicy = new ExceptionClassifierSkipPolicy();
Map<Class<? extends Throwable>, SkipPolicy> policyMap = new HashMap<>();
policyMap.put(ItemReaderException.class, alwaysSkip); // (2)
skipPolicy.setPolicyMap(policyMap);
return skipPolicy;
}
// (3)
@Bean
public AlwaysSkipItemSkipPolicy alwaysSkip() {
return new AlwaysSkipItemSkipPolicy();
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
SkipLoggingListener skipLoggingListener,
ExceptionClassifierSkipPolicy skipPolicy) {
return new StepBuilder("jobSalesPerfAtValidNolimitSkipReadError.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipPolicy(skipPolicy) // (4)
.listener(skipLoggingListener)
.build();
}
@Bean
public Job jobSalesPerfAtValidNolimitSkipReadError(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfAtValidNolimitSkipReadError", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- (1) -->
<bean id="skipPolicy"
class="org.springframework.batch.core.step.skip.ExceptionClassifierSkipPolicy">
<property name="policyMap">
<map>
<!-- (2) -->
<entry key="org.springframework.batch.item.ItemReaderException"
value-ref="alwaysSkip"/>
</map>
</property>
</bean>
<!-- (3) -->
<bean id="alwaysSkip"
class="org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy"/>
<batch:job id="jobSalesPerfAtValidNolimitSkipReadError"
job-repository="jobRepository">
<batch:step id="jobSalesPerfAtValidNolimitSkipReadError.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- skip-limit value is dummy. -->
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"
skip-policy="skipPolicy"> <!-- (4) -->
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
例外別に実行したいスキップ方法を定義する。 |
(4) |
|
- ItemProcessorでのスキップ
-
ItemProcessor内でtry~catchをして、nullを返却する。
skip-policy
によるスキップは、ItemProcessorで再処理が発生するため利用しない。
詳細は、<skippable-exception-classes>を使わない理由についてを参照。
ItemProcessorにおける例外ハンドリンクの制約
<skippable-exception-classes>を使わない理由についてにあるように、
ItemProcessorでは、 |
スキップの実装例を説明する。
コーディングポイント(ItemProcessor編)の ItemProcessor内でtry~catchする実装例を スキップに対応させる。
@Component
public class AmountCheckProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {
// omitted.
@Override
public SalesPerformanceDetail process(SalesPerformanceDetail item) throws Exception {
// (1)
try {
checkAmount(item.getAmount(), amountLimit);
} catch (ArithmeticException ae) {
logger.warn("Exception occurred while processing. Skipped. [item:{}]",
item, ae); // (2)
return null; // (3)
}
return item;
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
(3) |
nullを返却することでエラーデータをスキップする。 |
- ItemWriterでのスキップ
-
ItemWriterにおいてスキップ処理は原則として行わない。
スキップが必要な場合でも、skip-policy
によるスキップは、チャンクサイズが変動するので利用しない。
詳細は、<skippable-exception-classes>を使わない理由についてを参照。
タスクレットモデル
ビジネスロジック内で例外をハンドリングし、独自にエラーレコードをスキップする処理を実装する。
タスクレットモデルにおける例外ハンドリングの 実装例を スキップ対応させる。
@Component
public class SalesPerformanceTasklet implements Tasklet {
private static final Logger logger =
LoggerFactory.getLogger(SalesPerformanceTasklet.class);
// omitted.
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// (1)
try {
reader.open(chunkContext.getStepContext().getStepExecution()
.getExecutionContext());
List<SalesPerformanceDetail> items = new ArrayList<>(10);
SalesPerformanceDetail item = null;
do {
// Pseudo operation of ItemReader
// omitted.
// Pseudo operation of ItemProcessor
checkAmount(item.getAmount(), amountLimit);
// Pseudo operation of ItemWriter
// omitted.
} while (item != null);
} catch (Exception e) {
logger.warn("exception in tasklet. Skipped.", e); // (2)
continue; // (3)
} finally {
try {
reader.close();
} catch (Exception e) {
// do nothing.
}
}
return RepeatStatus.FINISHED;
}
}
項番 | 説明 |
---|---|
(1) |
|
(2) |
例外ハンドリングを実装する |
(3) |
continueにより、エラーデータの処理をスキップする。 |
リトライ
例外を検知した場合に、規定回数に達するまで再処理する方法を説明する。
リトライには、状態管理の有無やリトライが発生するシチュエーションなどさまざまな要素を考慮する必要があり、 確実な方法は存在しないうえに、むやみにリトライするとかえって状況を悪化させてしまう。
そのため、本ガイドラインでは、局所的なリトライを実現するorg.springframework.retry.support.RetryTemplate
を利用する方法を説明する。
スキップと同様に |
public class RetryableAmountCheckProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> {
// omitted.
// (1)
private RetryPolicy retryPolicy;
@Override
public SalesPerformanceDetail process(SalesPerformanceDetail item)
throws Exception {
// (2)
RetryTemplate rt = new RetryTemplate();
if (retryPolicy != null) {
rt.setRetryPolicy(retryPolicy);
}
try {
// (3)
rt.execute(new RetryCallback<SalesPerformanceDetail, Exception>() {
@Override
public SalesPerformanceDetail doWithRetry(RetryContext context) throws Exception {
logger.info("execute with retry. [retry-count:{}]", context.getRetryCount());
// retry mocking
if (context.getRetryCount() == adjustTimes) {
item.setAmount(item.getAmount().divide(new BigDecimal(10)));
}
checkAmount(item.getAmount(), amountLimit);
return null;
}
});
} catch (ArithmeticException ae) {
// (4)
throw new IllegalStateException("check error at processor.", ae);
}
return item;
}
public void setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
}
}
@Bean
@StepScope
public RetryableAmountCheckProcessor amountCheckProcessor(
@Value("#{jobParameters['adjust-times']}") int adjustTimes,
SimpleRetryPolicy retryPolicy) {
RetryableAmountCheckProcessor processor = new RetryableAmountCheckProcessor();
processor.setAmountLimit(BigDecimal.valueOf(10000));
processor.setAdjustTimes(adjustTimes);
processor.setRetryPolicy(retryPolicy); // (5)
return processor;
}
// (6)
@Bean
public SimpleRetryPolicy retryPolicy(
Map<Class<? extends Throwable>, Boolean> exceptionMap) {
return new SimpleRetryPolicy(3, exceptionMap); // (7) (8)
}
// (9)
@Bean
public Map<Class<? extends Throwable>, Boolean> exceptionMap() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(UnsupportedOperationException.class, true);
return exceptionMap;
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer) {
return new StepBuilder("jobSalesPerfWithRetryPolicy.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job jobSalesPerfWithRetryPolicy(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfWithRetryPolicy", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- omitted -->
<bean id="amountCheckProcessor"
class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch06.exceptionhandling.RetryableAmountCheckProcessor"
scope="step"
p:retryPolicy-ref="retryPolicy"/> <!-- (5) -->
<!-- (6) (7) (8)-->
<bean id="retryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy"
c:maxAttempts="3"
c:retryableExceptions-ref="exceptionMap"/>
<!-- (9) -->
<util:map id="exceptionMap">
<entry key="java.lang.ArithmeticException" value="true"/>
</util:map>
<batch:job id="jobSalesPerfWithRetryPolicy" job-repository="jobRepository">
<batch:step id="jobSalesPerfWithRetryPolicy.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
リトライ条件を格納する |
(2) |
RetryTemplateのインスタンスを作成する。 |
(3) |
|
(4) |
リトライ回数が規定回数を超えた場合の例外ハンドリング。 |
(5) |
(6)で定義するリトライ条件を指定する。 |
(6) |
リトライ条件を、 |
(7) |
コンストラクタ引数の |
(8) |
コンストラクタ引数の |
(9) |
キーにリトライ対象の例外クラス、値に真偽値を設定したマップを定義する。 |
処理中断
ステップ実行を打ち切りたい場合、スキップ・リトライ対象以外のRuntimeExceptionもしくはそのサブクラスをスローする。
LimitCheckingItemSkipPolicyをもとに、スキップの実装例を示す。
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer,
SkipLoggingListener listener) {
return new StepBuilder("jobSalesPerfAtValidSkipReadError.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(2)
.skip(ValidationException.class) // (1)
.listener(listener)
.build();
}
@Bean
public Job jobSalesPerfAtValidSkipReadError(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfAtValidSkipReadError", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<batch:job id="jobSalesPerfAtValidSkipReadError" job-repository="jobRepository">
<batch:step id="jobSalesPerfAtValidSkipReadError.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"
skip-limit="2">
<batch:skippable-exception-classes>
<!-- (1) -->
<batch:include class="org.springframework.batch.item.validator.ValidationException"/>
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
リトライをもとに、リトライの実装例を示す。
@Bean
public SimpleRetryPolicy retryPolicy(
Map<Class<? extends Throwable>, Boolean> exceptionMap) {
return new SimpleRetryPolicy(3, exceptionMap);
}
@Bean
public Map<Class<? extends Throwable>, Boolean> exceptionMap() {
Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
exceptionMap.put(UnsupportedOperationException.class, true); // (1)
return exceptionMap;
}
@Bean
public Step step01(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
@Qualifier("detailCSVReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("amountCheckProcessor") ItemProcessor<SalesPerformanceDetail, SalesPerformanceDetail> processor,
@Qualifier("detailWriter") ItemWriter<SalesPerformanceDetail> writer) {
return new StepBuilder("jobSalesPerfWithRetryPolicy.step01",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(10,
transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
@Bean
public Job jobSalesPerfWithRetryPolicy(JobRepository jobRepository,
Step step01,
JobExecutionLoggingListener listener) {
return new JobBuilder("jobSalesPerfWithRetryPolicy", jobRepository)
.start(step01)
.listener(listener)
.build();
}
<!-- omitted -->
<bean id="retryPolicy" class="org.springframework.retry.policy.SimpleRetryPolicy"
c:maxAttempts="3"
c:retryableExceptions-ref="exceptionMap"/>
<util:map id="exceptionMap">
<!-- (1) -->
<entry key="java.lang.UnsupportedOperationException" value="true"/>
</util:map>
<batch:job id="jobSalesPerfWithRetryPolicy" job-repository="jobRepository">
<batch:step id="jobSalesPerfWithRetryPolicy.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="detailCSVReader"
processor="amountCheckProcessor"
writer="detailWriter" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
Appendix
<skippable-exception-classes>を使わない理由について
Spring Batchでは、ジョブ全体を対象としてスキップする例外を指定し、例外が発生したアイテムへの処理をスキップして処理を継続させる機能を提供している。
その機能は、以下のようにStepBuilderのskipメソッド/<chunk>要素配下に<skippable-exception-classes>要素
を設定し、スキップ対象の例外を指定する形で実装する。
@Bean
public Step retryStep(JobRepository jobRepository,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
ItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter) {
return new StepBuilder("flowJob.retryStep",
jobRepository)
.chunk(20, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.faultTolerant()
.skipLimit(10)
.skip(Exception.class)
.noSkip(NullPointerException.class)
.build();
}
@Bean
public Job flowJob(JobRepository jobRepository,
Step retryStep,
JobExecutionLoggingListener listener) {
return new JobBuilder("flowJob", jobRepository)
.start(retryStep)
.listener(listener)
.build();
}
<job id="flowJob">
<step id="retryStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter"
processor="itemProcessor" commit-interval="20"
skip-limit="10">
<skippable-exception-classes>
<!-- specify exceptions to the skipped -->
<include class="java.lang.Exception"/>
<exclude class="java.lang.NullPointerException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
</job>
この機能を利用することによって、入力チェックエラーが発生したレコードをスキップして後続データの処理を継続することは可能だが、 Macchinetta Batch 2.xでは以下の理由により使用しない。
-
skipメソッド/<skippable-exception-classes>要素
を利用して例外をスキップした場合、 1つのチャンクに含まれるデータ件数が変動するため、性能劣化を引き起こす可能性がある。-
これは、例外の発生箇所(
ItemReader
/ItemProcessor
/ItemWriter
)によって変わる。詳細は後述する。
-
skip/<skippable-exception-classes>を定義せずにSkipPolicyを利用することは必ず避ける
暗黙的にすべての例外が登録された状況になり、性能劣化の可能性が飛躍的に高まる。 |
例外発生箇所(ItemReader
/ItemProcessor
/ItemWriter
)ごとの挙動についてそれぞれ説明する。
なお、トランザクションの動作は例外の発生箇所によらず、例外が発生した場合は必ずロールバックした後、再度処理される。
- ItemReaderで例外が発生した場合
-
-
ItemReader
の処理内で例外が発生した場合は、次のitemへ処理対象が移る。 -
これによる副作用はない
-
- ItemProcessorで例外が発生した場合
-
-
ItemProcessor
の処理内で例外が発生した場合は、チャンクの最初に戻り1件目から再処理する。 -
再処理の対象にスキップされるitemは含まれない。
-
1度目の処理と再処理時のチャンクサイズは変わらない。
-
- ItemWriterで例外が発生した場合
-
-
ItemWriter
の処理内で例外が発生した場合は、チャンクの最初に戻り1件目から再処理する。 -
再処理は
ChunkSize=1
に固定し、1件ずつ実行される。 -
再処理対象にスキップされるitemも含まれる。
-
ItemProcessor
にて例外が発生した場合、ChunkSize=1000
の場合を例に考えると、
1000件目で例外が発生すると1件目から再処理が行われ、合計で1999件分の処理が実行されてしまう。
ItemWriter
にて例外が発生した場合、ChunkSize=1
に固定し再処理される。
仮にChunkSize=1000
の場合を例に考えると、
本来1回のトランザクションにも関わらず1000回のトランザクションに分割し処理されてしまう。
これらはジョブ全体の処理時間が長期化することを意味し、異常時に状況を悪化させる可能性が高い。 また、二重処理すること自体が問題化する可能性を秘めており、設計製造に追加の考慮事項を生む。
よって、skipメソッド/<skippable-exception-classes>
を使用することは推奨しない。
ItemReader
でエラーになったデータをスキップすることはこれらの問題を引き起こさないが、
事故を未然に防ぐためには基本的に避けるようにし、どうしても必要な場合に限定的に適用すること。