refactor(heli): 重构数据采集服务接口和实现

This commit is contained in:
zxy 2026-03-27 15:54:23 +08:00
parent 69d1184fa5
commit aaedb814d0
5 changed files with 114 additions and 159 deletions

View File

@ -22,6 +22,7 @@ import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid; import javax.validation.Valid;
import java.io.IOException; import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import static com.chanko.yunxi.mes.framework.common.pojo.CommonResult.success; import static com.chanko.yunxi.mes.framework.common.pojo.CommonResult.success;
@ -114,14 +115,14 @@ public class DataAcquisitionController {
@GetMapping("/updateTime") @GetMapping("/updateTime")
@Operation(summary = "更新设备数据采集") @Operation(summary = "更新设备数据采集")
@PreAuthorize("@ss.hasPermission('heli:data-acquisition:update')") @PreAuthorize("@ss.hasPermission('heli:data-acquisition:update')")
public CommonResult<Boolean> updateTime(@RequestParam("code") String code) { public CommonResult<Boolean> updateTime(@RequestParam("acquisitionTime") @org.springframework.format.annotation.DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") LocalDateTime acquisitionTime) {
dataAcquisitionService.updateTime(code, null); dataAcquisitionService.updateTime(acquisitionTime);
return success(true); return success(true);
} }
@GetMapping("/stopCode") @GetMapping("/updateRunningDuration")
@Operation(summary = "停止设备数据采集") @Operation(summary = "停止设备数据采集")
public CommonResult<Boolean> stopCode(@RequestParam("code") String code) { public CommonResult<Boolean> updateRunningDuration(@RequestParam("code") String code) {
dataAcquisitionService.stopUpdateTask(code); dataAcquisitionService.updateRunningDuration(code);
return success(true); return success(true);
} }
} }

View File

@ -60,8 +60,8 @@ public interface DataAcquisitionService {
List<DataAcquisitionDO> getDataAcquisitionList(String code); List<DataAcquisitionDO> getDataAcquisitionList(String code);
void updateTime(String code,LocalDateTime acquisitionTime); void updateTime(LocalDateTime acquisitionTime);
void stopUpdateTask(String code); void updateRunningDuration(String code);
} }

View File

