Ver código fonte

添加Mq节点

caixiaofeng 6 meses atrás
pai
commit
e6691c402a

+ 37 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/behavior/MqActivityBehavior.java

@@ -0,0 +1,37 @@
+package com.flow.flowable.behavior;
+
+import com.flow.flowable.event.CustomFlowableEngineEvent;
+import com.flow.flowable.event.CustomFlowableEventType;
+import org.flowable.common.engine.api.delegate.Expression;
+import org.flowable.common.engine.api.delegate.event.FlowableEventDispatcher;
+import org.flowable.engine.delegate.DelegateExecution;
+import org.flowable.engine.impl.bpmn.behavior.AbstractBpmnActivityBehavior;
+import org.flowable.engine.impl.cfg.ProcessEngineConfigurationImpl;
+import org.flowable.engine.impl.persistence.entity.ExecutionEntityImpl;
+import org.flowable.engine.impl.util.CommandContextUtil;
+import org.flowable.http.common.impl.ExpressionUtils;
+
+import java.util.Objects;
+
+public class MqActivityBehavior extends AbstractBpmnActivityBehavior {
+    private static final long serialVersionUID = 1L;
+    protected Expression queue;
+    protected Expression params;
+
+    @Override
+    public void execute(DelegateExecution execution) {
+        ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration();
+        // 发送消息
+        String queue = ExpressionUtils.getStringFromField(this.queue, execution);
+        String params = ExpressionUtils.getStringFromField(this.params, execution);
+
+
+        // 发送事件
+        FlowableEventDispatcher eventDispatcher = processEngineConfiguration.getEventDispatcher();
+        if (Objects.nonNull(eventDispatcher) && eventDispatcher.isEnabled()) {
+            CustomFlowableEngineEvent customFlowableEngineEvent = new CustomFlowableEngineEvent((ExecutionEntityImpl) execution, CustomFlowableEventType.MQ_SEND);
+            eventDispatcher.dispatchEvent(customFlowableEngineEvent, processEngineConfiguration.getEngineCfgKey());
+        }
+        leave(execution);
+    }
+}

+ 4 - 4
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/configure/FlowableConfigure.java

@@ -1,16 +1,14 @@
 package com.flow.flowable.configure;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.flow.flowable.converter.CopyServiceTaskXMLConverter;
-import com.flow.flowable.converter.CustomServiceTaskXMLConverter;
-import com.flow.flowable.converter.JumpServiceTaskXMLConverter;
-import com.flow.flowable.converter.MicroServiceTaskXMLConverter;
+import com.flow.flowable.converter.*;
 import com.flow.flowable.expression.VariableBetweenExpressionFunction;
 import com.flow.flowable.expression.VariableContainsAllExpressionFunction;
 import com.flow.flowable.parser.factory.CustomDefaultActivityBehaviorFactory;
 import com.flow.flowable.parser.handler.CopyServiceTaskParseHandler;
 import com.flow.flowable.parser.handler.JumpServiceTaskParseHandler;
 import com.flow.flowable.parser.handler.MicroServiceTaskParseHandler;
+import com.flow.flowable.parser.handler.MqServiceTaskParseHandler;
 import com.flow.flowable.validator.CustomServiceTaskValidator;
 import org.flowable.bpmn.converter.BpmnXMLConverter;
 import org.flowable.common.engine.api.delegate.FlowableFunctionDelegate;
@@ -55,6 +53,7 @@ public class FlowableConfigure implements EngineConfigurationConfigurer<SpringPr
         bpmnParseHandlers.add(new CopyServiceTaskParseHandler());
         bpmnParseHandlers.add(new JumpServiceTaskParseHandler());
         bpmnParseHandlers.add(new MicroServiceTaskParseHandler());
+        bpmnParseHandlers.add(new MqServiceTaskParseHandler());
         engineConfiguration.setPostBpmnParseHandlers(bpmnParseHandlers);
         engineConfiguration.setCustomServiceTaskValidator(new CustomServiceTaskValidator());
         engineConfiguration.setActivityBehaviorFactory(new CustomDefaultActivityBehaviorFactory());
@@ -65,6 +64,7 @@ public class FlowableConfigure implements EngineConfigurationConfigurer<SpringPr
         BpmnXMLConverter.addConverter(new JumpServiceTaskXMLConverter());
         BpmnXMLConverter.addConverter(new CopyServiceTaskXMLConverter());
         BpmnXMLConverter.addConverter(new MicroServiceTaskXMLConverter());
