Overview
排他制御とは、複数のトランザクションから同じリソースに対して、同時に更新処理が行われる際に、データの整合性を保つために行う処理のことである。 複数のトランザクションから同じリソースに対して、同時に更新処理が行われる可能性がある場合は、基本的に排他制御を行う必要がある。
ここでの複数トランザクションとは以下のことを指す。
-
複数ジョブの同時実行時におけるトランザクション
-
オンライン処理との同時実行時におけるトランザクション
複数ジョブの排他制御
複数ジョブを同時実行する場合は、排他制御の必要がないようにジョブ設計を行うことが基本である。 これは、アクセスするリソースや処理対象をジョブごとに分割することが基本であることを意味する。 |
排他制御に関する概念は、オンライン処理と同様であるため、Macchinetta Server 1.x 開発ガイドラインにある 排他制御を参照。
ここでは、Macchinetta Server 1.xでは説明されていない部分を中心に説明をする。
本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。
排他制御の必要性
排他制御の必要性に関しては、Macchinetta Server 1.x 開発ガイドラインにある 排他制御の必要性を参照。
ファイルの排他制御
ファイルでの排他制御はファイルロックにより実現するのが一般的である。
- ファイルロックとは
-
ファイルロックとは、ファイルをあるプログラムで使用している間、ほかのプログラムからの読み書きを制限する仕組みである。 ファイルロックの実施イメージを以下に示す。
-
バッチ処理Aがファイルのロックを取得し、ファイルの更新処理を開始する。
-
バッチ処理Bが同一のファイルの更新を試みファイルのロック取得を試みるが失敗する。
-
バッチ処理Aが処理を終了し、ファイルのロックを解除する
-
バッチ処理A(Batch ProcessA)が排他対象ファイル(TargetFile)のロック取得を試みる。
-
バッチ処理Aが、排他対象ファイルのロック取得に成功する。
-
バッチ処理B(Batch ProcessB)が、排他対象ファイルのロック取得を試みる。
-
バッチ処理Aが、排他対象ファイルに書き込みを行う。
-
バッチ処理Bは、バッチ処理Aがロック中であるため、排他対象ファイルのロック取得に失敗する。
-
バッチ処理Bが、ファイル更新失敗の処理を行う。
-
バッチ処理Aが、排他対象ファイルのロックを開放する。
デッドロックの予防
ファイルにおいてもデータベースと同様に複数のファイルに対してロックを取得する場合、デッドロックとなる場合がある。
そのため、ファイルの更新順序をルール化することが重要である。 |
データベースの排他制御
データベースの排他制御に関しては、Macchinetta Server 1.x 開発ガイドラインにある データベースのロック機能による排他制御 で詳しく説明されているため、そちらを参照。
排他制御方式の使い分け
Macchinetta Batch 2.xでのロック方式と向いているシチュエーションを示す。
ロック方式 | 向いているシチュエーション |
---|---|
同時実行時におけるトランザクションで、別トランザクションの更新結果を処理対象外にして処理を継続できる場合 |
|
処理時間が長く、処理中に対象データの状況が変化したことによるやり直しが難しい処理 |
排他制御とコンポーネントの関係
Macchinetta Batch 2.xが提供する各コンポーネントと排他制御との関係は以下のとおり。
- 楽観ロック
処理モデル | コンポーネント | ファイル | データベース |
---|---|---|---|
チャンク |
ItemReader |
- |
Versionカラムなど取得時と更新時とで同じデータであることが確認できるカラムを含めてデータ取得を行う。 |
ItemProcessor |
- |
排他制御は不要である。 |
|
ItemWriter |
- |
取得時と更新時との差分を確認し、他の処理で更新されていないことを確認した上で更新を行う。 |
|
タスクレット |
Tasklet |
- |
データ取得時にはItemReader、データ更新時はItemWriterで説明した処理を実施する。 |
ファイルに対する楽観ロック
ファイルの特性上、ファイルに対して楽観ロックを適用することがない。 |
- 悲観ロック
処理モデル | コンポーネント | ファイル | データベース |
---|---|---|---|
チャンク |
ItemReader |
- |
悲観ロックを用いずにSELECT文を発行する。 |
ItemProcessor |
- |
Mapperインタフェースを利用して、ItemReaderで取得したデータ(キー情報)を条件とするSQL文でSELECT FOR UPDATEを発行する。 |
|
ItemWriter |
- |
悲観ロックを行ったItemProcessorと同トランザクションとなるため、ItemWriterでは排他制御を意識することなくデータを更新する。 |
|
タスクレット |
Tasklet |
ItemStreamReaderでファイルをオープンする直前にファイルロックを取得する。 |
データ取得時にはSELECT FOR UPDATE文を発行するItemReaderかMapperインタフェースを直接利用する。 |
チャンクモデルでのデータベースでの悲観ロックによる注意事項
ItemReaderで取得したデータ(キー情報)がItemProcessorへ渡される間は排他制御されず、他のトランザクションによりもとのデータが更新されている可能性がある。
そのため、ItemProcessorがデータを取得する条件は、ItemReaderと同じデータ(キー情報)を取得する条件を含む必要がある。 |
ファイルに対する悲観ロック
ファイルに対する悲観ロックはタスクレットモデルで実装すること。 チャンクモデルではその構造上、チャンク処理の隙間で排他できない期間が存在してしまうためである。 また、ファイルアクセスはItemStreamReader/ItemStreamWriterをInjectして利用することを前提とする。 |
データベースでの悲観ロックによる待ち時間
悲観ロックを行う場合、競合により処理が待たされる時間が長くなる可能性がある。 その場合、NO WAITオプションやタイムアウト時間を指定して、悲観ロックを使用するのが妥当である。 |
How to use
排他制御の使い方をリソース別に説明する。
ファイルの排他制御
Macchinetta Batch 2.xにおけるファイルの排他制御はタスクレットを実装することで実現する。
排他の実現手段としては、java.nio.channels.FileChannel
クラスを使用したファイルロック取得で排他制御を行う。
FileChannelクラスの詳細
|
ただし、Spring Batchにおいて標準的なファイルの入出力機能を提供するFlatFileItemReader/FlatFileItemWriterから、java.nio.channels.FileChannelクラスを利用することはできない。 そのため、排他対象のファイルと一対一に排他対象ファイルのロックを担当するファイル(以降、ロック用ファイル)を用意し、ロック用ファイルからのファイルロックを取得をもって、排他対象ファイルへの排他的な制御権を取得できたと見なすことで排他制御を実現する。 ロック用ファイルを用いたファイルロックの実施イメージを以下に示す。
-
バッチ処理A(Batch ProcessA)がロック用ファイル(LockFile)のロック取得を試みる。
-
バッチ処理Aが、ロック用ファイルのロック取得に成功する。
-
バッチ処理B(Batch ProcessB)が、ロック用ファイルのロック取得を試みる。
-
バッチ処理Aが、排他対象ファイルに書き込みを行う。
-
バッチ処理Bは、バッチ処理Aがロック中であるため、ロック用ファイルのロック取得に失敗する。
-
バッチ処理Bが、ファイル更新失敗の処理を行う。
-
バッチ処理Aが、ロック用ファイルのロックを開放する。
ロック用ファイルの削除タイミング
後述するTasklet実装の例では、ロック用ファイルの削除について表現していないが、そのタイミングについては注意が必要である。 特にLinux環境では、ジョブの内部でロック用ファイルの削除を行うべきではない。 Linux環境ではファイルロック中のファイルを削除できるため、複数のプロセスでバッチ処理が実行される場合、あるプロセスがファイルロックを取得しているロック用ファイルを、別プロセスが削除する可能性がある。 このようなロック用ファイルの削除が起こると、また別のプロセスはロック用ファイルを作成し、作成したロック用ファイルからファイルロックを取得できる。 結果として、排他対象ファイルに書き込めるプロセスが複数になり、排他制御として機能しなくなる。 これを防ぐため、ロック用ファイルの削除は、排他制御を行うプロセスが存在しなくなったタイミングで行うなどの方法が考えられる。 |
ItemStreamWriterのtransactional
プロパティはfalse
を設定する。
transactional
プロパティがデフォルトのtrue
の場合、ファイル出力のタイミングがTransactionManagerと同期し、排他制御がされていない状態でのファイル出力が行われる。
以下に、ItemStreamWriterの設定例を示す。
<bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step"
p:resource="file:#{jobParameters['outputFile']}"
p:transactional="false"> <!-- (1) -->
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
p:names="processName,plan.branchId,plan.year,plan.month,plan.customerId,plan.amount"/>
</property>
</bean>
</property>
</bean>
項番 | 説明 |
---|---|
(1) |
|
FileChannel
クラスを使用しファイルのロックを取得する例を示す。
@Component
@Scope("step")
public class FileExclusiveTasklet implements Tasklet {
private String targetPath = null; // (1)
@Inject
ItemStreamReader<SalesPlanDetail> reader;
@Inject
ItemStreamWriter<SalesPlanDetailWithProcessName> writer;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// omitted.
File file = new File(targetPath);
try (FileChannel fc = FileChannel.open(file.toPath(),
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND); // (2)
FileLock fileLock = fc.tryLock()) { // (3)
if (fileLock == null) {
logger.error("Failed to acquire lock. [processName={}]", processName);
throw new FailedAcquireLockException("Failed to acquire lock");
}
reader.open(executionContext);
writer.open(executionContext); // (4)
// (5)
SalesPlanDetail item;
List<SalesPlanDetailWithProcessName> items = new ArrayList<>();
while ((item = reader.read()) != null) {
// omitted.
items.add(item);
if (items.size() >= 10) {
writer.write(items);
items.clear();
}
}
if (items.size() > 0) {
writer.write(items);
}
} catch (IOException e) {
logger.error("Failure other than lock acquisition", e);
throw new FailedOtherAcquireLockException("Failure other than lock acquisition", e);
} finally {
try {
writer.close(); // (6)
} catch (ItemStreamException e) {
// ignore
}
try {
reader.close();
} catch (ItemStreamException e) {
// ignore
}
}
return RepeatStatus.FINISHED;
}
// (7)
@Value("#{jobParameters['lockFile']}")
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
}
項番 | 説明 |
---|---|
(1) |
ロック用ファイルのパス。 |
(2) |
ロック用ファイルのファイルチャネルを取得する。 |
(3) |
ロック用ファイルのファイルロックを取得する。 |
(4) |
ロック用ファイルのファイルロック取得に成功した場合、排他対象のファイルをオープンする。 |
(5) |
ファイル出力を伴うビジネスロジックを実行する。 |
(6) |
排他対象のファイルをクローズする。 |
(7) |
ロック用ファイルのパスを設定する。 |
ロック取得に用いるFileChannelのメソッドについて
|
同一VMでのスレッド間の排他制御
同一VMにおけるスレッド間の排他制御は注意が必要である。
同一VMでのスレッド間でファイルに対する処理を行う場合、 |
データベースの排他制御
Macchinetta Batch 2.xにおけるデータベースの排他制御について説明する。
データベースの排他制御実装は、Macchinetta Server 1.x 開発ガイドラインにある MyBatis3使用時の実装方法が基本である。 本ガイドラインでは、 MyBatis3使用時の実装方法ができている前提で説明を行う。
排他制御とコンポーネントの関係にあるとおり、処理モデル・コンポーネントの組み合わせによるバリエーションがある。
排他方式 |
処理モデル |
コンポーネント |
楽観ロック |
チャンクモデル |
ItemReader/ItemWriter |
タスクレットモデル |
ItemReader/ItemWriter |
|
Mapperインタフェース |
||
悲観ロック |
チャンクモデル |
ItemReader/ItemProcessor/ItemWriter |
タスクレットモデル |
ItemReader/ItemWriter |
|
Mapperインタフェース |
タスクレットモデルでMapperインタフェースを使用する場合は、 MyBatis3使用時の実装方法のとおりであるため、説明を割愛する。
タスクレットモデルでItemReader/ItemWriterを使用する場合は、Mapperインタフェースでの呼び出し部分がItemReader/ItemWriterに代わるだけなので、これも説明を割愛する。
よって、ここではチャンクモデルの排他制御について説明する。
楽観ロック
チャンクモデルでの楽観ロックについて説明する。
MyBatisBatchItemWriterがもつassertUpdates
プロパティの設定により、ジョブの振る舞いが変化するので業務要件に合わせて、適切に設定をする必要がある。
楽観ロックを行うジョブ定義を以下に示す。
<!-- (1) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchFindOne"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<property name="parameterValues">
<map>
<entry key="branchId" value="#{jobParameters['branchId']}"/>
</map>
</property>
</bean>
<!-- (2) -->
<bean id="writer"
class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
p:assertUpdates="true" /> <!-- (3) -->
<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="chunkOptimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="branchEditItemProcessor"
writer="writer" commit-interval="10" />
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
楽観ロックによるデータ取得のSQLIDを設定する。 |
(2) |
楽観ロックによるデータ更新のSQLIDを設定する。 |
(3) |
バッチ更新の件数を検証有無を設定する。 |
悲観ロック
チャンクモデルでの悲観ロックについて説明する。
悲観ロックを行うジョブ定義とItemProcessorを以下に示す。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
template-ref="batchModeSqlSessionTemplate"/>
<!-- (2) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchIdFindByName"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues">
<map>
<entry key="branchName" value="#{jobParameters['branchName']}"/>
</map>
</property>
</bean>
<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchUpdate"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>
<batch:job id="chunkPessimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="chunkPessimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- (4) -->
<batch:chunk reader="reader" processor="branchEditWithkPessimisticLockItemProcessor"
writer="writer" commit-interval="3"/>
</batch:tasklet>
</batch:step>
</batch:job>
@Component
@Scope("step")
public class BranchEditWithkPessimisticLockItemProcessor implements ItemProcessor<String, ExclusiveBranch> {
// (5)
@Inject
ExclusiveControlRepository exclusiveControlRepository;
// (6)
@Value("#{jobParameters['branchName']}")
private String branchName;
// omitted.
@Override
public ExclusiveBranch process(String item) throws Exception {
// (7)
Branch branch = exclusiveControlRepository.branchFindOneByNameWithNowWaitLock(item, branchName);
if (branch != null) {
ExclusiveBranch updatedBranch = new ExclusiveBranch();
updatedBranch.setBranchId(branch.getBranchId());
updatedBranch.setBranchName(branch.getBranchName() + " - " + identifier);
updatedBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
updatedBranch.setBranchTel(branch.getBranchTel());
updatedBranch.setCreateDate(branch.getUpdateDate());
updatedBranch.setUpdateDate(new Timestamp(clock.millis()));
updatedBranch.setOldBranchName(branch.getBranchName());
return updatedBranch;
} else {
// (8)
logger.warn("An update by another user occurred. [branchId: {}]", item);
return null;
}
}
}
項番 | 説明 |
---|---|
(1) |
MapperインタフェースがItemWriterと同じ更新モードになるように、 |
(2) |
悲観ロックを用いないデータ取得のSQLIDを設定する。 |
(3) |
排他制御をしないデータ更新のSQLと同じSQLIDを設定する。 |
(4) |
悲観ロックによるデータ取得を行うItemProcessorを設定する。 |
(5) |
悲観ロックによるデータ取得を行うMapperインタフェースをインジェクションする。 |
(6) |
悲観ロックの抽出条件とするため、ジョブ起動パラメータから、 |
(7) |
悲観ロックによるデータ取得のメソッドを呼び出す。 |
(8) |
他のトランザクションにより対象データが先に更新されて対象データを取得できない場合、悲観ロックによるデータ取得のメソッドがnullを返却する。 |
タスクレットモデルでの悲観ロックを行うコンポーネントについて
タスクレットモデルで悲観ロックを行う場合は、悲観ロックを行うSQL発行するItemReaderを用いる。Mapperインタフェースを直接利用する場合も同様である。 |