refactor(dataacquisition): 优化设备运行时长更新逻辑并添加分布式锁

This commit is contained in:
zxy 2026-04-24 11:46:38 +08:00
parent ea0bb9732a
commit 6e92b0dd0e

View File

@ -11,6 +11,8 @@ import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.Data
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO;
import com.chanko.yunxi.mes.module.heli.dal.mysql.dataacquisition.DataAcquisitionMapper;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
@ -18,6 +20,7 @@ import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -34,6 +37,9 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
@Resource
private DataAcquisitionMapper dataAcquisitionMapper;
@Resource
private RedissonClient redissonClient;
// 时间格式正则匹配 数字H 数字M 数字S
private static final Pattern TIME_PATTERN = Pattern.compile("(\\d+)H(\\d+)M(\\d+)S");
@ -143,54 +149,138 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
}
}
private static String addFiveSeconds(String runningDuration) {
if (runningDuration == null || runningDuration.trim().isEmpty()) {
log.warn("运行时长为空");
return null;
}
Matcher matcher = TIME_PATTERN.matcher(runningDuration);
if (!matcher.matches()) {
log.error("运行时长格式错误: {},期望格式: XHXMXS例如: 28863H38M22S", runningDuration);
return null;
}
try {
int hour = Integer.parseInt(matcher.group(1));
int min = Integer.parseInt(matcher.group(2));
int sec = Integer.parseInt(matcher.group(3));
sec += 5;
if (sec >= 60) {
min++;
sec -= 60;
}
if (min >= 60) {
hour++;
min -= 60;
}
return String.format("%dH%02dM%02dS", hour, min, sec);
} catch (NumberFormatException e) {
log.error("解析运行时长数值失败: {}", runningDuration, e);
return null;
}
}
@Override
public void updateRunningDuration(String code) {
// 1. 根据传入的 code 查询最新一条数据
LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DataAcquisitionDO::getCode, code);
queryWrapper.orderByAsc(DataAcquisitionDO::getId);
List<DataAcquisitionDO> dataAcquisitionDOS = dataAcquisitionMapper.selectList(queryWrapper);
if (CollectionUtil.isEmpty(dataAcquisitionDOS)) {
if (code == null || code.trim().isEmpty()) {
log.warn("设备编码为空,跳过更新");
return;
}
DataAcquisitionDO dataAcquisitionDO = dataAcquisitionDOS.get(0);
String runningDuration = dataAcquisitionDO.getRunningDuration();
dataAcquisitionDOS.remove(dataAcquisitionDO);
for (DataAcquisitionDO acquisitionDO : dataAcquisitionDOS) {
runningDuration = getString(runningDuration);
acquisitionDO.setRunningDuration(runningDuration);
dataAcquisitionMapper.updateById(acquisitionDO);
String lockKey = "dataAcquisition:updateRunningDuration:" + code;
RLock lock = redissonClient.getLock(lockKey);
boolean locked = false;
try {
locked = lock.tryLock(0, 300, TimeUnit.SECONDS);
if (!locked) {
log.warn("设备 [{}] 的运行时长更新任务正在执行中,请勿重复提交", code);
return;
}
log.info("开始更新设备 [{}] 的运行时长", code);
LambdaQueryWrapper<DataAcquisitionDO> firstQuery = new LambdaQueryWrapper<>();
firstQuery.eq(DataAcquisitionDO::getCode, code);
firstQuery.orderByAsc(DataAcquisitionDO::getId);
firstQuery.last("LIMIT 1");
DataAcquisitionDO firstData = dataAcquisitionMapper.selectOne(firstQuery);
if (firstData == null || firstData.getRunningDuration() == null) {
log.warn("未找到设备 [{}] 的数据", code);
return;
}
String baseRunningDuration = firstData.getRunningDuration();
log.info("设备 [{}] 的基准运行时长: {}", code, baseRunningDuration);
int batchSize = 1000;
Long lastId = firstData.getId();
List<DataAcquisitionDO> batchList;
int totalUpdated = 0;
do {
LambdaQueryWrapper<DataAcquisitionDO> batchQuery = new LambdaQueryWrapper<>();
batchQuery.eq(DataAcquisitionDO::getCode, code);
batchQuery.gt(DataAcquisitionDO::getId, lastId);
batchQuery.orderByAsc(DataAcquisitionDO::getId);
batchQuery.last("LIMIT " + batchSize);
batchList = dataAcquisitionMapper.selectList(batchQuery);
if (CollectionUtil.isNotEmpty(batchList)) {
List<DataAcquisitionDO> toUpdate = new java.util.ArrayList<>();
String currentDuration = baseRunningDuration;
for (DataAcquisitionDO acquisitionDO : batchList) {
String newDuration = addFiveSeconds(currentDuration);
if (newDuration != null) {
acquisitionDO.setRunningDuration(newDuration);
toUpdate.add(acquisitionDO);
currentDuration = newDuration;
} else {
log.error("设备 [{}] 计算运行时长失败,当前值: {}", code, currentDuration);
}
}
if (CollectionUtil.isNotEmpty(toUpdate)) {
dataAcquisitionMapper.updateBatch(toUpdate);
totalUpdated += toUpdate.size();
log.info("设备 [{}] 已更新批次数据 {} 条,最后一条时长: {}", code, toUpdate.size(), currentDuration);
baseRunningDuration = currentDuration;
}
lastId = batchList.get(batchList.size() - 1).getId();
}
} while (CollectionUtil.isNotEmpty(batchList));
log.info("设备 [{}] 运行时长更新完成,共更新 {} 条记录", code, totalUpdated);
} catch (InterruptedException e) {
log.error("设备 [{}] 运行时长更新被中断", code, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("设备 [{}] 运行时长更新失败", code, e);
throw e;
} finally {
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug("设备 [{}] 的锁已释放", code);
}
}
}
public static void main(String[] args) {
String duration = "28863H38M22S";
String string = getString(duration);
String string = addFiveSeconds(duration);
System.out.println( string);
}
private static String getString(String runningDuration) {
// 2. 解析时间 28863H41M32S
Matcher matcher = TIME_PATTERN.matcher(runningDuration);
int hour = Integer.parseInt(matcher.group(1));
int min = Integer.parseInt(matcher.group(2));
int sec = Integer.parseInt(matcher.group(3));
// 3. +5
sec += 5;
// 秒进位
if (sec >= 60) {
min++;
sec -= 60;
}
// 分钟进位
if (min >= 60) {
hour++;
min -= 60;
}
// 4. 拼接新时间自动补0
return String.format("%dH%02dM%02dS", hour, min, sec);
}
}