Commit 5c09468c by yangxianglong

MySQL InnoDB 行锁 + 嵌套事务优化

parent 8f00f421
...@@ -215,14 +215,15 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -215,14 +215,15 @@ public class ItemSyncServiceImpl implements ItemSyncService {
pageNo++; pageNo++;
} }
// 进入长事务前最后一次写进度:事务内禁止再 updateProgress(REQUIRES_NEW),否则与外层长事务争用 item_sync_log 行锁(Lock wait timeout)
itemSyncProgressService.updateProgress(logId, itemSyncProgressService.updateProgress(logId,
String.format("拉取完成,乐檬返回约 %d 条,有效转换 %d 条;开始处理并推送汉尼", lemengTotal, allRows.size()), String.format("拉取完成,有效 %d 条;接下来将分批推送汉尼并写库(耗时可能数分钟,此阶段进度不再刷新直至完成)",
allRows.size()),
lemengTotal); lemengTotal);
final int lemengTotalFinal = lemengTotal; final int lemengTotalFinal = lemengTotal;
transactionTemplate.execute(status -> { transactionTemplate.execute(status -> {
// 不在此事务内预先 findById(ItemSyncLog):长事务持有 item_sync_log 行会与 // 不在此事务内预先 findById(ItemSyncLog);且不在此事务内调用 updateProgress:见类注释与 push 批次逻辑。
// ItemSyncProgressService(REQUIRES_NEW 更新同一行)互相等待,导致 Lock wait timeout。
processAndSyncToHanniFromList(allRows, logId, lemengTotalFinal); processAndSyncToHanniFromList(allRows, logId, lemengTotalFinal);
return null; return null;
}); });
...@@ -330,9 +331,7 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -330,9 +331,7 @@ public class ItemSyncServiceImpl implements ItemSyncService {
details.add(d); details.add(d);
} }
itemSyncProgressService.updateProgress(logId, log.info("分类完成:待推送汉尼新增 {} 条、更新 {} 条", toAdd.size(), toUpdate.size());
String.format("分类完成:待推送汉尼新增 %d 条、更新 %d 条", toAdd.size(), toUpdate.size()),
null);
StringBuilder responseBuilder = new StringBuilder(); StringBuilder responseBuilder = new StringBuilder();
...@@ -347,11 +346,10 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -347,11 +346,10 @@ public class ItemSyncServiceImpl implements ItemSyncService {
} }
if (toAdd.isEmpty() && toUpdate.isEmpty()) { if (toAdd.isEmpty() && toUpdate.isEmpty()) {
itemSyncProgressService.updateProgress(logId, "无需推送汉尼(均为跳过),正在写入明细", null); log.info("无需推送汉尼(均为跳过),写入明细");
} }
itemSyncProgressService.updateProgress(logId, log.info("写入同步明细,共 {} 条", details.size());
String.format("正在写入同步明细(共 %d 条)…", details.size()), null);
saveDetailsInChunks(details); saveDetailsInChunks(details);
ItemSyncLog syncLog = itemSyncLogRepository.findById(logId) ItemSyncLog syncLog = itemSyncLogRepository.findById(logId)
...@@ -374,9 +372,7 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -374,9 +372,7 @@ public class ItemSyncServiceImpl implements ItemSyncService {
List<HanniItemDTO> batchItems = new ArrayList<>(toAdd.subList(i, end)); List<HanniItemDTO> batchItems = new ArrayList<>(toAdd.subList(i, end));
List<LemonItemRow> batchRows = new ArrayList<>(addRows.subList(i, end)); List<LemonItemRow> batchRows = new ArrayList<>(addRows.subList(i, end));
int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1; int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1;
itemSyncProgressService.updateProgress(logId, log.info("汉尼新增 第 {}/{} 批,本批 {} 条(共 {} 条)", batchNo, batches, batchItems.size(), total);
String.format("正在调用汉尼新增接口…(总计 %d 条,第 %d/%d 批,本批 %d 条)",
total, batchNo, batches, batchItems.size()), null);
String r = hanniApiClient.saveItems(batchItems); String r = hanniApiClient.saveItems(batchItems);
if (r != null) { if (r != null) {
if (responseBuilder.length() > 0) { if (responseBuilder.length() > 0) {
...@@ -396,7 +392,6 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -396,7 +392,6 @@ public class ItemSyncServiceImpl implements ItemSyncService {
upsertSnapshot(rr); upsertSnapshot(rr);
} }
} }
itemSyncProgressService.updateProgress(logId, "汉尼新增接口已完成,正在处理快照", null);
} }
private void pushHanniUpdatesInBatches(Long logId, StringBuilder responseBuilder, private void pushHanniUpdatesInBatches(Long logId, StringBuilder responseBuilder,
...@@ -409,9 +404,7 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -409,9 +404,7 @@ public class ItemSyncServiceImpl implements ItemSyncService {
List<HanniItemDTO> batchItems = new ArrayList<>(toUpdate.subList(i, end)); List<HanniItemDTO> batchItems = new ArrayList<>(toUpdate.subList(i, end));
List<LemonItemRow> batchRows = new ArrayList<>(updateRows.subList(i, end)); List<LemonItemRow> batchRows = new ArrayList<>(updateRows.subList(i, end));
int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1; int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1;
itemSyncProgressService.updateProgress(logId, log.info("汉尼更新 第 {}/{} 批,本批 {} 条(共 {} 条)", batchNo, batches, batchItems.size(), total);
String.format("正在调用汉尼更新接口…(总计 %d 条,第 %d/%d 批,本批 %d 条)",
total, batchNo, batches, batchItems.size()), null);
String r = hanniApiClient.updateItems(batchItems); String r = hanniApiClient.updateItems(batchItems);
if (r != null) { if (r != null) {
if (responseBuilder.length() > 0) { if (responseBuilder.length() > 0) {
...@@ -433,7 +426,6 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -433,7 +426,6 @@ public class ItemSyncServiceImpl implements ItemSyncService {
upsertSnapshot(rr); upsertSnapshot(rr);
} }
} }
itemSyncProgressService.updateProgress(logId, "汉尼更新接口已完成,正在处理快照", null);
} }
private static String normalizeMod(String s) { private static String normalizeMod(String s) {
...@@ -461,8 +453,8 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -461,8 +453,8 @@ public class ItemSyncServiceImpl implements ItemSyncService {
} }
/** /**
* 批量写入明细。不在 insert 子表(带 sync_log_id 外键)之后调用 {@link ItemSyncProgressService#updateProgress}: * 批量写入明细。长事务内不得调用 {@link ItemSyncProgressService#updateProgress}:
* InnoDB 在插入子行时会对父表 item_sync_log 加共享锁,REQUIRES_NEW 的 UPDATE 同一父行会锁等待直至超时。 * 插入子表(sync_log_id 外键)后父行有共享锁;且整段长事务与 REQUIRES_NEW 更新同一父行会锁等待超时。
*/ */
private void saveDetailsInChunks(List<ItemSyncDetail> details) { private void saveDetailsInChunks(List<ItemSyncDetail> details) {
int batch = 200; int batch = 200;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment