Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28
> INDEX

Overview

排他制御とは、複数のトランザクションから同じリソースに対して、同時に更新処理が行われる際に、データの整合性を保つために行う処理のことである。 複数のトランザクションから同じリソースに対して、同時に更新処理が行われる可能性がある場合は、基本的に排他制御を行う必要がある。

ここでの複数トランザクションとは以下のことを指す。

  • 複数ジョブの同時実行時におけるトランザクション

  • オンライン処理との同時実行時におけるトランザクション

複数ジョブの排他制御

複数ジョブを同時実行する場合は、排他制御の必要がないようにジョブ設計を行うことが基本である。 これは、アクセスするリソースや処理対象をジョブごとに分割することが基本であることを意味する。

排他制御に関する概念は、オンライン処理と同様であるため、Macchinetta Server 1.x 開発ガイドラインにある 排他制御を参照。

ここでは、Macchinetta Server 1.xでは説明されていない部分を中心に説明をする。

本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。

排他制御の必要性

排他制御の必要性に関しては、Macchinetta Server 1.x 開発ガイドラインにある 排他制御の必要性を参照。

ファイルの排他制御

ファイルでの排他制御はファイルロックにより実現するのが一般的である。

ファイルロックとは

ファイルロックとは、ファイルをあるプログラムで使用している間、ほかのプログラムからの読み書きを制限する仕組みである。 ファイルロックの実施イメージを以下に示す。

シナリオ
  • バッチ処理Aがファイルのロックを取得し、ファイルの更新処理を開始する。

  • バッチ処理Bが同一のファイルの更新を試みファイルのロック取得を試みるが失敗する。

  • バッチ処理Aが処理を終了し、ファイルのロックを解除する

ExclusiveControl_File_Senario
図 1. ファイルロックの実施イメージ
  1. バッチ処理A(Batch ProcessA)が排他対象ファイル(TargetFile)のロック取得を試みる。

  2. バッチ処理Aが、排他対象ファイルのロック取得に成功する。

  3. バッチ処理B(Batch ProcessB)が、排他対象ファイルのロック取得を試みる。

  4. バッチ処理Aが、排他対象ファイルに書き込みを行う。

  5. バッチ処理Bは、バッチ処理Aがロック中であるため、排他対象ファイルのロック取得に失敗する。

  6. バッチ処理Bが、ファイル更新失敗の処理を行う。

  7. バッチ処理Aが、排他対象ファイルのロックを開放する。

デッドロックの予防

ファイルにおいてもデータベースと同様に複数のファイルに対してロックを取得する場合、デッドロックとなる場合がある。 そのため、ファイルの更新順序をルール化することが重要である。
デッドロックの予防に関してはデータベースのテーブル間でのデッドロック防止と同様である。 詳細については、Macchinetta Server 1.x 開発ガイドラインの デッドロックの予防を参照。

データベースの排他制御

データベースの排他制御に関しては、Macchinetta Server 1.x 開発ガイドラインにある データベースのロック機能による排他制御 で詳しく説明されているため、そちらを参照。

排他制御方式の使い分け

Macchinetta Batch 2.xでのロック方式と向いているシチュエーションを示す。

表 1. 排他制御方式の使い分け
ロック方式 向いているシチュエーション

楽観ロック

同時実行時におけるトランザクションで、別トランザクションの更新結果を処理対象外にして処理を継続できる場合

悲観ロック

処理時間が長く、処理中に対象データの状況が変化したことによるやり直しが難しい処理
ファイルに対する排他制御が必要な処理

排他制御とコンポーネントの関係

Macchinetta Batch 2.xが提供する各コンポーネントと排他制御との関係は以下のとおり。

楽観ロック
表 2. 排他制御とコンポーネントの関係
処理モデル コンポーネント ファイル データベース

チャンク

ItemReader

-

Versionカラムなど取得時と更新時とで同じデータであることが確認できるカラムを含めてデータ取得を行う。

ItemProcessor

-

排他制御は不要である。

ItemWriter

-

取得時と更新時との差分を確認し、他の処理で更新されていないことを確認した上で更新を行う。

タスクレット

Tasklet

-

データ取得時にはItemReader、データ更新時はItemWriterで説明した処理を実施する。
Mapperインタフェースを直接利用する場合も考え方は同じである。

ファイルに対する楽観ロック

