Explorar o código

修改报告生成线程的队列策略

lixing %!s(int64=3) %!d(string=hai) anos
pai
achega
0d90a5bf53

+ 38 - 14
src/main/java/com/persagy/apm/report/outline/service/impl/GenerateReportThreadPool.java

@@ -25,14 +25,27 @@ public class GenerateReportThreadPool {
     @Value("${maxGenerateMinutes:5}")
     private int maxGenerateMinutes;
 
-    private final ExecutorService es;
+    /**
+     * 最大生成报告任务数
+     */
+    private final Integer maxGenerateTaskCount = 1000;
+
+    private final ExecutorService generateReportEs;
+    private final ExecutorService listenGenerateResultEs;
 
     public GenerateReportThreadPool() {
-        // 初始化线程池
-        es = ExecutorBuilder.create()
+        // 初始化生成报告任务线程池
+        generateReportEs = ExecutorBuilder.create()
                 .setCorePoolSize(5)
                 .setMaxPoolSize(10)
-                .setWorkQueue(new LinkedBlockingQueue<>(1000))
+                .setWorkQueue(new LinkedBlockingQueue<>(maxGenerateTaskCount))
+                .setHandler(new ThreadPoolExecutor.AbortPolicy())
+                .build();
+        // 初始化监测报告生成结果线程池
+        listenGenerateResultEs = ExecutorBuilder.create()
+                .setCorePoolSize(5)
+                .setMaxPoolSize(10)
+                .setWorkQueue(new LinkedBlockingQueue<>(maxGenerateTaskCount + 200))
                 .setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
                 .build();
     }
@@ -44,7 +57,7 @@ public class GenerateReportThreadPool {
      * @author lixing
      * @version V1.0 2021/5/24 4:09 下午
      */
-    public void generateReport(ReportOutline reportOutline) {
+    public String generateReport(ReportOutline reportOutline) {
         String userId = PoemsContext.getContext().getUserId();
         String pd = PoemsContext.getContext().getPd();
         String loginDevice = PoemsContext.getContext().getLoginDevice();
@@ -57,15 +70,26 @@ public class GenerateReportThreadPool {
             return "done";
         });
 
-        es.execute(future);
-        es.execute(() -> {
-            try {
-                future.get(maxGenerateMinutes, TimeUnit.MINUTES);
-            } catch (InterruptedException | ExecutionException | TimeoutException e) {
-                log.error("生成报告发生异常", e);
-                future.cancel(true);
-                reportGenerator.updateReportOutlineInfoWhenFailure(reportOutline);
+        try {
+            generateReportEs.execute(future);
+            listenGenerateResultEs.execute(() -> {
+                try {
+                    future.get(maxGenerateMinutes, TimeUnit.MINUTES);
+                } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                    log.error("生成报告发生异常", e);
+                    future.cancel(true);
+                    reportGenerator.updateReportOutlineInfoWhenFailure(reportOutline);
+                }
+            });
+        } catch (Exception e) {
+            if (e instanceof RejectedExecutionException) {
+                log.error("当前报告生成任务数达到最大值:{}", maxGenerateTaskCount);
             }
-        });
+            log.error("生成报告发生异常", e);
+            future.cancel(true);
+            reportGenerator.updateReportOutlineInfoWhenFailure(reportOutline);
+            return "当前报告生成任务数达到最大值:" + maxGenerateTaskCount + ",请稍后再试";
+        }
+        return null;
     }
 }

+ 6 - 1
src/main/java/com/persagy/apm/report/outline/service/impl/ReportOutlineServiceImpl.java

@@ -38,6 +38,7 @@ import com.persagy.apm.report.userconfig.reporttype.constant.enums.BelongTypeEnu
 import com.persagy.apm.report.userconfig.reporttype.model.ReportType;
 import com.persagy.apm.report.userconfig.reporttype.model.vo.ResponseReportTypeListItemVO;
 import com.persagy.apm.report.userconfig.reporttype.service.IReportTypeService;
+import com.persagy.common.exception.BusinessException;
 import org.apache.commons.lang.StringUtils;
 import org.assertj.core.util.Lists;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,6 +50,7 @@ import java.text.Collator;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -102,7 +104,10 @@ public class ReportOutlineServiceImpl extends ServiceImpl<ReportOutlineMapper, R
         setDefaultValue(reportOutline);
         saveOrUpdate(reportOutline);
         // 后台生成报告
-        generateReportThreadPool.generateReport(reportOutline);
+        String result = generateReportThreadPool.generateReport(reportOutline);
+        if (StringUtils.isNotBlank(result)) {
+            throw new BusinessException(result);
+        }
         return reportOutline.getId();
     }
 

