前提
チュートリアルの進め方で説明しているとおり、
既に作成済みのジョブに対して、非同期実行していく形式とする。
なお、非同期実行方式にはDBポーリングを利用する方式と
Webコンテナを利用する方式がある。 |
概要
DBポーリングを利用してジョブを非同期実行する。
なお、本節はMacchinetta Batch 2.x 開発ガイドラインを元に説明しているため、
詳細については非同期実行(DBポーリング)を参照のこと。
前提のとおり既に作成済みのジョブがあるため、作成するアプリケーションの背景、処理概要、業務仕様は説明は割愛する。
以降では、DBポーリングによるジョブの非同期実行方法を以下の手順で説明する。
準備
非同期実行(DBポーリング)を行うための準備作業を実施する。
実施する作業は以下のとおり。
ポーリング処理の設定
非同期実行に必要な設定は、batch-application.properties
で行う。
Macchinetta Batch 2.xでは既に設定済みであるため、詳細な説明は割愛する。
各項目の説明は各種設定のポーリング処理の設定を参照のこと。
# TERASOLUNA AsyncBatchDaemon settings.
async-batch-daemon.scheduler.size=1
async-batch-daemon.schema.script=classpath:org/terasoluna/batch/async/db/schema-h2.sql
async-batch-daemon.job-concurrency-num=3
# (1)
async-batch-daemon.polling-interval=5000
async-batch-daemon.polling-initial-delay=1000
# (2)
async-batch-daemon.polling-stop-file-path=/tmp/stop-async-batch-daemon
項番 | 説明 |
---|---|
(1) |
ポーリング周期(ミリ秒単位)を設定する。 |
(2) |
非同期バッチデーモンを停止させるための終了ファイルパスを設定する。 |
ジョブの設定
非同期実行する対象のジョブは、async-batch-daemon.xml
のautomaticJobRegistrar
に設定する。
例としてデータベースアクセスでデータ入出力を行うジョブ(チャンクモデル)を指定した設定を以下に示す。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:c="http://www.springframework.org/schema/c"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- omitted -->
<bean id="automaticJobRegistrar" class="org.springframework.batch.core.configuration.support.AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.springframework.batch.core.configuration.support.ClasspathXmlApplicationContextsFactoryBean"
p:resources="classpath:/META-INF/jobs/dbaccess/jobPointAddChunk.xml" /> <!-- (1) -->
</property>
<property name="jobLoader">
<bean class="org.springframework.batch.core.configuration.support.DefaultJobLoader"
p:jobRegistry-ref="jobRegistry" />
</property>
</bean>
<!-- omitted -->
</beans>
項番 | 説明 |
---|---|
(1) |
非同期実行する対象ジョブのBean定義ファイルを指定する。 |
ジョブ設計上の留意点
非同期実行(DBポーリング)の特性上、同一ジョブの並列実行が可能になっているので、並列実行した場合に同一ジョブが影響を与えないようにする必要がある。 本チュートリアルでは、データベースアクセスのジョブとファイルアクセスのジョブで同じジョブIDを用いている。 チュートリアルの中で、これらのジョブを並列実行することはないが、同じジョブIDのジョブを複数指定する場合はエラーとなってしまうため、 ジョブの設計時に留意する必要がある。 |
入力リソースの設定
非同期実行でジョブを実行する際の入力リソース(データベース or ファイル)の設定を行う。
ここでは、正常系データを利用するジョブを実行する。
データベースアクセスするジョブとファイルアクセスするジョブの場合の入力リソースの設定を以下に示す。
- データベースアクセスするジョブの場合
-
batch-application.proeprties
のDatabase Initializeのスクリプトを以下のとおり設定する。
# Database Initialize
tutorial.create-table.script=file:sqls/create-member-info-table.sql
tutorial.insert-data.script=file:sqls/insert-member-info-data.sql
#tutorial.insert-data.script=file:sqls/insert-member-info-error-data.sql
- ファイルアクセスするジョブの場合
-
事前に、入力ファイルが配備されていること、および出力ディレクトリが存在することを確認しておくこと。
-
入力ファイル
-
files/input/input-member-info-data.csv
-
-
出力ディレクトリ
-
files/output/
-
-
本チュートリアルにおける入力リソースのデータ準備について
データベースアクセスするジョブの場合、非同期バッチデーモン起動時(ApplicationContext生成時)にINSERTのSQLを実行し、 データベースにデータを準備している。 ファイルアクセスするジョブの場合、入力ファイルをディレクトリに配置し、 ジョブ要求テーブルへジョブ情報の登録時にそのジョブ情報のパラメータ部として入出力ファイルのパスを指定する。 |
非同期バッチデーモンを起動
Macchinetta Batch 2.xが提供するAsyncBatchDaemon
を起動する。
実行構成を以下のとおり作成し、非同期バッチデーモンを起動する。
作成手順はRun Configuration(実行構成)の作成を参照。
項目名 | 値 |
---|---|
Name |
Run Job With AsyncBatchDaemon |
Project |
macchinetta-batch-tutorial |
Main class |
org.terasoluna.batch.async.db.AsyncBatchDaemon |
非同期バッチデーモンを起動すると、ポーリングプロセスが5秒間隔(batch-application.properties
のasync-batch-daemon.polling-interval
に指定したミリ秒)で実行される。
ログの出力例を以下に示す。
このログではポーリングプロセスが3回実行されたことがわかる。
[2017/09/06 18:53:23][main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon start.
(.. omitted)
[2017/09/06 18:53:27] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon will start watching the creation of a polling stop file. [Path:\tmp\stop-async-batch-daemon]
[2017/09/06 18:53:27] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:53:33] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:53:38] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
ジョブ要求テーブルにジョブ情報を登録
ジョブ要求テーブル(batch_job_request)にジョブを実行するための情報を登録するため、 SQL(INSERT文)を発行する。
ジョブ要求テーブルのテーブル仕様はポーリングするテーブルについてのジョブ要求テーブルの構造を参照のこと。
STS上でSQLを実行する方法を以下に記す。
-
Data Source Explorer Viewを表示
Viewの表示手順はSTSからデータベースを参照する準備を参照。
-
SQL Scrapbookを開く
データソースを右クリックし、「Open SQL Scrapbook」を押下する。
-
SQLを記述する
データベースアクセスするジョブとファイルアクセスするジョブを実行するためのSQLをチャンクモデルの例で以下に示す。
- データベースアクセスするジョブの場合
-
記述するSQLを以下に示す。
INSERT INTO batch_job_request(job_name,job_parameter,polling_status,create_date)
VALUES ('jobPointAddChunk', '', 'INIT', current_timestamp);
- ファイルアクセスするジョブの場合
-
記述するSQLを以下に示す。
INSERT INTO batch_job_request(job_name,job_parameter,polling_status,create_date)
VALUES ('jobPointAddChunk', 'inputFile=files/input/input-member-info-data.csv,outputFile=files/output/output-member_info_out.csv', 'INIT', current_timestamp);
SQL記述後のイメージを以下に記す。
ここでは、データベースアクセスするジョブの実行要求用SQLを記述している。
-
SQLを実行する
下図のとおり、SQL ScrapbookのConnection Profileを設定し、 余白部分を右クリック → [Execute All]を押下する。
Connection Profileの内容は、 STSからデータベースを参照する準備で設定した内容を元にしている。
-
SQLの実行結果を確認する
下図のとおり、SQL Results Viewで実行したSQLのStatus
がSucceeded
となっていることを確認する。
-
ジョブ要求テーブルを確認する
下図のとおり、ジョブ要求テーブルにジョブを実行するための情報が登録されていることを確認する。
POLLING_STATUS
はINIT
で登録したが、既にポーリングが行われた場合は、POLLING_STATUS
がPOLLED
もしくはEXECUTED
となっている。
POLLING_STATUS
の詳細についてはポーリングステータス(polling_status)の遷移パターンを参照。
ジョブの実行と結果の確認
非同期実行対象ジョブの実行結果を確認する。
コンソールログの確認
Console Viewを開き、以下の内容のログが出力されていることを確認する。
ここでは、処理が完了(COMPLETED)し、例外が発生しないことを確認する。
(.. omitted)
[2017/09/06 18:59:50] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:59:55] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobOperator] [INFO ] Checking status of job with name=jobPointAddChunk
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobOperator] [INFO ] Attempting to launch job with name=jobPointAddChunk and parameters=
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] launched with the following parameters: [{jsr_batch_run_id=117}]
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.j.SimpleStepHandler] [INFO ] Executing step: [jobPointAddChunk.step01]
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] completed with the following parameters: [{jsr_batch_run_id=117}] and the following status: [COMPLETED]
[2017/09/06 19:00:00] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 19:00:05] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing
終了コードの確認
非同期実行の場合、STS上で実行対象ジョブの終了コードを確認することはできない。
ジョブの実行状態はジョブ実行状態の確認で確認する。
出力リソースの確認
実行したジョブによって出力リソース(データベース or ファイル)を確認する。
データベースアクセスするジョブの場合
更新前後の会員情報テーブルの内容を比較し、確認内容のとおりとなっていることを確認する。
確認手順はData Source Explorerを使用してデータベースを参照するを参照。
- 確認内容
-
-
statusカラム
-
"0"(初期状態)のレコードが存在しないこと
-
-
pointカラム
-
ポイント加算対象について、会員種別に応じたポイントが加算されていること
-
typeカラムが"G"(ゴールド会員)の場合は100ポイント
-
typeカラムが"N"(一般会員)の場合は10ポイント
-
-
1,000,000(上限値)を超えたレコードが存在しないこと
-
-
更新前後の会員情報テーブルの内容を以下に示す。
ファイルアクセスするジョブの場合
会員情報ファイルの入出力内容を比較し、確認内容のとおりとなっていることを確認する。
- 確認内容
-
-
出力ディレクトリに会員情報ファイルが出力されていること
-
出力ファイル: files/output/output-member-info-data.csv
-
-
statusフィールド
-
"0"(初期状態)のレコードが存在しないこと
-
-
pointフィールド
-
ポイント加算対象について、会員種別に応じたポイントが加算されていること
-
typeフィールドが"G"(ゴールド会員)の場合は100ポイント
-
typeフィールドが"N"(一般会員)の場合は10ポイント
-
-
1,000,000(上限値)を超えたレコードが存在しないこと
-
-
会員情報ファイルの入出力内容を以下に示す。
ファイルのフィールドはid(会員番号)、type(会員種別)、status(商品購入フラグ)、point(ポイント)の順で出力される。
非同期バッチデーモンの停止
終了ファイルを作成し、非同期バッチデーモンを停止する。
ポーリング処理の設定で設定したとおり、 C:\tmpにstop-async-batch-daemonファイル(空ファイル)を作成する。
STSのコンソールで以下のとおり非同期バッチデーモンが停止していることを確認する。
(.. omitted)
[2017/09/08 21:41:41] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/08 21:41:44] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon has detected the polling stop file, and then shutdown now!
[2017/09/08 21:41:44] [main] [o.s.c.s.ClassPathXmlApplicationContext] [INFO ] Closing org.springframework.context.support.ClassPathXmlApplicationContext@6b2fad11: startup date [Fri Sep 08 21:41:01 JST 2017]; root of context hierarchy
[2017/09/08 21:41:44] [main] [o.s.b.c.c.s.GenericApplicationContextFactory$ResourceXmlApplicationContext] [INFO ] Closing ResourceXmlApplicationContext:file:/C:/dev/workspace/tutorial/Maccinetta-batch-tutorial/target/classes/META-INF/jobs/dbaccess/jobPointAddChunk.xml
[2017/09/08 21:41:44] [main] [o.s.c.s.DefaultLifecycleProcessor] [INFO ] Stopping beans in phase 0
[2017/09/08 21:41:44] [main] [o.t.b.a.d.JobRequestPollTask] [INFO ] JobRequestPollTask is called shutdown.
[2017/09/08 21:41:44] [main] [o.s.s.c.ThreadPoolTaskScheduler] [INFO ] Shutting down ExecutorService 'daemonTaskScheduler'
[2017/09/08 21:41:44] [main] [o.s.s.c.ThreadPoolTaskExecutor] [INFO ] Shutting down ExecutorService
[2017/09/08 21:41:44] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon stopped after all jobs completed.
ジョブ実行状態の確認
JobRepositoryのメタデータテーブルでジョブの状態・実行結果を確認する。ここでは、batch_job_execution
を参照する。
ジョブの状態を確認するためのSQLを以下に示す。
SELECT job_execution_id,start_time,end_time,exit_code FROM batch_job_execution WHERE job_execution_id =
(SELECT max(job_execution_id) FROM batch_job_request WHERE job_execution_id IS NOT NULL);
このSQLでは、最後に実行されたジョブの実行状態を取得するようにしている。
SQLの実行結果は、STS上でSQL実行後に表示されるSQL Results Viewにて確認できる。
下図のとおり、EXIT_CODE
がCOMPLETED
となっていることを確認する。