Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.1.RELEASE, 2018-3-9
> INDEX

Overview

排他制御とは、複数のトランザクションから同じリソースに対して、同時に更新処理が行われる際に、データの整合性を保つために行う処理のことである。 複数のトランザクションから同じリソースに対して、同時に更新処理が行われる可能性がある場合は、基本的に排他制御を行う必要がある。

ここでの複数トランザクションとは以下のことを指す。

  • 複数ジョブの同時実行時におけるトランザクション

  • オンライン処理との同時実行時におけるトランザクション

複数ジョブの排他制御

複数ジョブを同時実行する場合は、排他制御の必要がないようにジョブ設計を行うことが基本である。 これは、アクセスするリソースや処理対象をジョブごとに分割することが基本であることを意味する。

排他制御に関する概念は、オンライン処理と同様であるため、Macchinetta Server 1.x 開発ガイドラインにある 排他制御を参照してほしい。

ここでは、Macchinetta Server 1.xでは説明されていない部分を中心に説明をする。

本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。

排他制御の必要性

排他制御の必要性に関しては、Macchinetta Server 1.x 開発ガイドラインにある 排他制御の必要性を参照。

ファイルの排他制御

ファイルでの排他制御はファイルロックにより実現するのが一般的である。

ファイルロックとは

ファイルロックとは、ファイルをあるプログラムで使用している間、ほかのプログラムからの読み書きを制限する仕組みである。 ファイルロックの処理概要を以下に示す。

シナリオ
  • バッチ処理Aがファイルのロックを取得し、ファイルの更新処理を開始する。

  • バッチ処理Bが同一のファイルの更新を試みファイルのロック取得を試みるが失敗する。

  • バッチ処理Aが処理を終了し、ファイルのロックを解除する

ExclusiveControl_File_Senario
ファイルロックの処理概要
  1. バッチ処理A(Batch ProcessA)が対象ファイル(TargetFile)のロック取得を試みる。

  2. バッチ処理Aが、対象ファイルのロック取得に成功する

  3. バッチ処理B(Batch ProcessB)が、対象ファイルのロック取得を試みる

  4. バッチ処理Aが、対象ファイルに書き込みを行う

  5. バッチ処理Bは、バッチ処理Aがロック中であるため、対象ファイルのロック取得に失敗する

  6. バッチ処理Bが、ファイル更新失敗の処理を行う。

  7. バッチ処理Aが、対象ファイルのロックを開放する。

デッドロックの予防

ファイルにおいてもデータベースと同様に複数のファイルに対してロックを取得する場合、デッドロックとなる場合がある。 そのため、ファイルの更新順序をルール化することが重要である。
デッドロックの予防に関してはデータベースのテーブル間でのデッドロック防止と同様である。 詳細については、Macchinetta Server 1.x 開発ガイドラインの デッドロックの予防を参照。

データベースの排他制御

データベースの排他制御に関しては、Macchinetta Server 1.x 開発ガイドラインにある データベースのロック機能による排他制御 で詳しく説明されているため、そちらを参照のこと。

排他制御方式の使い分け

Macchinetta Batch 2.xでのロック方式と向いているシチュエーションを示す。

排他制御方式の使い分け
ロック方式 向いているシチュエーション

楽観ロック

同時実行時におけるトランザクションで、別トランザクションの更新結果を処理対象外にして処理を継続できる場合

悲観ロック

処理時間が長く、処理中に対象データの状況が変化したことによるやり直しが難しい処理
ファイルに対する排他制御が必要な処理

排他制御とコンポーネントの関係

Macchinetta Batch 2.xが提供する各コンポーネントと排他制御との関係は以下のとおり。

楽観ロック
排他制御とコンポーネントの関係
処理モデル コンポーネント ファイル データベース

チャンク

ItemReader

-

Versionカラムなど取得時と更新時とで同じデータであることが確認できるカラムを含めてデータ取得を行う。

ItemProcessor

-

排他制御は不要である。

ItemWriter

-

取得時と更新時との差分を確認し、他の処理で更新されていないことを確認した上で更新を行う。

タスクレット

Tasklet

-

データ取得時にはItemReader、データ更新時はItemWriterで説明した処理を実施する。
Mapperインターフェースを直接利用する場合も考え方は同じである。

ファイルに対する楽観ロック

ファイルの特性上、ファイルに対して楽観ロックを適用することがない。