ファイルの特性上、ファイルに対して楽観ロックを適用することがない。

悲観ロック
表 3. 排他制御とコンポーネントの関係
処理モデル コンポーネント ファイル データベース

チャンク

ItemReader

-

悲観ロックを用いずにSELECT文を発行する。
ItemProcessorやItemWriterとは別コネクションになるため、ItemReaderでは排他制御は行わない。
SELECTで取得するデータは、ItemProcessorでデータを取得する条件とする必要最低限のデータ(キー情報)とすることで性能が向上する。

ItemProcessor

-

Mapperインタフェースを利用して、ItemReaderで取得したデータ(キー情報)を条件とするSQL文でSELECT FOR UPDATEを発行する。

ItemWriter

-

悲観ロックを行ったItemProcessorと同トランザクションとなるため、ItemWriterでは排他制御を意識することなくデータを更新する。

タスクレット

Tasklet

ItemStreamReaderでファイルをオープンする直前にファイルロックを取得する。
ItemStreamWriterをクローズした直後にファイルロックを開放する。

データ取得時にはSELECT FOR UPDATE文を発行するItemReaderかMapperインタフェースを直接利用する。
データ更新時はItemWriterで説明した処理を実施する。Mapperインタフェースを直接利用する場合も考え方は同じである。

チャンクモデルでのデータベースでの悲観ロックによる注意事項

ItemReaderで取得したデータ(キー情報)がItemProcessorへ渡される間は排他制御されず、他のトランザクションによりもとのデータが更新されている可能性がある。 そのため、ItemProcessorがデータを取得する条件は、ItemReaderと同じデータ(キー情報)を取得する条件を含む必要がある。
ItemProcessorでデータが取得できない場合は、他のトランザクションにより更新されている可能性を考慮して、処理の継続または中断を検討し実装する必要がある。

ファイルに対する悲観ロック

ファイルに対する悲観ロックはタスクレットモデルで実装すること。 チャンクモデルではその構造上、チャンク処理の隙間で排他できない期間が存在してしまうためである。 また、ファイルアクセスはItemStreamReader/ItemStreamWriterをInjectして利用することを前提とする。

データベースでの悲観ロックによる待ち時間

悲観ロックを行う場合、競合により処理が待たされる時間が長くなる可能性がある。 その場合、NO WAITオプションやタイムアウト時間を指定して、悲観ロックを使用するのが妥当である。

How to use

排他制御の使い方をリソース別に説明する。

ファイルの排他制御

Macchinetta Batch 2.xにおけるファイルの排他制御はタスクレットを実装することで実現する。 排他の実現手段としては、java.nio.channels.FileChannelクラスを使用したファイルロック取得で排他制御を行う。

FileChannelクラスの詳細

FileChannelクラスの詳細、使用方法については Javadocを参照。

ただし、Spring Batchにおいて標準的なファイルの入出力機能を提供するFlatFileItemReader/FlatFileItemWriterから、java.nio.channels.FileChannelクラスを利用することはできない。 そのため、排他対象のファイルと一対一に排他対象ファイルのロックを担当するファイル(以降、ロック用ファイル)を用意し、ロック用ファイルからのファイルロックを取得をもって、排他対象ファイルへの排他的な制御権を取得できたと見なすことで排他制御を実現する。 ロック用ファイルを用いたファイルロックの実施イメージを以下に示す。

ExclusiveControl_LockFile_Senario
図 2. ロック用ファイルを用いたファイルロックの実施イメージ
  1. バッチ処理A(Batch ProcessA)がロック用ファイル(LockFile)のロック取得を試みる。

  2. バッチ処理Aが、ロック用ファイルのロック取得に成功する。

  3. バッチ処理B(Batch ProcessB)が、ロック用ファイルのロック取得を試みる。

  4. バッチ処理Aが、排他対象ファイルに書き込みを行う。

  5. バッチ処理Bは、バッチ処理Aがロック中であるため、ロック用ファイルのロック取得に失敗する。

  6. バッチ処理Bが、ファイル更新失敗の処理を行う。

  7. バッチ処理Aが、ロック用ファイルのロックを開放する。

ロック用ファイルの削除タイミング

後述するTasklet実装の例では、ロック用ファイルの削除について表現していないが、そのタイミングについては注意が必要である。 特にLinux環境では、ジョブの内部でロック用ファイルの削除を行うべきではない。

