Overview
一般的に、バッチウィンドウ(バッチ処理のために使用できる時間)がシビアなバッチシステムでは、
複数ジョブを並列に動作させる(以降、並列処理と呼ぶ)ことで全体の処理時間を可能な限り短くするように設計する。
しかし、1ジョブの処理データが大量であるために処理時間がバッチウィンドウに収まらない場合がある。
その際は、1ジョブの処理データを分割して多重走行させる(以降、多重処理と呼ぶ)ことで処理時間を短縮させる手法が用いられる。
この、並列処理と多重処理は同じような意味合いで扱われることもあるが、ここでは以下の定義とする。
- 並列処理
-
複数の異なるジョブを、同時に実行する。
- 多重処理
-
1ジョブの処理対象を分割して、同時に実行する。
並列処理と多重処理ともにジョブスケジューラで行う方法とMacchinetta Batch 2.xで行う方法がある。
なお、Macchinetta Batch 2.xでの並列処理および多重処理は
フロー制御の上に成り立っている。
実現方法 | 並列処理 | 多重処理 |
---|---|---|
ジョブスケジューラ |
依存関係がない複数の異なるジョブを同時に実行するように定義する。 |
複数の同じジョブを異なるデータ範囲で実行するように定義する。各ジョブに処理対象のデータを絞るための情報を引数などで渡す。 |
Macchinetta Batch 2.x |
Parallel Step (並列処理) |
Partitioning Step (多重処理) |
- ジョブスケジューラを使用する場合
-
1ジョブに1プロセスが割り当てられるため複数プロセスで起動される。 そのため、1つのジョブを設計・実装する難易度は低い。
しかし、複数プロセスで起動するため、同時実行数が増えるとマシンリソースへの負荷が高くなる。
よって、同時実行数が3、4程度であれば、ジョブスケジューラを利用するとよい。
もちろん、この数値は絶対的なものではない。実行環境やジョブの実装に依存するため目安としてほしい。 - Macchinetta Batch 2.xを使用する場合
-
各ステップがスレッドに割り当てられるため、1プロセス複数スレッドで動作する。そのため、1つのジョブへの設計・実装の難易度はジョブスケジューラを使用する場合より高くなる。
しかし、複数スレッドで起動するため、同時実行数が増えてもマシンリソースへの負荷がジョブスケジューラを使用する場合ほど高くはならない。 よって、同時実行数が多い(5以上の)場合であれば、Macchinetta Batch 2.xを利用するのがよい。
もちろん、この数値は絶対的なものではない。実行環境やシステム特性に依存するため目安としてほしい。
Spring Batchで実行可能な並列処理方法の1つに
|
並列処理・多重処理で1つのデータベースに対して更新する場合は、リソース競合とデッドロックが発生する可能性がある。 ジョブ設計の段階から潜在的な競合発生を排除すること。 マルチプロセスや複数筐体への分散処理は、Spring Batchに機能があるが、Macchinetta Batch 2.xとしては障害設計が困難になるため扱わないこととする。 |
本機能は、チャンクモデルとタスクレットモデルとで同じ使い方になる
How to use
Macchinetta Batch 2.xでの並列処理および多重処理を行う方法を説明する。
Parallel Step (並列処理)
Parallel Step (並列処理)の方法を説明する。
各ステップに別々な処理を定義することができ、それらを並列に実行することができる。 各ステップごとにスレッドが割り当てられる。
Parallel Stepの概要図を例にしたParallel Stepの定義方法を以下に示す。
// (1)
@Bean
public TaskExecutor parallelTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setQueueCapacity(200);
return executor;
}
// (4)
@Bean
public Step stepChunkDb(JobRepository jobRepository,
ItemReader fileReader,
ItemWriter databaseWriter,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelStepJob.step.chunk.db", jobRepository)
.chunk(100, transactionManager)
.reader(fileReader)
.writer(databaseWriter)
.build();
}
// (5)
@Bean
public Step stepTaskletChunk(JobRepository jobRepository,
Tasklet chunkTransactionTasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelStepJob.step.tasklet.chunk", jobRepository)
.tasklet(chunkTransactionTasklet, transactionManager)
.build();
}
// (6)
@Bean
public Step stepTaskletSingle(JobRepository jobRepository,
Tasklet singleTransactionTasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelStepJob.step.tasklet.single", jobRepository)
.tasklet(singleTransactionTasklet, transactionManager)
.build();
}
// (7)
@Bean
public Step stepChunkDb(JobRepository jobRepository,
ItemReader databaseReader,
ItemWriter fileWriter,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelStepJob.step.chunk.file", jobRepository)
.chunk(200, transactionManager)
.reader(databaseReader)
.writer(fileWriter)
.build();
}
// (3)
@Bean
public Flow flow1(Step stepChunkDb) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(stepChunkDb)
.build();
}
// (3)
@Bean
public Flow flow2(Step stepTaskletChunk) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(stepTaskletChunk)
.build();
}
// (3)
@Bean
public Flow flow3(Step stepTaskletSingle) {
return new FlowBuilder<SimpleFlow>("flow3")
.start(stepTaskletSingle)
.build();
}
// (3)
@Bean
public Flow flow4(Step stepChunkDb) {
return new FlowBuilder<SimpleFlow>("flow4")
.start(stepChunkDb)
.build();
}
// (2)
@Bean
public Flow splitFlow(
@Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
Flow flow1, Flow flow2, Flow flow3, Flow flow4) {
return new FlowBuilder<SimpleFlow>("parallelStepJob.split")
.split(taskExecutor)
.add(flow1, flow2, flow3, flow4)
.build();
}
@Bean
public Job parallelSummarizeJob(JobRepository jobRepository,
Flow splitFlow) {
return new JobBuilder("parallelStepJob", jobRepository)
.start(splitFlow)
.end()
.build();
}
<!-- Task Executor -->
<!-- (1) -->
<task:executor id="parallelTaskExecutor" pool-size="10" queue-capacity="200"/>
<!-- Job Definition -->
<!-- (2) -->
<batch:job id="parallelStepJob" job-repository="jobRepository">
<batch:split id="parallelStepJob.split" task-executor="parallelTaskExecutor">
<batch:flow> <!-- (3) -->
<batch:step id="parallelStepJob.step.chunk.db">
<!-- (4) -->
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="fileReader" writer="databaseWriter"
commit-interval="100"/>
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow> <!-- (3) -->
<batch:step id="parallelStepJob.step.tasklet.chunk">
<!-- (5) -->
<batch:tasklet transaction-manager="jobTransactionManager"
ref="chunkTransactionTasklet"/>
</batch:step>
</batch:flow>
<batch:flow> <!-- (3) -->
<batch:step id="parallelStepJob.step.tasklet.single">
<!-- (6) -->
<batch:tasklet transaction-manager="jobTransactionManager"
ref="singleTransactionTasklet"/>
</batch:step>
</batch:flow>
<batch:flow> <!-- (3) -->
<batch:step id="parallelStepJob.step.chunk.file">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- (7) -->
<batch:chunk reader="databaseReader" writer="fileWriter"
commit-interval="200"/>
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
</batch:job>
項番 | 説明 |
---|---|
(1) |
並列処理のために、各スレッドに割り当てるためのスレッドプールを定義する。 |
(2) |
|
(3) |
|
(4) |
概要図のStep1:チャンクモデルの中間コミット方式処理を定義する。 |
(5) |
概要図のStep2:タスクレットモデルの中間コミット方式処理を定義する。 |
(6) |
概要図のStep3:タスクレットモデルの一括コミット方式処理を定義する。 |
(7) |
概要図のStep4:チャンクモデルの非トランザクショナルなリソースに対する中間コミット方式処理を定義する。 |
並列処理したために処理性能が低下するケース
並列処理では多重処理同様にデータ範囲を変えて同じ処理を並列走行させることが可能である。この場合、データ範囲はパラメータなどで与える。 フットプリントの例
|
また、Parallel Stepの前後に共通処理のステップを定義することも可能である。
@Bean
public Step stepPreprocess(JobRepository jobRepository,
StepExecutionLoggingListener listener,
DeleteDetailTasklet deleteDetailTasklet,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelRegisterJob.step.preprocess", jobRepository)
.tasklet(deleteDetailTasklet, transactionManager)
.listener(listener)
.build();
}
@Bean
public Step stepPlan(JobRepository jobRepository,
StepExecutionLoggingListener listener,
@Qualifier("planReader") ItemReader<SalesPlanDetail> reader,
@Qualifier("planWriter") ItemWriter<SalesPlanDetail> writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelRegisterJob.step.plan", jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(1,
transactionManager)
.listener(listener)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Step stepPerformance(JobRepository jobRepository,
StepExecutionLoggingListener listener,
@Qualifier("performanceReader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("performanceWriter") ItemWriter<SalesPerformanceDetail> writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("parallelRegisterJob.step.performance",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(1,
transactionManager)
.listener(listener)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public Flow flow1(Step stepPlan) {
return new FlowBuilder<SimpleFlow>("flow1")
.start(stepPlan)
.build();
}
@Bean
public Flow flow2(Step stepPerformance) {
return new FlowBuilder<SimpleFlow>("flow2")
.start(stepPerformance)
.build();
}
// (2)
@Bean
public Flow splitFlow(
@Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
Flow flow1, Flow flow2) {
return new FlowBuilder<SimpleFlow>("parallelRegisterJob.split")
.split(taskExecutor) // (1)
.add(flow1, flow2)
.build();
}
@Bean
public Job parallelRegisterJob(JobRepository jobRepository,
Flow splitFlow,
Step stepPreprocess) {
return new JobBuilder("parallelRegisterJob", jobRepository)
.start(stepPreprocess)
.on("*")
.to(splitFlow)
.end()
.build();
}
<batch:job id="parallelRegisterJob" job-repository="jobRepository">
<!-- (1) -->
<batch:step id="parallelRegisterJob.step.preprocess"
next="parallelRegisterJob.split">
<batch:tasklet transaction-manager="jobTransactionManager"
ref="deleteDetailTasklet" />
</batch:step>
<!--(2) -->
<batch:split id="parallelRegisterJob.split" task-executor="parallelTaskExecutor">
<batch:flow>
<batch:step id="parallelRegisterJob.step.plan">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="planReader" writer="planWriter"
commit-interval="1000" />
</batch:tasklet>
</batch:step>
</batch:flow>
<batch:flow>
<batch:step id="parallelRegisterJob.step.performance">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="performanceReader" writer="performanceWriter"
commit-interval="1000" />
</batch:tasklet>
</batch:step>
</batch:flow>
</batch:split>
</batch:job>
項番 | 説明 |
---|---|
(1) |
前処理として処理するステップを定義する。 |
(2) |
Parallel Stepを定義する。 |
Partitioning Step (多重処理)
Partitioning Step(多重処理)の方法を説明する
Partitioning Stepでは、ManagerステップとWorkerステップの処理フェーズに分割される。
-
Managerステップでは、
Partitioner
により各Workerステップが処理するデータ範囲を特定するためのParition Key
を生成する。Parition Key
はステップコンテキストに格納される。 -
Workerステップでは、ステップコンテキストから自身に割り当てられた
Parition Key
を取得し、それを使い処理対象データを特定する。 特定した処理対象データに対して定義したステップの処理を実行する。
Partitioning Stepでは処理データを分割必要があるが、分割数については可変数と固定数のどちらにも対応できる。
- 可変数の場合
-
部門別で分割や、特定のディレクトリに存在するファイル単位での処理
- 固定数の場合
-
全データを個定数で分割してデータを処理
Spring Batchでは、固定数のことをgrid-size
といい、Partitioner
でgrid-size
になるようにデータ分割範囲を決定する。
Partitioning Stepでは、分割数をスレッドサイズより大きくすることができる。 この場合、スレッド数分で多重実行され、スレッドに空きが出るまで、処理が未実行のまま保留となるステップが発生する。
以下にPartitioning Stepのユースケースを示す。
ユースケース | Manager(Patitioner) | Worker | 分割数 |
---|---|---|---|
マスタ情報からトランザクション情報を分割・多重化するケース |
DB(マスタ情報) |
DB(トランザクション情報) |
可変 |
複数ファイルのリストから1ファイル単位に多重化するケース |
複数ファイル |
単一ファイル |
可変 |
大量データを一定数で分割・多重化するケース 障害発生時にリラン以外のリカバリ設計が難しくなるため、実運用では利用されることはあまりないケース。 |
|
DB(トランザクション情報) |
固定 |
分割数が可変の場合
Partitioning Stepで分割数を可変とする方法を説明する。
下記に処理イメージ図を示す。
処理イメージを例とした実装方法を示す。
<!-- (1) -->
<select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.mst.Branch">
<![CDATA[
SELECT
branch_id AS branchId,
branch_name AS branchName,
branch_address AS branchAddrss,
branch_tel AS branchTel,
create_date AS createDate,
update_date AS updateDate
FROM
branch_mst
]]>
</select>
<!-- (2) -->
<select id="summarizeInvoice"
resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceDetail">
<![CDATA[
SELECT
branchId, year, month, customerId, SUM(amount) AS amount
FROM (
SELECT
t2.charge_branch_id AS branchId,
date_part('year', t1.invoice_date) AS year,
date_part('month', t1.invoice_date) AS month,
t1.customer_id AS customerId,
t1.invoice_amount AS amount
FROM invoice t1
INNER JOIN customer_mst t2 ON t1.customer_id = t2.customer_id
WHERE
t2.charge_branch_id = #{branchId}
) t3
GROUP BY branchId, year, month, customerId
ORDER BY branchId ASC, year ASC, month ASC, customerId ASC
]]>
</select>
<!-- omitted -->
@Component
public class BranchPartitioner implements Partitioner {
@Inject
BranchRepository branchRepository; // (3)
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>();
List<Branch> branches = branchRepository.findAll();
int index = 0;
for (Branch branch : branches) {
ExecutionContext context = new ExecutionContext();
context.putString("branchId", branch.getBranchId()); // (4)
map.put("partition" + index, context); // (5)
index++;
}
return map;
}
}
// (6)
@Bean
public TaskExecutor parallelTaskExecutor(
@Value("${thread.size}") int threadSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadSize);
executor.setQueueCapacity(10);
return executor;
}
// (7)
@Bean
@StepScope
public MyBatisCursorItemReader<SalesPerformanceDetail> reader(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
@Value("#{stepExecutionContext['branchId']}") String branchId) { // (8)
final Map<String, Object> parameterValues = new LinkedHashMap<>();
parameterValues.put("branchId", branchId);
return new MyBatisCursorItemReaderBuilder<SalesPerformanceDetail>()
.sqlSessionFactory(jobSqlSessionFactory)
.queryId(
"org.terasoluna.batch.functionaltest.app.repository.performance.InvoiceRepository.summarizeInvoice")
.parameterValues(parameterValues)
.build();
}
// omitted
// (12)
@Bean
public Step step1(JobRepository jobRepository,
StepExecutionLoggingListener listener,
@Qualifier("reader") ItemReader<SalesPerformanceDetail> reader,
@Qualifier("writer") ItemWriter<SalesPerformanceDetail> writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("multipleInvoiceSummarizeJob.worker",
jobRepository)
.<SalesPerformanceDetail, SalesPerformanceDetail> chunk(1,
transactionManager)
.listener(listener)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public PartitionHandler partitionHandler(
@Qualifier("parallelTaskExecutor") TaskExecutor taskExecutor,
@Value("${grid.size}") int gridSize, Step step1) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(taskExecutor);
handler.setStep(step1);
handler.setGridSize(gridSize); // (11)
return handler;
}
// (9)
@Bean
public Step step1Manager(JobRepository jobRepository,
BranchPartitioner partitioner,
PartitionHandler partitionHandlerandler) {
return new StepBuilder("multipleInvoiceSummarizeJob.manager",
jobRepository)
.partitioner("multipleInvoiceSummarizeJob.worker", partitioner) // (10)
.partitionHandler(partitionHandlerandler)
.build();
}
@Bean
public Job multipleInvoiceSummarizeJob(JobRepository jobRepository,
JobExecutionLoggingListener listener,
Step step1Manager) {
return new JobBuilder("multipleInvoiceSummarizeJob", jobRepository)
.start(step1Manager)
.listener(listener)
.build();
}
<!-- (6) -->
<task:executor id="parallelTaskExecutor"
pool-size="${thread.size}" queue-capacity="10"/>
<!-- (7) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.InvoiceRepository.summarizeInvoice"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues">
<map>
<!-- (8) -->
<entry key="branchId" value="#{stepExecutionContext['branchId']}" />
</map>
</property>
</bean>
<!-- omitted -->
<batch:job id="multipleInvoiceSummarizeJob" job-repository="jobRepository">
<!-- (9) -->
<batch:step id="multipleInvoiceSummarizeJob.manager">
<!-- (10) -->
<batch:partition partitioner="branchPartitioner"
step="multipleInvoiceSummarizeJob.worker">
<!-- (11) -->
<batch:handler grid-size="0" task-executor="parallelTaskExecutor" />
</batch:partition>
</batch:step>
</batch:job>
<!-- (12) -->
<batch:step id="multipleInvoiceSummarizeJob.worker">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" writer="writer" commit-interval="10"/>
</batch:tasklet>
</batch:step>
項番 | 説明 |
---|---|
(1) |
マスタデータから処理対象を取得するSQLを定義する。 |
(2) |
マスタデータからの取得値を検索条件とするSQLを定義する。 |
(3) |
定義したRepository(SQLMapper)をInjectする。 |
(4) |
1つのWorkerステップが処理するマスタ値をステップコンテキストに格納する。 |
(5) |
各Workerが該当するコンテキストを取得できるようMapに格納する。 |
(6) |
多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。 |
(7) |
マスタ値によるデータ取得のItemReaderを定義する。 |
(8) |
(4)で設定したマスタ値をステップコンテキストから取得し、検索条件に追加する。 |
(9) |
Managerステップを定義する。 |
(10) |
データの分割条件を生成する処理を定義する。 |
(11) |
|
(12) |
Workerステップを定義する。 |
複数ファイルのリストから1ファイル単位に多重化する場合は、Spring Batchが提供している以下のPartitioner
を利用することができる。
-
org.springframework.batch.core.partition.support.MultiResourcePartitioner
MultiResourcePartitioner
の利用例を以下に示す。
// (1)
@Bean
public TaskExecutor parallelTaskExecutor(
@Value("${thread.size}") int threadSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadSize);
executor.setQueueCapacity(200);
return executor;
}
// (2)
@Bean
@StepScope
public FlatFileItemReader<SalesPlanDetail> reader(
@Value("#{stepExecutionContext['fileName']}") File fileName) {
final DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames("branchId", "year", "month", "customerId", "amount");
final BeanWrapperFieldSetMapper<SalesPlanDetail> fieldSetMapper = new BeanWrapperFieldSetMapper<>();
fieldSetMapper.setTargetType(SalesPlanDetail.class);
final DefaultLineMapper<SalesPlanDetail> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(fieldSetMapper);
return new FlatFileItemReaderBuilder<SalesPlanDetail>()
.name(ClassUtils.getShortName(FlatFileItemReader.class))
.resource(new FileSystemResource(fileName)) // (3)
.lineMapper(lineMapper)
.build();
}
// (4)
@Bean
@StepScope
public MultiResourcePartitioner partitioner(
@Value("#{jobParameters['inputdir']}") File inputdir) throws Exception { // (5)
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
Resource[] resources = resolver.getResources(
new FileSystemResource(inputdir).getURL() + File.separator + "salesPlanDetail_*.csv");
partitioner.setResources(resources);
return partitioner;
}
// (7)
@Bean
public Step stepWorker(JobRepository jobRepository,
ItemReader<SalesPlanDetail> reader,
LoggingItemWriter writer,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("jobEvaluationReport.worker", jobRepository)
.<SalesPlanDetail, SalesPlanDetail> chunk(20,
transactionManager)
.reader(reader)
.writer(writer)
.build();
}
@Bean
public PartitionHandler partitionHandler(
TaskExecutor parallelTaskExecutor,
Step stepWorker) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(parallelTaskExecutor);
handler.setStep(stepWorker);
handler.setGridSize(0);
return handler;
}
// (6)
@Bean
public Step stepManager(JobRepository jobRepository,
MultiResourcePartitioner partitioner,
PartitionHandler partitionHandler) {
return new StepBuilder("multiplePartitioninglStepFileJob.manager", jobRepository)
.partitioner("multiplePartitioninglStepFileJob.worker", partitioner)
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Job multiplePartitioninglStepFileJob(JobRepository jobRepository,
Step stepManager) {
return new JobBuilder("multiplePartitioninglStepFileJob", jobRepository)
.start(stepManager)
.build();
}
<!-- (1) -->
<task:executor id="parallelTaskExecutor" pool-size="10" queue-capacity="200"/>
<!-- (2) -->
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step"
p:resource="#{stepExecutionContext['fileName']}"> <!-- (3) -->
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"
p:names="branchId,year,month,customerId,amount"/>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
p:targetType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail"/>
</property>
</bean>
</property>
</bean>
<!-- (4) -->
<bean id="partitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
scope="step"
p:resources="file:#{jobParameters['inputdir']}/salesPlanDetail_*.csv"/> <!-- (5) -->
<!--(6) -->
<batch:job id="multiplePartitioninglStepFileJob" job-repository="jobRepository">
<batch:step id="multiplePartitioninglStepFileJob.step.manager">
<batch:partition partitioner="partitioner"
step="multiplePartitioninglStepFileJob.step.worker">
<batch:handler grid-size="0" task-executor="parallelTaskExecutor"/>
</batch:partition>
</batch:step>
</batch:job>
<!-- (7) -->
<batch:step id="multiplePartitioninglStepFileJob.step.worker">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" writer="writer" commit-interval="20"/>
</batch:tasklet>
</batch:step>
項番 | 説明 |
---|---|
(1) |
多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。 |
(2) |
1つのファイルを読み込むためのItemReaderを定義する。 |
(3) |
|
(4) |
|
(5) |
|
(6) |
Managerステップを定義する。 |
(7) |
Workerステップを定義する。 |
分割数が固定の場合
Partitioning Stepで分割数を固定する方法を説明する。
下記に処理イメージ図を示す。
処理イメージを例とした実装方法を示す。
<!-- (1) -->
<select id="findByYearAndMonth"
resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
<![CDATA[
SELECT
branch_id AS branchId, year, month, amount
FROM
sales_performance_summary
WHERE
year = #{year} AND month = #{month}
ORDER BY
branch_id ASC
LIMIT
#{dataSize}
OFFSET
#{offset}
]]>
</select>
<!-- (2) -->
<select id="countByYearAndMonth" resultType="_int">
<![CDATA[
SELECT
count(*)
FROM
sales_performance_summary
WHERE
year = #{year} AND month = #{month}
]]>
</select>
<!-- omitted -->
@Component
public class SalesDataPartitioner implements Partitioner {
@Inject
SalesSummaryRepository repository; // (3)
// omitted.
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>();
int count = repository.countByYearAndMonth(year, month);
int dataSize = (count / gridSize) + 1; // (4)
int offset = 0;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("dataSize", dataSize); // (5)
context.putInt("offset", offset); // (6)
offset += dataSize;
map.put("partition:" + i, context); // (7)
}
return map;
}
}
// (8)
@Bean
public TaskExecutor parallelTaskExecutor(
@Value("${thread.size}") int threadSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threadSize);
executor.setQueueCapacity(10);
return executor;
}
// (9)
@Bean
@StepScope
public MyBatisCursorItemReader<SalesPerformanceSummary> reader(
@Qualifier("jobSqlSessionFactory") SqlSessionFactory jobSqlSessionFactory,
@Value("#{jobParameters['year']}") Integer year,
@Value("#{jobParameters['month']}") Integer month,
@Value("#{stepExecutionContext['dataSize']}") Integer dataSize, // (10)
@Value("#{stepExecutionContext['offset']}") Integer offset) { // (11)
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("year", year);
parameterValues.put("month", month);
parameterValues.put("dataSize", dataSize);
parameterValues.put("offset", offset);
return new MyBatisCursorItemReaderBuilder<SalesPerformanceSummary>()
.sqlSessionFactory(jobSqlSessionFactory)
.queryId(
"org.terasoluna.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth")
.parameterValues(parameterValues)
.build();
}
// omitted
// (15)
@Bean
public Step stepWorker(JobRepository jobRepository,
StepExecutionLoggingListener listener,
ItemReader<SalesPerformanceSummary> reader,
ItemWriter<SalesPlanSummary> writer,
AddProfitsItemProcessor addProfitsItemProcessor,
@Qualifier("jobTransactionManager") PlatformTransactionManager transactionManager) {
return new StepBuilder("multipleCreateSalesPlanSummaryJob.worker", jobRepository)
.<SalesPerformanceSummary, SalesPlanSummary> chunk(10,
transactionManager)
.listener(listener)
.reader(reader)
.processor(addProfitsItemProcessor)
.writer(writer)
.build();
}
@Bean
public PartitionHandler partitionHandler(
TaskExecutor parallelTaskExecutor,
@Value("${grid.size}") int gridSize,
Step stepWorker) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(parallelTaskExecutor); // (14)
handler.setStep(stepWorker);
handler.setGridSize(gridSize); // (14)
return handler;
}
// (12)
@Bean
public Step stepManager(JobRepository jobRepository,
SalesDataPartitioner salesDataPartitioner,
PartitionHandler partitionHandler) {
return new StepBuilder("multipleCreateSalesPlanSummaryJob.manager", jobRepository)
.partitioner("multipleCreateSalesPlanSummaryJob.worker", salesDataPartitioner) // (13)
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Job multipleCreateSalesPlanSummaryJob(JobRepository jobRepository,
JobExecutionLoggingListener listener,
Step stepManager) {
return new JobBuilder("multipleCreateSalesPlanSummaryJob", jobRepository)
.start(stepManager)
.listener(listener)
.build();
}
<!-- (8) -->
<task:executor id="parallelTaskExecutor"
pool-size="${thread.size}" queue-capacity="10"/>
<!-- (9) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues">
<map>
<entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
<entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>
<!-- (10) -->
<entry key="dataSize" value="#{stepExecutionContext['dataSize']}" />
<!-- (11) -->
<entry key="offset" value="#{stepExecutionContext['offset']}" />
</map>
</property>
</bean>
<!-- omitted -->
<batch:job id="multipleCreateSalesPlanSummaryJob" job-repository="jobRepository">
<!-- (12) -->
<batch:step id="multipleCreateSalesPlanSummaryJob.manager">
<!-- (13) -->
<batch:partition partitioner="salesDataPartitioner"
step="multipleCreateSalesPlanSummaryJob.worker">
<!-- (14) -->
<batch:handler grid-size="4" task-executor="parallelTaskExecutor" />
</batch:partition>
</batch:step>
</batch:job>
<!-- (15) -->
<batch:step id="multipleCreateSalesPlanSummaryJob.worker">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="addProfitsItemProcessor"
writer="writer" commit-interval="10"/>
</batch:tasklet>
</batch:step>
項番 | 説明 |
---|---|
(1) |
特定のデータ範囲を取得するためにページネーション検索(SQL絞り込み方式)を定義する。 |
(2) |
処理対象の全件数を取得するSQLを定義する。 |
(3) |
定義したRepository(SQLMapper)をInjectする。 |
(4) |
1つのWorkerステップが処理するデータ件数を算出する。 |
(5) |
(4)のデータ件数をステップコンテキストに格納する。 |
(6) |
各Workerステップの検索開始位置をステップコンテキストに格納する。 |
(7) |
各Workerが該当するコンテキストを取得できるようMapに格納する。 |
(8) |
多重処理でWorkerステップの各スレッドに割り当てるためのスレッドプールを定義する。 |
(9) |
ページネーション検索(SQL絞り込み方式)によるデータ取得のItemReaderを定義する。 |
(10) |
(5)で設定したデータ件数をステップコンテキストから取得し、検索条件に追加する。 |
(11) |
(6)で設定した検索開始位置をステップコンテキストから取得し、検索条件に追加する。 |
(12) |
Managerステップを定義する。 |
(13) |
データの分割条件を生成する処理を定義する。 |
(14) |
|
(15) |
Workerステップを定義する。 |