@ -1,9 +1,9 @@
package com.chanko.yunxi.mes.module.heli.service.dataacquisition; package com.chanko.yunxi.mes.module.heli.service.dataacquisition;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.chanko.yunxi.mes.framework.common.exception.ErrorCode;
import com.chanko.yunxi.mes.framework.common.pojo.PageResult; import com.chanko.yunxi.mes.framework.common.pojo.PageResult;
import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils; import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionPageReqVO; import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionPageReqVO;
@ -18,11 +18,9 @@ import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static com.chanko.yunxi.mes.framework.common.exception.util.ServiceExceptionUtil.exception;
/** /**
* 设备数据采集 Service 实现类 * 设备数据采集 Service 实现类
@ -40,9 +38,6 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
// 时间格式正则匹配 数字H 数字M 数字S // 时间格式正则匹配 数字H 数字M 数字S
private static final Pattern TIME_PATTERN = Pattern.compile("(\\d+)H(\\d+)M(\\d+)S"); private static final Pattern TIME_PATTERN = Pattern.compile("(\\d+)H(\\d+)M(\\d+)S");
// 用于保存正在运行的线程防止重复启动
private final Map<String, Thread> runningThreads = new ConcurrentHashMap<>();
@Override @Override
public Long createDataAcquisition(DataAcquisitionSaveReqVO createReqVO) { public Long createDataAcquisition(DataAcquisitionSaveReqVO createReqVO) {
// 插入 // 插入
@ -118,138 +113,87 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
} }
@Override @Override
public void updateTime(String code, LocalDateTime acquisitionTime) { public void updateTime(LocalDateTime acquisitionTime) {
// 1. 根据传入的 code 查询最新一条数据
LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DataAcquisitionDO::getCode, code);
queryWrapper.orderByDesc(DataAcquisitionDO::getId);
queryWrapper.last("limit 1");
DataAcquisitionDO data = dataAcquisitionMapper.selectOne(queryWrapper);
if (data == null) {
throw exception(new ErrorCode(400, "设备信息不存在!"));
}
// 正在运行的线程,打印运行中的线程code
runningThreads.forEach((key, value) -> {
log.info("正在运行的线程,code:{}", key);
});
startUpdateTask(data);
// 查询所有数据按 code 分组并按 id 排序id 代表插入顺序 // 查询所有数据按 code 分组并按 id 排序id 代表插入顺序
// List<DataAcquisitionDO> allData = dataAcquisitionMapper.selectList( List<DataAcquisitionDO> allData = dataAcquisitionMapper.selectList(
// new LambdaQueryWrapper<DataAcquisitionDO>() new LambdaQueryWrapper<DataAcquisitionDO>()
// .orderByAsc(DataAcquisitionDO::getId) .orderByAsc(DataAcquisitionDO::getId)
// ); );
//
// if (allData == null || allData.isEmpty()) {
// return;
// }
//
// // code 分组
// Map<String, List<DataAcquisitionDO>> groupedByCode = allData.stream()
// .collect(java.util.stream.Collectors.groupingBy(DataAcquisitionDO::getCode, java.util.stream.Collectors.toList()));
//
// // 遍历每个分组更新时间
// for (Map.Entry<String, List<DataAcquisitionDO>> entry : groupedByCode.entrySet()) {
// List<DataAcquisitionDO> groupData = entry.getValue();
// for (int i = 0; i < groupData.size(); i++) {
// DataAcquisitionDO record = groupData.get(i);
// // 第一条记录使用 updateReqVO 的时间之后的每条增加5秒
// LocalDateTime newCreateTime = acquisitionTime.plusSeconds(i * 5L);
//
// // 更新该条记录的创建时间
// record.setCreateTime(newCreateTime);
// dataAcquisitionMapper.updateById(record);
// }
// }
}
/** if (allData == null || allData.isEmpty()) {
* 外部调用传入 code 自动每5秒执行一次时间+5秒
*
* @param dataAcquisitionDO 外部传入01#02#03#...
*/
public void startUpdateTask(DataAcquisitionDO dataAcquisitionDO) {
String code = dataAcquisitionDO.getCode();
// 如果已经在运行不再重复启动
if (runningThreads.containsKey(code)) {
return; return;
} }
Thread thread = new Thread(() -> { // code 分组
while (!Thread.currentThread().isInterrupted()) { Map<String, List<DataAcquisitionDO>> groupedByCode = allData.stream()
try { .collect(Collectors.groupingBy(DataAcquisitionDO::getCode, Collectors.toList()));
LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>(); // 遍历每个分组更新时间
queryWrapper.eq(DataAcquisitionDO::getCode, code); for (Map.Entry<String, List<DataAcquisitionDO>> entry : groupedByCode.entrySet()) {
queryWrapper.orderByDesc(DataAcquisitionDO::getId); List<DataAcquisitionDO> groupData = entry.getValue();
queryWrapper.last("limit 1"); for (int i = 0; i < groupData.size(); i++) {
DataAcquisitionDO data = dataAcquisitionMapper.selectOne(queryWrapper); DataAcquisitionDO record = groupData.get(i);
if (data == null) { // 第一条记录使用 updateReqVO 的时间之后的每条增加5秒
Thread.sleep(5000); LocalDateTime newCreateTime = acquisitionTime.plusSeconds(i * 5L);
continue;
}
// 2. 解析时间 28863H41M32S // 更新该条记录的创建时间
String timeStr = data.getRunningDuration(); record.setCreateTime(newCreateTime);
Matcher matcher = TIME_PATTERN.matcher(timeStr); dataAcquisitionMapper.updateById(record);
if (!matcher.matches()) {
Thread.sleep(5000);
continue;
}
int hour = Integer.parseInt(matcher.group(1));
int min = Integer.parseInt(matcher.group(2));
int sec = Integer.parseInt(matcher.group(3));
// 3. +5
sec += 5;
// 秒进位
if (sec >= 60) {
min++;
sec -= 60;
}
// 分钟进位
if (min >= 60) {
hour++;
min -= 60;
}
// 4. 拼接新时间自动补0
String newTime = String.format("%dH%02dM%02dS", hour, min, sec);
// 5. 更新数据库
data.setRunningDuration(newTime);
data.setUpdateTime(LocalDateTime.now());
dataAcquisitionMapper.updateById(data);
} catch (Exception e) {
e.printStackTrace();
}
// 每5秒执行一次
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
} }
});
thread.start();
runningThreads.put(code, thread);
}
/**
* 停止某个 code 的任务
*/
@Override
public void stopUpdateTask(String code) {
Thread thread = runningThreads.get(code);
if (thread != null) {
thread.interrupt();
runningThreads.remove(code);
} }
} }
@Override
public void updateRunningDuration(String code) {
// 1. 根据传入的 code 查询最新一条数据
LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DataAcquisitionDO::getCode, code);
queryWrapper.orderByAsc(DataAcquisitionDO::getId);
List<DataAcquisitionDO> dataAcquisitionDOS = dataAcquisitionMapper.selectList(queryWrapper);
if (CollectionUtil.isEmpty(dataAcquisitionDOS)) {
return;
}
DataAcquisitionDO dataAcquisitionDO = dataAcquisitionDOS.get(0);
String runningDuration = dataAcquisitionDO.getRunningDuration();
dataAcquisitionDOS.remove(dataAcquisitionDO);
for (DataAcquisitionDO acquisitionDO : dataAcquisitionDOS) {
runningDuration = getString(runningDuration);
acquisitionDO.setRunningDuration(runningDuration);
dataAcquisitionMapper.updateById(acquisitionDO);
}
}
public static void main(String[] args) {
String duration = "28863H38M22S";
String string = getString(duration);
System.out.println( string);
}
private static String getString(String runningDuration) {
// 2. 解析时间 28863H41M32S
Matcher matcher = TIME_PATTERN.matcher(runningDuration);
if (!matcher.matches()) {
return runningDuration;
}
int hour = Integer.parseInt(matcher.group(1));
int min = Integer.parseInt(matcher.group(2));
int sec = Integer.parseInt(matcher.group(3));
// 3. +5
sec += 5;
// 秒进位
if (sec >= 60) {
min++;
sec -= 60;
}
// 分钟进位
if (min >= 60) {
hour++;
min -= 60;
}
// 4. 拼接新时间自动补0
return String.format("%dH%02dM%02dS", hour, min, sec);
}
} }

