feat(heli): 更新设备数据采集功能实现定时任务控制

This commit is contained in:
zxy 2026-03-27 14:35:47 +08:00
parent d40c3ceff7
commit 69d1184fa5
6 changed files with 235 additions and 93 deletions

View File

@ -1,35 +1,31 @@
package com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition; package com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition;
import org.apache.ibatis.annotations.Param; import com.chanko.yunxi.mes.framework.common.pojo.CommonResult;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import org.springframework.validation.annotation.Validated;
import org.springframework.security.access.prepost.PreAuthorize;
import io.swagger.v3.oas.annotations.tags.Tag;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.Operation;
import javax.validation.*;
import javax.servlet.http.*;
import java.time.LocalDateTime;
import java.util.*;
import java.io.IOException;
import com.chanko.yunxi.mes.framework.common.pojo.PageParam; import com.chanko.yunxi.mes.framework.common.pojo.PageParam;
import com.chanko.yunxi.mes.framework.common.pojo.PageResult; import com.chanko.yunxi.mes.framework.common.pojo.PageResult;
import com.chanko.yunxi.mes.framework.common.pojo.CommonResult;
import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils; import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils;
import static com.chanko.yunxi.mes.framework.common.pojo.CommonResult.success;
import com.chanko.yunxi.mes.framework.excel.core.util.ExcelUtils; import com.chanko.yunxi.mes.framework.excel.core.util.ExcelUtils;
import com.chanko.yunxi.mes.framework.operatelog.core.annotations.OperateLog; import com.chanko.yunxi.mes.framework.operatelog.core.annotations.OperateLog;
import static com.chanko.yunxi.mes.framework.operatelog.core.enums.OperateTypeEnum.*; import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionPageReqVO;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionRespVO;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.*; import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionSaveReqVO;
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO; import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO;
import com.chanko.yunxi.mes.module.heli.service.dataacquisition.DataAcquisitionService; import com.chanko.yunxi.mes.module.heli.service.dataacquisition.DataAcquisitionService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import javax.validation.Valid;
import java.io.IOException;
import java.util.List;
import static com.chanko.yunxi.mes.framework.common.pojo.CommonResult.success;
import static com.chanko.yunxi.mes.framework.operatelog.core.enums.OperateTypeEnum.EXPORT;
@Tag(name = "管理后台 - 设备数据采集") @Tag(name = "管理后台 - 设备数据采集")
@RestController @RestController
@ -114,12 +110,18 @@ public class DataAcquisitionController {
List<DataAcquisitionDO> dataAcquisition = dataAcquisitionService.getDataAcquisitionList(code); List<DataAcquisitionDO> dataAcquisition = dataAcquisitionService.getDataAcquisitionList(code);
return success(dataAcquisition); return success(dataAcquisition);
} }
@GetMapping("/updateTime") @GetMapping("/updateTime")
@Operation(summary = "更新设备数据采集") @Operation(summary = "更新设备数据采集")
@PreAuthorize("@ss.hasPermission('heli:data-acquisition:update')") @PreAuthorize("@ss.hasPermission('heli:data-acquisition:update')")
public CommonResult<Boolean> updateTime(@RequestParam("acquisitionTime") @org.springframework.format.annotation.DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") LocalDateTime acquisitionTime) { public CommonResult<Boolean> updateTime(@RequestParam("code") String code) {
dataAcquisitionService.updateTime(acquisitionTime); dataAcquisitionService.updateTime(code, null);
return success(true);
}
@GetMapping("/stopCode")
@Operation(summary = "停止设备数据采集")
public CommonResult<Boolean> stopCode(@RequestParam("code") String code) {
dataAcquisitionService.stopUpdateTask(code);
return success(true); return success(true);
} }
} }

View File