悲観ロック
排他制御とコンポーネントの関係
処理モデル コンポーネント ファイル データベース

チャンク

ItemReader

-

悲観ロックを用いずにSELECT文を発行する。
ItemProcessorやItemWriterとは別コネクションになるため、ItemReaderでは排他制御は行わない。
SELECTで取得するデータは、ItemProcessorでデータを取得する条件とする必要最低限のデータ(キー情報)とすることで性能が向上する。

ItemProcessor

-

Mapperインターフェースを利用して、ItemReaderで取得したデータ(キー情報)を条件とするSQL文でSELECT FOR UPDATEを発行する。

ItemWriter

-

悲観ロックを行ったItemProcessorと同トランザクションとなるため、ItemWriterでは排他制御を意識することなくデータを更新する。

タスクレット

Tasklet

ItemStreamReaderでファイルをオープンした直後にファイルロックを取得する。
ItemStreamWriterをクローズする直前にファイルロックを開放する。

データ取得時にはSELECT FOR UPDATE文を発行するItemReaderかMapperインターフェースを直接利用する。
データ更新時はItemWriterで説明した処理を実施する。Mapperインターフェースを直接利用する場合も考え方は同じである。

チャンクモデルでのデータベースでの悲観ロックによる注意事項

ItemReaderで取得したデータ(キー情報)がItemProcessorへ渡される間は排他制御されず、他のトランザクションにより元のデータが更新されている可能性がある。 そのため、ItemProcessorがデータを取得する条件は、ItemReaderと同じデータ(キー情報)を取得する条件を含む必要がある。
ItemProcessorでデータが取得できない場合は、他のトランザクションにより更新されている可能性を考慮して、処理の継続または中断を検討し実装する必要がある。

ファイルに対する悲観ロック

ファイルに対する悲観ロックはタスクレットモデルで実装すること。 チャンクモデルではその構造上、チャンク処理の隙間で排他できない期間が存在してしまうためである。 また、ファイルアクセスはItemStreamReader/ItemStreamWriterをInjectして利用することを前提とする。

データベースでの悲観ロックによる待ち時間

悲観ロックを行う場合、競合により処理が待たされる時間が長くなる可能性がある。 その場合、NO WAITオプションやタイムアウト時間を指定して、悲観ロックを使用するのが妥当である。

How to use

排他制御の使い方をリソース別に説明する。

ファイルの排他制御

Macchinetta Batch 2.xにおけるファイルの排他制御はタスクレットを実装することで実現する。 排他の実現手段としては、java.nio.channels.FileChannelクラスを使用したファイルロック取得で排他制御を行う。

FileChannelクラスの詳細

FileChannelクラスの詳細、使用方法については Javadocを参照。

FileChannelクラスを使用しファイルのロックを取得する例を示す。

Tasklet実装
@Component
@Scope("step")
public class FileExclusiveTasklet implements Tasklet {

    private String targetPath = null; // (1)

    @Inject
    ItemStreamReader<SalesPlanDetail> reader;

    @Inject
    ItemStreamWriter<SalesPlanDetailWithProcessName> writer;

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        // omitted.

        FileChannel fc = null;
        FileLock fileLock = null;

        try {
            try {
                File file = new File(targetPath);
                fc = FileChannel.open(file.toPath(), StandardOpenOption.WRITE,
                        StandardOpenOption.CREATE,
                        StandardOpenOption.APPEND); // (2)
                fileLock = fc.tryLock();  // (3)
            } catch (IOException e) {
                logger.error("Failure other than lock acquisition", e);
                throw new FailedOtherAcquireLockException(
                        "Failure other than lock acquisition", e);
            }
            if (fileLock == null) {
                logger.error("Failed to acquire lock. [processName={}]", processName);
                throw new FailedAcquireLockException("Failed to acquire lock");
            }

            reader.open(executionConetxt);
            writer.open(executionConetxt);  // (4)

            // (5)
            SalesPlanDetail item;
            List<SalesPlanDetailWithProcessName> items = new ArrayList<>();
            while ((item = reader.read()) != null) {

                // omitted.

                items.add(item);
                if (items.size() >= 10) {
                    writer.write(items);
                    items.clear();
                }
            }
            if (items.size() > 0) {
                writer.write(items);
            }

        } finally {
            if (fileLock != null) {
                try {
                    fileLock.release();  // (6)
                } catch (IOException e) {
                    logger.warn("Lock release failed.", e);
                }
            }
            if (fc != null) {
                try {
                    fc.close();
                } catch (IOException e) {
                    // do nothing.
                }
            }
            try {
                writer.close(); // (7)
            } catch (ItemStreamException e) {
                // ignore
            }
            try {
                reader.close();
            } catch (ItemStreamException e) {
                // ignore
            }
        }
        return RepeatStatus.FINISHED;
    }

    // (8)
    @Value("#{jobParameters['outputFile']}")
    public void setTargetPath(String targetPath) {
        this.targetPath = targetPath;
    }
}
説明
項番 説明

