浏览代码

服务器端功能性初步完成

jxing 6 年之前
父节点
当前提交
5dfcf6f130

+ 23 - 11
scheduler/src/main/java/cn/sagacloud/pojo/ChannelHandlerContextWrapper.java

@@ -18,14 +18,14 @@ public class ChannelHandlerContextWrapper {
     private final long sendingTimeOut = 10;            // 发送给客户端任务, 如果超过该时间未响应, 则超时, 并将发送的任务状态置为初始状态
     private final long lastRefuseTimeOut = 3600;       // 如果客户端拒绝任务, 则等待lastRefuseTimeOut秒, 才会再分配任务给该客户端
     private long lastRefuseTime = 0L;
-    private String macAddr;             // mac地址
+    private String clientInfo;             // mac地址
     // 任务发送中状态map,     key --> taskid, value --> 发送任务时间点,    如果超过15s, 即为超时.
     private Map<Integer, Long> taskSendingStatusMap = new HashMap<>();
 
     public ChannelHandlerContextWrapper(ChannelHandlerContext ctx) {
         this.ctx = ctx;
         this.lastRejectTime = 0;
-        macAddr = null;
+        clientInfo = null;
     }
 
     public void sendTask(TaskModel toBeSent) {
@@ -53,12 +53,12 @@ public class ChannelHandlerContextWrapper {
         return -1;
     }
 
-    public String getMacAddr() {
-        return macAddr;
+    public String getClientInfo() {
+        return clientInfo;
     }
 
-    public void setMacAddr(String macAddr) {
-        this.macAddr = macAddr;
+    public void setClientInfo(String clientInfo) {
+        this.clientInfo = clientInfo;
     }
 
     public ChannelHandlerContext getCtx() {
@@ -69,11 +69,11 @@ public class ChannelHandlerContextWrapper {
         return lastRejectTime;
     }
 
-    public void setLastRejectTime(long lastRejectTime) {
-        this.lastRejectTime = lastRejectTime;
+    public void setLastRejectTime(int taskId) {
+        this.lastRejectTime = CommonUtil.getTime();
+        DispatchTask.changeStatusByCmd(taskId, Command.RefuseTask, clientInfo);
     }
 
-
     public boolean isLastRefuseTimeOutPassed() {
         if(lastRefuseTime == 0)
             return true;
@@ -82,11 +82,23 @@ public class ChannelHandlerContextWrapper {
             return true;
         return false;
     }
-    // 是否加锁
+
     public void acceptTask(int taskId) {
         TaskStatus isAllow = DispatchTask.isAllowCmd(taskId, Command.AcceptTask);
         if(isAllow==null)
             return;
-        DispatchTask.changeStatusByCmd(taskId, Command.AcceptTask);
+        DispatchTask.changeStatusByCmd(taskId, Command.AcceptTask, clientInfo);
+    }
+
+    public void taskSuccess(MessageProto.Message message) {
+        DispatchTask.changeStatusByCmdWithReturnJson(message.getTaskId(), Command.TaskSuccess, clientInfo, message.getContent());
+    }
+
+    public void commandError(MessageProto.Message message) {
+        DispatchTask.changeStatusByCmdWithReturnJson(message.getTaskId(), Command.CommandError, clientInfo, message.getContent());
+    }
+
+    public void downloadError(MessageProto.Message message) {
+        DispatchTask.changeStatusByCmdWithReturnJson(message.getTaskId(), Command.DownloadError, clientInfo, message.getContent());
     }
 }

+ 30 - 0
scheduler/src/main/java/cn/sagacloud/pojo/ClientMessage.java

@@ -0,0 +1,30 @@
+package cn.sagacloud.pojo;
+
+
+import cn.sagacloud.proto.MessageProto;
+
+public class ClientMessage {
+    private ChannelHandlerContextWrapper ctxWrapper;
+    private MessageProto.Message message;
+
+    public ClientMessage(ChannelHandlerContextWrapper ctxWrapper, MessageProto.Message message) {
+        this.ctxWrapper = ctxWrapper;
+        this.message = message;
+    }
+
+    public ChannelHandlerContextWrapper getCtxWrapper() {
+        return ctxWrapper;
+    }
+
+    public void setCtxWrapper(ChannelHandlerContextWrapper ctxWrapper) {
+        this.ctxWrapper = ctxWrapper;
+    }
+
+    public MessageProto.Message getMessage() {
+        return message;
+    }
+
+    public void setMessage(MessageProto.Message message) {
+        this.message = message;
+    }
+}

+ 34 - 8
scheduler/src/main/java/cn/sagacloud/server/DispatchTask.java

@@ -43,14 +43,28 @@ public class DispatchTask implements Runnable {
             return null;
         return allowCmd.get(cmd);
     }
+    // 加锁
+    public static void changeStatusByCmd(int taskId, Command cmd, String clientInfo) {
+        TaskStatus nextStatus = isAllowCmd(taskId, cmd);
+        if(nextStatus == null)
+            return;
+        TaskModel task = DispatchTask.tasks.get(taskId);
+        if(task == null)
+            return;
+        task.setTask_last_client(clientInfo);
+        task.setTask_status(TaskStatus.getIdByTaskStatus(nextStatus));
+        updateTask(task);
+    }
 
-    public static void changeStatusByCmd(int taskId, Command cmd) {
+    public static void changeStatusByCmdWithReturnJson(int taskId, Command cmd, String clientInfo, String returnJson) {
         TaskStatus nextStatus = isAllowCmd(taskId, cmd);
         if(nextStatus == null)
             return;
         TaskModel task = DispatchTask.tasks.get(taskId);
         if(task == null)
             return;
+        task.setTask_last_client(clientInfo);
+        task.setTask_result_json(returnJson);
         task.setTask_status(TaskStatus.getIdByTaskStatus(nextStatus));
         updateTask(task);
     }
@@ -73,20 +87,32 @@ public class DispatchTask implements Runnable {
                 }
             }
             try {
-                Thread.sleep(1000);
+                Thread.sleep(5000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
+            if(cnt%20 == 0){
+                syncTask();
+            }
+        }
+    }
+
+    private void syncTask() {
+        try {
+            Map<Integer, TaskModel> tmpTasks = service.getAllTaskMapByStatus(Arrays.asList(0, 1, 2));
+            tasks = tmpTasks;
+            log.error("同步数据库任务完成");
+        } catch (Exception e) {
+            e.printStackTrace();
+            log.error("同步数据库失败");
         }
     }
 
     private void checkExecuteTimeOut() {
-        for(ChannelHandlerContextWrapper client : clientList){
-            int id = getOneExecuteTimeOutTaskId();
-            if(id > 0){
-                // 是否加锁
-                tasks.get(id).setTask_status(0);
-            }
+        int id = getOneExecuteTimeOutTaskId();
+        if(id > 0){
+            // 是否加锁
+            tasks.get(id).setTask_status(0);
         }
     }
     /**

+ 39 - 6
scheduler/src/main/java/cn/sagacloud/server/MessageHandler.java

@@ -1,14 +1,18 @@
 package cn.sagacloud.server;
 
 import cn.sagacloud.pojo.ChannelHandlerContextWrapper;
+import cn.sagacloud.pojo.ClientMessage;
 import cn.sagacloud.pojo.Command;
 import cn.sagacloud.proto.MessageProto;
+import cn.sagacloud.proto.MessageUtil;
 import cn.sagacloud.utils.CommonUtil;
-import io.netty.channel.ChannelHandlerContext;
-import org.checkerframework.checker.formatter.FormatUtil;
 
-public class MessageHandler {
-    public static void handle(MessageProto.Message message, ChannelHandlerContextWrapper ctx) {
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class MessageHandler implements Runnable {
+    public static ConcurrentLinkedQueue<ClientMessage> messageQueue = new ConcurrentLinkedQueue<>();
+
+    public static void handle(MessageProto.Message message, ChannelHandlerContextWrapper clientWrapper) {
         Command cmd;
         try {
             cmd = Command.valueOf(message.getCmd());
@@ -17,19 +21,48 @@ public class MessageHandler {
         }
         switch (cmd){
             case AcceptTask:
-                ctx.acceptTask(message.getTaskId());
+                clientWrapper.acceptTask(message.getTaskId());
                 break;
             case RefuseTask:
-                ctx.setLastRejectTime(CommonUtil.getTime());
+                clientWrapper.setLastRejectTime(message.getTaskId());
                 break;
             case ClientInfo:
+                clientWrapper.setClientInfo(message.getContent());
                 break;
             case TaskSuccess:
+                clientWrapper.taskSuccess(message);
                 break;
             case CommandError:
+                clientWrapper.commandError(message);
                 break;
             case DownloadError:
+                clientWrapper.downloadError(message);
                 break;
         }
     }
+
+    public static void offer(ClientMessage clientMessage) {
+        MessageUtil.printMessage(clientMessage.getMessage());
+        messageQueue.offer(clientMessage);
+    }
+
+    @Override
+    public void run() {
+        while(true){
+            ClientMessage clientMessage = messageQueue.poll();
+            if(clientMessage == null){
+                try {
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                continue;
+            }
+            handle(clientMessage.getMessage(), clientMessage.getCtxWrapper());
+        }
+    }
 }
+
+
+
+

+ 3 - 6
scheduler/src/main/java/cn/sagacloud/server/SchedulerHandler.java

@@ -5,11 +5,11 @@ package cn.sagacloud.server;
  */
 
 import cn.sagacloud.pojo.ChannelHandlerContextWrapper;
+import cn.sagacloud.pojo.ClientMessage;
 import cn.sagacloud.pojo.Command;
 import cn.sagacloud.pojo.TaskStatus;
 import cn.sagacloud.proto.MessageProto;
 import cn.sagacloud.proto.MessageUtil;
-import cn.sagacloud.utils.CommonUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
@@ -77,7 +77,7 @@ public class SchedulerHandler extends ChannelInboundHandlerAdapter {
     public void channelActive(ChannelHandlerContext ctx) {
         ctxw = new ChannelHandlerContextWrapper(ctx);
         this.clientList.add(ctxw);
-        //ctx.channel().writeAndFlush("connected !"+System.currentTimeMillis());
+        ctx.channel().writeAndFlush(MessageUtil.buildMessage(Command.ClientInfo.name(), 0, ""));
     }
 
     /**
@@ -91,10 +91,7 @@ public class SchedulerHandler extends ChannelInboundHandlerAdapter {
         try {
             // Do something with msg
             MessageProto.Message message = (MessageProto.Message)msg;
-            MessageUtil.printMessage(message);
-
-            MessageHandler.handle(message, ctxw);
-            //ctx.writeAndFlush(myMsg);  // 异步
+            MessageHandler.offer(new ClientMessage(ctxw, message));
         } finally {
             // 如果ctx write 或 writeAndFlush过的话, 就不用释放msg
             //ReferenceCountUtil.release(msg);

+ 4 - 1
scheduler/src/main/java/cn/sagacloud/server/Server.java

@@ -34,6 +34,7 @@ public class Server {
     public static ExecutorService pool = Executors.newFixedThreadPool(8);
     Config config;
     DispatchTask dispatchTask;
+    MessageHandler messageHandler;
     ArrayList<ChannelHandlerContextWrapper> clientList = new ArrayList<>();
     private static Logger log = Logger.getLogger(Server.class);
     public Server() throws Exception {
@@ -64,6 +65,7 @@ public class Server {
         // 初始化Task(从sql到内存)
         log.info("开始同步数据库中任务...");
         dispatchTask = new DispatchTask(clientList);
+        messageHandler = new MessageHandler();
     }
 
     public void start() throws InterruptedException {
@@ -72,7 +74,8 @@ public class Server {
             log.info("开始启动监听...");
             channel = bootstrap.bind(config.getPort()).sync().channel();
             log.info("开始分发任务...");
-            pool.submit(dispatchTask);
+            pool.submit(dispatchTask);        // 开启任务分发线程
+            pool.submit(messageHandler);      // 开启消息处理线程
             //pool.submit()
             log.info("服务器启动完毕");
             channel.closeFuture().sync();

+ 1 - 1
scheduler/src/main/resources/log4j.properties

@@ -10,7 +10,7 @@ log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,S
 ### Êä³öDEBUG ¼¶±ðÒÔÉϵÄÈÕÖ¾µ½=E://logs/error.log ###
 log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
 log4j.appender.D.File = E:/logs/log.log
-log4j.appender.D.Append = true
+log4j.appender.D.Append = true 
 log4j.appender.D.Threshold = DEBUG
 log4j.appender.D.layout = org.apache.log4j.PatternLayout
 log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n