+        BpmnXMLConverter.addConverter(new MqServiceTaskXMLConverter());
         BpmnXMLConverter.addConverter(new CustomServiceTaskXMLConverter());
         if (Objects.nonNull(this.objectMapper)) {
             engineConfiguration.setObjectMapper(this.objectMapper);

+ 13 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/converter/CustomServiceTaskXMLConverter.java

@@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.flow.flowable.model.CopyServiceTask;
 import com.flow.flowable.model.JumpServiceTask;
 import com.flow.flowable.model.MicroServiceTask;
+import com.flow.flowable.model.MqServiceTask;
 import com.flow.flowable.utils.ProcessElementUtil;
 import org.flowable.bpmn.converter.ServiceTaskXMLConverter;
 import org.flowable.bpmn.model.BaseElement;
@@ -41,6 +42,10 @@ public class CustomServiceTaskXMLConverter extends ServiceTaskXMLConverter {
                 microServiceTask.setValues(serviceTask);
                 this.convertMicroServiceTaskXMLProperties(microServiceTask, model, xtr);
                 return microServiceTask;
+            } else if ("mq".equals(serviceTaskType)) {
+                MqServiceTask mqServiceTask = new MqServiceTask();
+                mqServiceTask.setValues(serviceTask);
+                this.convertMqServiceTaskXMLProperties(mqServiceTask, model, xtr);
             }
         }
         return baseElement;
@@ -73,4 +78,12 @@ public class CustomServiceTaskXMLConverter extends ServiceTaskXMLConverter {
         microServiceTask.setBody(objectMapper.readValue(body, new TypeReference<Map<String, Object>>() {}));
         microServiceTask.setResponseVariableName(responseVariableName);
     }
+
+    protected void convertMqServiceTaskXMLProperties(MqServiceTask mqServiceTask, BpmnModel bpmnModel, XMLStreamReader xtr) throws Exception {
+        String queue = ProcessElementUtil.getFieldExtensionValue(mqServiceTask, "queue");
+        String params = ProcessElementUtil.getFieldExtensionValue(mqServiceTask, "params");
+        mqServiceTask.setQueue(queue);
+        ObjectMapper objectMapper = CommandContextUtil.getProcessEngineConfiguration().getObjectMapper();
+        mqServiceTask.setParams(objectMapper.readValue(params, new TypeReference<Map<String, Object>>() {}));
+    }
 }

+ 15 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/converter/MqServiceTaskXMLConverter.java

@@ -0,0 +1,15 @@
+package com.flow.flowable.converter;
+
+import com.flow.flowable.model.MqServiceTask;
+import org.flowable.bpmn.converter.ServiceTaskXMLConverter;
+import org.flowable.bpmn.model.BaseElement;
+
+public class MqServiceTaskXMLConverter extends ServiceTaskXMLConverter {
+    public MqServiceTaskXMLConverter() {
+    }
+
+    public Class<? extends BaseElement> getBpmnElementType() {
+        return MqServiceTask.class;
+    }
+
+}

+ 2 - 1
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/event/CustomFlowableEventType.java

@@ -5,5 +5,6 @@ import org.flowable.common.engine.api.delegate.event.FlowableEventType;
 public enum CustomFlowableEventType implements FlowableEventType {
     ACTIVITY_JUMP,
     PROCESS_COPY,
-    MICRO_EXECUTE
+    MICRO_EXECUTE,
+    MQ_SEND
 }

+ 44 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/model/MqServiceTask.java

@@ -0,0 +1,44 @@
+package com.flow.flowable.model;
+
+import org.flowable.bpmn.model.ServiceTask;
+
+import java.util.Map;
+
+public class MqServiceTask extends ServiceTask {
+    protected String queue;
+    protected Map<String, Object> params;
+
+
+    public MqServiceTask() {
+    }
+
+    public String getQueue() {
+        return queue;
+    }
+
+    public void setQueue(String queue) {
+        this.queue = queue;
+    }
+
+    public Map<String, Object> getParams() {
+        return params;
+    }
+
+    public void setParams(Map<String, Object> params) {
+        this.params = params;
+    }
+
+
+    public MqServiceTask clone() {
+        MqServiceTask clone = new MqServiceTask();
+        clone.setValues(this);
+        return clone;
+    }
+
+
+    public void setValues(MqServiceTask otherElement) {
+        super.setValues(otherElement);
+        this.setQueue(otherElement.getQueue());
+        this.setParams(otherElement.getParams());
+    }
+}

+ 7 - 1
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/parser/factory/CustomDefaultActivityBehaviorFactory.java

