Browse Source

init dmp: 初始化task

yaoll 4 years ago
parent
commit
4753439f4b
26 changed files with 1491 additions and 0 deletions
  1. 82 0
      dmp-task/doc/config.sql
  2. 121 0
      dmp-task/pom.xml
  3. 50 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/TaskApp.java
  4. 46 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/config/DmpMessageAspect.java
  5. 116 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/config/RequestLogAspect.java
  6. 46 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/config/WebConfig.java
  7. 19 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/constant/TaskConstant.java
  8. 81 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/controller/TaskCfgController.java
  9. 53 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/controller/TaskRecordController.java
  10. 89 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/entity/TaskCfg.java
  11. 53 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/entity/TaskRecord.java
  12. 55 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/jms/JmsConfig.java
  13. 14 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/jms/MessageExecutor.java
  14. 74 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/jms/MessageProcesser.java
  15. 29 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/model/TaskCfgModel.java
  16. 19 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/model/TaskRecordModel.java
  17. 9 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/repository/TaskCfgRepository.java
  18. 9 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/repository/TaskRecordRepository.java
  19. 56 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/rmq/config/TaskRabbitConfig.java
  20. 29 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/rmq/listener/TaskListener.java
  21. 73 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskBaseService.java
  22. 158 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskCfgService.java
  23. 147 0
      dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskRecordService.java
  24. 7 0
      dmp-task/src/main/resources/bootstrap.yml
  25. 55 0
      dmp-task/src/main/resources/logback-spring.xml
  26. 1 0
      pom.xml

+ 82 - 0
dmp-task/doc/config.sql

