Macchinetta Batch Framework (2.x) Development Guideline - version 2.1.1.RELEASE, 2019-3-26
> INDEX
前提

チュートリアルの進め方で説明しているとおり、 既に作成済みのジョブに対して、非同期実行していく形式とする。 なお、非同期実行方式にはDBポーリングを利用する方式Webコンテナを利用する方式がある。
ただし、記述はDBポーリングを利用してジョブを非同期実行する場合の説明としているため留意すること。

概要

DBポーリングを利用してジョブを非同期実行する。

なお、本節はMacchinetta Batch 2.x 開発ガイドラインを元に説明しているため、 詳細については非同期実行(DBポーリング)を参照のこと。
前提のとおり既に作成済みのジョブがあるため、作成するアプリケーションの背景、処理概要、業務仕様は説明は割愛する。

以降では、DBポーリングによるジョブの非同期実行方法を以下の手順で説明する。

準備

非同期実行(DBポーリング)を行うための準備作業を実施する。

実施する作業は以下のとおり。

ポーリング処理の設定

非同期実行に必要な設定は、batch-application.propertiesで行う。
Macchinetta Batch 2.xでは既に設定済みであるため、詳細な説明は割愛する。 各項目の説明は各種設定のポーリング処理の設定を参照のこと。

src/main/resources/batch-application.properties
# TERASOLUNA AsyncBatchDaemon settings.
async-batch-daemon.scheduler.size=1
async-batch-daemon.schema.script=classpath:org/terasoluna/batch/async/db/schema-h2.sql
async-batch-daemon.job-concurrency-num=3
# (1)
async-batch-daemon.polling-interval=5000
async-batch-daemon.polling-initial-delay=1000
# (2)
async-batch-daemon.polling-stop-file-path=/tmp/stop-async-batch-daemon
説明
項番 説明

(1)

ポーリング周期(ミリ秒単位)を設定する。
ここでは、5000ミリ秒(5秒)を指定する。

(2)

非同期バッチデーモンを停止させるための終了ファイルパスを設定する。
本チュートリアルは、Windows環境で実施することを前提としているため、 この設定の場合はC:\tmp配下にstop-async-batch-daemonファイルを置くことになる。

ジョブの設定

非同期実行する対象のジョブは、async-batch-daemon.xmlautomaticJobRegistrarに設定する。

例としてデータベースアクセスでデータ入出力を行うジョブ(チャンクモデル)を指定した設定を以下に示す。

src/main/resources/META-INF/spring/async-batch-daemon.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jdbc="http://www.springframework.org/schema/jdbc"
       xmlns:c="http://www.springframework.org/schema/c"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- omitted -->

    <bean id="automaticJobRegistrar" class="org.springframework.batch.core.configuration.support.AutomaticJobRegistrar">
        <property name="applicationContextFactories">
            <bean class="org.springframework.batch.core.configuration.support.ClasspathXmlApplicationContextsFactoryBean"
                p:resources="classpath:/META-INF/jobs/dbaccess/jobPointAddChunk.xml" /> <!-- (1) -->
        </property>
        <property name="jobLoader">
            <bean class="org.springframework.batch.core.configuration.support.DefaultJobLoader"
                p:jobRegistry-ref="jobRegistry" />
        </property>
    </bean>

    <!-- omitted -->

</beans>
説明
項番 説明

(1)

非同期実行する対象ジョブのBean定義ファイルを指定する。
ワイルドカード(**/*)の利用も可能である。ジョブの指定に際しては ジョブの設定に 注意事項等の記載があるため参照してほしい。

ジョブ設計上の留意点

非同期実行(DBポーリング)の特性上、同一ジョブの並列実行が可能になっているので、並列実行した場合に同一ジョブが影響を与えないようにする必要がある。

本チュートリアルでは、データベースアクセスのジョブとファイルアクセスのジョブで同じジョブIDを用いている。 チュートリアルの中で、これらのジョブを並列実行することはないが、同じジョブIDのジョブを複数指定する場合はエラーとなってしまうため、 ジョブの設計時に留意する必要がある。

入力リソースの設定

非同期実行でジョブを実行する際の入力リソース(データベース or ファイル)の設定を行う。
ここでは、正常系データを利用するジョブを実行する。

データベースアクセスするジョブとファイルアクセスするジョブの場合の入力リソースの設定を以下に示す。

データベースアクセスするジョブの場合

batch-application.propertiesのDatabase Initializeのスクリプトを以下のとおり設定する。

src/main/resources/batch-application.properties
# Database Initialize
tutorial.create-table.script=file:sqls/create-member-info-table.sql
tutorial.insert-data.script=file:sqls/insert-member-info-data.sql
#tutorial.insert-data.script=file:sqls/insert-member-info-error-data.sql
ファイルアクセスするジョブの場合

事前に、入力ファイルが配備されていること、および出力ディレクトリが存在することを確認しておくこと。

  • 入力ファイル

    • files/input/input-member-info-data.csv

  • 出力ディレクトリ

    • files/output/

本チュートリアルにおける入力リソースのデータ準備について

データベースアクセスするジョブの場合、非同期バッチデーモン起動時(ApplicationContext生成時)にINSERTのSQLを実行し、 データベースにデータを準備している。

ファイルアクセスするジョブの場合、入力ファイルをディレクトリに配置し、 ジョブ要求テーブルへジョブ情報の登録時にそのジョブ情報のパラメータ部として入出力ファイルのパスを指定する。

非同期バッチデーモンを起動

Macchinetta Batch 2.xが提供するAsyncBatchDaemonを起動する。

実行構成を以下のとおり作成し、非同期バッチデーモンを起動する。
作成手順はRun Configuration(実行構成)の作成を参照。

Run ConfigurationsのMainタブで設定する値
項目名

Name

Run Job With AsyncBatchDaemon
(任意の値を設定する)

Project

macchinetta-batch-tutorial

Main class

org.terasoluna.batch.async.db.AsyncBatchDaemon

非同期バッチデーモンを起動すると、ポーリングプロセスが5秒間隔(batch-application.propertiesasync-batch-daemon.polling-intervalに指定したミリ秒)で実行される。
ログの出力例を以下に示す。
このログではポーリングプロセスが3回実行されたことがわかる。

コンソールログの出力例
[2017/09/06 18:53:23][main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon start.

(.. omitted)

[2017/09/06 18:53:27] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon will start watching the creation of a polling stop file. [Path:\tmp\stop-async-batch-daemon]
[2017/09/06 18:53:27] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:53:33] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:53:38] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.

ジョブ要求テーブルにジョブ情報を登録

ジョブ要求テーブル(batch_job_request)にジョブを実行するための情報を登録するため、 SQL(INSERT文)を発行する。

ジョブ要求テーブルのテーブル仕様はポーリングするテーブルについてのジョブ要求テーブルの構造を参照のこと。

STS上でSQLを実行する方法を以下に記す。

SQL実行手順
  1. Data Source Explorer Viewを表示
    Viewの表示手順はSTSからデータベースを参照する準備を参照。

Data Source Explorer View
Data Source Explorer View
  1. SQL Scrapbookを開く
    データソースを右クリックし、「Open SQL Scrapbook」を押下する。

Open SQL Scrapbook
SQL Scrapbook
  1. SQLを記述する
    データベースアクセスするジョブとファイルアクセスするジョブを実行するためのSQLをチャンクモデルの例で以下に示す。

データベースアクセスするジョブの場合

記述するSQLを以下に示す。

データベースアクセスするジョブの実行要求用SQL
INSERT INTO batch_job_request(job_name,job_parameter,polling_status,create_date)
VALUES ('jobPointAddChunk', '', 'INIT', current_timestamp);
ファイルアクセスするジョブの場合

記述するSQLを以下に示す。

ファイルアクセスするジョブの実行要求用SQL
INSERT INTO batch_job_request(job_name,job_parameter,polling_status,create_date)
VALUES ('jobPointAddChunk', 'inputFile=files/input/input-member-info-data.csv,outputFile=files/output/output-member_info_out.csv', 'INIT', current_timestamp);

SQL記述後のイメージを以下に記す。
ここでは、データベースアクセスするジョブの実行要求用SQLを記述している。

Entering SQL
SQL入力
  1. SQLを実行する
    下図のとおり、SQL ScrapbookのConnection Profileを設定し、 余白部分を右クリック → [Execute All]を押下する。
    Connection Profileの内容は、 STSからデータベースを参照する準備で設定した内容を元にしている。

Execute SQL
SQL実行
  1. SQLの実行結果を確認する
    下図のとおり、SQL Results Viewで実行したSQLのStatusSucceededとなっていることを確認する。

Confirm SQL Results
SQL実行結果確認
  1. ジョブ要求テーブルを確認する
    下図のとおり、ジョブ要求テーブルにジョブを実行するための情報が登録されていることを確認する。
    POLLING_STATUSINITで登録したが、既にポーリングが行われた場合は、POLLING_STATUSPOLLEDもしくはEXECUTEDとなっている。
    POLLING_STATUSの詳細についてはポーリングステータス(polling_status)の遷移パターンを参照。

Confrim batch_job_Request Table
ジョブ要求テーブルの確認

ジョブの実行と結果の確認

非同期実行対象ジョブの実行結果を確認する。

コンソールログの確認

Console Viewを開き、以下の内容のログが出力されていることを確認する。
ここでは、処理が完了(COMPLETED)し、例外が発生しないことを確認する。

コンソールログ出力例
(.. omitted)

[2017/09/06 18:59:50] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:59:55] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobOperator] [INFO ] Checking status of job with name=jobPointAddChunk
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobOperator] [INFO ] Attempting to launch job with name=jobPointAddChunk and parameters=
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] launched with the following parameters: [{jsr_batch_run_id=117}]
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.j.SimpleStepHandler] [INFO ] Executing step: [jobPointAddChunk.step01]
[2017/09/06 18:59:55] [daemonTaskExecutor-1] [o.s.b.c.l.s.SimpleJobLauncher] [INFO ] Job: [FlowJob: [name=jobPointAddChunk]] completed with the following parameters: [{jsr_batch_run_id=117}] and the following status: [COMPLETED]
[2017/09/06 19:00:00] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/06 19:00:05] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing

終了コードの確認

非同期実行の場合、STS上で実行対象ジョブの終了コードを確認することはできない。
ジョブの実行状態はジョブ実行状態の確認で確認する。

出力リソースの確認

実行したジョブによって出力リソース(データベース or ファイル)を確認する。

データベースアクセスするジョブの場合

更新前後の会員情報テーブルの内容を比較し、確認内容のとおりとなっていることを確認する。
確認手順はData Source Explorerを使用してデータベースを参照するを参照。

確認内容
  • statusカラム

    • "0"(初期状態)のレコードが存在しないこと

  • pointカラム

    • ポイント加算対象について、会員種別に応じたポイントが加算されていること

      • typeカラムが"G"(ゴールド会員)の場合は100ポイント

      • typeカラムが"N"(一般会員)の場合は10ポイント

    • 1,000,000(上限値)を超えたレコードが存在しないこと

更新前後の会員情報テーブルの内容を以下に示す。

Table of member_info
更新前後の会員情報テーブルの内容

ファイルアクセスするジョブの場合

会員情報ファイルの入出力内容を比較し、確認内容のとおりとなっていることを確認する。

確認内容
  • 出力ディレクトリに会員情報ファイルが出力されていること

    • 出力ファイル: files/output/output-member-info-data.csv

  • statusフィールド

    • "0"(初期状態)のレコードが存在しないこと

  • pointフィールド

    • ポイント加算対象について、会員種別に応じたポイントが加算されていること

      • typeフィールドが"G"(ゴールド会員)の場合は100ポイント

      • typeフィールドが"N"(一般会員)の場合は10ポイント

    • 1,000,000(上限値)を超えたレコードが存在しないこと

会員情報ファイルの入出力内容を以下に示す。
ファイルのフィールドはid(会員番号)、type(会員種別)、status(商品購入フラグ)、point(ポイント)の順で出力される。

File of member_info
会員情報ファイルの入出力内容

非同期バッチデーモンの停止

終了ファイルを作成し、非同期バッチデーモンを停止する。

ポーリング処理の設定で設定したとおり、 C:\tmpにstop-async-batch-daemonファイル(空ファイル)を作成する。

Stop AsyncBatchDaemon
終了ファイル作成

STSのコンソールで以下のとおり非同期バッチデーモンが停止していることを確認する。

非同期バッチデーモンの停止を確認
(.. omitted)

[2017/09/08 21:41:41] [daemonTaskScheduler-1] [o.t.b.a.d.JobRequestPollTask] [INFO ] Polling processing.
[2017/09/08 21:41:44] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon has detected the polling stop file, and then shutdown now!
[2017/09/08 21:41:44] [main] [o.s.c.s.ClassPathXmlApplicationContext] [INFO ] Closing org.springframework.context.support.ClassPathXmlApplicationContext@6b2fad11: startup date [Fri Sep 08 21:41:01 JST 2017]; root of context hierarchy
[2017/09/08 21:41:44] [main] [o.s.b.c.c.s.GenericApplicationContextFactory$ResourceXmlApplicationContext] [INFO ] Closing ResourceXmlApplicationContext:file:/C:/dev/workspace/tutorial/macchinetta-batch-tutorial/target/classes/META-INF/jobs/dbaccess/jobPointAddChunk.xml
[2017/09/08 21:41:44] [main] [o.s.c.s.DefaultLifecycleProcessor] [INFO ] Stopping beans in phase 0
[2017/09/08 21:41:44] [main] [o.t.b.a.d.JobRequestPollTask] [INFO ] JobRequestPollTask is called shutdown.
[2017/09/08 21:41:44] [main] [o.s.s.c.ThreadPoolTaskScheduler] [INFO ] Shutting down ExecutorService 'daemonTaskScheduler'
[2017/09/08 21:41:44] [main] [o.s.s.c.ThreadPoolTaskExecutor] [INFO ] Shutting down ExecutorService
[2017/09/08 21:41:44] [main] [o.t.b.a.d.AsyncBatchDaemon] [INFO ] Async Batch Daemon stopped after all jobs completed.

ジョブ実行状態の確認

JobRepositoryのメタデータテーブルでジョブの状態・実行結果を確認する。ここでは、batch_job_executionを参照する。

ジョブの状態を確認するためのSQLを以下に示す。

ジョブの状態確認用SQL
SELECT job_execution_id,start_time,end_time,exit_code FROM batch_job_execution WHERE job_execution_id =
(SELECT max(job_execution_id) FROM batch_job_request WHERE job_execution_id IS NOT NULL);

このSQLでは、最後に実行されたジョブの実行状態を取得するようにしている。

SQLの実行結果は、STS上でSQL実行後に表示されるSQL Results Viewにて確認できる。
下図のとおり、EXIT_CODECOMPLETEDとなっていることを確認する。

SQL Results View
ジョブの状態確認
Macchinetta Batch Framework (2.x) Development Guideline - version 2.1.1.RELEASE, 2019-3-26