(1)

排他対象のファイルパス。

(2)

ファイルチャネルを取得する。
この例では、ファイルの新規作成・追記・書き込みに対するチャネルを取得している。

(3)

ファイルロックを取得する。

(4)

ファイロックの取得に成功した場合、排他対象のファイルをオープンする。

(5)

ファイル出力を伴うビジネスロジックを実行する。

(6)

ファイルロックを開放する。

(7)

排他対象のファイルをクローズする。

(8)

ファイルパスを設定する。
この例では、ジョブパラメータから受け取るようにしている。

ロック取得に用いるFileChannelのメソッドについて

lock()メソッドは対象ファイルがロック済みの場合ロックが解除されるまで待機するため、待機されないtryLock()メソッドを使用することを推奨する。 なおtrylock()は共有ロックと排他ロックが選択できるが、バッチ処理においては、通常は排他ロックを用いる。

同一VMでのスレッド間の排他制御

同一VMにおけるスレッド間の排他制御は注意が必要である。 同一VMでのスレッド間でファイルに対する処理を行う場合、FileChannelクラスを用いたのロック機能では、ファイルが別スレッドの処理にてロックされているかの判定ができない。
そのため、スレッド間での排他制御は機能しない。これを回避するには、ファイルへの書き込みを行う部分で同期化処理をすることでスレッド間の排他制御が行える。
しかし、同期化を行うことで並列処理のメリットが薄れてしまい、単一スレッドで処理することと差異がなくなってしまう。 結果、同一のファイルに対して異なるスレッドで排他制御をして処理することは適していないため、そのような処理設計・実装を行わないこと。

FlatFileItemWriterのappendAllowedプロパティについて

ファイルを新規作成(上書き)する場合は、appendAllowedプロパティをfalse(デフォルト)にすることで、排他制御が実現できる。 これは、FlatFileItemWriterの内部でFileChannelを使って制御しているためである。 しかし、ファイルの追記(appendAllowedプロパティがtrue)の場合は、開発者がFileChannelによる排他制御を実装する必要がある。

データベースの排他制御

Macchinetta Batch 2.xにおけるデータベースの排他制御について説明する。

データベースの排他制御実装は、Macchinetta Server 1.x 開発ガイドラインにある MyBatis3使用時の実装方法が基本である。 本ガイドラインでは、 MyBatis3使用時の実装方法ができている前提で説明を行う。

排他制御とコンポーネントの関係にあるとおり、処理モデル・コンポーネントの組み合わせによるバリエーションがある。

データベースの排他制御のバリエーション

排他方式

処理モデル

コンポーネント

楽観ロック

チャンクモデル

ItemReader/ItemWriter

タスクレットモデル

ItemReader/ItemWriter

Mapperインターフェース

悲観ロック

チャンクモデル

ItemReader/ItemProcessor/ItemWriter

タスクレットモデル

ItemReader/ItemWriter

Mapperインターフェース

タスクレットモデルでMapperインターフェースを使用する場合は、 MyBatis3使用時の実装方法のとおりであるため、説明を割愛する。

タスクレットモデルでItemReader/ItemWriterを使用する場合は、Mapperインターフェースでの呼び出し部分がItemReader/ItemWriterに代わるだけなので、これも説明を割愛する。

よって、ここではチャンクモデルの排他制御について説明する。

楽観ロック

チャンクモデルでの楽観ロックについて説明する。

MyBatisBatchItemWriterがもつassertUpdatesプロパティの設定により、ジョブの振る舞いが変化するので業務要件に合わせて、適切に設定をする必要がある。

楽観ロックを行うジョブ定義を以下に示す。

ジョブ定義
<!-- (1) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchFindOne"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
    <property name="parameterValues">
        <map>
            <entry key="branchId" value="#{jobParameters['branchId']}"/>
        </map>
    </property>
</bean>

