Bladeren bron

采集服务代码提交

fenghanchao 3 jaren geleden
bovenliggende
commit
5487a15a7d

+ 1 - 1
pom.xml

@@ -122,7 +122,7 @@
 						<manifest>
 							<addClasspath>true</addClasspath>
 							<classpathPrefix></classpathPrefix>
-							<mainClass>com.persagy.compute.main.ComputeMain</mainClass>
+							<mainClass>com.saga.main.Main</mainClass>
 						</manifest>
 					</archive>
 				</configuration>

+ 2 - 1
src/main/java/com/saga/thread/common/LoadConfigThread.java

@@ -663,7 +663,8 @@ public class LoadConfigThread extends Thread {
 						continue;
 					}
 					pty = Constant.toLower(pty);
-					Constant.activeMqBrokerUrl = rowData.get(pty.get("broker-url"));
+					Constant.activeMqBrokerUrl = rowData.get(pty.get("brokerurl"));
+					Constant.queueName = rowData.get(pty.get("queuename"));
 				}
 			}
 		}

+ 19 - 4
src/main/java/com/saga/thread/common/SaveThread.java

@@ -4,6 +4,9 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import com.alibaba.fastjson.JSONObject;
 import com.saga.util.MessageUtil;
@@ -181,10 +184,8 @@ public class SaveThread extends Thread {
 											record.receivetime = Constant.getOffSetTime(record);
 											record.funcID = Constant.getFunctionConv(record);
 
-											//TODO 往activeMq消息队列里发送消息
-											if (Constant.activeMqPointSet.contains(record.buildingSign + "-" + record.meterSign + "-" + record.funcID)) {
-												MessageUtil.sendMessage(JSONObject.toJSONString(record));
-											}
+											// 往activeMq消息队列里发送消息
+											sendMessage(record);
 
 											if (toDB(record.buildingSign, record.meterSign, record.funcID)) {
 												this.strecordList.add(record);
@@ -237,6 +238,20 @@ public class SaveThread extends Thread {
 		}
 	}
 
+	ExecutorService executorService = Executors.newFixedThreadPool(10);
+
+	private void sendMessage(Record record) {
+		executorService.execute(() -> {
+			if (Constant.activeMqPointSet.contains(record.buildingSign + "-" + record.meterSign + "-" + record.funcID)) {
+				try {
+					MessageUtil.sendMessage(JSONObject.toJSONString(record));
+				} catch (Exception e) {
+					LogUtil.error(e);
+				}
+			}
+		});
+	}
+
 	/**
 	 * 是否需要进库
 	 * 

+ 22 - 0
src/main/java/com/saga/thread/common/WTDBThread.java

@@ -2,7 +2,12 @@ package com.saga.thread.common;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
+import com.alibaba.fastjson.JSONObject;
+import com.saga.util.MessageUtil;
+import org.springframework.util.CollectionUtils;
 import org.zillion.util.log.LogUtil;
 
 import com.saga.entity.Record;
@@ -43,6 +48,7 @@ public class WTDBThread extends Thread {
 				}
 				Constant.BatchInsert(Constant.Database, wtdbrecordList);
 				Constant.BatchInsertUpload(wtdbrecordList);
+		//		sendMessage();
 				write = System.currentTimeMillis();
 				LogUtil.error("WTDBThread-" + this.getName() + ".write: " + write);
 				wtdbrecordList = new ArrayList<Record>();
@@ -53,6 +59,22 @@ public class WTDBThread extends Thread {
 		}
 	}
 
+/*	ExecutorService executorService = Executors.newFixedThreadPool(10);
+	private void sendMessage() {
+		wtdbrecordList.forEach(record -> {
+			//TODO ÍùactiveMqÏûÏ¢¶ÓÁÐÀï·¢ËÍÏûÏ¢
+			if (Constant.activeMqPointSet.contains(record.buildingSign + "-" + record.meterSign + "-" + record.funcID)) {
+				executorService.execute(() -> {
+					try {
+						MessageUtil.sendMessage(JSONObject.toJSONString(record));
+					} catch (Exception e) {
+						LogUtil.error(e);
+					}
+				});
+			}
+		});
+	}*/
+
 	private boolean delayToDb() {
 		if (wtdbrecordList.size() == 0
 				|| (wtdbrecordList.size() < 500 && (System.currentTimeMillis() - this.write < Constant.delay))) {

+ 2 - 0
src/main/java/com/saga/util/Constant.java

@@ -114,6 +114,8 @@ public class Constant {
 
 
 	public static String activeMqBrokerUrl;
+	public static String queueName;
+
 	public static Set<String> activeMqPointSet = new HashSet<>();
 
 	// static {

+ 23 - 9
src/main/java/com/saga/util/MessageUtil.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.commons.lang.StringUtils;
 import org.springframework.util.CollectionUtils;
+import org.zillion.util.log.LogUtil;
 
 import javax.jms.*;
 import java.util.HashMap;
@@ -16,8 +17,6 @@ import java.util.Map;
  **/
 public class MessageUtil {
 
-    public static final String QUEUE_NAME = "collection.queue.20220114";
-
     private static MessageProducer producer;
 
     private static Session session;
@@ -28,7 +27,8 @@ public class MessageUtil {
         }
 
         if (StringUtils.isBlank(Constant.activeMqBrokerUrl)) {
-            throw new Exception("读取不到activeMq的brokerUrl!");
+            LogUtil.error("读取不到activeMq的brokerUrl!");
+            return null;
         }
 
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD
@@ -41,22 +41,36 @@ public class MessageUtil {
         return session;
     }
 
-    public synchronized static MessageProducer getMessageProducer() throws Exception {
+    private static final long FIVE_MINUTES = 5 * 60 * 1000;
+
+    public synchronized static MessageProducer getMessageProducer(Session session) throws Exception {
+        if (session == null) {
+            return null;
+        }
+
         if (producer != null) {
             return producer;
         }
-
-        Session session = getSession();
         //创建queue
-        Queue queue = session.createQueue(QUEUE_NAME);//Destination
+        Queue queue = session.createQueue(Constant.queueName);//Destination
         //消息生产者
         producer = session.createProducer(queue);
+        producer.setTimeToLive(FIVE_MINUTES);
         return producer;
     }
 
     public static void sendMessage(String message) throws Exception {
-        MessageProducer messageProducer = getMessageProducer();
-        TextMessage textMessage = getSession().createTextMessage(message);
+        Session session = getSession();
+        if (session == null) {
+            LogUtil.error("未连接到ActiveMq服务!");
+            return;
+        }
+        MessageProducer messageProducer = getMessageProducer(session);
+        if (messageProducer == null) {
+            LogUtil.error("未连接到ActiveMq服务!");
+            return;
+        }
+        TextMessage textMessage = session.createTextMessage(message);
         messageProducer.send(textMessage);
     }