@@ -3,6 +3,7 @@ package com.flow.flowable.parser.factory;
 import com.flow.flowable.behavior.CopyActivityBehavior;
 import com.flow.flowable.behavior.JumpActivityBehavior;
 import com.flow.flowable.behavior.MicroActivityBehavior;
+import com.flow.flowable.behavior.MqActivityBehavior;
 import org.flowable.bpmn.model.ServiceTask;
 import org.flowable.engine.impl.bpmn.helper.ClassDelegate;
 import org.flowable.engine.impl.bpmn.parser.FieldDeclaration;
@@ -23,9 +24,14 @@ public class CustomDefaultActivityBehaviorFactory extends DefaultActivityBehavio
         return (JumpActivityBehavior) ClassDelegate.defaultInstantiateDelegate(JumpActivityBehavior.class, fieldDeclarations);
     }
 
-        public MicroActivityBehavior createMicroActivityBehavior(ServiceTask serviceTask) {
+    public MicroActivityBehavior createMicroActivityBehavior(ServiceTask serviceTask) {
         List<FieldDeclaration> fieldDeclarations = super.createFieldDeclarations(serviceTask.getFieldExtensions());
         return (MicroActivityBehavior) ClassDelegate.defaultInstantiateDelegate(MicroActivityBehavior.class, fieldDeclarations);
     }
 
+    public MqActivityBehavior createMqActivityBehavior(ServiceTask serviceTask) {
+        List<FieldDeclaration> fieldDeclarations = super.createFieldDeclarations(serviceTask.getFieldExtensions());
+        return (MqActivityBehavior) ClassDelegate.defaultInstantiateDelegate(MqActivityBehavior.class, fieldDeclarations);
+    }
+
 }

+ 22 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/parser/handler/MqServiceTaskParseHandler.java

@@ -0,0 +1,22 @@
+package com.flow.flowable.parser.handler;
+
+import com.flow.flowable.model.MqServiceTask;
+import com.flow.flowable.parser.factory.CustomDefaultActivityBehaviorFactory;
+import org.flowable.bpmn.model.BaseElement;
+import org.flowable.bpmn.model.ServiceTask;
+import org.flowable.engine.impl.bpmn.parser.BpmnParse;
+import org.flowable.engine.impl.bpmn.parser.handler.AbstractActivityBpmnParseHandler;
+
+public class MqServiceTaskParseHandler extends AbstractActivityBpmnParseHandler<ServiceTask> {
+    @Override
+    public Class<? extends BaseElement> getHandledType() {
+        return MqServiceTask.class;
+    }
+
+    @Override
+    protected void executeParse(BpmnParse bpmnParse, ServiceTask serviceTask) {
+        CustomDefaultActivityBehaviorFactory activityBehaviorFactory = (CustomDefaultActivityBehaviorFactory) bpmnParse.getActivityBehaviorFactory();
+        serviceTask.setBehavior(activityBehaviorFactory.createMqActivityBehavior(serviceTask));
+    }
+
+}

+ 8 - 0
flow-common/flow-common-flowable-starter/src/main/java/com/flow/flowable/validator/CustomServiceTaskValidator.java

@@ -23,6 +23,8 @@ public class CustomServiceTaskValidator extends ServiceTaskValidator {
                 this.validateFieldDeclarationsForJump(process, serviceTask, serviceTask.getFieldExtensions(), errors);
             } else if (type.equalsIgnoreCase("micro")) {
                 this.validateFieldDeclarationsForMicro(process, serviceTask, serviceTask.getFieldExtensions(), errors);
+            } else if (type.equalsIgnoreCase("mq")) {
+                this.validateFieldDeclarationsForMq(process, serviceTask, serviceTask.getFieldExtensions(), errors);
             } else {
                 super.verifyType(process, serviceTask, errors);
             }
@@ -52,7 +54,13 @@ public class CustomServiceTaskValidator extends ServiceTaskValidator {
         if (StringUtils.isBlank(method)) {
             addError(errors, "flowable-microTask-no-method", process, task, "微服务节点缺少method属性");
         }
+    }
 
+    private void validateFieldDeclarationsForMq(Process process, TaskWithFieldExtensions task, List<FieldExtension> fieldExtensions, List<ValidationError> errors) {
+        String queue = ProcessElementUtil.getFieldExtensionValue(task, "queue");
+        if (StringUtils.isBlank(queue)) {
+            addError(errors, "flowable-mqTask-no-queue", process, task, "MQ节点缺少queue属性");
+        }
     }
 
 }