<!-- (2) --->
<bean id="writer"
      class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
      p:assertUpdates="true" />  <!-- (3) -->

<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="chunkOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader" processor="branchEditItemProcessor"
                         writer="writer" commit-interval="10" />
        </batch:tasklet>
    </batch:step>
</batch:job>
説明
項番 説明

(1)

楽観ロックによるデータ取得のSQLIDを設定する。

(2)

楽観ロックによるデータ更新のSQLIDを設定する。

(3)

バッチ更新の件数を検証有無を設定する。
true(デフォルト)に設定すると、更新件数が0件の場合に例外をスローする。
falseに設定すると、更新件数が0件の場合でも正常処理とする。

悲観ロック

チャンクモデルでの悲観ロックについて説明する。

悲観ロックを行うジョブ定義とItemProcessorを以下に示す。

ジョブ定義
<!-- (1) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
        template-ref="batchModeSqlSessionTemplate"/>

<!-- (2) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchIdFindByName"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues">
        <map>
            <entry key="branchName" value="#{jobParameters['branchName']}"/>
        </map>
    </property>
</bean>

<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchUpdate"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
      p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>

<batch:job id="chunkPessimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="chunkPessimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- (4) -->
            <batch:chunk reader="reader" processor="branchEditWithkPessimisticLockItemProcessor"
                         writer="writer" commit-interval="3"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
悲観ロックを行うItemProcessor
@Component
@Scope("step")
public class BranchEditWithkPessimisticLockItemProcessor implements ItemProcessor<String, ExclusiveBranch> {

    // (5)
    @Inject
    ExclusiveControlRepository exclusiveControlRepository;

    // (6)
    @Value("#{jobParameters['branchName']}")
    private String branchName;

    // omitted.

    @Override
    public ExclusiveBranch process(String item) throws Exception {

      // (7)
        Branch branch = exclusiveControlRepository.branchFindOneByNameWithNowWaitLock(item, branchName);

        if (branch != null) {
            ExclusiveBranch updatedBranch = new ExclusiveBranch();

            updatedBranch.setBranchId(branch.getBranchId());
            updatedBranch.setBranchName(branch.getBranchName() + " - " + identifier);
            updatedBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
            updatedBranch.setBranchTel(branch.getBranchTel());
            updatedBranch.setCreateDate(branch.getUpdateDate());
            updatedBranch.setUpdateDate(new Timestamp(clock.millis()));
            updatedBranch.setOldBranchName(branch.getBranchName());

            return updatedBranch;
        } else {
            // (8)
            logger.warn("An update by another user occurred. [branchId: {}]", item);
            return null;
        }
    }
}
説明
項番 説明

(1)

MapperインターフェースがItemWriterと同じ更新モードになるように、batchModeSqlSessionTemplateを設定する。

(2)

悲観ロックを用いないデータ取得のSQLIDを設定する。
抽出条件として、ジョブ起動パラメータから、branchNameを設定する。 このSQLによる取得項目は、(6)でデータを一意に特定するのに必要最低限に絞り込むことで性能を向上させることができる。

(3)

排他制御をしないデータ更新のSQLと同じSQLIDを設定する。

(4)

悲観ロックによるデータ取得を行うItemProcessorを設定する。

(5)

悲観ロックによるデータ取得を行うMapperインターフェースをインジェクションする。

(6)

悲観ロックの抽出条件とするため、ジョブ起動パラメータから、branchNameを設定する。

(7)

悲観ロックによるデータ取得のメソッドを呼び出す。
(2)の抽出条件と同じ条件を設定しているため、キー情報(id)の他にジョブ起動パラメータ``branchName`を引数に渡している。
NO WAITやタイムアウトを設定して悲観ロックを行い、他のトランザクションにより排他された時は、ここで例外が発生する。

(8)

他のトランザクションにより対象データが先に更新されて対象データを取得できない場合、悲観ロックによるデータ取得のメソッドがnullを返却する。
悲観ロックによるデータ取得のメソッドがnullを返却した場合、例外を発生させて処理を中断するなど業務要件に合せた処理が必要となる。
ここでは、WARNログを出力してnullを返却することで後続の処理を継続させる。

タスクレットモデルでの悲観ロックを行うコンポーネントについて

タスクレットモデルで悲観ロックを行う場合は、悲観ロックを行うSQL発行するItemReaderを用いる。Mapperインターフェースを直接利用する場合も同様である。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.0.1.RELEASE, 2018-3-9