+ 112 - 0
src/test/java/com/persagy/apm/report/indicator/factory/EnergyEfficiencyFactoryTest.java

@@ -1,12 +1,16 @@
 package com.persagy.apm.report.indicator.factory;
 
+import cn.hutool.core.thread.ExecutorBuilder;
+import com.persagy.apm.common.context.poems.PoemsContext;
 import com.persagy.apm.report.dependencies.energyefficiencyweb.model.vo.EnergyEfficiencyDataVO;
 import com.persagy.apm.report.detail.model.vo.AttrValueVO;
 import junit.framework.TestCase;
 import org.assertj.core.util.Lists;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.*;
 
 public class EnergyEfficiencyFactoryTest extends TestCase {
 
@@ -33,4 +37,112 @@ public class EnergyEfficiencyFactoryTest extends TestCase {
         AttrValueVO maxDays = energyEfficiencyFactory.getMinDays(dailyEnergyEfficiency);
         System.out.println(maxDays.getValue());
     }
+
+    public void testThreadPool() throws InterruptedException {
+        ExecutorService es = ExecutorBuilder.create()
+                .setCorePoolSize(5)
+                .setMaxPoolSize(10)
+                .setWorkQueue(new LinkedBlockingQueue<>(10))
+                .setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
+                .build();
+        int count = 1000;
+        List<FutureTask<String>> tasks = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            System.out.println("次数" + i);
+            // 添加Future任务,过期时间为5分钟
+            FutureTask<String> future = new FutureTask<>(() -> {
+                System.out.println("开始执行");
+                Thread.sleep(5000);
+                return Thread.currentThread().getId() + "done";
+            });
+            tasks.add(future);
+        }
+        for (FutureTask<String> task : tasks) {
+            es.execute(task);
+            es.execute(() -> {
+                try {
+                    System.out.println("获取结果");
+                    System.out.println(task.get());
+                } catch (InterruptedException | ExecutionException e) {
+                    System.out.println("发生异常");
+                    task.cancel(true);
+                }
+            });
+        }
+
+        Thread.sleep(5000 * count);
+    }
+
+    public void testThreadPool1() throws InterruptedException {
+        ExecutorService es = ExecutorBuilder.create()
+                .setCorePoolSize(5)
+                .setMaxPoolSize(10)
+                .setWorkQueue(new LinkedBlockingQueue<>(10))
+                .setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
+                .build();
+        int count = 1000;
+        List<FutureTask<String>> tasks = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            System.out.println("次数" + i);
+            // 添加Future任务,过期时间为5分钟
+            FutureTask<String> task = new FutureTask<>(() -> {
+                System.out.println("开始执行");
+                Thread.sleep(5000);
+                return Thread.currentThread().getId() + "done";
+            });
+            es.execute(task);
+            es.execute(() -> {
+                try {
+                    System.out.println("获取结果");
+                    System.out.println(task.get());
+                } catch (InterruptedException | ExecutionException e) {
+                    System.out.println("发生异常");
+                    task.cancel(true);
+                }
+            });
+        }
+        Thread.sleep(5000 * count);
+    }
+
+
+    public void testThreadPool2() throws InterruptedException {
+        ExecutorService es = ExecutorBuilder.create()
+                .setCorePoolSize(5)
+                .setMaxPoolSize(10)
+                .setWorkQueue(new LinkedBlockingQueue<>(100))
+                .setHandler(new ThreadPoolExecutor.AbortPolicy())
+                .build();
+        ExecutorService es1 = ExecutorBuilder.create()
+                .setCorePoolSize(5)
+                .setMaxPoolSize(10)
+                .setWorkQueue(new LinkedBlockingQueue<>(100))
+                .setHandler(new ThreadPoolExecutor.CallerRunsPolicy())
+                .build();
+        int count = 1000;
+        List<FutureTask<String>> tasks = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            System.out.println("次数" + i);
+            // 添加Future任务,过期时间为5分钟
+            FutureTask<String> task = new FutureTask<>(() -> {
+                System.out.println("开始执行");
+                Thread.sleep(5000);
+                return Thread.currentThread().getId() + "done";
+            });
+            try {
+                es.submit(task);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            //            es1.execute(() -> {
+//                try {
+//                    System.out.println("获取结果");
+//                    System.out.println(task.get());
+//                } catch (InterruptedException | ExecutionException e) {
+//                    System.out.println("发生异常");
+//                    task.cancel(true);
+//                }
+//            });
+        }
+        Thread.sleep(5000 * count);
+    }
 }