@ -1,11 +1,11 @@
package com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition; package com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition;
import lombok.*; import com.baomidou.mybatisplus.annotation.KeySequence;
import java.util.*; import com.baomidou.mybatisplus.annotation.TableField;
import java.time.LocalDateTime; import com.baomidou.mybatisplus.annotation.TableId;
import java.time.LocalDateTime; import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.*;
import com.chanko.yunxi.mes.framework.mybatis.core.dataobject.BaseDO; import com.chanko.yunxi.mes.framework.mybatis.core.dataobject.BaseDO;
import lombok.*;
/** /**
* 设备数据采集 DO * 设备数据采集 DO

View File

@ -1,11 +1,13 @@
package com.chanko.yunxi.mes.module.heli.service.dataacquisition; package com.chanko.yunxi.mes.module.heli.service.dataacquisition;
import java.time.LocalDateTime;
import java.util.*;
import javax.validation.*;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.*;
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO;
import com.chanko.yunxi.mes.framework.common.pojo.PageResult; import com.chanko.yunxi.mes.framework.common.pojo.PageResult;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionPageReqVO;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionSaveReqVO;
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO;
import javax.validation.Valid;
import java.time.LocalDateTime;
import java.util.List;
/** /**
* 设备数据采集 Service 接口 * 设备数据采集 Service 接口
@ -58,6 +60,8 @@ public interface DataAcquisitionService {
List<DataAcquisitionDO> getDataAcquisitionList(String code); List<DataAcquisitionDO> getDataAcquisitionList(String code);
void updateTime(LocalDateTime acquisitionTime); void updateTime(String code,LocalDateTime acquisitionTime);
void stopUpdateTask(String code);
} }

View File

@ -3,19 +3,24 @@ package com.chanko.yunxi.mes.module.heli.service.dataacquisition;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.chanko.yunxi.mes.framework.common.exception.ErrorCode;
import com.chanko.yunxi.mes.framework.common.pojo.PageResult;
import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionPageReqVO;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.DataAcquisitionSaveReqVO;
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO;
import com.chanko.yunxi.mes.module.heli.dal.mysql.dataacquisition.DataAcquisitionMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import com.chanko.yunxi.mes.module.heli.controller.admin.dataacquisition.vo.*; import java.util.concurrent.ConcurrentHashMap;
import com.chanko.yunxi.mes.module.heli.dal.dataobject.dataacquisition.DataAcquisitionDO; import java.util.regex.Matcher;
import com.chanko.yunxi.mes.framework.common.pojo.PageResult; import java.util.regex.Pattern;
import com.chanko.yunxi.mes.framework.common.util.object.BeanUtils;
import com.chanko.yunxi.mes.module.heli.dal.mysql.dataacquisition.DataAcquisitionMapper;
import static com.chanko.yunxi.mes.framework.common.exception.util.ServiceExceptionUtil.exception; import static com.chanko.yunxi.mes.framework.common.exception.util.ServiceExceptionUtil.exception;
@ -26,11 +31,18 @@ import static com.chanko.yunxi.mes.framework.common.exception.util.ServiceExcept
*/ */
@Service @Service
@Validated @Validated
@Slf4j
public class DataAcquisitionServiceImpl implements DataAcquisitionService { public class DataAcquisitionServiceImpl implements DataAcquisitionService {
@Resource @Resource
private DataAcquisitionMapper dataAcquisitionMapper; private DataAcquisitionMapper dataAcquisitionMapper;
// 时间格式正则匹配 数字H 数字M 数字S
private static final Pattern TIME_PATTERN = Pattern.compile("(\\d+)H(\\d+)M(\\d+)S");
// 用于保存正在运行的线程防止重复启动
private final Map<String, Thread> runningThreads = new ConcurrentHashMap<>();
@Override @Override
public Long createDataAcquisition(DataAcquisitionSaveReqVO createReqVO) { public Long createDataAcquisition(DataAcquisitionSaveReqVO createReqVO) {
// 插入 // 插入
@ -78,10 +90,10 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
// 查找每个 code 分组中 status=1 的最大 createTime 之后的 status=0 的最小记录的 id // 查找每个 code 分组中 status=1 的最大 createTime 之后的 status=0 的最小记录的 id
Integer integer = dataAcquisitionMapper.selectStatus(); Integer integer = dataAcquisitionMapper.selectStatus();
System.out.println("=== equipment_job_config status: " + integer); System.out.println("=== equipment_job_config status: " + integer);
if (ObjectUtil.isNotEmpty( integer)&&integer==0){ if (ObjectUtil.isNotEmpty(integer) && integer == 0) {
List<Long> ids = dataAcquisitionMapper.selectNextStatusZeroIds(); List<Long> ids = dataAcquisitionMapper.selectNextStatusZeroIds();
System.out.println("=== selectNextStatusZeroIds returned " + (ids == null ? 0 : ids.size()) + " ids: " + ids); System.out.println("=== selectNextStatusZeroIds returned " + (ids == null ? 0 : ids.size()) + " ids: " + ids);
if (ObjectUtil.isNotEmpty(ids)){ if (ObjectUtil.isNotEmpty(ids)) {
LambdaUpdateWrapper<DataAcquisitionDO> updateWrapper = new LambdaUpdateWrapper<>(); LambdaUpdateWrapper<DataAcquisitionDO> updateWrapper = new LambdaUpdateWrapper<>();
updateWrapper.in(DataAcquisitionDO::getId, ids); updateWrapper.in(DataAcquisitionDO::getId, ids);
updateWrapper.set(DataAcquisitionDO::getStatus, 1); updateWrapper.set(DataAcquisitionDO::getStatus, 1);
@ -102,37 +114,141 @@ public class DataAcquisitionServiceImpl implements DataAcquisitionService {
@Override @Override
public List<DataAcquisitionDO> getDataAcquisitionList(String code) { public List<DataAcquisitionDO> getDataAcquisitionList(String code) {
return dataAcquisitionMapper.getDataAcquisitionList( code); return dataAcquisitionMapper.getDataAcquisitionList(code);
} }
@Override @Override
public void updateTime(LocalDateTime acquisitionTime) { public void updateTime(String code, LocalDateTime acquisitionTime) {
// 查询所有数据按 code 分组并按 id 排序id 代表插入顺序 // 1. 根据传入的 code 查询最新一条数据
List<DataAcquisitionDO> allData = dataAcquisitionMapper.selectList( LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>();
new LambdaQueryWrapper<DataAcquisitionDO>() queryWrapper.eq(DataAcquisitionDO::getCode, code);
.orderByAsc(DataAcquisitionDO::getId) queryWrapper.orderByDesc(DataAcquisitionDO::getId);
); queryWrapper.last("limit 1");
DataAcquisitionDO data = dataAcquisitionMapper.selectOne(queryWrapper);
if (data == null) {
throw exception(new ErrorCode(400, "设备信息不存在!"));
}
// 正在运行的线程,打印运行中的线程code
runningThreads.forEach((key, value) -> {
log.info("正在运行的线程,code:{}", key);
});
startUpdateTask(data);
if (allData == null || allData.isEmpty()) { // 查询所有数据按 code 分组并按 id 排序id 代表插入顺序
// List<DataAcquisitionDO> allData = dataAcquisitionMapper.selectList(
// new LambdaQueryWrapper<DataAcquisitionDO>()
// .orderByAsc(DataAcquisitionDO::getId)
// );
//
// if (allData == null || allData.isEmpty()) {
// return;
// }
//
// // code 分组
// Map<String, List<DataAcquisitionDO>> groupedByCode = allData.stream()
// .collect(java.util.stream.Collectors.groupingBy(DataAcquisitionDO::getCode, java.util.stream.Collectors.toList()));
//
// // 遍历每个分组更新时间
// for (Map.Entry<String, List<DataAcquisitionDO>> entry : groupedByCode.entrySet()) {
// List<DataAcquisitionDO> groupData = entry.getValue();
// for (int i = 0; i < groupData.size(); i++) {
// DataAcquisitionDO record = groupData.get(i);
// // 第一条记录使用 updateReqVO 的时间之后的每条增加5秒
// LocalDateTime newCreateTime = acquisitionTime.plusSeconds(i * 5L);
//
// // 更新该条记录的创建时间
// record.setCreateTime(newCreateTime);
// dataAcquisitionMapper.updateById(record);
// }
// }
}
/**
* 外部调用传入 code 自动每5秒执行一次时间+5秒
*
* @param dataAcquisitionDO 外部传入01#02#03#...
*/
public void startUpdateTask(DataAcquisitionDO dataAcquisitionDO) {
String code = dataAcquisitionDO.getCode();
// 如果已经在运行不再重复启动
if (runningThreads.containsKey(code)) {
return; return;
} }
// code 分组 Thread thread = new Thread(() -> {
Map<String, List<DataAcquisitionDO>> groupedByCode = allData.stream() while (!Thread.currentThread().isInterrupted()) {
.collect(java.util.stream.Collectors.groupingBy(DataAcquisitionDO::getCode, java.util.stream.Collectors.toList())); try {
// 遍历每个分组更新时间 LambdaQueryWrapper<DataAcquisitionDO> queryWrapper = new LambdaQueryWrapper<>();
for (Map.Entry<String, List<DataAcquisitionDO>> entry : groupedByCode.entrySet()) { queryWrapper.eq(DataAcquisitionDO::getCode, code);
List<DataAcquisitionDO> groupData = entry.getValue(); queryWrapper.orderByDesc(DataAcquisitionDO::getId);
for (int i = 0; i < groupData.size(); i++) { queryWrapper.last("limit 1");
DataAcquisitionDO record = groupData.get(i); DataAcquisitionDO data = dataAcquisitionMapper.selectOne(queryWrapper);
// 第一条记录使用 updateReqVO 的时间之后的每条增加5秒 if (data == null) {
LocalDateTime newCreateTime = acquisitionTime.plusSeconds(i * 5L); Thread.sleep(5000);
continue;
// 更新该条记录的创建时间
record.setCreateTime(newCreateTime);
dataAcquisitionMapper.updateById(record);
} }
// 2. 解析时间 28863H41M32S
String timeStr = data.getRunningDuration();
Matcher matcher = TIME_PATTERN.matcher(timeStr);
if (!matcher.matches()) {
Thread.sleep(5000);
continue;
}
int hour = Integer.parseInt(matcher.group(1));
int min = Integer.parseInt(matcher.group(2));
int sec = Integer.parseInt(matcher.group(3));
// 3. +5
sec += 5;
// 秒进位
if (sec >= 60) {
min++;
sec -= 60;
}
// 分钟进位
if (min >= 60) {
hour++;
min -= 60;
}
// 4. 拼接新时间自动补0
String newTime = String.format("%dH%02dM%02dS", hour, min, sec);
// 5. 更新数据库
data.setRunningDuration(newTime);
data.setUpdateTime(LocalDateTime.now());
dataAcquisitionMapper.updateById(data);
} catch (Exception e) {
e.printStackTrace();
}
// 每5秒执行一次
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
}
});
thread.start();
runningThreads.put(code, thread);
}
/**
* 停止某个 code 的任务
*/
@Override
public void stopUpdateTask(String code) {
Thread thread = runningThreads.get(code);
if (thread != null) {
thread.interrupt();
runningThreads.remove(code);
} }
} }

View File

@ -48,6 +48,10 @@ export const getDataAcquisitionList = async (code: string) => {
return await request.get({ url: `/heli/data-acquisition/getDataAcquisitionList`, params: { code } }) return await request.get({ url: `/heli/data-acquisition/getDataAcquisitionList`, params: { code } })
} }
// 新增客户新表 // 新增客户新表
export const updateTime = async (acquisitionTime) => { export const updateTime = async (code: string) => {
return await request.get({ url: `/heli/data-acquisition/updateTime`,params: { acquisitionTime } }) return await request.get({ url: `/heli/data-acquisition/updateTime`,params: {code } })
}
export const stopCode = async (code: string) => {
return await request.get({ url: `/heli/data-acquisition/updateTime`,params: {code } })
} }

View File

@ -12,17 +12,21 @@
:inline="true" :inline="true"
label-width="108px" label-width="108px"
> >
<el-form-item label="开始时间" prop="startTime"> <!-- <el-form-item label="开始时间" prop="startTime">-->
<el-date-picker <!-- <el-date-picker-->
v-model="queryParams.acquisitionTime" <!-- v-model="queryParams.acquisitionTime"-->
value-format="YYYY-MM-DD HH:mm:ss" <!-- value-format="YYYY-MM-DD HH:mm:ss"-->
type="datetime" <!-- type="datetime"-->
placeholder="选择时间" <!-- placeholder="选择时间"-->
class="!w-400px" <!-- class="!w-400px"-->
/> <!-- />-->
<!-- </el-form-item>-->
<el-form-item label="设备编号" prop="code">
<el-input v-model="queryParams.code" placeholder="请输入设备编号" clearable />
</el-form-item> </el-form-item>
<el-form-item> <el-form-item>
<el-button @click="handleQuery" type="primary"> 更新</el-button> <el-button @click="handleQuery" type="primary"> 启动</el-button>
<el-button @click="stopCode" type="primary"> 停止</el-button>
</el-form-item> </el-form-item>
</el-form> </el-form>
</ContentWrap> </ContentWrap>
@ -49,6 +53,7 @@ const queryParams = reactive({
pageNo: 1, pageNo: 1,
pageSize: 10, pageSize: 10,
acquisitionTime: undefined, acquisitionTime: undefined,
code: undefined,
}) })
const queryFormRef = ref() // const queryFormRef = ref() //
@ -56,10 +61,21 @@ const queryFormRef = ref() // 搜索的表单
/** 查询列表 */ /** 查询列表 */
/** 搜索按钮操作 */ /** 搜索按钮操作 */code: [{ required: true, message: '设备编号不能为空', trigger: 'blur' }]
const handleQuery = async () => { const handleQuery = async () => {
const data = await DataAcquisitionApi.updateTime(queryParams.acquisitionTime) if (!queryParams.code) {
return message.error('设备编号不能为空')
}
const data = await DataAcquisitionApi.updateTime(queryParams.code)
message.success("更新成功") message.success("更新成功")
} }
const stopCode = async () => {
if (!queryParams.code) {
return message.error('设备编号不能为空')
}
const data = await DataAcquisitionApi.stopCode(queryParams.code)
message.success("停止成功")
}
</script> </script>