Linux環境ではファイルロック中のファイルを削除できるため、複数のプロセスでバッチ処理が実行される場合、あるプロセスがファイルロックを取得しているロック用ファイルを、別プロセスが削除する可能性がある。 このようなロック用ファイルの削除が起こると、また別のプロセスはロック用ファイルを作成し、作成したロック用ファイルからファイルロックを取得できる。

結果として、排他対象ファイルに書き込めるプロセスが複数になり、排他制御として機能しなくなる。

これを防ぐため、ロック用ファイルの削除は、排他制御を行うプロセスが存在しなくなったタイミングで行うなどの方法が考えられる。

ItemStreamWriterのtransactionalプロパティはfalseを設定する。 transactionalプロパティがデフォルトのtrueの場合、ファイル出力のタイミングがTransactionManagerと同期し、排他制御がされていない状態でのファイル出力が行われる。 以下に、ItemStreamWriterの設定例を示す。

@Bean
@StepScope
public FlatFileItemWriter<SalesPlanDetailWithProcessName> writer(
        @Value("#{jobParameters['outputFile']}") File outputFile) {
    DelimitedLineAggregator<SalesPlanDetailWithProcessName> lineAggregator = new DelimitedLineAggregator<>();
    BeanWrapperFieldExtractor<SalesPlanDetailWithProcessName> fieldExtractor = new BeanWrapperFieldExtractor<>();
    fieldExtractor.setNames(new String[] {"processName", "plan.branchId", "plan.year", "plan.month", "plan.customerId", "plan.amount"});
    lineAggregator.setFieldExtractor(fieldExtractor);
    return new FlatFileItemWriterBuilder<SalesPlanDetailWithProcessName>()
            .name(ClassUtils.getShortName(FlatFileItemWriter.class))
            .resource(new FileSystemResource(outputFile))
            .lineAggregator(lineAggregator)
            .transactional(false) // (1)
            .build();
}
<bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"
      p:resource="file:#{jobParameters['outputFile']}"
      p:transactional="false">  <!-- (1) -->
    <property name="lineAggregator">
        <bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <property name="fieldExtractor">
                <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
                      p:names="processName,plan.branchId,plan.year,plan.month,plan.customerId,plan.amount"/>
            </property>
        </bean>
    </property>
</bean>
表 4. 説明
項番 説明

(1)

FlatFileItemWriterBuilderのtransactionalメソッド/<bean>要素のtransactional属性falseを設定する。

FileChannelクラスを使用しファイルのロックを取得する例を示す。

Tasklet実装
@Component
@Scope("step")
public class FileExclusiveTasklet implements Tasklet {

    private String targetPath = null; // (1)

    @Inject
    ItemStreamReader<SalesPlanDetail> reader;

    @Inject
    ItemStreamWriter<SalesPlanDetailWithProcessName> writer;

    @Override
    public RepeatStatus execute(StepContribution contribution,
            ChunkContext chunkContext) throws Exception {

        // omitted.

        File file = new File(targetPath);
        try (FileChannel fc = FileChannel.open(file.toPath(),
                StandardOpenOption.WRITE,
                StandardOpenOption.CREATE,
                StandardOpenOption.APPEND); // (2)
                FileLock fileLock = fc.tryLock()) {  // (3)

            if (fileLock == null) {
                logger.error("Failed to acquire lock. [processName={}]", processName);
                throw new FailedAcquireLockException("Failed to acquire lock");
            }

            reader.open(executionContext);
            writer.open(executionContext);  // (4)

            // (5)
            SalesPlanDetail item;
            List<SalesPlanDetailWithProcessName> items = new ArrayList<>();
            while ((item = reader.read()) != null) {

                // omitted.

                items.add(item);
                if (items.size() >= 10) {
                    writer.write(new Chunk(items));
                    items.clear();
                }
            }
            if (items.size() > 0) {
                writer.write(new Chunk(items));
            }

        } catch (IOException e) {
            logger.error("Failure other than lock acquisition", e);
            throw new FailedOtherAcquireLockException("Failure other than lock acquisition", e);

        } finally {
            try {
                writer.close(); // (6)
            } catch (ItemStreamException e) {
                // ignore
            }
            try {
                reader.close();
            } catch (ItemStreamException e) {
                // ignore
            }
        }

        return RepeatStatus.FINISHED;
    }