@@ -0,0 +1,82 @@
+# rwd config
+delete from `dmp_config`.`integrated_config` where application = 'dmp-task';
+INSERT INTO `dmp_config`.`integrated_config`
+(`APPLICATION`,`PPROFILE`,`LABLE`,`BELONGTO`,`ISUPDATE`,`PKEY`,`PVALUE`,`DESCRIPTION`)
+VALUES
+('dmp-task','default','master','DMP',1,'spring.jackson.time-zone', 'Asia/Shanghai', null),
+('dmp-task','default','master','DMP',1,'spring.jpa.show-sql', 'false', null),
+('dmp-task','default','master','DMP',1,'spring.jpa.hibernate.ddl-auto', 'none', null),
+('dmp-task','default','master','DMP',1,'management.endpoints.web.exposure.include', '*', null),
+('dmp-task','default','master','DMP',1,'management.endpoint.health.show-details', 'ALWAYS', null),
+('dmp-task','default','master','DMP',1,'eureka.client.fetch-registry','true',null),
+('dmp-task','default','master','DMP',1,'eureka.client.register-with-eureka','true',null),
+('dmp-task','default','master','DMP',1,'eureka.instance.lease-renewal-interval-in-seconds','10',null),
+('dmp-task','default','master','DMP',1,'eureka.instance.lease-expiration-duration-in-seconds','30',null),
+('dmp-task','default','master','DMP',1,'eureka.instance.hostname','${spring.application.name}',null),
+('dmp-task','default','master','DMP',1,'eureka.instance.prefer-ip-address','true',null),
+('dmp-task','default','master','DMP',1,'eureka.instance.instance-id','${spring.cloud.client.ip-address}:${server.port}',null),
+
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.props.sql.show', 'false', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.names', 'ds0', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.type', 'com.alibaba.druid.pool.DruidDataSource', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.driverClassName', 'com.mysql.cj.jdbc.Driver', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.url', 'jdbc:mysql://${persagy.mysql.host}:${persagy.mysql.port}/${persagy.mysql.database}?useUnicode=true&characterEncoding=utf-8&mysqlEncoding=utf8&useSSL=false', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.username', '${persagy.mysql.username}', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.password', '${persagy.mysql.password}', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.initialSize', '${persagy.mysql.initialSize}', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.datasource.ds0.maxActive', '${persagy.mysql.maxActive}', null),
+('dmp-task','default','master','DMP',1,'spring.shardingsphere.sharding.default-data-source-name', 'ds0', null),
+
+('dmp-task','default','master','DMP',1,'spring.rabbitmq.host', '${persagy.rabbitmq.host}', null),
+('dmp-task','default','master','DMP',1,'spring.rabbitmq.port', '${persagy.rabbitmq.port}', null),
+('dmp-task','default','master','DMP',1,'spring.rabbitmq.username', '${persagy.rabbitmq.username}', null),
+('dmp-task','default','master','DMP',1,'spring.rabbitmq.password', '${persagy.rabbitmq.password}', null),
+('dmp-task','default','master','DMP',1,'spring.rabbitmq.virtual-host', '${persagy.rabbitmq.virtual-host}', null),
+
+('dmp-task','default','master','DMP',1,'spring.redis.host', '${persagy.redis.host}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.port', '${persagy.redis.port}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.password', '${persagy.redis.password}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.database', '${persagy.redis.database}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.pool.max-active', '${persagy.redis.max-active}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.pool.max-idle', '${persagy.redis.max-idle}', null),
+('dmp-task','default','master','DMP',1,'spring.redis.pool.min-idle', '${persagy.redis.min-idle}', null),
+
+('dmp-task','default','master','DMP',1,'persagy.dmp.exchange', 'exchange-dmp', null),
+('dmp-task','default','master','DMP',1,'persagy.dmp.dic.routingKey', 'routing-dmp-dic', null),
+('dmp-task','default','master','DMP',1,'persagy.dmp.org.routingKey', 'routing-dmp-org', null),
+('dmp-task','default','master','DMP',1,'persagy.dmp.rwd.routingKey', 'routing-dmp-rwd', null),
+('dmp-task','default','master','DMP',1,'persagy.dmp.alarm.routingKey', 'routing-dmp-alarm', null),
+('dmp-task','default','master','DMP',1,'persagy.dmp.task.routingKey', 'routing-dmp-task', null),
+
+('dmp-task','dev','master','DMP',1,'server.port', '8084', null),
+
+('dmp-task','dev','master','DMP',1,'eureka.client.service-url.defaultZone','http://frame:123456@localhost:9931/eureka/','注册中心地址'),
+('dmp-task','dev','master','DMP',1,'eureka.instance.ip-address','localhost',null),
+('dmp-task','dev','master','DMP',1,'persagy.iot.data.server', 'http://localhost:9981', 'IOT数据查询服务'),
+('dmp-task','dev','master','DMP',1,'persagy.iot.setting.server', 'http://localhost:9983', 'IOT控制服务'),
+
+('dmp-task','dev','master','DMP',1,'persagy.mysql.host', 'localhost', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.port', '3301', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.database', 'dmp_dev_rwd', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.username', 'root', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.password', '123456', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.initialSize', '10', null),
+('dmp-task','dev','master','DMP',1,'persagy.mysql.maxActive', '300', null),
+
+('dmp-task','dev','master','DMP',1,'persagy.redis.host', 'localhost', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.port', '5672', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.password', '123456', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.database', '0', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.max-active', '500', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.max-idle', '50', null),
+('dmp-task','dev','master','DMP',1,'persagy.redis.min-idle', '10', null),
+
+('dmp-task','dev','master','DMP',1,'persagy.rabbitmq.host', 'localhost', null),
+('dmp-task','dev','master','DMP',1,'persagy.rabbitmq.port', '5672', null),
+('dmp-task','dev','master','DMP',1,'persagy.rabbitmq.username', 'admin', null),
+('dmp-task','dev','master','DMP',1,'persagy.rabbitmq.password', '123456', null),
+('dmp-task','dev','master','DMP',1,'persagy.rabbitmq.virtual-host', '/test', null),
+('dmp-task','dev','master','DMP',1,'persagy.dmp.task.queue', 'queue-dmp-rwd', null),
+
+('dmp-task','dev','master','DMP',1,'persagy.log.path', '.', '日志文件所在路径');
+

+ 121 - 0
dmp-task/pom.xml

@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>com.persagy</groupId>
+        <artifactId>dmp</artifactId>
+        <version>4.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dmp-task</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.persagy</groupId>
+            <artifactId>dmp-model</artifactId>
+            <version>4.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.persagy</groupId>
+            <artifactId>dmp-common</artifactId>
+            <version>4.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-jpa</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.21</version>
+        </dependency>
+        <dependency>
+            <groupId>com.querydsl</groupId>
+            <artifactId>querydsl-apt</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.querydsl</groupId>
+            <artifactId>querydsl-sql</artifactId>
+            <version>${querydsl.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.querydsl</groupId>
+            <artifactId>querydsl-jpa</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.vladmihalcea</groupId>
+            <artifactId>hibernate-types-52</artifactId>
+            <version>2.9.13</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <!-- 注册中心 -->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
+        </dependency>
+
+        <!-- 配置中心 -->
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-config</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <finalName>dmp-task</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <fork>true</fork>
+                    <jvmArguments>-Dfile.encoding=UTF-8</jvmArguments>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <!--
+            -->
+            <plugin>
+                <groupId>com.mysema.maven</groupId>
+                <artifactId>apt-maven-plugin</artifactId>
+                <version>1.1.3</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>process</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>target/generated-sources</outputDirectory>
+                            <processor>com.querydsl.apt.jpa.JPAAnnotationProcessor</processor>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 50 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/TaskApp.java

@@ -0,0 +1,50 @@
+package com.persagy.dmp.dpt;
+
+import com.persagy.common.criteria.CriteriaUtils;
+import com.persagy.common.criteria.JpaUtils;
+import com.persagy.dmp.common.http.HttpUtils;
+import com.querydsl.jpa.impl.JPAQueryFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.context.annotation.Bean;
+
+import javax.persistence.EntityManager;
+
+/**
+ * @author: yaoll
+ * @date: 2020-10-22
+ * @verison: 1.0
+ */
+@Slf4j
+@SpringBootApplication
+@EnableDiscoveryClient
+public class TaskApp {
+
+	public static void main(String[] args) {
+		SpringApplication.run(TaskApp.class, args);
+	}
+
+	@Bean
+	public JPAQueryFactory jpaQueryFactory(EntityManager entityManager) {
+		return new JPAQueryFactory(entityManager);
+	}
+
+	@Bean
+	public CriteriaUtils criteriaUtils(@Autowired JPAQueryFactory jpaQueryFactory) {
+		return new CriteriaUtils(jpaQueryFactory);
+	}
+
+	@Bean
+	public JpaUtils jpaUtils(@Autowired EntityManager entityManager) {
+		return new JpaUtils(entityManager);
+	}
+
+	@Bean
+	public HttpUtils httpUtils() {
+		return new HttpUtils();
+	}
+
+}

+ 46 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/config/DmpMessageAspect.java

@@ -0,0 +1,46 @@
+package com.persagy.dmp.dpt.config;
+
+import com.persagy.common.web.BaseResponse;
+import com.persagy.dmp.dpt.jms.MessageProcesser;
+import com.persagy.dmp.rwd.model.DmpMessage;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Pointcut;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author: yaoll
+ * @date: 2020-10-22
+ * @verison: 1.0
+ */
+@Aspect
+@Component
+public class DmpMessageAspect {
+
+	@Autowired
+	private MessageProcesser messageProcesser;
+
+	@Pointcut("execution(public * com.persagy.dmp.*.controller.*.*(..))")
+	public void controllerPoint() {
+	}
+
+	@AfterReturning(returning = "ret", pointcut = "controllerPoint()")
+	public void doAfterReturning(Object ret) {
+		if (ret != null) {
+			if (ret instanceof BaseResponse) {
+				BaseResponse resp = (BaseResponse) ret;
+				if (resp.success()) {
+					List<DmpMessage> messageList = resp.getMessageList();
+					if (messageList != null && messageList.size() > 0) {
+						for (DmpMessage message : messageList) {
+							messageProcesser.convertAndSend(message);
+						}
+					}
+				}
+			}
+		}
+	}
+}

+ 116 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/config/RequestLogAspect.java

@@ -0,0 +1,116 @@
+package com.persagy.dmp.dpt.config;
+
+import com.persagy.common.json.JacksonMapper;
+import com.persagy.common.web.BaseResponse;
+import lombok.extern.slf4j.Slf4j;
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.annotation.*;
+import org.springframework.stereotype.Component;
+import org.springframework.web.context.request.RequestContextHolder;
+import org.springframework.web.context.request.ServletRequestAttributes;
+import org.springframework.web.multipart.MultipartFile;
+
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author: yaoll
+ * @date: 2020-09-22
+ * @verison: 1.0
+ */
+@Slf4j
+@Aspect
+@Component
+public class RequestLogAspect {
+
+	private static final ThreadLocal<LogData> dataStorage = new ThreadLocal();
+
+	private static final AtomicInteger flag = new AtomicInteger(10000000);
+
+
+	@Pointcut("execution(public * com.persagy.dmp.*.controller.*.*(..))")
+	public void controllerPoint() {
+	}
+
+	@Before("controllerPoint()")
+	public void doBefore(JoinPoint joinPoint) {
+		ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
+		HttpServletRequest request = attributes.getRequest();
+		LogData data = initFlag(request);
+		dataStorage.set(data);
+		log.info("{} REQUEST URL      : {} {}", data.intFlag, data.method, data.url);
+		log.info("{} REQUEST ARGS     : {}", data.intFlag, JacksonMapper.toSimpleJson(prepare(joinPoint)));
+	}
+
+	public static List<Object> prepare(JoinPoint joinPoint) {
+		Object[] args = joinPoint.getArgs();
+		List<Object> list = new LinkedList<>();
+		if (args != null && args.length > 0) {
+			for (Object arg : args) {
+				if (arg instanceof ServletRequest) {
+					continue;
+				}
+				if (arg instanceof ServletResponse) {
+					continue;
+				}
+				if (arg instanceof MultipartFile) {
+					continue;
+				}
+				list.add(arg);
+			}
+		}
+		return list;
+	}
+
+	@AfterReturning(returning = "ret", pointcut = "controllerPoint()")
+	public void doAfterReturning(Object ret) {
+
+		LogData data = dataStorage.get();
+		if (ret != null) {
+			if (ret instanceof BaseResponse) {
+				log.info("{} REQUEST DURATION : {} {} {}", data.intFlag, System.currentTimeMillis() - data.timestamp, ((BaseResponse) ret).getResult(), data.url);
+			} else {
+				log.info("{} REQUEST DURATION : {} {} {}", data.intFlag, System.currentTimeMillis() - data.timestamp, ret.getClass().getName(), data.url);
+			}
+		} else {
+			log.info("{} REQUEST DURATION : {} {} {}", data.intFlag, System.currentTimeMillis() - data.timestamp, "null", data.url);
+		}
+		dataStorage.remove();
+	}
+
+	@AfterThrowing(throwing = "ex", pointcut = "controllerPoint()")
+	public void doAfterThrowing(Throwable ex) {
+		LogData data = dataStorage.get();
+		log.error("{} REQUEST EXCEPTION: {} {} {}", data.intFlag, System.currentTimeMillis() - data.timestamp, data.url, ex.getMessage());
+		dataStorage.remove();
+	}
+
+	private static LogData initFlag(HttpServletRequest request) {
+		LogData data = new LogData();
+		data.timestamp = System.currentTimeMillis();
+		data.method = request.getMethod();
+
+		String requestURI = request.getRequestURI();
+		String queryString = request.getQueryString();
+		data.url = requestURI + (queryString == null ? "" : "?" + queryString);
+
+		int result = flag.addAndGet(1);
+		if (result >= 99999999) {
+			flag.set(10000000);
+		}
+		data.intFlag = result;
+		dataStorage.set(data);
+		return data;
+	}
+
+	private static class LogData {
+		Integer intFlag;
+		Long timestamp;
+		String url;
+		String method;
+	}
+}

+ 46 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/config/WebConfig.java

@@ -0,0 +1,46 @@
+package com.persagy.dmp.dpt.config;
+
+import com.persagy.common.json.JacksonMapper;
+import com.persagy.dmp.config.DmpParameterStorage;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.amqp.support.converter.MessageConverter;
+import org.springframework.boot.autoconfigure.http.HttpMessageConverters;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
+import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
+
+/**
+ * @author: yaoll
+ * @date: 2020-09-22
+ * @verison: 1.0
+ */
+@Configuration
+public class WebConfig implements WebMvcConfigurer {
+
+	@Bean
+	public DmpParameterStorage dmpParameterStorage() {
+		return new DmpParameterStorage();
+	}
+
+	@Override
+	public void addInterceptors(InterceptorRegistry registry) {
+		// 设置拦截的路径、不拦截的路径、优先级等等
+		registry.addInterceptor(dmpParameterStorage()).addPathPatterns("/**");
+	}
+
+	@Bean
+	public HttpMessageConverters customConverters() {
+		MappingJackson2HttpMessageConverter converter = new MappingJackson2HttpMessageConverter();
+		converter.setObjectMapper(JacksonMapper.nonEmptyMapper);
+		return new HttpMessageConverters(converter);
+	}
+
+	// JMS 使用
+	@Bean
+	public MessageConverter customMessageConvert() {
+		return new Jackson2JsonMessageConverter();
+	}
+
+}

+ 19 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/constant/TaskConstant.java

@@ -0,0 +1,19 @@
+package com.persagy.dmp.dpt.constant;
+
+public class TaskConstant {
+
+    // 任务调度 RMQ 消费默认并发
+    public static final int TASK_CONSUMER_DEFAULT_CONCURRENT = 10;
+
+    // 任务调度 RMQ 消费默认并发
+    public static final int TASK_CONSUMER_MAX_CONCURRENT = 50;
+
+    /**  */
+    public static final String TASK_CFG_CREATE = "create";
+    public static final String TASK_CFG_UPDATE = "update";
+    public static final String TASK_CFG_DELETE = "delete";
+
+    /** config 字段 json key */
+    public static final String CONFIG_EXCHANGE = "exchange";
+    public static final String CONFIG_ROUTING = "routing";
+}

+ 81 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/controller/TaskCfgController.java

@@ -0,0 +1,81 @@
+package com.persagy.dmp.dpt.controller;
+
+import com.persagy.common.criteria.JacksonCriteria;
+import com.persagy.common.web.MapResponse;
+import com.persagy.common.web.PagedResponse;
+import com.persagy.dmp.dpt.constant.TaskConstant;
+import com.persagy.dmp.dpt.entity.TaskCfg;
+import com.persagy.dmp.dpt.model.TaskCfgModel;
+import com.persagy.dmp.dpt.model.TaskRecordModel;
+import com.persagy.dmp.dpt.service.TaskCfgService;
+import com.persagy.dmp.dpt.service.TaskRecordService;
+import com.persagy.dmp.rwd.model.DmpMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.*;
+
+@Slf4j
+@RestController
+@RequestMapping("/task/cfg")
+public class TaskCfgController {
+
+	@Autowired
+	private TaskCfgService service;
+
+	@Autowired
+	private TaskRecordService taskRecordService;
+
+	@Autowired
+	private RabbitTemplate rabbitTemplate;
+
+	@PostMapping("/query")
+	public PagedResponse<TaskCfgModel> query(@RequestBody JacksonCriteria criteria) {
+		return service.query(criteria);
+	}
+	
+	@PostMapping("/create")
+	public MapResponse create(@RequestBody TaskCfgModel param){
+		MapResponse mapResponse = service.create(param);
+		// TODO 新增任务广播 MQ
+		String taskId = param.getId();
+
+		return mapResponse;
+	}
+	
+	@PostMapping("/update")
+	public MapResponse update(@RequestBody TaskCfgModel param){
+		MapResponse mapResponse = service.update(param);
+		// TODO  更新任务广播 MQ
+		String taskId = param.getId();
+		Map<String,Object> taskMQ = new HashMap<>();
+		taskMQ.put("taskId",taskId);
+		taskMQ.put("type","update");
+		taskMQ.put("groupCode", param.getGroupCode());
+		taskMQ.put("appId", param.getAppId());
+		taskMQ.put("projectId", param.getProjectId());
+		rabbitTemplate.convertAndSend("TaskExchanges", "", taskMQ);
+		return mapResponse;
+	}
+	
+	@PostMapping("/delete")
+	public MapResponse delete(@RequestBody TaskCfgModel param){
+		MapResponse mapResponse = service.delete(param);
+		// TODO  删除任务广播 MQ
+		String taskId = param.getId();
+		Map<String,Object> taskMQ = new HashMap<>();
+		taskMQ.put("taskId",taskId);
+		taskMQ.put("type","delete");
+		taskMQ.put("groupCode", param.getGroupCode());
+		taskMQ.put("appId", param.getAppId());
+		taskMQ.put("projectId", param.getProjectId());
+		rabbitTemplate.convertAndSend("TaskExchanges", "", taskMQ);
+		return mapResponse;
+	}
+
+}

+ 53 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/controller/TaskRecordController.java

@@ -0,0 +1,53 @@
+package com.persagy.dmp.dpt.controller;
+
+import com.persagy.common.criteria.JacksonCriteria;
+import com.persagy.common.web.MapResponse;
+import com.persagy.common.web.PagedResponse;
+import com.persagy.dmp.dpt.entity.TaskCfg;
+import com.persagy.dmp.dpt.model.TaskRecordModel;
+import com.persagy.dmp.dpt.service.TaskRecordService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.List;
+
+@RestController
+@RequestMapping("/task/record")
+public class TaskRecordController {
+	@Autowired
+	private TaskRecordService service;
+	
+	@PostMapping("/query")
+	public PagedResponse<TaskRecordModel> query(@RequestBody JacksonCriteria criteria) {
+		return service.query(criteria);
+	}
+	
+//	@PostMapping("/create")
+//	public MapResponse create(@RequestBody TaskRecordModel param){
+//		return service.create(param);
+//	}
+	
+	@PostMapping("/update")
+	public MapResponse update(@RequestBody TaskRecordModel param){
+		return service.update(param);
+	}
+	
+//	@PostMapping("/delete")
+//	public MapResponse delete(@RequestBody TaskRecordModel param){
+//		return service.delete(param);
+//	}
+	/**
+	 * 任务调度回调接口
+	 * @param taskIds
+	 * @return
+	 */
+	@PostMapping("/receive")
+	public MapResponse sendTaskToRmq(@RequestBody List<String> taskIds){
+		MapResponse response = new MapResponse();
+		service.saveAndSendMq(taskIds);
+		return response;
+	}
+}

+ 89 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/entity/TaskCfg.java

@@ -0,0 +1,89 @@
+package com.persagy.dmp.dpt.entity;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.vladmihalcea.hibernate.type.json.JsonStringType;
+import lombok.Getter;
+import lombok.Setter;
+import com.persagy.dmp.dpt.model.TaskCfgModel;
+import org.hibernate.annotations.GenericGenerator;
+import org.hibernate.annotations.Type;
+import org.hibernate.annotations.TypeDef;
+
+import javax.persistence.GeneratedValue;
+import javax.persistence.*;
+import java.util.Date;
+
+@Setter
+@Getter
+@Entity
+@TypeDef(name = "json", typeClass = JsonStringType.class)
+@Table(name = "dpt_task_cfg")
+public class TaskCfg {
+
+	@Id
+	@GeneratedValue(generator = "assignedId")
+	@GenericGenerator(name = "assignedId", strategy = "assigned")
+	private String id;
+	private String groupCode;
+	private String appId;
+	private String projectId;
+	private String code;
+	private String name;
+	private String type;
+
+	@Type(type = "json")
+	private ObjectNode config;
+	@Type(type = "json")
+	private ObjectNode params;
+	@Type(type = "json")
+	private ObjectNode timeRule;
+	private Date startTime;
+	private Date endTime;
+	private Integer responseTime;
+	private String level;
+	private Integer status;
+	private String remark;
+
+	public static TaskCfg fromModel(TaskCfgModel model) {
+		TaskCfg entity = new TaskCfg();
+		entity.setId(model.getId());
+		entity.setGroupCode(model.getGroupCode());
+		entity.setAppId(model.getAppId());
+		entity.setProjectId(model.getProjectId());
+		entity.setCode(model.getCode());
+		entity.setName(model.getName());
+		entity.setType(model.getType());
+		entity.setConfig(model.getConfig());
+		entity.setParams(model.getParams());
+		entity.setTimeRule(model.getTimeRule());
+		entity.setStartTime(model.getStartTime());
+		entity.setEndTime(model.getEndTime());
+		entity.setResponseTime(model.getResponseTime());
+		entity.setLevel(model.getLevel());
+		entity.setStatus(model.getStatus());
+		entity.setRemark(model.getRemark());
+		return entity;
+	}
+
+	public TaskCfgModel toModel() {
+		TaskCfgModel model = new TaskCfgModel();
+		model.setId(this.id);
+		model.setGroupCode(this.getGroupCode());
+		model.setAppId(this.getAppId());
+		model.setProjectId(this.getProjectId());
+		model.setCode(this.getCode());
+		model.setName(this.getName());
+		model.setType(this.getType());
+		model.setConfig(this.getConfig());
+		model.setParams(this.getParams());
+		model.setTimeRule(this.getTimeRule());
+		model.setStartTime(this.getStartTime());
+		model.setEndTime(this.getEndTime());
+		model.setResponseTime(this.getResponseTime());
+		model.setLevel(this.getLevel());
+		model.setStatus(this.getStatus());
+		model.setRemark(this.getRemark());
+		return model;
+	}
+
+}

+ 53 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/entity/TaskRecord.java

@@ -0,0 +1,53 @@
+package com.persagy.dmp.dpt.entity;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
+import lombok.Setter;
+import com.persagy.dmp.dpt.model.TaskRecordModel;
+import org.hibernate.annotations.GenericGenerator;
+import org.hibernate.annotations.Type;
+
+import javax.persistence.GeneratedValue;
+import javax.persistence.*;
+import java.util.Date;
+
+@Setter
+@Getter
+@Entity
+@Table(name = "dpt_task_record")
+public class TaskRecord {
+
+	@Id
+	@GeneratedValue(generator = "assignedId")
+	@GenericGenerator(name = "assignedId", strategy = "assigned")
+	private String id;
+	private String taskId;
+	private Date startTime;
+	private Date endTime;
+	private Integer success;
+	@Type(type = "json")
+	private ObjectNode errorInfo;
+
+	public static TaskRecord fromModel(TaskRecordModel model) {
+		TaskRecord entity = new TaskRecord();
+		entity.setId(model.getId());
+		entity.setTaskId(model.getTaskId());
+		entity.setStartTime(model.getStartTime());
+		entity.setEndTime(model.getEndTime());
+		entity.setSuccess(model.getSuccess());
+		entity.setErrorInfo(model.getErrorInfo());
+		return entity;
+	}
+
+	public TaskRecordModel toModel() {
+		TaskRecordModel model = new TaskRecordModel();
+		model.setId(this.id);
+		model.setTaskId(this.getTaskId());
+		model.setStartTime(this.getStartTime());
+		model.setEndTime(this.getEndTime());
+		model.setSuccess(this.getSuccess());
+		model.setErrorInfo(this.getErrorInfo());
+		return model;
+	}
+
+}

+ 55 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/jms/JmsConfig.java

@@ -0,0 +1,55 @@
+package com.persagy.dmp.dpt.jms;
+
+import com.persagy.common.json.JacksonMapper;
+import com.persagy.dmp.rwd.model.DmpMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @author: yaoll
+ * @date: 2020-10-22
+ * @verison: 1.0
+ */
+@Slf4j
+@Configuration
+public class JmsConfig {
+
+	@Value("${persagy.dmp.exchange}")
+	private String exchange;
+
+	@Value("${persagy.dmp.task.routingKey}")
+	private String taskRoutingKey;
+
+	@Value("${persagy.dmp.task.queue}")
+	private String taskQueue;
+
+	@Autowired
+	private MessageProcesser messageProcesser;
+
+	@Bean
+	public TopicExchange exchange() {
+		return new TopicExchange(exchange);
+	}
+
+	@Bean
+	public Queue queue() {
+		return new Queue(taskQueue, true);
+	}
+
+	@Bean
+	public Binding taskBinding() {
+		return BindingBuilder.bind(queue()).to(exchange()).with(taskRoutingKey);
+	}
+
+	// @RabbitListener(queues = "${persagy.dmp.task.queue}")    //监听器监听指定的Queue
+	public void processC(DmpMessage message) {
+		log.info("============================== Receive {}: {}", taskQueue, JacksonMapper.toSimpleJson(message));
+		messageProcesser.listen(message);
+	}
+
+}

+ 14 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/jms/MessageExecutor.java

@@ -0,0 +1,14 @@
+package com.persagy.dmp.dpt.jms;
+
+import com.persagy.dmp.rwd.model.DmpMessage;
+
+/**
+ * @author: yaoll
+ * @date: 2020-10-22
+ * @verison: 1.0
+ */
+public interface MessageExecutor {
+
+	void execute(DmpMessage message);
+
+}

+ 74 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/jms/MessageProcesser.java

@@ -0,0 +1,74 @@
+package com.persagy.dmp.dpt.jms;
+
+import com.persagy.dmp.common.SpringUtils;
+import com.persagy.dmp.config.DmpParameterStorage;
+import com.persagy.dmp.dpt.constant.TaskConstant;
+import com.persagy.dmp.rwd.model.DmpMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author: yaoll
+ * @date: 2020-10-22
+ * @verison: 1.0
+ */
+@Slf4j
+@Component
+public class MessageProcesser {
+
+	@Value("${persagy.dmp.exchange}")
+	private String exchange;
+
+	@Value("${persagy.dmp.task.routingKey}")
+	private String taskRoutingKey;
+
+	@Autowired
+	private RabbitTemplate rabbitTemplate;
+
+	public void convertAndSend(DmpMessage message) {
+		rabbitTemplate.convertAndSend(exchange, taskRoutingKey, message);
+	}
+
+	private static final Set<String> beanNames = new HashSet<>();
+
+	static {
+		beanNames.add(TaskConstant.TASK_CFG_CREATE);
+		beanNames.add(TaskConstant.TASK_CFG_UPDATE);
+		beanNames.add(TaskConstant.TASK_CFG_DELETE);
+	}
+
+	public void listen(DmpMessage message) {
+		String type = message.getType();
+		if (!beanNames.contains(type)) {
+			return;
+		}
+
+		String executorName = type + "Executor";
+		MessageExecutor executor = SpringUtils.getBean(executorName, MessageExecutor.class);
+		if (executor == null) {
+			log.error("bean for [" + executorName + "] not exists");
+		}
+
+		try {
+			DmpParameterStorage.ParameterData data = new DmpParameterStorage.ParameterData();
+			data.setGroupCode(message.getGroupCode());
+			data.setProjectId(message.getProjectId());
+			data.setAppId(message.getAppId());
+
+			DmpParameterStorage.set(data);
+
+			executor.execute(message);
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+			DmpParameterStorage.clear();
+		}
+	}
+
+}

+ 29 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/model/TaskCfgModel.java

@@ -0,0 +1,29 @@
+package com.persagy.dmp.dpt.model;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
+import lombok.Setter;
+import java.util.Date;
+
+@Setter
+@Getter
+public class TaskCfgModel {
+
+	private String id;
+	private String groupCode;
+	private String appId;
+	private String projectId;
+	private String code;
+	private String name;
+	private String type;
+	private ObjectNode config;
+	private ObjectNode params;
+	private ObjectNode timeRule;
+	private Date startTime;
+	private Date endTime;
+	private Integer responseTime;
+	private String level;
+	private Integer status;
+	private String remark;
+
+}

+ 19 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/model/TaskRecordModel.java

@@ -0,0 +1,19 @@
+package com.persagy.dmp.dpt.model;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.Getter;
+import lombok.Setter;
+import java.util.Date;
+
+@Setter
+@Getter
+public class TaskRecordModel {
+
+	private String id;
+	private String taskId;
+	private Date startTime;
+	private Date endTime;
+	private Integer success;
+	private ObjectNode errorInfo;
+
+}

+ 9 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/repository/TaskCfgRepository.java

@@ -0,0 +1,9 @@
+package com.persagy.dmp.dpt.repository;
+
+import com.persagy.dmp.dpt.entity.TaskCfg;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.querydsl.QuerydslPredicateExecutor;
+
+public interface TaskCfgRepository extends JpaRepository<TaskCfg, String>, QuerydslPredicateExecutor<TaskCfg>{
+}
+

+ 9 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/repository/TaskRecordRepository.java

@@ -0,0 +1,9 @@
+package com.persagy.dmp.dpt.repository;
+
+import com.persagy.dmp.dpt.entity.TaskRecord;
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.data.querydsl.QuerydslPredicateExecutor;
+
+public interface TaskRecordRepository extends JpaRepository<TaskRecord, String>, QuerydslPredicateExecutor<TaskRecord>{
+}
+

+ 56 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/rmq/config/TaskRabbitConfig.java

@@ -0,0 +1,56 @@
+package com.persagy.dmp.dpt.rmq.config;
+
+import com.persagy.dmp.dpt.constant.TaskConstant;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TaskRabbitConfig {
+
+    /**
+     * 格式化输入输出格式
+     * @param connectionFactory
+     * @return
+     */
+    @Bean
+    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
+        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
+        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
+        return rabbitTemplate;
+    }
+    /*
+    @Bean
+    public Queue TaskQueue() {
+        return new Queue("TaskQueue",true);
+    }
+
+    @Bean
+    DirectExchange TaskExchange() {
+        return new DirectExchange("TaskExchange",true,false);
+    }
+
+    @Bean
+    Binding bindingDirect() {
+        return BindingBuilder.bind(TaskQueue()).to(TaskExchange()).with("TaskRouting");
+    }
+
+    @Bean("customContainerFactory")
+    public SimpleRabbitListenerContainerFactory containerFactory(
+            SimpleRabbitListenerContainerFactoryConfigurer configurer,
+            ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        factory.setConcurrentConsumers(TaskConstant.TASK_CONSUMER_DEFAULT_CONCURRENT);
+        factory.setMaxConcurrentConsumers(TaskConstant.TASK_CONSUMER_MAX_CONCURRENT);
+        configurer.configure(factory, connectionFactory);
+        return factory;
+    }*/
+}

+ 29 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/rmq/listener/TaskListener.java

@@ -0,0 +1,29 @@
+package com.persagy.dmp.dpt.rmq.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+@Slf4j
+@Component
+public class TaskListener {
+
+    /**
+     * 监听的队列名称 s
+     * @param message
+     */
+//    @RabbitHandler
+//    @RabbitListener(queues = "task_receive_queue")
+//    public void process1(Map message) {
+//        log.info("process1: " + message.toString());
+//    }
+
+//    @RabbitHandler
+//    @RabbitListener(queues = "task_test_queue")
+//    public void process2(Map message) {
+//        log.info("process2: " + message.toString());
+//    }
+}

+ 73 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskBaseService.java

@@ -0,0 +1,73 @@
+package com.persagy.dmp.dpt.service;
+
+import com.persagy.common.web.BaseResponse;
+import com.persagy.dmp.config.DmpParameterStorage;
+import com.persagy.dmp.dpt.repository.TaskCfgRepository;
+import com.persagy.dmp.dpt.repository.TaskRecordRepository;
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.UUID;
+
+public class TaskBaseService {
+
+    @Autowired
+    protected TaskCfgRepository taskCfgRepository;
+
+    @Autowired
+    protected TaskRecordRepository taskRecordRepository;
+
+    @Autowired
+    protected RabbitTemplate rabbitTemplate;
+
+    public static final String USER_ID = "userId";
+    public static final String APP_ID = "appId";
+    public static final String GROUP_CODE = "groupCode";
+    public static final String PROJECT_ID = "projectId";
+
+    @Getter
+    @Setter
+    protected static class OrgParam {
+        String groupCode;
+        String userId;
+        String appId;
+        String projectId;
+    }
+
+    protected <T> OrgParam prepareParam(BaseResponse<T> resp, String... requiredColumns) {
+        OrgParam param = new OrgParam();
+        param.groupCode = DmpParameterStorage.getGroupCode();
+        param.userId = DmpParameterStorage.getUserId();
+        param.projectId = DmpParameterStorage.getProjectId();
+        param.appId = DmpParameterStorage.getAppId();
+        if(requiredColumns != null){
+            for (int i = 0; i < requiredColumns.length; i++) {
+                String column = requiredColumns[i];
+                if (USER_ID.equals(column) && (param.userId == null || param.userId.isEmpty())) {
+                    resp.setFail("请求地址中的 userId 必须有值");
+                    return null;
+                }
+                if (GROUP_CODE.equals(column) && (param.groupCode == null || param.groupCode.isEmpty())){
+                    resp.setFail("请求地址中的 groupCode 必须有值");
+                    return null;
+                }
+                if (PROJECT_ID.equals(column) && (param.projectId == null || param.projectId.isEmpty())){
+                    resp.setFail("请求地址中的 projectId 必须有值");
+                    return null;
+                }
+                if (APP_ID.equals(column) && (param.appId == null || param.appId.isEmpty())){
+                    resp.setFail("请求地址中的 appId 必须有值");
+                    return null;
+                }
+            }
+        }
+        return param;
+    }
+
+    public static String getUUID(){
+        return UUID.randomUUID().toString().replace("-", "");
+    }
+
+}

+ 158 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskCfgService.java

@@ -0,0 +1,158 @@
+package com.persagy.dmp.dpt.service;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.persagy.dmp.dpt.constant.TaskConstant;
+import com.persagy.dmp.dpt.model.TaskRecordModel;
+import com.persagy.dmp.rwd.model.DmpMessage;
+import com.querydsl.core.types.dsl.BooleanExpression;
+import com.persagy.common.criteria.CriteriaUtils;
+import com.persagy.common.criteria.JacksonCriteria;
+import com.persagy.common.web.PagedResponse;
+import com.persagy.common.web.MapResponse;
+import javax.transaction.Transactional;
+
+import org.springframework.data.domain.Sort;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import com.persagy.dmp.dpt.model.TaskCfgModel;
+import org.springframework.beans.factory.annotation.Autowired;
+import com.persagy.dmp.dpt.entity.*;
+import com.persagy.dmp.dpt.repository.TaskCfgRepository;
+
+import java.util.stream.Stream;
+
+@Service
+public class TaskCfgService extends TaskBaseService{
+
+	@Autowired
+	private TaskCfgRepository taskCfgRepository;
+
+	@Autowired
+	private CriteriaUtils criteriaUtils;
+
+	private List<BooleanExpression> parse(ObjectNode criteria) {
+		// TODO
+		return null;
+	}
+
+	public PagedResponse<TaskCfgModel> query(JacksonCriteria criteria) {
+		PagedResponse<TaskCfg> resp = criteriaUtils.query(QTaskCfg.taskCfg, this::parse, criteria);
+		PagedResponse<TaskCfgModel> result = new PagedResponse<>();
+		result.setCount(resp.getCount());
+		List<TaskCfg> dataList = resp.getData();
+		if (dataList != null && dataList.size() > 0) {
+			List<TaskCfgModel> collect = dataList.stream().map(entity -> {
+				TaskCfgModel model = entity.toModel();
+				return model;
+			}).collect(Collectors.toList());
+			result.setData(collect);
+		}
+		return result;
+	}
+
+	@Transactional
+	public MapResponse create(TaskCfgModel param) {
+		MapResponse response = new MapResponse();
+		OrgParam orgParam = prepareParam(response, USER_ID, GROUP_CODE, PROJECT_ID, APP_ID);
+		if (!response.success()) {
+			response.setFail(response.getMessage());
+			return response;
+		}
+		TaskCfg entity = TaskCfg.fromModel(param);
+
+		entity.setId(getUUID());
+		entity.setAppId(orgParam.appId);
+		entity.setGroupCode(orgParam.groupCode);
+		entity.setProjectId(orgParam.projectId);
+
+		ObjectNode config = entity.getConfig();
+		System.out.println(config.get(TaskConstant.CONFIG_EXCHANGE).asText());
+		if (config.get(TaskConstant.CONFIG_EXCHANGE).asText().isEmpty()){
+			response.setFail("exchange 不能为空!");
+			return response;
+		}
+		if (config.get(TaskConstant.CONFIG_ROUTING).asText().isEmpty()){
+			response.setFail("routing 不能为空!");
+			return response;
+		}
+		entity.setStatus(1);
+		taskCfgRepository.save(entity);
+		response.add("id", entity.getId());
+
+		DmpMessage msg = new DmpMessage(TaskConstant.TASK_CFG_CREATE, entity.getId());
+		msg.setGroupCode(orgParam.groupCode);
+		msg.setProjectId(orgParam.projectId);
+		msg.setUserId(orgParam.userId);
+		msg.setAppId(orgParam.appId);
+		response.add(msg);
+
+		return response;
+	}
+
+	@Transactional
+	public MapResponse update(TaskCfgModel param) {
+		MapResponse response = new MapResponse();
+		// TODO
+		prepareParam(response, USER_ID, GROUP_CODE, PROJECT_ID, APP_ID);
+		if (!response.success()) {
+			response.setFail(response.getMessage());
+			return response;
+		}
+		TaskCfg taskCfg = TaskCfg.fromModel(param);
+
+		if (taskCfg.getId() == null){
+			response.setFail("更新操作id不能为空!");
+			return response;
+		}
+		Optional<TaskCfg> optional = taskCfgRepository.findById(taskCfg.getId());
+		if (!optional.isPresent()){
+			response.setFail("数据不存在!");
+			return response;
+		}
+		TaskCfg one = optional.get();
+
+		if (taskCfg.getConfig() != null){
+			ObjectNode config = taskCfg.getConfig();
+			if (config.get(TaskConstant.CONFIG_EXCHANGE).asText().isEmpty()){
+				response.setFail("exchange 不能为空!");
+				return response;
+			}
+			if (config.get(TaskConstant.CONFIG_ROUTING).asText().isEmpty()){
+				response.setFail("routing 不能为空!");
+				return response;
+			}
+		}
+
+		/** 集团编码不支持修改 */
+		taskCfg.setGroupCode(one.getGroupCode());
+
+		/** 项目 Id 不支持修改 */
+		taskCfg.setProjectId(one.getProjectId());
+
+		taskCfgRepository.save(one);
+		response.add("id", one.getId());
+		return response;
+	}
+
+	@Transactional
+	public MapResponse delete(TaskCfgModel param) {
+		MapResponse response = new MapResponse();
+		/** 查询要删除的文件是否存在 */
+		Optional<TaskCfg> optional = taskCfgRepository.findById(param.getId());
+		if (!optional.isPresent()){
+			response.setFail("数据不存在!");
+			return response;
+		}
+		TaskCfg one = optional.get();
+
+		/** 逻辑删除 */
+		one.setStatus(0);
+
+		taskCfgRepository.save(one);
+		response.add("id", one.getId());
+		return response;
+	}
+}
+

+ 147 - 0
dmp-task/src/main/java/com/persagy/dmp/dpt/service/TaskRecordService.java

@@ -0,0 +1,147 @@
+package com.persagy.dmp.dpt.service;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.persagy.common.criteria.CriteriaUtils;
+import com.persagy.common.criteria.JacksonCriteria;
+import com.persagy.common.web.MapResponse;
+import com.persagy.common.web.PagedResponse;
+import com.persagy.dmp.dpt.entity.QTaskRecord;
+import com.persagy.dmp.dpt.entity.TaskCfg;
+import com.persagy.dmp.dpt.entity.TaskRecord;
+import com.persagy.dmp.dpt.model.TaskRecordModel;
+import com.persagy.dmp.dpt.repository.TaskRecordRepository;
+import com.querydsl.core.types.dsl.BooleanExpression;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.transaction.Transactional;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Service
+public class TaskRecordService extends TaskBaseService{
+
+	@Autowired
+	private TaskRecordRepository taskRecordRepository;
+
+	@Autowired
+	private CriteriaUtils criteriaUtils;
+
+	private List<BooleanExpression> parse(ObjectNode criteria) {
+		// TODO
+		return null;
+	}
+
+	public PagedResponse<TaskRecordModel> query(JacksonCriteria criteria) {
+		PagedResponse<TaskRecord> resp = criteriaUtils.query(QTaskRecord.taskRecord, this::parse, criteria);
+		PagedResponse<TaskRecordModel> result = new PagedResponse<>();
+		result.setCount(resp.getCount());
+		List<TaskRecord> dataList = resp.getData();
+		if (dataList != null && dataList.size() > 0) {
+			List<TaskRecordModel> collect = dataList.stream().map(entity -> {
+				TaskRecordModel model = entity.toModel();
+				return model;
+			}).collect(Collectors.toList());
+			result.setData(collect);
+		}
+		return result;
+	}
+
+	@Transactional
+	public MapResponse create(TaskRecordModel param) {
+		MapResponse response = new MapResponse();
+		// TODO
+		TaskRecord entity = TaskRecord.fromModel(param);
+		taskRecordRepository.save(entity);
+		response.add("id", entity.getId());
+		return response;
+	}
+
+	@Transactional
+	public MapResponse update(TaskRecordModel param) {
+		MapResponse response = new MapResponse();
+		// TODO
+		TaskRecord entity = TaskRecord.fromModel(param);
+		taskRecordRepository.save(entity);
+		response.add("id", entity.getId());
+		return response;
+	}
+
+	@Transactional
+	public MapResponse delete(TaskRecordModel param) {
+		MapResponse response = new MapResponse();
+		// TODO
+		taskRecordRepository.deleteById(param.getId());
+		return response;
+	}
+
+	/**
+	 * 回调接口保存任务记录
+	 * @param taskRecord
+	 */
+	@Transactional
+	public void save(TaskRecord taskRecord){
+		taskRecordRepository.save(taskRecord);
+	}
+
+	public MapResponse saveAndSendMq(List<String> taskIds) {
+		MapResponse response = new MapResponse();
+
+		OrgParam orgParam = prepareParam(response, USER_ID, GROUP_CODE, PROJECT_ID, APP_ID);
+		if (!response.success()) {
+			response.setFail(response.getMessage());
+			return response;
+		}
+		if (taskIds == null || taskIds.isEmpty()) {
+			response.setFail("无数据!");
+			return response;
+		}
+		List<TaskCfg> taskCfgs = taskCfgRepository.findAllById(taskIds);
+		if (taskCfgs == null || taskCfgs.isEmpty()){
+			response.setFail("没有要发送的数据!");
+			return response;
+		}
+
+		/** 按级别排序 */
+		taskCfgs.sort((t1, t2) -> t2.getLevel().compareTo(t1.getLevel()));
+
+		for (TaskCfg taskCfg : taskCfgs) {
+			TaskRecord taskRecord = new TaskRecord();
+			String taskId = taskCfg.getId();
+			String taskRecordId = getUUID();
+			Date startDate = new Date();
+			/** 新增 taskRecord */
+			taskRecord.setId(taskRecordId);
+			taskRecord.setTaskId(taskId);
+			taskRecord.setStartTime(startDate);
+			taskRecord.setErrorInfo(taskCfg.getConfig());
+			taskRecord.setSuccess(0);
+
+			/** 保存任务记录 */
+			save(taskRecord);
+
+			/** 获取发送的路由与路由键 */
+			String exchange = taskCfg.getConfig().get("exchange").textValue();
+			String routing = taskCfg.getConfig().get("routing").textValue();
+
+			/** 封装MQ消息 */
+			Map<String, Object> taskMQ = new HashMap<>();
+			taskMQ.put("params", taskCfg.getParams());
+			taskMQ.put("taskId", taskId);
+			taskMQ.put("taskRecordId",taskRecordId);
+			taskMQ.put(GROUP_CODE, orgParam.groupCode);
+			taskMQ.put(USER_ID, orgParam.userId);
+			taskMQ.put(APP_ID, orgParam.appId);
+			taskMQ.put(PROJECT_ID, orgParam.projectId);
+
+			/** 发送MQ */
+			rabbitTemplate.convertAndSend(exchange, routing, taskMQ);
+		}
+		return response;
+	}
+
+}
+

+ 7 - 0
dmp-task/src/main/resources/bootstrap.yml

@@ -0,0 +1,7 @@
+spring:
+  application:
+    name: dmp-task
+  cloud:
+    config:
+      profile: dev
+      uri: http://localhost:9932

+ 55 - 0
dmp-task/src/main/resources/logback-spring.xml

@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration scan="true" scanPeriod="60 seconds" debug="false">
+
+    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
+    <property name="log_pattern" value="%-30thread %d{yyyyMMdd HH:mm:ss.SSS} %-5level %logger{140} - %msg%n"/>
+    <property name="app_name" value="task"/>
+    <springProperty scope="context" name="log_path" source="persagy.log.path"/>
+    <property name="log_file" value="${log_path}/${app_name}"/>
+    <contextName>${app_name}</contextName>
+
+    <appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
+        <layout class="ch.qos.logback.classic.PatternLayout">
+            <pattern>${log_pattern}</pattern>
+        </layout>
+    </appender>
+
+    <!-- 日志文件 -->
+    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${log_file}-%d{yyyyMMdd}.log</fileNamePattern>
+            <maxHistory>7</maxHistory>
+        </rollingPolicy>
+        <encoder>
+            <pattern>${log_pattern}</pattern>
+        </encoder>
+    </appender>
+
+    <logger name="org.hibernate" level="INFO"/>
+    <logger name="org.springframework" level="INFO"/>
+    <logger name="com.netflix" level="WARN"/>
+    <logger name="org.apache.http" level="INFO"/>
+
+    <springProfile name="log-dev">
+        <logger name="org.hibernate.SQL" level="DEBUG"/>
+        <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE"/>
+        <root level="info">
+            <appender-ref ref="stdout"/>
+        </root>
+    </springProfile>
+
+    <springProfile name="log-test">
+        <logger name="org.hibernate.SQL" level="DEBUG"/>
+        <logger name="org.hibernate.type.descriptor.sql.BasicBinder" level="TRACE"/>
+        <root level="info">
+            <appender-ref ref="file"/>
+        </root>
+    </springProfile>
+
+    <springProfile name="log-prod">
+        <root level="info">
+            <appender-ref ref="file"/>
+        </root>
+    </springProfile>
+
+</configuration> 

+ 1 - 0
pom.xml

@@ -16,6 +16,7 @@
         <module>dmp-rwd</module>
         <module>dmp-rwd-funcid-parser</module>
         <module>dmp-rwd-datautils</module>
+        <module>dmp-task</module>
     </modules>
 
     <packaging>pom</packaging>