Overview
1つの業務処理を実装する方法として、1つのジョブに集約して実装するのではなく、 複数のジョブに分割し組み合わせることで実装することがある。 このとき、ジョブ間の依存関係を定義したものをジョブネットと呼ぶ。
ジョブネットを定義することのメリットを下記に挙げる。
-
処理の進行状況が可視化しやすくなる
-
ジョブの部分再実行、実行保留、実行中止が可能になる
-
ジョブの並列実行が容易になる
以上より、バッチ処理を設計する場合はジョブネットも併せてジョブ設計を行うことが一般的である。
処理内容とジョブネットの適性
分割するまでもないシンプルな業務処理やオンライン処理と連携する処理に対して、ジョブネットは適さないことが多い。 |
本ガイドラインでは、ジョブネットでジョブ同士の流れを制御することをフロー制御と呼ぶ。 また処理の流れにおける前のジョブを先行ジョブ、後のジョブを後続ジョブと呼び、 先行ジョブと後続ジョブの依存関係を、先行後続関係と呼ぶ。
フロー制御の概念図を以下に示す。
上図のとおり、フロー制御はジョブスケジューラ、Macchinetta Batch 2.xのどちらでも実施可能である。 しかし、以下の理由によりできる限りジョブスケジューラを活用することが望ましい。
-
1ジョブの処理や状態が多岐に渡る傾向が強まり、ブラックボックス化しやすい。
-
ジョブスケジューラとジョブの境界があいまいになってしまう
-
ジョブスケジューラ上から異常時の状況がみえにくくなってしまう
ただし、ジョブスケジューラに定義するジョブ数が多くなると、以下の様なデメリットが生じることも一般に知られている。
-
ジョブスケジューラによる以下のようなコストが累積し、システム全体の処理時間が伸びる
-
ジョブスケジューラ製品固有の通信、実行ノードの制御、など
-
ジョブごとのJavaプロセス起動に伴うオーバーヘッド
-
-
ジョブ登録数の限界
このため、以下を方針とする。
-
基本的にはジョブスケジューラによりフロー制御を行う。
-
ジョブ数が多いことによる弊害がある場合に限り、以下のとおり対処する。
-
Macchinetta Batch 2.xにてシーケンシャルな複数の処理を1ジョブにまとめる。
-
シンプルな先行後続関係を1ジョブに集約するのみとする。
-
ステップ終了コードの変更と、この終了コードに基づく後続ステップ起動の条件分岐は機能上利用可能だが、 ジョブの実行管理が複雑化するため、ジョブ終了時のプロセス終了コード決定に限り原則利用する。
どうしても条件分岐を使わないと問題を解消できない場合に限り使用を許容するが、 シンプルな先行後続関係を維持するよう配慮すること。
-
-
ジョブの終了コードの決定について、詳細は"終了コードのカスタマイズ"を参照。 |
また、以下に先行後続を実現する上で意識すべきポイントを示す。
-
ジョブスケジューラがシェル等を介してjavaプロセスを起動する。
-
1ジョブが1javaプロセスとなる。
-
処理全体では、4つのjavaプロセスが起動する。
-
-
ジョブスケジューラが各処理の起動順序を制御する。ぞれぞれのjavaプロセスは独立している。
-
後続ジョブの起動判定として、先行ジョブのプロセス終了コードが用いられる。
-
ジョブ間のデータ受け渡しは、ファイルやデータベースなど外部リソースを使用する必要がある。
-
ジョブスケジューラがシェル等を介してjavaプロセスを起動する。
-
1ジョブが1javaプロセスとなる。
-
処理全体では、1つのjavaプロセスしか使わない。
-
-
1javaプロセス内で各ステップの起動順序を制御する。それぞれのステップは独立している。
-
後続ステップの起動判定として、先行ステップの終了コードが用いられる。
-
ステップ間のデータはインメモリで受け渡しが可能である。
以降、Macchinetta Batch 2.xによるフロー制御の実現方法について説明する。
ジョブスケジューラでのフロー制御は製品仕様に強く依存するためここでは割愛する。
フロー制御の応用例
複数ジョブの並列化・多重化は、一般的にジョブスケジューラとジョブネットによって実現することが多い。 |
本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。
How to use
Macchinetta Batch 2.xでのフロー制御方法を説明する。
シーケンシャルフロー
シーケンシャルフローとは先行ステップと後続ステップを直列に連結したフローである。
何らかの業務処理がシーケンシャルフロー内のステップで異常終了した場合、後続ステップは実行されずにジョブが中断する。
このとき、JobRepository
によりジョブ実行IDに紐付けられる当該のステップとジョブのステータス・終了コードは
FAILED
として記録される。
失敗原因の回復後にリスタートを実施することで、異常終了したステップから処理をやり直すことができる。
ジョブのリスタート方法についてはジョブのリスタートを参照。 |
ここでは3つのステップからなるジョブのシーケンシャルフローを設定する。
// tasklet definition is omitted.
TaskletStepBuilder parentStepBuilder(String stepName,
JobRepository jobRepository,
SequentialFlowTasklet tasklet,
PlatformTransactionManager transactionManager) {
return new StepBuilder(stepName, jobRepository)
.tasklet(tasklet, transactionManager);
}
@Bean
public Step step01(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobSequentialFlow.step1", jobRepository,
tasklet, transactionManager)
.build();
}
@Bean
public Step step02(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobSequentialFlow.step2", jobRepository,
tasklet, transactionManager)
.build();
}
@Bean
public Step step03(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobSequentialFlow.step3", jobRepository,
tasklet, transactionManager)
.build();
}
@Bean
public Job jobSequentialFlow(JobRepository jobRepository,
Step step01, Step step02, Step step03) {
Flow flow = new FlowBuilder<Flow>("jobSequentialFlow")
.from(step01)
.next(step02) // (1)
.next(step03) // (1)
.build(); // (2)
return new JobBuilder("jobSequentialFlow", jobRepository)
.start(flow)
.end()
.build();
}
<!-- tasklet definition is omitted. -->
<batch:step id="parentStep">
<batch:tasklet ref="sequentialFlowTasklet"
transaction-manager="jobTransactionManager"/>
</batch:step>
<batch:job id="jobSequentialFlow" job-repository="jobRepository">
<batch:step id="jobSequentialFlow.step1"
next="jobSequentialFlow.step2" parent="parentStep"/> <!-- (1) -->
<batch:step id="jobSequentialFlow.step2"
next="jobSequentialFlow.step3" parent="parentStep"/> <!-- (1) -->
<batch:step id="jobSequentialFlow.step3" parent="parentStep"/> <!-- (2) -->
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
フローの末端になるステップには、 |
これにより、 以下の順でステップが直列に起動する。
jobSequentialFlow.step1
→ jobSequentialFlow.step2
→ jobSequentialFlow.step3
Flow/<batch:flow>を使った定義方法
前述の例では
|
ステップ間のデータの受け渡し
Spring Batchには、ステップ、ジョブそれぞれのスコープで利用できる実行コンテキストのExecutionContext
が用意されている。
ステップ実行コンテキストを利用することでステップ内のコンポーネント間でデータを共有できる。
このとき、ステップ実行コンテキストはステップ間で共有できないため、先行のステップ実行コンテキストは後続のステップ実行コンテキストからは参照できない。
ジョブ実行コンテキストを利用すれば実現可能だが、すべてのステップから参照可能になるため、慎重に扱う必要がある。
ステップ間の情報を引き継ぐ必要があるときは、以下の手順により対応できる。
-
先行ステップの後処理で、ステップ実行コンテキストに格納した情報をジョブ実行コンテキストに移す。
-
後続ステップがジョブ実行コンテキストから情報を取得する。
最初の手順は、Spring Batchから提供されているExecutionContextPromotionListener
を利用することで、
実装をせずとも、引き継ぎたい情報をリスナーに指定するだけ実現できる。
ExecutionContextを使用する上での注意点
データの受け渡しに使用する
また、実行コンテキストを経由せず、SingletonやJobスコープのBeanを共有することでも情報のやり取りは可能だが、 この方法もサイズが大きすぎるとメモリリソースを圧迫する可能性があるので注意すること。 |
以下、タスクレットモデルとチャンクモデルについて、それぞれステップ間のデータ受け渡しについて説明する。
タスクレットモデルを用いたステップ間のデータ受け渡し
受け渡しデータの保存・取得に、ChunkContext
からExecutionContext
を取得し、ステップ間のデータ受け渡しを行う。
// package, imports are omitted.
@Component
public class SavePromotionalTasklet implements Tasklet {
// omitted.
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// (1)
chunkContext.getStepContext().getStepExecution().getExecutionContext()
.put("promotion", "value1");
// omitted.
return RepeatStatus.FINISHED;
}
}
// package and imports are omitted.
@Component
public class ConfirmPromotionalTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) {
// (2)
Object promotion = chunkContext.getStepContext().getJobExecutionContext()
.get("promotion");
// omitted.
return RepeatStatus.FINISHED;
}
}
// import,annotation,component-scan definitions are omitted
// (3)
@Bean
ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] { "promotion" });
listener.setStrict(true);
return listener;
}
@Bean
public Step step1(JobRepository jobRepository,
@Qualifier("savePromotionalTasklet") SavePromotionalTasklet tasklet,
@Qualifier("executionContextPromotionListener") ExecutionContextPromotionListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("jobPromotionalFlow.step1", jobRepository)
.tasklet(tasklet, transactionManager)
.listener(listener)
.build();
}
@Bean
public Step step2(JobRepository jobRepository,
@Qualifier("confirmPromotionalTasklet") ConfirmPromotionalTasklet tasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("jobPromotionalFlow.step2", jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}
@Bean
public Job jobPromotionalFlow(JobRepository jobRepository,
Step step1, Step step2) {
return new JobBuilder("jobPromotionalFlow", jobRepository)
.start(step1)
.next(step2)
.build();
}
<!-- import,annotation,component-scan definitions are omitted -->
<batch:job id="jobPromotionalFlow" job-repository="jobRepository">
<batch:step id="jobPromotionalFlow.step1" next="jobPromotionalFlow.step2">
<batch:tasklet ref="savePromotionalTasklet"
transaction-manager="jobTransactionManager"/>
<batch:listeners>
<batch:listener>
<!-- (3) -->
<bean class="org.springframework.batch.core.listener.ExecutionContextPromotionListener"
p:keys="promotion"
p:strict="true"/>
</batch:listener>
</batch:listeners>
</batch:step>
<batch:step id="jobPromotionalFlow.step2">
<batch:tasklet ref="confirmPromotionalTasklet"
transaction-manager="jobTransactionManager"/>
</batch:step>
</batch:job>
<!-- omitted -->
項番 | 説明 |
---|---|
(1) |
ステップ実行コンテキストの |
(2) |
先行ステップの(1)で設定された受け渡しデータを |
(3) |
|
ExecutionContextPromotionListenerとステップ終了コードについて
|
チャンクモデルを用いたステップ間のデータ受け渡し
ItemProcessor
に@AfterStep
、@BeforeStep
アノテーションを付与したメソッドを使用する。
データ受け渡しに使用するリスナーと、ExecutionContext
の使用方法はタスクレットと同様である。
// package and imports are omitted.
@Component
@Scope("step")
public class PromotionSourceItemProcessor implements ItemProcessor<String, String> {
@Override
public String process(String item) {
// omitted.
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
// (1)
stepExecution.getExecutionContext().put("promotion", "value2");
return null;
}
}
// package and imports are omitted.
@Component
@Scope("step")
public class PromotionTargetItemProcessor implements ItemProcessor<String, String> {
@Override
public String process(String item) {
// omitted.
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// (2)
Object promotion = stepExecution.getJobExecution().getExecutionContext()
.get("promotion");
// omitted.
}
}
// (3)
@Bean
ExecutionContextPromotionListener executionContextPromotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] { "promotion" });
listener.setStrict(true);
return listener;
}
@Bean
public Step step1(JobRepository jobRepository,
@Qualifier("executionContextPromotionListener") ExecutionContextPromotionListener listener,
@Qualifier("listItemReader") ListItemReader<String> reader,
@Qualifier("promotionSourceItemProcessor") ItemProcessor<String, String> processor,
@Qualifier("promotionLogItemWriter") ItemWriter<String> writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
SimpleStepBuilder<String, String> builder
= parentStepBuilder("jobChunkPromotionalFlow.step1",
jobRepository, reader, processor, writer, transactionManager);
builder.listener(listener);
return builder.build();
}
@Bean
public Step step2(JobRepository jobRepository,
@Qualifier("listItemReader") ListItemReader<String> reader,
@Qualifier("promotionTargetItemProcessor") ItemProcessor<String, String> processor,
@Qualifier("promotionLogItemWriter") ItemWriter<String> writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
SimpleStepBuilder<String, String> builder
= parentStepBuilder("jobChunkPromotionalFlow.step2",
jobRepository, reader, processor, writer, transactionManager);
return builder.build();
}
@Bean
public Job jobChunkPromotionalFlow(JobRepository jobRepository,
Step step1, Step step2) {
return new JobBuilder("jobChunkPromotionalFlow", jobRepository)
.start(step1)
.next(step2)
.build();
}
SimpleStepBuilder<String, String> parentStepBuilder(String stepName,
JobRepository jobRepository,
ListItemReader<String> reader,
ItemProcessor<String, String> processor,
ItemWriter<String> writer,
PlatformTransactionManager transactionManager) {
return new StepBuilder(stepName, jobRepository)
.<String, String> chunk(1, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer);
}
<!-- import,annotation,component-scan definitions are omitted -->
<batch:job id="jobChunkPromotionalFlow" job-repository="jobRepository">
<batch:step id="jobChunkPromotionalFlow.step1" parent="sourceStep"
next="jobChunkPromotionalFlow.step2">
<batch:listeners>
<batch:listener>
<!-- (3) -->
<bean class="org.springframework.batch.core.listener.ExecutionContextPromotionListener"
p:keys="promotion"
p:strict="true" />
</batch:listener>
</batch:listeners>
</batch:step>
<batch:step id="jobChunkPromotionalFlow.step2" parent="targetStep"/>
</batch:job>
<!-- step definitions are omitted. -->
項番 | 説明 |
---|---|
(1) |
ステップ実行コンテキストの |
(2) |
先行ステップの(1)で設定された受け渡しデータを |
(3) |
|
How to extend
ここでは後続ステップの条件分岐と、条件により後続ステップ実行前にジョブを停止させる停止条件について説明する。
ジョブ・ステップの終了コードとステータスの違い。
以降の説明では「ステータス」と「終了コード」という言葉が頻繁に登場する。 |
条件分岐
条件分岐は先行ステップの実行結果となる終了コードを受けて、複数の後続ステップから1つを選択して継続実行させることを言う。
いずれの後続ステップを実行させずにジョブを停止させる場合は後述の"停止条件"を参照。
@Bean
public Step stepA(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobConditionalFlow.stepA", jobRepository,
tasklet, listener, transactionManager)
.build();
}
@Bean
public Step stepB(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobConditionalFlow.stepB", jobRepository,
tasklet, listener, transactionManager)
.build();
}
@Bean
public Step stepC(JobRepository jobRepository,
SequentialFlowTasklet tasklet,
ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobConditionalFlow.stepC", jobRepository,
tasklet, listener, transactionManager)
.build();
}
@Bean
public Job jobConditionalFlow(JobRepository jobRepository, Step stepA,
Step stepB, Step stepC) {
return new JobBuilder("jobConditionalFlow", jobRepository)
.start(stepA)
.on(FlowExecutionStatus.COMPLETED.getName()).to(stepB) // (1) (2)
.from(stepA)
.on(FlowExecutionStatus.FAILED.getName()).to(stepC) // (1) (3)
.end()
.build();
}
<batch:job id="jobConditionalFlow" job-repository="jobRepository">
<batch:step id="jobConditionalFlow.stepA" parent="conditionalFlow.parentStep">
<!-- (1) -->
<batch:next on="COMPLETED" to="jobConditionalFlow.stepB" />
<batch:next on="FAILED" to="jobConditionalFlow.stepC"/>
</batch:step>
<!-- (2) -->
<batch:step id="jobConditionalFlow.stepB" parent="conditionalFlow.parentStep"/>
<!-- (3) -->
<batch:step id="jobConditionalFlow.stepC" parent="conditionalFlow.parentStep"/>
</batch:job>
項番 | 説明 |
---|---|
(1) |
シーケンシャルフローのように |
(2) |
(1)のステップ終了コードが |
(3) |
(1)のステップ終了コードが |
後続ステップによる回復処理の注意点
先行ステップの処理失敗(終了コードが 後続ステップの回復処理が失敗した場合にジョブをリスタートすると、回復処理のみが再実行される。 |
停止条件
先行ステップの終了コードに応じ、ジョブを停止させる方法を説明する。
停止の手段として、以下の3つの要素を指定する方法がある。
-
end
-
fail
-
stop
これらの終了コードが先行ステップに該当する場合は後続ステップが実行されない。
また、同一ステップ内にそれぞれ複数指定が可能である。
TaskletStepBuilder parentStepBuilder(String stepName,
JobRepository jobRepository,
SequentialFlowTasklet tasklet,
ChangeExitCodeReturnListener listener,
PlatformTransactionManager transactionManager) {
return new StepBuilder(stepName, jobRepository)
.listener(listener)
.tasklet(tasklet, transactionManager);
}
@Bean
public Step step1(JobRepository jobRepository,
@Qualifier("stopFlowTasklet") SequentialFlowTasklet tasklet,
@Qualifier("changeExitCodeReturnListener") ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobStopFlow.step1", jobRepository, tasklet,
listener, transactionManager)
.build();
}
@Bean
public Step step2(JobRepository jobRepository,
@Qualifier("stopFlowTasklet") SequentialFlowTasklet tasklet,
@Qualifier("changeExitCodeReturnListener") ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobStopFlow.step2", jobRepository, tasklet,
listener, transactionManager)
.build();
}
@Bean
public Step step3(JobRepository jobRepository,
@Qualifier("stopFlowTasklet") SequentialFlowTasklet tasklet,
@Qualifier("changeExitCodeReturnListener") ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobStopFlow.step3", jobRepository, tasklet,
listener, transactionManager)
.build();
}
@Bean
public Step step4(JobRepository jobRepository,
@Qualifier("stopFlowTasklet") SequentialFlowTasklet tasklet,
@Qualifier("changeExitCodeReturnListener") ChangeExitCodeReturnListener listener,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return parentStepBuilder("jobStopFlow.step4", jobRepository, tasklet,
listener, transactionManager)
.build();
}
@Bean
public Job jobStopFlow(JobRepository jobRepository,
@Qualifier("jobExitCodeChangeListener") JobExitCodeChangeListener listener,
Step step1, Step step2, Step step3, Step step4) {
return new JobBuilder("jobStopFlow", jobRepository)
.start(step1)
.listener(listener)
.on("END_WITH_NO_EXIT_CODE").end()
.from(step1).on("END_WITH_EXIT_CODE").end("COMPLETED_CUSTOM")
.from(step1).on("*").to(step2)
.from(step2).on("FORCE_FAIL_WITH_NO_EXIT_CODE").fail()
.from(step2).on("FORCE_FAIL_WITH_EXIT_CODE").fail()
.from(step2).on("*").to(step3)
.from(step3).on("FORCE_STOP").stopAndRestart(step4)
.from(step3).on("FORCE_STOP_WITH_EXIT_CODE")
.stopAndRestart(step4)
.from(step3).on("*").to(step4)
.end()
.build();
}
<batch:job id="jobStopFlow" job-repository="jobRepository">
<batch:step id="jobStopFlow.step1" parent="stopFlow.parentStep">
<!-- (1) -->
<batch:end on="END_WITH_NO_EXIT_CODE"/>
<batch:end on="END_WITH_EXIT_CODE" exit-code="COMPLETED_CUSTOM"/>
<!-- (2) -->
<batch:next on="*" to="jobStopFlow.step2"/>
</batch:step>
<batch:step id="jobStopFlow.step2" parent="stopFlow.parentStep">
<!-- (3) -->
<batch:fail on="FORCE_FAIL_WITH_NO_EXIT_CODE"/>
<batch:fail on="FORCE_FAIL_WITH_EXIT_CODE" exit-code="FAILED_CUSTOM"/>
<!-- (2) -->
<batch:next on="*" to="jobStopFlow.step3"/>
</batch:step>
<batch:step id="jobStopFlow.step3" parent="stopFlow.parentStep">
<!-- (4) -->
<batch:stop on="FORCE_STOP" restart="jobStopFlow.step4" exit-code=""/>
<!-- (2) -->
<batch:next on="*" to="jobStopFlow.step4"/>
</batch:step>
<batch:step id="jobStopFlow.step4" parent="stopFlow.parentStep"/>
</batch:job>
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
|
(4) |
|
exit-code属性による終了コードのカスタマイズ時は漏れなくプロセス終了コードにマッピングさせること。
詳細は"終了コードのカスタマイズ"を参照。 |
XMLConfigを使用する場合は、<batch:stop>でexit-codeに空文字列を指定すること。
上記はstep1が正常終了した際ジョブは停止状態となり、再度リスタート実行時にstep2を実行させることを意図したフロー制御になっている。 これを回避するためには上述で示したように 不具合の詳細は Spring Batch/BATCH-2315 を参照。 |