    // (7)
    @Value("#{jobParameters['lockFile']}")
    public void setTargetPath(String targetPath) {
        this.targetPath = targetPath;
    }
}
表 5. 説明
項番 説明

(1)

ロック用ファイルのパス。

(2)

ロック用ファイルのファイルチャネルを取得する。
この例では、ファイルの新規作成・追記・書き込みに対するチャネルを取得している。

(3)

ロック用ファイルのファイルロックを取得する。

(4)

ロック用ファイルのファイルロック取得に成功した場合、排他対象のファイルをオープンする。
この例では、排他対象のファイルはジョブ定義ファイルで設定している。
設定方法は、ファイルアクセスを参照。

(5)

ファイル出力を伴うビジネスロジックを実行する。

(6)

排他対象のファイルをクローズする。

(7)

ロック用ファイルのパスを設定する。
この例では、ジョブパラメータから受け取るようにしている。

ロック取得に用いるFileChannelのメソッドについて

lock()メソッドは排他対象ファイルがロック済みの場合ロックが解除されるまで待機するため、待機されないtryLock()メソッドを使用することを推奨する。 なおtrylock()は共有ロックと排他ロックが選択できるが、バッチ処理においては、通常は排他ロックを用いる。

同一VMでのスレッド間の排他制御

同一VMにおけるスレッド間の排他制御は注意が必要である。 同一VMでのスレッド間でファイルに対する処理を行う場合、FileChannelクラスを用いたロック機能では、ファイルが別スレッドの処理にてロックされているかの判定ができない。
そのため、スレッド間での排他制御は機能しない。これを回避するには、ファイルへの書き込みを行う部分で同期化処理をすることでスレッド間の排他制御が行える。
しかし、同期化を行うことで並列処理のメリットが薄れてしまい、単一スレッドで処理することと差異がなくなってしまう。 結果、同一のファイルに対して異なるスレッドで排他制御をして処理することは適していないため、そのような処理設計・実装を行わないこと。

データベースの排他制御

Macchinetta Batch 2.xにおけるデータベースの排他制御について説明する。

データベースの排他制御実装は、Macchinetta Server 1.x 開発ガイドラインにある MyBatis3使用時の実装方法が基本である。 本ガイドラインでは、 MyBatis3使用時の実装方法ができている前提で説明を行う。

排他制御とコンポーネントの関係にあるとおり、処理モデル・コンポーネントの組み合わせによるバリエーションがある。

表 6. データベースの排他制御のバリエーション

排他方式

処理モデル

コンポーネント

楽観ロック

チャンクモデル

ItemReader/ItemWriter

タスクレットモデル

ItemReader/ItemWriter

Mapperインタフェース

悲観ロック

チャンクモデル

ItemReader/ItemProcessor/ItemWriter

タスクレットモデル

ItemReader/ItemWriter

Mapperインタフェース

タスクレットモデルでMapperインタフェースを使用する場合は、 MyBatis3使用時の実装方法のとおりであるため、説明を割愛する。

タスクレットモデルでItemReader/ItemWriterを使用する場合は、Mapperインタフェースでの呼び出し部分がItemReader/ItemWriterに代わるだけなので、これも説明を割愛する。

よって、ここではチャンクモデルの排他制御について説明する。

楽観ロック

チャンクモデルでの楽観ロックについて説明する。

MyBatisBatchItemWriterがもつassertUpdatesプロパティの設定により、ジョブの振る舞いが変化するので業務要件に合わせて、適切に設定をする必要がある。

