103 lines
3.9 KiB
Plaintext
103 lines
3.9 KiB
Plaintext
package com.cmeim.biz.service;
|
||
|
||
import com.alibaba.fastjson.JSON;
|
||
import com.cmeim.activiti6.api.enums.BizKey;
|
||
import com.cmeim.biz.dto.IntelligentInputDto;
|
||
import com.cmeim.common.core.exception.ServiceException;
|
||
import com.cmeim.kafka.api.Topic;
|
||
import com.cmeim.kafka.api.dto.Activiti6Dto;
|
||
import com.cmeim.kafka.api.dto.input.MaterialBarOPDto;
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||
import org.springframework.beans.factory.annotation.Autowired;
|
||
import org.springframework.kafka.annotation.KafkaListener;
|
||
import org.springframework.stereotype.Service;
|
||
import org.springframework.transaction.annotation.Transactional;
|
||
|
||
@Slf4j
|
||
@Service
|
||
@Transactional(rollbackFor = {ServiceException.class, RuntimeException.class, Exception.class})
|
||
public class KafKaListenerService {
|
||
@Autowired
|
||
private KafKaService kafKaService;
|
||
|
||
@Autowired
|
||
private InputBillService inputBillService;
|
||
|
||
/**
|
||
* 电子料仓新增条码申请
|
||
*
|
||
*/
|
||
@KafkaListener(topics = {Topic.Input.MATERIAL_BAR_ADD_APPLY})
|
||
public void materialBarAddApply(ConsumerRecord<?, String> record) {
|
||
try {
|
||
log.info("电子料仓条码新增申请:topic:{}, partition:{}, record:{}", record.topic(), record.partition(),
|
||
record.value());
|
||
MaterialBarOPDto opdto = JSON.parseObject(record.value(), MaterialBarOPDto.class);
|
||
kafKaService.addMaterial(opdto.getBarDto());
|
||
} catch (ServiceException e) {
|
||
log.error("电子料仓条码新增申请失败:topic:{}, partition:{}, record:{}, errorInfo:{}", record.topic(),
|
||
record.partition(), record.value(), e.getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 电子料仓删除条码
|
||
*
|
||
*/
|
||
@KafkaListener(topics = {Topic.Input.MATERIAL_BAR_DEL})
|
||
public void materialBarDel(ConsumerRecord<?, String> record) {
|
||
log.info("电子料仓删除条码:topic:{}, partition:{}, record:{}", record.topic(), record.partition(), record.value());
|
||
MaterialBarOPDto opdto = JSON.parseObject(record.value(), MaterialBarOPDto.class);
|
||
kafKaService.delMaterial(opdto.getBarDto().getMaterialBar());
|
||
}
|
||
|
||
/**
|
||
* 审批流程监听
|
||
*
|
||
*/
|
||
@KafkaListener(topics = {Topic.Act6.ACT_TASK_COMPLETE})
|
||
public void processInListener(ConsumerRecord<?, String> record) {
|
||
try {
|
||
log.info("topic:{}, partition:{}, record:{}", record.topic(), record.partition(), record.value());
|
||
Activiti6Dto actDto = JSON.parseObject(record.value(), Activiti6Dto.class);
|
||
if (BizKey.CGRK_INSTRUCTION.getKey().equals(actDto.getBizKey())) {
|
||
kafKaService.processInstanceComplete(actDto);
|
||
}
|
||
} catch (Exception e) {
|
||
log.error(e.getMessage());
|
||
}
|
||
|
||
}
|
||
|
||
/**
|
||
* 审批完毕监听
|
||
*
|
||
*/
|
||
@KafkaListener(topics = {Topic.Act6.ACT_PROCESS_INSTANCE_FINISH})
|
||
public void processListener(ConsumerRecord<?, String> record) {
|
||
log.info("topic:{}, partition:{}, record:{}", record.topic(), record.partition(), record.value());
|
||
Activiti6Dto actDto = JSON.parseObject(record.value(), Activiti6Dto.class);
|
||
if (BizKey.PROD_IN.getKey().equals(actDto.getBizKey())) {
|
||
try {
|
||
kafKaService.processInstanceFinishProdIn(record.value(), 1);
|
||
} catch (Exception e) {
|
||
log.error(e.getMessage());
|
||
}
|
||
}
|
||
kafKaService.processInstanceFinish(actDto);
|
||
}
|
||
|
||
|
||
/**
|
||
* 新增入库接口提供给智能仓储柜调用
|
||
*
|
||
*/
|
||
@KafkaListener(topics = {Topic.Input.STOCKIN_INPUT})
|
||
public void stockIn(ConsumerRecord<?, IntelligentInputDto> record) {
|
||
log.info("topic:{}, partition:{}, record:{}", record.topic(), record.partition(), record.value());
|
||
IntelligentInputDto value = record.value();
|
||
// inputBillService.stockIn(value);
|
||
}
|
||
}
|