View File

@ -48,10 +48,10 @@ export const getDataAcquisitionList = async (code: string) => {
return await request.get({ url: `/heli/data-acquisition/getDataAcquisitionList`, params: { code } }) return await request.get({ url: `/heli/data-acquisition/getDataAcquisitionList`, params: { code } })
} }
// 新增客户新表 // 新增客户新表
export const updateTime = async (code: string) => { export const updateTime = async (acquisitionTime) => {
return await request.get({ url: `/heli/data-acquisition/updateTime`,params: {code } }) return await request.get({ url: `/heli/data-acquisition/updateTime`,params: { acquisitionTime } })
} }
export const stopCode = async (code: string) => { export const updateRunningDuration = async (code: string) => {
return await request.get({ url: `/heli/data-acquisition/updateTime`,params: {code } }) return await request.get({ url: `/heli/data-acquisition/updateRunningDuration`,params: {code } })
} }

View File

@ -12,21 +12,34 @@
:inline="true" :inline="true"
label-width="108px" label-width="108px"
> >
<!-- <el-form-item label="开始时间" prop="startTime">--> <el-form-item label="开始时间" prop="startTime">
<!-- <el-date-picker--> <el-date-picker
<!-- v-model="queryParams.acquisitionTime"--> v-model="queryParams.acquisitionTime"
<!-- value-format="YYYY-MM-DD HH:mm:ss"--> value-format="YYYY-MM-DD HH:mm:ss"
<!-- type="datetime"--> type="datetime"
<!-- placeholder="选择时间"--> placeholder="选择时间"
<!-- class="!w-400px"--> class="!w300px"
<!-- />--> />
<!-- </el-form-item>--> </el-form-item>
<el-form-item>
<el-button @click="handleQuery" type="primary"> 更新</el-button>
</el-form-item>
</el-form>
<!-- 搜索工作栏 -->
<el-form
class="-mb-15px"
:model="queryParams"
ref="queryFormRef"
:inline="true"
label-width="108px"
>
<el-form-item label="设备编号" prop="code"> <el-form-item label="设备编号" prop="code">
<el-input v-model="queryParams.code" placeholder="请输入设备编号" clearable /> <el-input v-model="queryParams.code" placeholder="请输入设备编号" clearable class="!w300px"
/>
</el-form-item> </el-form-item>
<el-form-item> <el-form-item>
<el-button @click="handleQuery" type="primary"> 启动</el-button> <el-button @click="updateRunningDuration" type="primary"> 更新</el-button>
<el-button @click="stopCode" type="primary"> 停止</el-button>
</el-form-item> </el-form-item>
</el-form> </el-form>
</ContentWrap> </ContentWrap>
@ -64,17 +77,14 @@ const queryFormRef = ref() // 搜索的表单
/** 搜索按钮操作 */code: [{ required: true, message: '设备编号不能为空', trigger: 'blur' }] /** 搜索按钮操作 */code: [{ required: true, message: '设备编号不能为空', trigger: 'blur' }]
const handleQuery = async () => { const handleQuery = async () => {
if (!queryParams.code) { const data = await DataAcquisitionApi.updateTime(queryParams.acquisitionTime)
return message.error('设备编号不能为空')
}
const data = await DataAcquisitionApi.updateTime(queryParams.code)
message.success("更新成功") message.success("更新成功")
} }
const stopCode = async () => { const updateRunningDuration = async () => {
if (!queryParams.code) { if (!queryParams.code) {
return message.error('设备编号不能为空') return message.error('设备编号不能为空')
} }
const data = await DataAcquisitionApi.stopCode(queryParams.code) const data = await DataAcquisitionApi.updateRunningDuration(queryParams.code)
message.success("停止成功") message.success("停止成功")
} }