楽観ロックを行うジョブ定義を以下に示す。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = { "org.terasoluna.batch.functionaltest.app.common",
"org.terasoluna.batch.functionaltest.ch05.exclusivecontrol"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository", sqlSessionFactoryRef = "jobSqlSessionFactory")
public class ChunkOptimisticLockCheckJobConfig {

    @Bean
    @StepScope
    public MyBatisCursorItemReader<Branch> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            @Value("#{jobParameters['branchId']}") String branchId) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("branchId", branchId);
        return new MyBatisCursorItemReaderBuilder<Branch>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .parameterValues(parameterValues)
                .queryId(
                        "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchFindOne") // (1)
                .build();
    }

    @Bean
    @StepScope
    public MyBatisBatchItemWriter<ExclusiveBranch> writer(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            SqlSessionTemplate batchModeSqlSessionTemplate,
            @Value("#{new Boolean(jobParameters['assertUpdates'])}") boolean assertUpdates) {
        return new MyBatisBatchItemWriterBuilder<ExclusiveBranch>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .statementId(
                        "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate") // (2)
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .assertUpdates(assertUpdates) // (3)
                .build();
    }

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       MyBatisCursorItemReader<Branch> reader,
                       MyBatisBatchItemWriter<ExclusiveBranch> writer,
                       BranchEditItemProcessor branchEditItemProcessor) {
        return new StepBuilder("chunkOptimisticLockCheckJob.step01",
                jobRepository)
                .<Branch, ExclusiveBranch> chunk(10, transactionManager)
                .reader(reader)
                .processor(branchEditItemProcessor)
                .writer(writer)
                .build();
    }

    @Bean
    public Job chunkOptimisticLockCheckJob(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener jobExecutionLoggingListener) {
        return new JobBuilder("chunkOptimisticLockCheckJob",jobRepository)
                .start(step01)
                .listener(jobExecutionLoggingListener)
                .build();
    }
}
<!-- (1) -->
<bean id="reader"
      class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchFindOne"
      p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
    <property name="parameterValues">
        <map>
            <entry key="branchId" value="#{jobParameters['branchId']}"/>
        </map>
    </property>
</bean>

<!-- (2) -->
<bean id="writer"
      class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
      p:assertUpdates="true" />  <!-- (3) -->

<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="chunkOptimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <batch:chunk reader="reader" processor="branchEditItemProcessor"
                         writer="writer" commit-interval="10" />
        </batch:tasklet>
    </batch:step>
</batch:job>
表 7. 説明
項番 説明

(1)

楽観ロックによるデータ取得のSQLIDを設定する。

(2)

楽観ロックによるデータ更新のSQLIDを設定する。

(3)

バッチ更新の件数を検証有無を設定する。
true(デフォルト)に設定すると、更新件数が0件の場合に例外をスローする。
falseに設定すると、更新件数が0件の場合でも正常処理とする。

悲観ロック

チャンクモデルでの悲観ロックについて説明する。

悲観ロックを行うジョブ定義とItemProcessorを以下に示す。

@Configuration
@Import(JobBaseContextConfig.class)
@ComponentScan(value = { "org.terasoluna.batch.functionaltest.app.common",
"org.terasoluna.batch.functionaltest.ch05.exclusivecontrol"}, scopedProxy = ScopedProxyMode.TARGET_CLASS)
@MapperScan(basePackages = "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository", sqlSessionTemplateRef = "batchModeSqlSessionTemplate") // (1)
public class ChunkPessimisticLockCheckJobConfig {

    @Bean
    @StepScope
    public MyBatisCursorItemReader<String> reader(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            @Value("#{jobParameters['branchName']}") String branchName) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("branchName", branchName);
        return new MyBatisCursorItemReaderBuilder<String>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .parameterValues(parameterValues)
                .queryId(
                        "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchIdFindByName") // (2)
                .build();
    }

    @Bean
    @StepScope
    public MyBatisBatchItemWriter<ExclusiveBranch> writer(
            @Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
            SqlSessionTemplate batchModeSqlSessionTemplate,
            @Value("#{new Boolean(jobParameters['assertUpdates'])}") boolean assertUpdates) {
        return new MyBatisBatchItemWriterBuilder<ExclusiveBranch>()
                .sqlSessionFactory(jobSqlSessionFactory)
                .statementId(
                        "org.terasoluna.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchUpdate") // (3)
                .sqlSessionTemplate(batchModeSqlSessionTemplate)
                .assertUpdates(assertUpdates)
                .build();
    }

    @Bean
    public Step step01(JobRepository jobRepository,
                       @Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager,
                       MyBatisCursorItemReader<String> reader,
                       MyBatisBatchItemWriter<ExclusiveBranch> writer,
                       BranchEditWithkPessimisticLockItemProcessor processor) {
        return new StepBuilder("chunkPessimisticLockCheckJob.step01",
                jobRepository)
                .<String, ExclusiveBranch> chunk(10, transactionManager)
                .reader(reader)
                .processor(processor) // (4)
                .writer(writer)
                .build();
    }

    @Bean
    public Job chunkPessimisticLockCheckJob(JobRepository jobRepository,
                                            Step step01,
                                            JobExecutionLoggingListener jobExecutionLoggingListener) {
        return new JobBuilder("chunkPessimisticLockCheckJob",jobRepository)
                .start(step01)
                .listener(jobExecutionLoggingListener)
                .build();
    }
}
<!-- (1) -->
<mybatis:scan
        base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
        template-ref="batchModeSqlSessionTemplate"/>

<!-- (2) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
      p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchIdFindByName"
      p:sqlSessionFactory-ref="jobSqlSessionFactory">
    <property name="parameterValues">
        <map>
            <entry key="branchName" value="#{jobParameters['branchName']}"/>
        </map>
    </property>
</bean>

<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
      p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchUpdate"
      p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
      p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>

<batch:job id="chunkPessimisticLockCheckJob" job-repository="jobRepository">
    <batch:step id="chunkPessimisticLockCheckJob.step01">
        <batch:tasklet transaction-manager="jobTransactionManager">
            <!-- (4) -->
            <batch:chunk reader="reader" processor="branchEditWithkPessimisticLockItemProcessor"
                         writer="writer" commit-interval="3"/>
        </batch:tasklet>
    </batch:step>
</batch:job>
悲観ロックを行うItemProcessor
@Component
@Scope("step")
public class BranchEditWithkPessimisticLockItemProcessor implements ItemProcessor<String, ExclusiveBranch> {

    // (5)
    @Inject
    ExclusiveControlRepository exclusiveControlRepository;

    // (6)
    @Value("#{jobParameters['branchName']}")
    private String branchName;

    // omitted.

    @Override
    public ExclusiveBranch process(String item) throws Exception {

      // (7)
        Branch branch = exclusiveControlRepository.branchFindOneByNameWithNowWaitLock(item, branchName);

        if (branch != null) {
            ExclusiveBranch updatedBranch = new ExclusiveBranch();

            updatedBranch.setBranchId(branch.getBranchId());
            updatedBranch.setBranchName(branch.getBranchName() + " - " + identifier);
            updatedBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
            updatedBranch.setBranchTel(branch.getBranchTel());
            updatedBranch.setCreateDate(branch.getUpdateDate());
            updatedBranch.setUpdateDate(new Timestamp(clock.millis()));
            updatedBranch.setOldBranchName(branch.getBranchName());

            return updatedBranch;
        } else {
            // (8)
            logger.warn("An update by another user occurred. [branchId: {}]", item);
            return null;
        }
    }
}
表 8. 説明
項番 説明

(1)

MapperインタフェースがItemWriterと同じ更新モードになるように、batchModeSqlSessionTemplateを設定する。

(2)

悲観ロックを用いないデータ取得のSQLIDを設定する。
抽出条件として、ジョブ起動パラメータから、branchNameを設定する。 このSQLによる取得項目は、(6)でデータを一意に特定するのに必要最低限に絞り込むことで性能を向上させることができる。

(3)

排他制御をしないデータ更新のSQLと同じSQLIDを設定する。

(4)

悲観ロックによるデータ取得を行うItemProcessorを設定する。

(5)

悲観ロックによるデータ取得を行うMapperインタフェースをインジェクションする。

(6)

悲観ロックの抽出条件とするため、ジョブ起動パラメータから、branchNameを設定する。

(7)

悲観ロックによるデータ取得のメソッドを呼び出す。
(2)の抽出条件と同じ条件を設定しているため、キー情報(id)の他にジョブ起動パラメータbranchNameを引数に渡している。
NO WAITやタイムアウトを設定して悲観ロックを行い、他のトランザクションにより排他された時は、ここで例外が発生する。

(8)

他のトランザクションにより対象データが先に更新されて対象データを取得できない場合、悲観ロックによるデータ取得のメソッドがnullを返却する。
悲観ロックによるデータ取得のメソッドがnullを返却した場合、例外を発生させて処理を中断するなど業務要件に合せた処理が必要となる。
ここでは、WARNログを出力してnullを返却することで後続の処理を継続させる。

タスクレットモデルでの悲観ロックを行うコンポーネントについて

タスクレットモデルで悲観ロックを行う場合は、悲観ロックを行うSQL発行するItemReaderを用いる。Mapperインタフェースを直接利用する場合も同様である。

Macchinetta Batch Framework (2.x) Development Guideline - version 2.5.0.RELEASE, 2024-3-28