Commit 8f00f421 by yangxianglong

MySQL InnoDB 行锁 + 嵌套事务优化

parent f3409bf3
...@@ -49,6 +49,9 @@ import java.util.concurrent.Executor; ...@@ -49,6 +49,9 @@ import java.util.concurrent.Executor;
@Service @Service
public class ItemSyncServiceImpl implements ItemSyncService { public class ItemSyncServiceImpl implements ItemSyncService {
/** 汉尼 receiveGoods/updateGoods 单次请求条数,避免单次 POST 过大导致读超时 */
private static final int HANNI_PUSH_BATCH_SIZE = 500;
@Resource @Resource
private LemonApiClient lemonApiClient; private LemonApiClient lemonApiClient;
...@@ -218,9 +221,9 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -218,9 +221,9 @@ public class ItemSyncServiceImpl implements ItemSyncService {
final int lemengTotalFinal = lemengTotal; final int lemengTotalFinal = lemengTotal;
transactionTemplate.execute(status -> { transactionTemplate.execute(status -> {
ItemSyncLog managed = itemSyncLogRepository.findById(logId) // 不在此事务内预先 findById(ItemSyncLog):长事务持有 item_sync_log 行会与
.orElseThrow(() -> new IllegalStateException("item_sync_log 不存在: " + logId)); // ItemSyncProgressService(REQUIRES_NEW 更新同一行)互相等待,导致 Lock wait timeout。
processAndSyncToHanniFromList(allRows, managed, lemengTotalFinal); processAndSyncToHanniFromList(allRows, logId, lemengTotalFinal);
return null; return null;
}); });
} }
...@@ -242,11 +245,11 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -242,11 +245,11 @@ public class ItemSyncServiceImpl implements ItemSyncService {
return new ItemSyncDetailPageDTO(p.getContent(), p.getTotalElements()); return new ItemSyncDetailPageDTO(p.getContent(), p.getTotalElements());
} }
private void processAndSyncToHanniFromList(List<LemonItemRow> rows, ItemSyncLog syncLog, int lemengTotalCount) { private void processAndSyncToHanniFromList(List<LemonItemRow> rows, Long logId, int lemengTotalCount) {
Long logId = syncLog.getId();
syncLog.setLemengCount(lemengTotalCount);
if (rows == null || rows.isEmpty()) { if (rows == null || rows.isEmpty()) {
ItemSyncLog syncLog = itemSyncLogRepository.findById(logId)
.orElseThrow(() -> new IllegalStateException("item_sync_log 不存在: " + logId));
syncLog.setLemengCount(lemengTotalCount);
syncLog.setHanniCount(0); syncLog.setHanniCount(0);
syncLog.setStatus("SKIP"); syncLog.setStatus("SKIP");
syncLog.setErrorMsg("转换后无有效商品数据"); syncLog.setErrorMsg("转换后无有效商品数据");
...@@ -254,7 +257,6 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -254,7 +257,6 @@ public class ItemSyncServiceImpl implements ItemSyncService {
return; return;
} }
syncLog.setHanniCount(rows.size());
log.info("待处理 {} 条商品(按乐檬最后修改时间跳过未变化)", rows.size()); log.info("待处理 {} 条商品(按乐檬最后修改时间跳过未变化)", rows.size());
List<ItemSyncDetail> details = new ArrayList<>(); List<ItemSyncDetail> details = new ArrayList<>();
...@@ -335,15 +337,55 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -335,15 +337,55 @@ public class ItemSyncServiceImpl implements ItemSyncService {
StringBuilder responseBuilder = new StringBuilder(); StringBuilder responseBuilder = new StringBuilder();
if (!toAdd.isEmpty()) { if (!toAdd.isEmpty()) {
log.info("新增 {} 条商品到汉尼(每批最多 {} 条)", toAdd.size(), HANNI_PUSH_BATCH_SIZE);
pushHanniAddsInBatches(logId, responseBuilder, toAdd, addRows, addDetailByBc);
}
if (!toUpdate.isEmpty()) {
log.info("更新 {} 条变更商品到汉尼(每批最多 {} 条)", toUpdate.size(), HANNI_PUSH_BATCH_SIZE);
pushHanniUpdatesInBatches(logId, responseBuilder, toUpdate, updateRows, updateDetailByBc);
}
if (toAdd.isEmpty() && toUpdate.isEmpty()) {
itemSyncProgressService.updateProgress(logId, "无需推送汉尼(均为跳过),正在写入明细", null);
}
itemSyncProgressService.updateProgress(logId,
String.format("正在写入同步明细(共 %d 条)…", details.size()), null);
saveDetailsInChunks(details);
ItemSyncLog syncLog = itemSyncLogRepository.findById(logId)
.orElseThrow(() -> new IllegalStateException("item_sync_log 不存在: " + logId));
syncLog.setLemengCount(lemengTotalCount);
syncLog.setHanniCount(rows.size());
syncLog.setStatus("SUCCESS");
syncLog.setHanniResponse(truncateStr(responseBuilder.toString(), 2000));
syncLog.setProgressMessage("已完成");
itemSyncLogRepository.save(syncLog);
}
private void pushHanniAddsInBatches(Long logId, StringBuilder responseBuilder,
List<HanniItemDTO> toAdd, List<LemonItemRow> addRows,
Map<String, ItemSyncDetail> addDetailByBc) {
int total = toAdd.size();
int batches = (total + HANNI_PUSH_BATCH_SIZE - 1) / HANNI_PUSH_BATCH_SIZE;
for (int i = 0; i < total; i += HANNI_PUSH_BATCH_SIZE) {
int end = Math.min(i + HANNI_PUSH_BATCH_SIZE, total);
List<HanniItemDTO> batchItems = new ArrayList<>(toAdd.subList(i, end));
List<LemonItemRow> batchRows = new ArrayList<>(addRows.subList(i, end));
int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1;
itemSyncProgressService.updateProgress(logId, itemSyncProgressService.updateProgress(logId,
String.format("正在调用汉尼新增接口…(%d 条)", toAdd.size()), null); String.format("正在调用汉尼新增接口…(总计 %d 条,第 %d/%d 批,本批 %d 条)",
log.info("新增 {} 条商品到汉尼", toAdd.size()); total, batchNo, batches, batchItems.size()), null);
String r = hanniApiClient.saveItems(toAdd); String r = hanniApiClient.saveItems(batchItems);
if (r != null) { if (r != null) {
responseBuilder.append("add:").append(r); if (responseBuilder.length() > 0) {
responseBuilder.append(';');
}
responseBuilder.append("add[").append(batchNo).append("]:").append(r);
} }
String shortR = truncateStr(r, 500); String shortR = truncateStr(r, 500);
for (LemonItemRow rr : addRows) { for (LemonItemRow rr : batchRows) {
ItemSyncDetail d = addDetailByBc.get(rr.getItem().getBarcode()); ItemSyncDetail d = addDetailByBc.get(rr.getItem().getBarcode());
if (d != null) { if (d != null) {
d.setLemonLastModified(rr.getLemonLastModified()); d.setLemonLastModified(rr.getLemonLastModified());
...@@ -353,19 +395,32 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -353,19 +395,32 @@ public class ItemSyncServiceImpl implements ItemSyncService {
} }
upsertSnapshot(rr); upsertSnapshot(rr);
} }
itemSyncProgressService.updateProgress(logId, "汉尼新增接口已完成,正在处理快照", null);
} }
itemSyncProgressService.updateProgress(logId, "汉尼新增接口已完成,正在处理快照", null);
}
if (!toUpdate.isEmpty()) { private void pushHanniUpdatesInBatches(Long logId, StringBuilder responseBuilder,
List<HanniItemDTO> toUpdate, List<LemonItemRow> updateRows,
Map<String, ItemSyncDetail> updateDetailByBc) {
int total = toUpdate.size();
int batches = (total + HANNI_PUSH_BATCH_SIZE - 1) / HANNI_PUSH_BATCH_SIZE;
for (int i = 0; i < total; i += HANNI_PUSH_BATCH_SIZE) {
int end = Math.min(i + HANNI_PUSH_BATCH_SIZE, total);
List<HanniItemDTO> batchItems = new ArrayList<>(toUpdate.subList(i, end));
List<LemonItemRow> batchRows = new ArrayList<>(updateRows.subList(i, end));
int batchNo = i / HANNI_PUSH_BATCH_SIZE + 1;
itemSyncProgressService.updateProgress(logId, itemSyncProgressService.updateProgress(logId,
String.format("正在调用汉尼更新接口…(%d 条)", toUpdate.size()), null); String.format("正在调用汉尼更新接口…(总计 %d 条,第 %d/%d 批,本批 %d 条)",
log.info("更新 {} 条变更商品到汉尼", toUpdate.size()); total, batchNo, batches, batchItems.size()), null);
String r = hanniApiClient.updateItems(toUpdate); String r = hanniApiClient.updateItems(batchItems);
if (r != null) { if (r != null) {
responseBuilder.append(" update:").append(r); if (responseBuilder.length() > 0) {
responseBuilder.append(';');
}
responseBuilder.append("update[").append(batchNo).append("]:").append(r);
} }
String shortR = truncateStr(r, 500); String shortR = truncateStr(r, 500);
for (LemonItemRow rr : updateRows) { for (LemonItemRow rr : batchRows) {
String bc = rr.getItem().getBarcode(); String bc = rr.getItem().getBarcode();
ItemSyncDetail d = updateDetailByBc.get(bc); ItemSyncDetail d = updateDetailByBc.get(bc);
if (d != null) { if (d != null) {
...@@ -377,21 +432,8 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -377,21 +432,8 @@ public class ItemSyncServiceImpl implements ItemSyncService {
} }
upsertSnapshot(rr); upsertSnapshot(rr);
} }
itemSyncProgressService.updateProgress(logId, "汉尼更新接口已完成,正在处理快照", null);
}
if (toAdd.isEmpty() && toUpdate.isEmpty()) {
itemSyncProgressService.updateProgress(logId, "无需推送汉尼(均为跳过),正在写入明细", null);
} }
itemSyncProgressService.updateProgress(logId, "汉尼更新接口已完成,正在处理快照", null);
itemSyncProgressService.updateProgress(logId,
String.format("正在写入同步明细(共 %d 条)…", details.size()), null);
saveDetailsInChunks(details, logId);
syncLog.setStatus("SUCCESS");
syncLog.setHanniResponse(truncateStr(responseBuilder.toString(), 2000));
syncLog.setProgressMessage("已完成");
itemSyncLogRepository.save(syncLog);
} }
private static String normalizeMod(String s) { private static String normalizeMod(String s) {
...@@ -418,15 +460,18 @@ public class ItemSyncServiceImpl implements ItemSyncService { ...@@ -418,15 +460,18 @@ public class ItemSyncServiceImpl implements ItemSyncService {
itemSyncSnapshotRepository.save(s); itemSyncSnapshotRepository.save(s);
} }
private void saveDetailsInChunks(List<ItemSyncDetail> details, Long logId) { /**
* 批量写入明细。不在 insert 子表(带 sync_log_id 外键)之后调用 {@link ItemSyncProgressService#updateProgress}:
* InnoDB 在插入子行时会对父表 item_sync_log 加共享锁,REQUIRES_NEW 的 UPDATE 同一父行会锁等待直至超时。
*/
private void saveDetailsInChunks(List<ItemSyncDetail> details) {
int batch = 200; int batch = 200;
int total = details.size(); int total = details.size();
for (int i = 0; i < total; i += batch) { for (int i = 0; i < total; i += batch) {
int end = Math.min(i + batch, total); int end = Math.min(i + batch, total);
itemSyncDetailRepository.saveAll(details.subList(i, end)); itemSyncDetailRepository.saveAll(details.subList(i, end));
if (logId != null && total > batch) { if (total > batch) {
itemSyncProgressService.updateProgress(logId, log.debug("已写入同步明细 {}/{}", end, total);
String.format("正在写入同步明细 %d/%d …", end, total), null);
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment