Overview
排他制御とは、複数のトランザクションから同じリソースに対して、同時に更新処理が行われる際に、データの整合性を保つために行う処理のことである。 複数のトランザクションから同じリソースに対して、同時に更新処理が行われる可能性がある場合は、基本的に排他制御を行う必要がある。
ここでの複数トランザクションとは以下のことを指す。
-
複数ジョブの同時実行時におけるトランザクション
-
オンライン処理との同時実行時におけるトランザクション
複数ジョブの排他制御
複数ジョブを同時実行する場合は、排他制御の必要がないようにジョブ設計を行うことが基本である。 これは、アクセスするリソースや処理対象をジョブごとに分割することが基本であることを意味する。 |
排他制御に関する概念は、オンライン処理と同様であるため、Macchinetta Server 1.x 開発ガイドラインにある 排他制御を参照してほしい。
ここでは、Macchinetta Server 1.xでは説明されていない部分を中心に説明をする。
本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる。
排他制御の必要性
排他制御の必要性に関しては、Macchinetta Server 1.x 開発ガイドラインにある 排他制御の必要性を参照。
ファイルの排他制御
ファイルでの排他制御はファイルロックにより実現するのが一般的である。
- ファイルロックとは
-
ファイルロックとは、ファイルをあるプログラムで使用している間、ほかのプログラムからの読み書きを制限する仕組みである。 ファイルロックの処理概要を以下に示す。
-
バッチ処理Aがファイルのロックを取得し、ファイルの更新処理を開始する。
-
バッチ処理Bが同一のファイルの更新を試みファイルのロック取得を試みるが失敗する。
-
バッチ処理Aが処理を終了し、ファイルのロックを解除する
-
バッチ処理A(Batch ProcessA)が対象ファイル(TargetFile)のロック取得を試みる。
-
バッチ処理Aが、対象ファイルのロック取得に成功する
-
バッチ処理B(Batch ProcessB)が、対象ファイルのロック取得を試みる
-
バッチ処理Aが、対象ファイルに書き込みを行う
-
バッチ処理Bは、バッチ処理Aがロック中であるため、対象ファイルのロック取得に失敗する
-
バッチ処理Bが、ファイル更新失敗の処理を行う。
-
バッチ処理Aが、対象ファイルのロックを開放する。
デッドロックの予防
ファイルにおいてもデータベースと同様に複数のファイルに対してロックを取得する場合、デッドロックとなる場合がある。
そのため、ファイルの更新順序をルール化することが重要である。 |
データベースの排他制御
データベースの排他制御に関しては、Macchinetta Server 1.x 開発ガイドラインにある データベースのロック機能による排他制御 で詳しく説明されているため、そちらを参照のこと。
排他制御方式の使い分け
Macchinetta Batch 2.xでのロック方式と向いているシチュエーションを示す。
ロック方式 | 向いているシチュエーション |
---|---|
同時実行時におけるトランザクションで、別トランザクションの更新結果を処理対象外にして処理を継続できる場合 |
|
処理時間が長く、処理中に対象データの状況が変化したことによるやり直しが難しい処理 |
排他制御とコンポーネントの関係
Macchinetta Batch 2.xが提供する各コンポーネントと排他制御との関係は以下のとおり。
- 楽観ロック
処理モデル | コンポーネント | ファイル | データベース |
---|---|---|---|
チャンク |
ItemReader |
- |
Versionカラムなど取得時と更新時とで同じデータであることが確認できるカラムを含めてデータ取得を行う。 |
ItemProcessor |
- |
排他制御は不要である。 |
|
ItemWriter |
- |
取得時と更新時との差分を確認し、他の処理で更新されていないことを確認した上で更新を行う。 |
|
タスクレット |
Tasklet |
- |
データ取得時にはItemReader、データ更新時はItemWriterで説明した処理を実施する。 |
ファイルに対する楽観ロック
ファイルの特性上、ファイルに対して楽観ロックを適用することがない。 |
- 悲観ロック
処理モデル | コンポーネント | ファイル | データベース |
---|---|---|---|
チャンク |
ItemReader |
- |
悲観ロックを用いずにSELECT文を発行する。 |
ItemProcessor |
- |
Mapperインターフェースを利用して、ItemReaderで取得したデータ(キー情報)を条件とするSQL文でSELECT FOR UPDATEを発行する。 |
|
ItemWriter |
- |
悲観ロックを行ったItemProcessorと同トランザクションとなるため、ItemWriterでは排他制御を意識することなくデータを更新する。 |
|
タスクレット |
Tasklet |
ItemStreamReaderでファイルをオープンした直後にファイルロックを取得する。 |
データ取得時にはSELECT FOR UPDATE文を発行するItemReaderかMapperインターフェースを直接利用する。 |
チャンクモデルでのデータベースでの悲観ロックによる注意事項
ItemReaderで取得したデータ(キー情報)がItemProcessorへ渡される間は排他制御されず、他のトランザクションにより元のデータが更新されている可能性がある。
そのため、ItemProcessorがデータを取得する条件は、ItemReaderと同じデータ(キー情報)を取得する条件を含む必要がある。 |
ファイルに対する悲観ロック
ファイルに対する悲観ロックはタスクレットモデルで実装すること。 チャンクモデルではその構造上、チャンク処理の隙間で排他できない期間が存在してしまうためである。 また、ファイルアクセスはItemStreamReader/ItemStreamWriterをInjectして利用することを前提とする。 |
データベースでの悲観ロックによる待ち時間
悲観ロックを行う場合、競合により処理が待たされる時間が長くなる可能性がある。 その場合、NO WAITオプションやタイムアウト時間を指定して、悲観ロックを使用するのが妥当である。 |
How to use
排他制御の使い方をリソース別に説明する。
ファイルの排他制御
Macchinetta Batch 2.xにおけるファイルの排他制御はタスクレットを実装することで実現する。
排他の実現手段としては、java.nio.channels.FileChannel
クラスを使用したファイルロック取得で排他制御を行う。
FileChannelクラスの詳細
|
FileChannel
クラスを使用しファイルのロックを取得する例を示す。
@Component
@Scope("step")
public class FileExclusiveTasklet implements Tasklet {
private String targetPath = null; // (1)
@Inject
ItemStreamReader<SalesPlanDetail> reader;
@Inject
ItemStreamWriter<SalesPlanDetailWithProcessName> writer;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
// omitted.
FileChannel fc = null;
FileLock fileLock = null;
try {
try {
File file = new File(targetPath);
fc = FileChannel.open(file.toPath(), StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND); // (2)
fileLock = fc.tryLock(); // (3)
} catch (IOException e) {
logger.error("Failure other than lock acquisition", e);
throw new FailedOtherAcquireLockException(
"Failure other than lock acquisition", e);
}
if (fileLock == null) {
logger.error("Failed to acquire lock. [processName={}]", processName);
throw new FailedAcquireLockException("Failed to acquire lock");
}
reader.open(executionConetxt);
writer.open(executionConetxt); // (4)
// (5)
SalesPlanDetail item;
List<SalesPlanDetailWithProcessName> items = new ArrayList<>();
while ((item = reader.read()) != null) {
// omitted.
items.add(item);
if (items.size() >= 10) {
writer.write(items);
items.clear();
}
}
if (items.size() > 0) {
writer.write(items);
}
} finally {
if (fileLock != null) {
try {
fileLock.release(); // (6)
} catch (IOException e) {
logger.warn("Lock release failed.", e);
}
}
if (fc != null) {
try {
fc.close();
} catch (IOException e) {
// do nothing.
}
}
try {
writer.close(); // (7)
} catch (ItemStreamException e) {
// ignore
}
try {
reader.close();
} catch (ItemStreamException e) {
// ignore
}
}
return RepeatStatus.FINISHED;
}
// (8)
@Value("#{jobParameters['outputFile']}")
public void setTargetPath(String targetPath) {
this.targetPath = targetPath;
}
}
項番 | 説明 |
---|---|
(1) |
排他対象のファイルパス。 |
(2) |
ファイルチャネルを取得する。 |
(3) |
ファイルロックを取得する。 |
(4) |
ファイロックの取得に成功した場合、排他対象のファイルをオープンする。 |
(5) |
ファイル出力を伴うビジネスロジックを実行する。 |
(6) |
ファイルロックを開放する。 |
(7) |
排他対象のファイルをクローズする。 |
(8) |
ファイルパスを設定する。 |
ロック取得に用いるFileChannelのメソッドについて
|
同一VMでのスレッド間の排他制御
同一VMにおけるスレッド間の排他制御は注意が必要である。
同一VMでのスレッド間でファイルに対する処理を行う場合、 |
FlatFileItemWriterのappendAllowedプロパティについて
ファイルを新規作成(上書き)する場合は、 |
データベースの排他制御
Macchinetta Batch 2.xにおけるデータベースの排他制御について説明する。
データベースの排他制御実装は、Macchinetta Server 1.x 開発ガイドラインにある MyBatis3使用時の実装方法が基本である。 本ガイドラインでは、 MyBatis3使用時の実装方法ができている前提で説明を行う。
排他制御とコンポーネントの関係にあるとおり、処理モデル・コンポーネントの組み合わせによるバリエーションがある。
排他方式 |
処理モデル |
コンポーネント |
楽観ロック |
チャンクモデル |
ItemReader/ItemWriter |
タスクレットモデル |
ItemReader/ItemWriter |
|
Mapperインターフェース |
||
悲観ロック |
チャンクモデル |
ItemReader/ItemProcessor/ItemWriter |
タスクレットモデル |
ItemReader/ItemWriter |
|
Mapperインターフェース |
タスクレットモデルでMapperインターフェースを使用する場合は、 MyBatis3使用時の実装方法のとおりであるため、説明を割愛する。
タスクレットモデルでItemReader/ItemWriterを使用する場合は、Mapperインターフェースでの呼び出し部分がItemReader/ItemWriterに代わるだけなので、これも説明を割愛する。
よって、ここではチャンクモデルの排他制御について説明する。
楽観ロック
チャンクモデルでの楽観ロックについて説明する。
MyBatisBatchItemWriterがもつassertUpdates
プロパティの設定により、ジョブの振る舞いが変化するので業務要件に合わせて、適切に設定をする必要がある。
楽観ロックを行うジョブ定義を以下に示す。
<!-- (1) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchFindOne"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<property name="parameterValues">
<map>
<entry key="branchId" value="#{jobParameters['branchId']}"/>
</map>
</property>
</bean>
<!-- (2) --->
<bean id="writer"
class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
p:assertUpdates="true" /> <!-- (3) -->
<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="chunkOptimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="branchEditItemProcessor"
writer="writer" commit-interval="10" />
</batch:tasklet>
</batch:step>
</batch:job>
項番 | 説明 |
---|---|
(1) |
楽観ロックによるデータ取得のSQLIDを設定する。 |
(2) |
楽観ロックによるデータ更新のSQLIDを設定する。 |
(3) |
バッチ更新の件数を検証有無を設定する。 |
悲観ロック
チャンクモデルでの悲観ロックについて説明する。
悲観ロックを行うジョブ定義とItemProcessorを以下に示す。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
template-ref="batchModeSqlSessionTemplate"/>
<!-- (2) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchIdFindByName"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues">
<map>
<entry key="branchName" value="#{jobParameters['branchName']}"/>
</map>
</property>
</bean>
<!-- (3) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchUpdate"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>
<batch:job id="chunkPessimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="chunkPessimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- (4) -->
<batch:chunk reader="reader" processor="branchEditWithkPessimisticLockItemProcessor"
writer="writer" commit-interval="3"/>
</batch:tasklet>
</batch:step>
</batch:job>
@Component
@Scope("step")
public class BranchEditWithkPessimisticLockItemProcessor implements ItemProcessor<String, ExclusiveBranch> {
// (5)
@Inject
ExclusiveControlRepository exclusiveControlRepository;
// (6)
@Value("#{jobParameters['branchName']}")
private String branchName;
// omitted.
@Override
public ExclusiveBranch process(String item) throws Exception {
// (7)
Branch branch = exclusiveControlRepository.branchFindOneByNameWithNowWaitLock(item, branchName);
if (branch != null) {
ExclusiveBranch updatedBranch = new ExclusiveBranch();
updatedBranch.setBranchId(branch.getBranchId());
updatedBranch.setBranchName(branch.getBranchName() + " - " + identifier);
updatedBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
updatedBranch.setBranchTel(branch.getBranchTel());
updatedBranch.setCreateDate(branch.getUpdateDate());
updatedBranch.setUpdateDate(new Timestamp(clock.millis()));
updatedBranch.setOldBranchName(branch.getBranchName());
return updatedBranch;
} else {
// (8)
logger.warn("An update by another user occurred. [branchId: {}]", item);
return null;
}
}
}
項番 | 説明 |
---|---|
(1) |
MapperインターフェースがItemWriterと同じ更新モードになるように、 |
(2) |
悲観ロックを用いないデータ取得のSQLIDを設定する。 |
(3) |
排他制御をしないデータ更新のSQLと同じSQLIDを設定する。 |
(4) |
悲観ロックによるデータ取得を行うItemProcessorを設定する。 |
(5) |
悲観ロックによるデータ取得を行うMapperインターフェースをインジェクションする。 |
(6) |
悲観ロックの抽出条件とするため、ジョブ起動パラメータから、 |
(7) |
悲観ロックによるデータ取得のメソッドを呼び出す。 |
(8) |
他のトランザクションにより対象データが先に更新されて対象データを取得できない場合、悲観ロックによるデータ取得のメソッドがnullを返却する。 |
タスクレットモデルでの悲観ロックを行うコンポーネントについて
タスクレットモデルで悲観ロックを行う場合は、悲観ロックを行うSQL発行するItemReaderを用いる。Mapperインターフェースを直接利用する場合も同様である。 |