Overview
Macchinetta Batch 2.xでは、データベースアクセスの方法として、MyBatis3(以降、「MyBatis」と呼ぶ)を利用する。 MyBatisによるデータベースアクセスの基本的な利用方法は、Macchinetta Server 1.x 開発ガイドラインの以下を参照。
本節では、Macchinetta Batch 2.x特有の使い方を中心に説明する。
Linux環境でのOracle JDBC利用時の留意事項について
Linux環境でのOracle JDBCを利用時は、Oracle JDBCが使用するOSの乱数生成器によるロックが発生する。
そのため、ジョブを並列実行しようとしても逐次実行になる事象や片方のコネクションがタイムアウトする事象が発生する。
|
How to use
Macchinetta Batch 2.xでのデータベースアクセス方法を説明する。
なお、チャンクモデルとタスクレットモデルにおけるデータベースアクセス方法の違いに留意する。
Macchinetta Batch 2.xでのデータベースアクセスは、以下の2つの方法がある。
これらはデータベースアクセスするコンポーネントによって使い分ける。
-
MyBatis用のItemReaderおよびItemWriterを利用する。
-
チャンクモデルでのデータベースアクセスによる入出力で使用する。
-
org.mybatis.spring.batch.MyBatisCursorItemReader
-
org.mybatis.spring.batch.MyBatisBatchItemWriter
-
-
-
Mapperインタフェースを利用する
-
チャンクモデルでのビジネスロジック処理で使用する。
-
ItemProcessor実装で利用する。
-
-
タスクレットモデルでのデータベースアクセス全般で使用する。
-
Tasklet実装で利用する。
-
-
共通設定
データベースアクセスにおいて必要な共通設定について説明を行う。
データソースの設定
Macchinetta Batch 2.xでは、2つのデータソースを前提としている。
launch-context.xml
でデフォルト設定している2つのデータソースを示す。
データソース名 | 説明 |
---|---|
|
Spring BatchやMacchinetta Batch 2.xが利用するデータソース |
|
ジョブが利用するデータソース |
JobRepositoryのトランザクション
|
以下に、launch-context.xmlと接続情報のプロパティを示す。
これらをユーザの環境に合わせて設定すること。
<!-- (1) -->
<bean id="adminDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close"
p:driverClassName="${admin.h2.jdbc.driver}"
p:url="${admin.h2.jdbc.url}"
p:username="${admin.h2.jdbc.username}"
p:password="${admin.h2.jdbc.password}"
p:maxTotal="10"
p:minIdle="1"
p:maxWaitMillis="5000"
p:defaultAutoCommit="false"/>
<!-- (2) -->
<bean id="jobDataSource" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close"
p:driverClassName="${jdbc.driver}"
p:url="${jdbc.url}"
p:username="${jdbc.username}"
p:password="${jdbc.password}"
p:maxTotal="10"
p:minIdle="1"
p:maxWaitMillis="5000"
p:defaultAutoCommit="false" />
# (3)
# Admin DataSource settings.
admin.h2.jdbc.driver=org.h2.Driver
admin.h2.jdbc.url=jdbc:h2:mem:batch;DB_CLOSE_DELAY=-1
admin.h2.jdbc.username=sa
admin.h2.jdbc.password=
# (4)
# Job DataSource settings.
jdbc.driver=org.postgresql.Driver
jdbc.url=jdbc:postgresql://localhost:5432/postgres
jdbc.username=postgres
jdbc.password=postgres
項番 | 説明 |
---|---|
(1) |
|
(2) |
|
(3) |
|
(4) |
|
MyBatisの設定
Macchinetta Batch 2.xで、MyBatisの設定をする上で重要な点について説明をする。
バッチ処理を実装する際の重要なポイントの1つとして「大量のデータを一定のリソースで効率よく処理する」が挙げられる。
これに関する設定を説明する。
-
fetchSize
-
一般的なバッチ処理では、大量のデータを処理する際の通信コストを低減するために、 JDBCドライバに適切な
fetchSize
を指定することが必須である。fetchSize
とは、JDBCドライバとデータベース間とで1回の通信で取得するデータ件数を設定するパラメータである。 この値は出来る限り大きい値を設定することが望ましいが、大きすぎるとメモリを圧迫するため、注意が必要である。 ユーザにてチューニングする必要がある箇所と言える。 -
MyBatisでは、全クエリ共通の設定として
defaultFetchSize
を設定することができ、さらにクエリごとのfetchSize
設定で上書きできる。
-
-
executorType
-
一般的なバッチ処理では、同一トランザクション内で同じSQLを
全データ件数/fetchSize
の回数分実行することになる。 この際、都度ステートメントを作成するのではなく再利用することで効率よく処理できる。 -
MyBatisの設定における、
defaultExecutorType
にREUSE
を設定することでステートメントの再利用ができ、 処理スループット向上に寄与する。 -
大量のデータを一度に更新する場合、JDBCのバッチ更新を利用することで性能向上が期待できる。
そのため、MyBatisBatchItemWriter
で利用するSqlSessionTemplate
には、
executorType
に(REUSE
ではなく)BATCH
が設定されている。
-
Macchinetta Batch 2.xでは、同時に2つの異なるExecutorType
が存在する。
一方のExecutorType
で実装する場合が多いと想定するが、併用時は特に注意が必要である。
この点は、Mapperインタフェース(入力)にて詳しく説明する。
MyBatisのその他のパラメータ
その他のパラメータに関しては以下リンクを参照し、 アプリケーションの特性にあった設定を行うこと。 |
以下にデフォルト提供されている設定を示す。
<bean id="jobSqlSessionFactory"
class="org.mybatis.spring.SqlSessionFactoryBean"
p:dataSource-ref="jobDataSource">
<!-- (1) -->
<property name="configuration">
<bean class="org.apache.ibatis.session.Configuration"
p:localCacheScope="STATEMENT"
p:lazyLoadingEnabled="true"
p:aggressiveLazyLoading="false"
p:defaultFetchSize="1000"
p:defaultExecutorType="REUSE"/>
</property>
</bean>
<!-- (2) -->
<bean id="batchModeSqlSessionTemplate"
class="org.mybatis.spring.SqlSessionTemplate"
c:sqlSessionFactory-ref="jobSqlSessionFactory"
c:executorType="BATCH"/>
項番 | 説明 |
---|---|
(1) |
MyBatisの各種設定を行う。 |
(2) |
|
adminDataSourceを利用したSqlSessionFactoryの定義箇所について
同期実行をする場合は、adminDataSourceを利用した META-INF/spring/async-batch-daemon.xml
|
Mapper XMLの定義
Macchinetta Batch 2.x特有の説明事項はないので、Macchinetta Server 1.x 開発ガイドラインの データベースアクセス処理の実装を参照。
MyBatis-Springの設定
MyBatis-Springが提供するItemReaderおよびItemWriterを使用する場合、MapperのConfigで使用するMapper XMLを設定する必要がある。
設定方法としては、以下の2つが考えられる。
-
共通設定として、すべてのジョブで使用するMapper XMLを登録する。
-
META-INF/spring/launch-context.xml
にすべてのMapper XMLを記述することになる。
-
-
個別設定として、ジョブ単位で利用するMapper XMLを登録する。
-
META-INF/jobs/
配下のBean定義に、個々のジョブごとに必要なMapper XMLを記述することになる。
-
基本的な設定方法については、Macchinetta Server 1.x 開発ガイドラインの MyBatis-Springの設定を参照。
共通設定をした場合の性能面での弊害
共通設定をしてしまうと、同期実行をする際に実行するジョブのMapper XMLだけでなく、その他のジョブが使用するMapper XMLも読み込んでしまうために以下に示す弊害が生じる。
これを回避するために、Macchinetta Batch 2.xでは、個別設定として、個々のジョブ定義でそのジョブが必要とするMapper XMLだけを指定する設定方法を採用する。
この方法においては、 |
Macchinetta Batch 2.xでは、複数のSqlSessionFactory
およびSqlSessionTemplate
が定義されているため、
どれを利用するか明示的に指定する必要がある。
基本的にはjobSqlSessionFactory
を指定すればよい。
以下に設定例を示す。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
factory-ref="jobSqlSessionFactory"/>
項番 | 説明 |
---|---|
(1) |
|
入力
データベースアクセスの入力について以下のとおり説明する。
MyBatisCursorItemReader
ここではItemReaderとして
MyBatis-Springが提供するMyBatisCursorItemReader
によるデータベースアクセスについて説明する。
- 機能概要
-
MyBatis-Springが提供するItemReaderとして下記の2つが存在する。
-
org.mybatis.spring.batch.MyBatisCursorItemReader
-
org.mybatis.spring.batch.MyBatisPagingItemReader
-
MyBatisPagingItemReader
は、Macchinetta Server 1.x 開発ガイドラインの
Entityのページネーション検索(SQL絞り込み方式)で
説明している仕組みを利用したItemReaderである。
一定件数を取得した後に再度SQLを発行するため、データの一貫性が保たれない可能性がある。
そのため、バッチ処理で利用するには危険であることから、Macchinetta Batch 2.xでは原則使用しない。
Macchinetta Batch 2.xではMyBatisと連携して、Cursorを利用し取得データを返却するMyBatisCursorItemReader
のみを利用する。
Macchinetta Batch 2.xでは、MyBatis-Springの設定で説明したとおり、
mybatis:scan
によって動的にMapper XMLを登録する方法を採用している。
そのため、Mapper XMLに対応するインタフェースを用意する必要がある。
詳細については、Macchinetta Server 1.x 開発ガイドラインの
データベースアクセス処理の実装を参照。
MyBatisCursorItemReaderのトランザクションについて
|
MyBatisCursorItemReader
を利用してデータベースを参照するための実装例を処理モデルごとに以下に示す。
- チャンクモデルにおける利用方法
-
チャンクモデルで
MyBatisCursorItemReader
を利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReader
の実装例と、実装したMyBatisCursorItemReader
を利用してデータベースから取得したデータを処理するItemProcessor
の実装例を説明する。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository.cursor"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<batch:job id="outputAllCustomerList01" job-repository="jobRepository">
<batch:step id="outputAllCustomerList01.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<!-- (5) -->>
<batch:chunk reader="reader"
processor="retrieveBranchFromContextItemProcessor"
writer="writer" commit-interval="10"/>
<!-- omitted -->
</batch:tasklet>
</batch:step>
</batch:job>
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.mst.CustomerRepository">
<!-- omitted -->
<!-- (7) -->
<select id="cursor" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.mst.Customer">
<![CDATA[
SELECT
customer_id AS customerId,
customer_name AS customerName,
customer_address AS customerAddress,
customer_tel AS customerTel,
charge_branch_id AS chargeBranchId,
create_date AS createDate,
update_date AS updateDate
FROM
customer_mst
ORDER by
charge_branch_id ASC, customer_id ASC
]]>
</select>
<!-- omitted -->
</mapper>
public interface CustomerRepository {
// (8)
Cursor<Customer> cursor();
// omitted.
}
@Component
@Scope("step")
public class RetrieveBranchFromContextItemProcessor implements ItemProcessor<Customer, CustomerWithBranch> {
// omitted.
@Override
public CustomerWithBranch process(Customer item) throws Exception { // (9)
CustomerWithBranch newItem = new CustomerWithBranch(item);
newItem.setBranch(branches.get(item.getChargeBranchId())); // (10)
return newItem;
}
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
(2)で定義した |
(6) |
Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。 |
(7) |
SQLを定義する。 |
(8) |
(7)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。 |
(9) |
引数として受け取るitemの型は、
このクラスで実装しているItemProcessorインタフェースの型引数で指定した入力オブジェクトの型である |
(10) |
引数に渡されたitemを利用して各カラムの値を取得する。 |
- タスクレットモデルにおける利用方法
-
タスクレットモデルで
MyBatisCursorItemReader
を利用してデータベースを参照する実装例を以下に示す。
ここでは、MyBatisCursorItemReader
の実装例と、実装したMyBatisCursorItemReader
を利用してデータベースから取得したデータを処理するTasklet
の実装例を説明する。
タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。
タスクレットモデルではチャンクモデルと異なり、Tasklet
実装においてリソースを明示的にオープン/クローズする必要がある。
また、入力データの読み込みも明示的に行う。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="summarizeDetails" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.summarizeDetails"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<batch:job id="customizedJobExitCodeTaskletJob" job-repository="jobRepository">
<batch:step id="customizedJobExitCodeTaskletJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager" ref="checkAmountTasklet"/>
</batch:step>
<!-- omitted -->
</batch:job>
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">
<!-- omitted -->
<!-- (6) -->
<select id="summarizeDetails" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanSummary">
<![CDATA[
SELECT
branch_id AS branchId, year, month, SUM(amount) AS amount
FROM
sales_plan_detail
GROUP BY
branch_id, year, month
ORDER BY
branch_id ASC, year ASC, month ASC
]]>
</select>
</mapper>
public interface SalesPlanDetailRepository {
// (7)
List<SalesPlanSummary> summarizeDetails();
// omitted.
}
@Component
@Scope("step")
public class CheckAmountTasklet implements Tasklet {
// (8)
@Inject
ItemStreamReader<SalesPlanSummary> reader;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
SalesPlanSummary item = null;
List<SalesPlanSummary> items = new ArrayList<>(CHUNK_SIZE);
int errorCount = 0;
try {
// (9)
reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());
while ((item = reader.read()) != null) { // (10)
if (item.getAmount().signum() == -1) {
logger.warn("amount is negative. skip item [item: {}]", item);
errorCount++;
continue;
}
// omitted.
}
// catch block is omitted.
} finally {
// (11)
reader.close();
}
}
// omitted.
return RepeatStatus.FINISHED;
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。 |
(6) |
SQLを定義する。 |
(7) |
(6)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。 |
(8) |
|
(9) |
入力リソースをオープンする。 |
(10) |
入力データを1件ずつ読み込む。 |
(11) |
入力リソースをクローズする。 |
- 検索条件の指定方法
-
データベースアクセスの際に検索条件を指定して検索を行いたい場合は、 Bean定義にてMap形式でジョブパラメータから値を取得し、キーを設定することで検索条件を指定することができる。 以下にジョブパラメータを指定したジョブ起動コマンドの例と実装例を示す。
$ java -cp ${CLASSPATH} org.springframework.batch.core.launch.support.CommandLineJobRunner /META-INF/job/job001 job001 year=2017 month=12
<!-- (1) -->
<select id="findByYearAndMonth"
resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
<![CDATA[
SELECT
branch_id AS branchId, year, month, amount
FROM
sales_performance_summary
WHERE
year = #{year} AND month = #{month}
ORDER BY
branch_id ASC
]]>
</select>
<!-- omitted -->
<!-- omitted -->
<!-- (2) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues"> <!-- (3) -->
<map>
<!-- (4) -->
<entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
<entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>
<!-- omitted -->
</map>
</property>
</bean>
<!-- omitted -->
項番 | 説明 |
---|---|
(1) |
検索条件を指定して取得するSQLを定義する。 |
(2) |
データベースからデータを取得するための |
(3) |
|
(4) |
検索条件にする値をジョブパラメータから取得し、キーに設定することで検索条件を指定する。
SQLの引数が数値型で定義されているため、 |
StepExectionContextによる検索指定方法について
@BeforeStepなどジョブの前処理で検索条件を指定する場合は、 |
Mapperインタフェース(入力)
ItemReader以外でデータベースの参照を行うにはMapperインタフェースを利用する。
ここではMapperインタフェースを利用したデータベースの参照について説明する。
- 機能概要
-
Mapperインタフェースを利用するにあたって、Macchinetta Batch 2.xでは以下の制約を設けている。
処理 | ItemProcessor | Tasklet | リスナー |
---|---|---|---|
参照 |
利用可 |
利用可 |
利用可 |
更新 |
条件付で利用可 |
利用可 |
利用不可 |
- チャンクモデルにおける利用方法
-
チャンクモデルでMapperインタフェースを利用してデータベースを参照する実装例を以下に示す。
@Component
public class UpdateItemFromDBProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
// (1)
@Inject
CustomerRepository customerRepository;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
// (2)
Customer customer = customerRepository.findOne(readItem.getCustomerId());
// omitted.
return writeItem;
}
}
<!-- (3) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
template-ref="batchModeSqlSessionTemplate"/>
<!-- (4) -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.performance.SalesPerformanceDetailRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- omitted job definition -->
MapperインタフェースとMapper XMLについてはMyBatisCursorItemReader で説明している内容以外に特筆すべきことがないため省略する。
項番 |
説明 |
(1) |
MapperインタフェースをInjectする。 |
(2) |
Mapperインタフェースで検索処理を実行する。 |
(3) |
Mapper XMLの登録を行う。 |
(4) |
|
MyBatisCursorItemReader設定の補足
以下に示す定義例のように、MyBatisCursorItemReaderとMyBatisBatchItemWriterで異なる
|
- タスクレットモデルにおける利用方法
-
タスクレットモデルでMapperインタフェースを利用してデータベースを参照する実装例を以下に示す。
大量データを処理する場合、Cursorを利用して取得データを1件ずつ処理することでメモリ容量をひっ迫せずに効率良く処理することができる。 Macchinetta Batch 2.xでは、タスクレットモデルでMapperインタフェースを利用してデータベースアクセスする場合はCursorを利用することを基本とする。
Cursor同様に大量データを処理するうえで、ResultHandlerを利用することも有効である。 ResultHandlerについては、Macchinetta Server 1.x 開発ガイドラインの ResultHandlerの実装を参照。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository;jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
factory-ref="jobSqlSessionFactory"/>
<batch:job id="jobSalesPlanCursorTasklet" job-repository="jobRepository">
<batch:step id="jobSalesPlanCursorTasklet.step01">
<batch:tasklet transaction-manager="jobTransactionManager" ref="salesPlanCursorTasklet"/>
</batch:step>
</batch:job>
<!-- (2) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">
<!-- omitted -->
<!-- (3) -->
<select id="cursor" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
SELECT
branch_id AS branchId, year, month, customer_id AS customerId, amount
FROM
sales_plan_detail
ORDER BY
branch_id ASC, year ASC, month ASC, customer_id ASC
]]>
</select>
</mapper>
public interface SalesRepository {
// (4)
Cursor<SalesPlanDetail> cursor();
}
@Component
public class SalesPlanCursorTasklet implements Tasklet {
// omitted.
// (5)
@Inject
SalesRepository repository;
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
try (Cursor<SalesPlanDetail> cursor = repository.cursor()) { // (6)
for (SalesPlanDetail salesPlan : cursor) { // (7)
// omitted.
}
}
return RepeatStatus.FINISHED;
}
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。 |
(3) |
SQLを定義する。 |
(4) |
(3)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。 |
(5) |
MapperインタフェースをInjectする。 |
(6) |
Cursorから1件ずつデータを取得する。 |
(7) |
拡張for文によりCursorをフェッチする。 |
Cursor利用時の注意点
Cursorで読み取り中のテーブルを他の処理で更新をかけると、 読み取り済みの古いデータを処理してしまうことになり、データの不整合が発生する可能性があるため注意すること。 これを防ぐために安易にロックをかけるとデッドロックを引き起こしかねないため、 排他制御を参照し適切にロックをかけるか、 テーブルアクセスが競合しないようにジョブ設計することを検討してほしい。 |
出力
データベースアクセスの出力について以下のとおり説明する。
MyBatisBatchItemWriter
ここではItemWriterとして
MyBatis-Springが提供するMyBatisBatchItemWriter
によるデータベースアクセスについて説明する。
- 機能概要
-
MyBatis-Springが提供するItemWriterは以下の1つのみである。
-
org.mybatis.spring.batch.MyBatisBatchItemWriter
-
MyBatisBatchItemWriter
はMyBatisと連携してJDBCのバッチ更新機能を利用するItemWriterであり、
大量のデータを一度に更新する場合に性能向上が期待できる。
基本的な設定については、MyBatisCursorItemReaderと同じである。
MyBatisBatchItemWriter
では、MyBatisの設定で説明した
batchModeSqlSessionTemplate
を指定する必要がある。
MyBatisBatchItemWriter
を利用してデータベースを更新するための実装例を以下に示す。
- チャンクモデルにおける利用方法
-
チャンクモデルで
MyBatisBatchItemWriter
を利用してデータベースを更新(登録)する実装例を以下に示す。
ここでは、MyBatisBatchItemWriter
の実装例と、実装したMyBatisBatchItemWriter
を利用するItemProcessor
の実装例を説明する。ItemProcessor
実装では取得したデータをMyBatisBatchItemWriter
を利用してデータベースの更新を行っている。
<!-- (1) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="writer"
class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="step"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository.branchExclusiveUpdate"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"
p:assertUpdates="#{new Boolean(jobParameters['assertUpdates'])}"/>
<batch:job id="chunkOptimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="chunkOptimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader" processor="branchEditItemProcessor"
writer="writer" commit-interval="10"/> <!-- (5) -->
</batch:tasklet>
</batch:step>
</batch:job>
<!-- (6) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository.ExclusiveControlRepository">
<!-- (7) -->
<update id="branchExclusiveUpdate" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.model.ExclusiveBranch">
<![CDATA[
UPDATE
branch_mst
SET
branch_name = #{branchName,jdbcType=VARCHAR},
branch_address = #{branchAddress,jdbcType=VARCHAR},
branch_tel = #{branchTel,jdbcType=VARCHAR},
update_date = #{updateDate,jdbcType=TIMESTAMP}
WHERE
branch_id = #{branchId}
AND
branch_name = #{oldBranchName}
]]>
</update>
<!-- omitted -->
</mapper>
public interface ExclusiveControlRepository {
// (8)
int branchExclusiveUpdate(ExclusiveBranch branch);
// omitted.
}
@Component
@Scope("step")
public class BranchEditItemProcessor implements ItemProcessor<Branch, ExclusiveBranch> {
// omitted.
@Override
public ExclusiveBranch process(Branch item) throws Exception { // (9)
ExclusiveBranch branch = new ExclusiveBranch();
branch.setBranchId(item.getBranchId());
branch.setBranchName(item.getBranchName() + " - " + identifier);
branch.setBranchAddress(item.getBranchAddress() + " - " + identifier);
branch.setBranchTel(item.getBranchTel());
branch.setCreateDate(item.getUpdateDate());
branch.setUpdateDate(new Timestamp(clock.millis()));
branch.setOldBranchName(item.getBranchName());
// (10)
return branch;
}
}
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
(2)で定義した |
(6) |
Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。 |
(7) |
SQLを定義する。 |
(8) |
(7)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。 |
(9) |
返り値の型は、このクラスで実装しているItemProcessorインタフェースの型引数で指定した 出力オブジェクトの型である |
(10) |
更新データを設定したDTOオブジェクトを返すことでデータベースへ出力する。 |
- タスクレットモデルにおける利用方法
-
タスクレットモデルで
MyBatisBatchItemWriter
を利用してデータベースを更新(登録)する実装例を以下に示す。
ここでは、MyBatisBatchItemWriter
の実装例と実装したMyBatisBatchItemWriter
を利用するTasklet
の実装例を説明する。 タスクレットモデルでチャンクモデルのコンポーネントを利用する際の留意点についてはチャンクモデルのコンポーネントを利用するTasklet実装を参照。
<!-- (1) -->
<mybatis:scan base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan"
factory-ref="jobSqlSessionFactory"/>
<!-- (2) (3) (4) -->
<bean id="writer" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository.create"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<batch:job id="taskletJobWithListenerWithinJobScope" job-repository="jobRepository">
<batch:step id="taskletJobWithListenerWithinJobScope.step01">
<batch:tasklet transaction-manager="jobTransactionManager" ref="salesPlanDetailRegisterTasklet"/>
</batch:step>
<!-- omitted. -->
</batch:job>
<!-- (5) -->
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository.plan.SalesPlanDetailRepository">
<!-- (6) -->
<insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
INSERT INTO
sales_plan_detail(branch_id, year, month, customer_id, amount)
VALUES (
#{branchId}, #{year}, #{month}, #{customerId}, #{amount}
)
]]>
</insert>
<!-- omitted -->
</mapper>
public interface SalesPlanDetailRepository {
// (7)
void create(SalesPlanDetail salesPlanDetail);
// omitted.
}
@Component
@Scope("step")
public class SalesPlanDetailRegisterTasklet implements Tasklet {
// omitted.
// (8)
@Inject
ItemWriter<SalesPlanDetail> writer;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
SalesPlanDetail item = null;
try {
reader.open(chunkContext.getStepContext().getStepExecution().getExecutionContext());
List<SalesPlanDetail> items = new ArrayList<>(); // (9)
while ((item = reader.read()) != null) {
items.add(processor.process(item)); // (10)
if (items.size() == 10) {
writer.write(items); // (11)
items.clear();
}
}
// omitted.
}
// omitted.
return RepeatStatus.FINISHED;
}
}
MapperインタフェースとMapper XMLについてはMyBatisBatchItemWriter で説明している内容以外に特筆すべきことがないため省略する。
項番 | 説明 |
---|---|
(1) |
Mapper XMLの登録を行う。 |
(2) |
|
(3) |
|
(4) |
|
(5) |
Mapper XMLを定義する。namespaceの値とインタフェースのFQCNを一致させること。 |
(6) |
SQLを定義する。 |
(7) |
(6)で定義したSQLのIDに対応するメソッドをインタフェースに定義する。 |
(8) |
|
(9) |
出力データを格納するリストを定義する。 |
(10) |
リストに更新データを設定する。 |
(11) |
更新データを設定したリストを引数に指定して、データベースへ出力する。 |
Mapperインタフェース(出力)
ItemWriter以外でデータベースの更新を行うにはMapperインタフェースを利用する。
ここではMapperインタフェースを利用したデータベースの更新について説明する。
- 機能概要
-
Mapperインタフェースを利用してデータベースアクセスするうえでのMacchinetta Batch 2.xとしての制約はMapperインタフェース(入力)を参照。
さらにItemProcessor、TaskletでMapperインタフェースを利用したデータベースの更新を行う際には以下の制約がある。 - ItemProcessorでの制約
-
ItemWriterに
MyBatisBatchItemWriter
を利用する場合、ItemProcessorではMapperインタフェースを利用してデータベースの更新処理を行うことができないという制約がある。
これは、MyBatisBatchItemWriter
がSQL実行後に自身が発行したSQLによる更新処理が行われたかをチェックしており、 その際に同一トランザクション内での他の更新処理を検知するとエラーを発生させることによるものである。
よって、ItemProcessorでMapperインタフェースを利用したデータベースの更新はできず、参照のみが可能となる。
ItemProcessor内で更新処理を行いたいケースとして、「特定の条件に当てはまる入力データについて、 ItemWriterとは異なる更新処理をするためにItemProcessorで個別に更新処理を行う」などが考えられるが、上記の制約によりこのような利用はできない。
代替手段として、後述のタスクレットモデルによる実装、もしくはItemProcessor、ItemWriterでの更新をステップやジョブで分離したバッチ処理の設計を検討されたい。
|
- Taskletでの制約
-
Taskletでは、Mapperインタフェースを利用することが基本であるため、ItemProcessorのような影響はない。
MyBatisBatchItemWriter
をInjectして利用することも考えられるが、その場合はMapperインタフェース自体をBATCH
設定で処理すればよい。つまり、Taskletでは、MyBatisBatchItemWriter
をInjectして使う必要は基本的にない。
- タスクレットモデルにおける利用方法
-
タスクレットモデルでMapperインタフェースを利用してデータベースを更新(登録)する実装例を以下に示す。
@Component
public class OptimisticLockTasklet implements Tasklet {
// (1)
@Inject
ExclusiveControlRepository repository;
// omitted.
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
Branch branch = repository.branchFindOne(branchId);
// (2)
ExclusiveBranch exclusiveBranch = new ExclusiveBranch();
exclusiveBranch.setBranchId(branch.getBranchId());
exclusiveBranch.setBranchName(branch.getBranchName() + " - " + identifier);
exclusiveBranch.setBranchAddress(branch.getBranchAddress() + " - " + identifier);
exclusiveBranch.setBranchTel(branch.getBranchTel());
exclusiveBranch.setCreateDate(branch.getUpdateDate());
exclusiveBranch.setUpdateDate(new Timestamp(clock.millis()));
exclusiveBranch.setOldBranchName(branch.getBranchName());
// (3)
int result = repository.branchExclusiveUpdate(exclusiveBranch);
// omitted.
return RepeatStatus.FINISHED;
}
}
<!-- (4) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.exclusivecontrol.repository"
factory-ref="jobSqlSessionFactory"/>
<batch:job id="taskletOptimisticLockCheckJob" job-repository="jobRepository">
<batch:step id="taskletOptimisticLockCheckJob.step01">
<batch:tasklet transaction-manager="jobTransactionManager"
ref="optimisticLockTasklet"> <!-- (5) -->
</batch:tasklet>
</batch:step>
</batch:job>
MapperインタフェースとMapper XMLは省略する。
項番 | 説明 |
---|---|
(1) |
MapperインタフェースをInjectする。 |
(2) |
DTOオブジェクトを生成して、更新データを設定する。 |
(3) |
更新データを設定したDTOオブジェクトを引数に指定して、Mapperインタフェースで更新処理を実行する。 |
(4) |
Mapper XMLの登録を行う。 |
(5) |
MapperインタフェースをInjectしTaskletを設定する。 |
リスナーでのデータベースアクセス
リスナーでのデータベースアクセスは他のコンポーネントと連携することが多い。 使用するリスナー及び実装方法によっては、Mapperインタフェースで取得したデータを、 他のコンポーネントへ引き渡す仕組みを追加で用意する必要がある。
リスナーでMapperインタフェースを利用してデータベースアクセスを実装するにあたり、以下の制約がある。
- リスナーでの制約
-
リスナーでもItemProcessorでの制約と同じ制約が成立する。 加えて、リスナーでは、更新を必要とするユースケースが考えにくい。よって、リスナーでは、更新系処理を行うことを推奨しない。
リスナーで想定される更新処理の代替
|
ここでは一例として、StepExecutionListenerで ステップ実行前にデータを取得して、ItemProcessorで取得したデータを利用する例を示す。
public class CacheSetListener extends StepExecutionListenerSupport {
// (1)
@Inject
CustomerRepository customerRepository;
// (2)
@Inject
CustomerCache cache;
@Override
public void beforeStep(StepExecution stepExecution) {
// (3)
for(Customer customer : customerRepository.findAll()) {
cache.addCustomer(customer.getCustomerId(), customer);
}
}
}
@Component
public class UpdateItemFromCacheProcessor implements
ItemProcessor<SalesPerformanceDetail, SalesPlanDetail> {
// (4)
@Inject
CustomerCache cache;
@Override
public SalesPlanDetail process(SalesPerformanceDetail readItem) throws Exception {
Customer customer = cache.getCustomer(readItem.getCustomerId()); // (5)
SalesPlanDetail writeItem = new SalesPlanDetail();
// omitted.
writerItem.setCustomerName(customer.getCustomerName); // (6)
return writeItem;
}
}
// (7)
@Component
public class CustomerCache {
Map<String, Customer> customerMap = new HashMap<>();
public Customer getCustomer(String customerId) {
return customerMap.get(customerId);
}
public void addCustomer(String id, Customer customer) {
customerMap.put(id, customer);
}
}
<!-- omitted -->
<!-- (8) -->
<mybatis:scan
base-package="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.repository"
template-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="cacheSetListener"
class="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.CacheSetListener"/>
<!-- omitted -->
<batch:job id="DBAccessByItemListener" job-repository="jobRepository">
<batch:step id="DBAccessByItemListener.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader"
processor="updateItemFromCacheProcessor"
writer="writer" commit-interval="10"/> <!-- (10) -->
<!-- (11) -->
<batch:listeners>
<batch:listener ref="cacheSetListener"/>
</batch:listeners>
</batch:tasklet>
</batch:step>
</batch:job>
項番 |
説明 |
(1) |
MapperインタフェースをInjectする。 |
(2) |
Mapperインタフェースから取得したデータをキャッシュするためのBeanをInjectする。 |
(3) |
リスナーにて、Mapperインタフェースからデータを取得してキャッシュする。 |
(4) |
(2)で設定したキャッシュと同じBeanをInjectする。 |
(5) |
キャッシュから該当するデータを取得する。 |
(6) |
更新データにキャッシュからのデータを反映する。 |
(7) |
キャッシュクラスをコンポーネントとして実装する。 |
(8) |
Mapper XMLの登録を行う。 |
(9) |
Mapperインタフェースを利用するリスナーを定義する。 |
(10) |
キャッシュを利用するItemProcessorを指定する。 |
(11) |
(9)で定義したリスナーを登録する。 |
リスナーでのSqlSessionFactoryの利用
上記の例では、 チャンクのスコープ外で動作するリスナーについては、トランザクション外で処理されるため、
|
How to extend
CompositeItemWriterにおける複数テーブルの更新
チャンクモデルで、1つの入力データに対して複数のテーブルへ更新を行いたい場合は、Spring Batchが提供するCompositeItemWriter
を利用し、
各テーブルに対応したMyBatisBatchItemWriter
を連結することで実現できる。
ここでは、売上計画と売上実績の2つのテーブルを更新する場合の実装例を示す。
ItemProcessor
の実装例@Component
public class SalesItemProcessor implements ItemProcessor<SalesPlanDetail, SalesDTO> {
@Override
public SalesDTO process(SalesPlanDetail item) throws Exception { // (1)
SalesDTO salesDTO = new SalesDTO();
// (2)
SalesPerformanceDetail spd = new SalesPerformanceDetail();
spd.setBranchId(item.getBranchId());
spd.setYear(item.getYear());
spd.setMonth(item.getMonth());
spd.setCustomerId(item.getCustomerId());
spd.setAmount(new BigDecimal(0L));
salesDTO.setSalesPerformanceDetail(spd);
// (3)
item.setAmount(item.getAmount().add(new BigDecimal(1L)));
salesDTO.setSalesPlanDetail(item);
return salesDTO;
}
}
public class SalesDTO implements Serializable {
// (4)
private SalesPlanDetail salesPlanDetail;
// (5)
private SalesPerformanceDetail salesPerformanceDetail;
// omitted
}
<mapper namespace="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository">
<select id="findAll" resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.plan.SalesPlanDetail">
<![CDATA[
SELECT
branch_id AS branchId, year, month, customer_id AS customerId, amount
FROM
sales_plan_detail
ORDER BY
branch_id ASC, year ASC, month ASC, customer_id ASC
]]>
</select>
<!-- (6) -->
<update id="update" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
<![CDATA[
UPDATE
sales_plan_detail
SET
amount = #{salesPlanDetail.amount}
WHERE
branch_id = #{salesPlanDetail.branchId}
AND
year = #{salesPlanDetail.year}
AND
month = #{salesPlanDetail.month}
AND
customer_id = #{salesPlanDetail.customerId}
]]>
</update>
<!-- (7) -->
<insert id="create" parameterType="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.SalesDTO">
<![CDATA[
INSERT INTO
sales_performance_detail(
branch_id,
year,
month,
customer_id,
amount
)
VALUES (
#{salesPerformanceDetail.branchId},
#{salesPerformanceDetail.year},
#{salesPerformanceDetail.month},
#{salesPerformanceDetail.customerId},
#{salesPerformanceDetail.amount}
)
]]>
</insert>
</mapper>
CompositeItemWriter
の適用例<!-- reader using MyBatisCursorItemReader -->
<bean id="reader" class="org.mybatis.spring.batch.MyBatisCursorItemReader"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.findAll"
p:sqlSessionFactory-ref="jobSqlSessionFactory"/>
<!-- writer MyBatisBatchItemWriter -->
<!-- (8) -->
<bean id="planWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.update"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<!-- (9) -->
<bean id="performanceWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter"
p:statementId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch05.dbaccess.repository.SalesRepository.create"
p:sqlSessionTemplate-ref="batchModeSqlSessionTemplate"/>
<!-- (10) -->
<bean id="writer" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<!-- (11)-->
<list>
<ref bean="performanceWriter"/>
<ref bean="planWriter"/>
</list>
</property>
</bean>
<!-- (12) -->
<batch:job id="useCompositeItemWriter" job-repository="jobRepository">
<batch:step id="useCompositeItemWriter.step01">
<batch:tasklet transaction-manager="jobTransactionManager">
<batch:chunk reader="reader"
processor="salesItemProcessor"
writer="writer" commit-interval="3"/>
</batch:tasklet>
</batch:step>
</batch:job>
項番 |
説明 |
(1) |
入力データに対して2つのテーブルを更新するための各エンティティを保持するDTOを出力とする |
(2) |
売上実績(SalesPerformanceDetail)を新規作成するためのエンティティを作成し、DTOに格納する。 |
(3) |
入力データでもある売上計画(SalesPlanDetail)を更新するため、入力データを更新してDTOに格納する。 |
(4) |
売上計画(SalesPlanDetail)を保持するようにDTOに定義する。 |
(5) |
売上実績(SalesPerformanceDetail)を保持するようにDTOに定義する。 |
(6) |
DTOから取得した売上計画(SalesPlanDetail)で、売上計画テーブル(sales_plan_detail)を更新するSQLを定義する。 |
(7) |
DTOから取得した売上実績(SalesPlanDetail)で、売上実績テーブル(sales_performance_detail)を新規作成するSQLを定義する。 |
(8) |
売上計画テーブル(sales_plan_detail)を更新する |
(9) |
売上実績テーブル(sales_performance_detail)を新規作成する |
(10) |
(8),(9)を順番に実行するために |
(11) |
|
(12) |
チャンクの |
複数データソースへの出力(1ステップ)で説明した
また、CompositeItemWriterは、ItemWriter実装であれば連結できるので、 MyBatisBatchItemWriterとFlatFileItemWriterを設定することで、データベース出力とファイル出力を同時に行うこともできる。 |
検索条件の指定方法
データベースアクセスの際に検索条件を指定して検索を行いたい場合は、 Bean定義にてMap形式でジョブパラメータから値を取得し、キーを設定することで検索条件を指定することができる。 以下にジョブパラメータを指定したジョブ起動コマンドの例と実装例を示す。
$ java -cp ${CLASSPATH} org.springframework.batch.core.launch.support.CommandLineJobRunner /META-INF/job/job001 job001 year=2017 month=12
<!-- (1) -->
<select id="findByYearAndMonth"
resultType="jp.co.ntt.fw.macchinetta.batch.functionaltest.app.model.performance.SalesPerformanceSummary">
<![CDATA[
SELECT
branch_id AS branchId, year, month, amount
FROM
sales_performance_summary
WHERE
year = #{year} AND month = #{month}
ORDER BY
branch_id ASC
]]>
</select>
<!-- omitted -->
<!-- omitted -->
<!-- (2) -->
<bean id="reader"
class="org.mybatis.spring.batch.MyBatisCursorItemReader" scope="step"
p:queryId="jp.co.ntt.fw.macchinetta.batch.functionaltest.ch08.parallelandmultiple.repository.SalesSummaryRepository.findByYearAndMonth"
p:sqlSessionFactory-ref="jobSqlSessionFactory">
<property name="parameterValues"> <!-- (3) -->
<map>
<!-- (4) -->
<entry key="year" value="#{jobParameters['year']}" value-type="java.lang.Integer"/>
<entry key="month" value="#{jobParameters['month']}" value-type="java.lang.Integer"/>
<!-- omitted -->
</map>
</property>
</bean>
<!-- omitted -->
項番 | 説明 |
---|---|
(1) |
検索条件を指定して取得するSQLを定義する。 |
(2) |
データベースからデータを取得するための |
(3) |
|
(4) |
検索条件にする値をジョブパラメータから取得し、キーに設定することで検索条件を指定する。
SQLの引数が数値型で定義されているため、 |
StepExectionContextによる検索指定方法について
@BeforeStepなどジョブの前処理で検索条件を指定する場合は、 |