wudla 4 years ago
commit
f7e7df8642

+ 19 - 0
data_collect/pom.xml

@@ -0,0 +1,19 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>IOT</artifactId>
+        <groupId>org.example</groupId>
+        <version>1.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>data_collect</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>

+ 38 - 0
data_collect/src/main/scala/com/persagy/iot/app/IOTAgg.scala

@@ -0,0 +1,38 @@
+package com.persagy.iot.app
+
+import com.persagy.iot.bean.IOTData
+import org.apache.flink.api.common.functions.AggregateFunction
+
+class IOTAgg() extends AggregateFunction[IOTData, IOTData, IOTData]{
+
+  override def createAccumulator(): IOTData = null
+
+  override def add(in: IOTData, acc: IOTData): IOTData = {
+
+    if (acc == null){
+      in
+    } else {
+      if (in.eventTime > acc.eventTime){
+        acc.copy(eventTime = in.eventTime, sysTime = in.sysTime, value = in.value)
+      } else {
+        acc
+      }
+    }
+  }
+
+  override def getResult(acc: IOTData): IOTData = acc
+
+  override def merge(acc: IOTData, acc1: IOTData): IOTData = {
+    if (acc == null){
+      acc1
+    } else if (acc1 == null) {
+      acc
+    } else {
+      if (acc.eventTime > acc1.eventTime){
+        acc1.copy(eventTime = acc.eventTime, sysTime = acc.sysTime, value = acc.value)
+      } else {
+        acc.copy(eventTime = acc1.eventTime, sysTime = acc1.sysTime, value = acc1.value)
+      }
+    }
+  }
+}

+ 80 - 0
data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala

@@ -0,0 +1,80 @@
+package com.persagy.iot.app
+
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.func.{IOTSinkFunction, SplitData}
+import com.persagy.iot.utils.KafkaUtil
+import org.apache.flink.api.common.serialization.SimpleStringSchema
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
+
+object IOTApp {
+
+  /** 只要状态为 report 的数据 */
+  val common_type: String = "report"
+
+  /** 判断功能号的值是否存在 */
+  val no_value: String = "ff ff ff ff"
+
+  def main(args: Array[String]): Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    /* 设置并行度 */
+    env.setParallelism(1)
+
+    /* 设置时间语义 */
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val inputStream: DataStream[String] =
+      env.addSource( new FlinkKafkaConsumer[String](KafkaUtil.topic, new SimpleStringSchema(), KafkaUtil.kafkaConsumerProperties()) )
+
+    /* 过滤掉非 report 的数据 */
+    val filterStream: DataStream[String] = inputStream.filter(data => {
+
+      val arr1: Array[String] = data.split("\t")
+      if (arr1.length != 4){
+        false
+      } else {
+        val arr2: Array[String] = arr1(3).split(";")
+        common_type.equals(arr2(2))
+      }
+    })
+
+    /* 转化为实体类流 */
+    val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData)
+//    iotFilterDataStream.print("iotFilterDataStream")
+
+//    inputStream.print("inputStream")
+
+    val waterStream: DataStream[IOTData] = iotFilterDataStream
+      /* 设置水位时间 */
+      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IOTData](Time.minutes(30)) {
+        override def extractTimestamp(element: IOTData) = {
+          element.eventTime - 1
+        }
+      })
+
+    /* 定义侧输出流 */
+    val sideOutputTag = new OutputTag[IOTData]("late data")
+
+    val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.id)
+      .timeWindow(Time.minutes(15))
+      /* 允许处理数据的最迟时间 */
+      .allowedLateness(Time.minutes(60))
+      /* 侧输出流 */
+      .sideOutputLateData(sideOutputTag)
+      .aggregate(new IOTAgg(), new IOTWindowResult())
+
+    /* 设置要选取的事件时间 */
+//    val assignStream: DataStream[IOTData] = iotDataStream.assignAscendingTimestamps(_.eventTime)
+    windowsStream.addSink(new IOTSinkFunction)
+//    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTSourceFunction)
+
+    env.execute("iot data collect")
+  }
+
+
+}
+

+ 15 - 0
data_collect/src/main/scala/com/persagy/iot/app/IOTWindowResult.scala

@@ -0,0 +1,15 @@
+package com.persagy.iot.app
+
+import com.persagy.iot.bean.IOTData
+import org.apache.flink.streaming.api.scala.function.WindowFunction
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+class IOTWindowResult() extends WindowFunction[IOTData, IOTData, String, TimeWindow]{
+  override def apply(key: String,
+                     window: TimeWindow,
+                     input: Iterable[IOTData],
+                     out: Collector[IOTData]): Unit = {
+    out.collect(input.head.copy(windowEnd = window.getEnd))
+  }
+}

+ 12 - 0
data_collect/src/main/scala/com/persagy/iot/app/SendMessage.scala

@@ -0,0 +1,12 @@
+package com.persagy.iot.app
+
+import com.persagy.iot.utils.KafkaUtil
+
+object SendMessage {
+
+  def main(args: Array[String]): Unit = {
+    /** 发送数据到 kafka */
+    KafkaUtil.sendMessageToKafka(KafkaUtil.topic)
+  }
+
+}

+ 33 - 0
data_collect/src/main/scala/com/persagy/iot/bean/IOTData.scala

@@ -0,0 +1,33 @@
+package com.persagy.iot.bean
+
+case class IOTData(
+                    /** 数据状态+楼号+表号+功能号 */
+                    id: String,
+
+                    /** 楼号 */
+                    build: String,
+
+                    /** 表号 */
+                    sign: String,
+
+                    /** 功能号 */
+                    funId: String,
+
+                    /** 读取的表值 */
+                    value: Double,
+
+                    /** 产生的时间字段 */
+                    eventTimeStr: String,
+
+                    /** 产生的时间戳 */
+                    eventTime: Long,
+
+                    /** 进入系统的时间 */
+                    sysTime: Long,
+
+                    /** 分精度时间 */
+                    windowEnd: Long,
+
+                    /** 数据状态: 0:正常,1:迟到,2:丢数 */
+                    status: Int
+                  )

+ 25 - 0
data_collect/src/main/scala/com/persagy/iot/func/IOTSinkFunction.scala

@@ -0,0 +1,25 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.client.Connection
+
+
+class IOTSinkFunction extends RichSinkFunction[IOTData] {
+
+  var conn: Connection = _
+
+  override def open(parameters: Configuration): Unit = {
+    println("init Hbase 数据。。。")
+//    conn = HbaseUtil.getHbaseConnection()
+  }
+
+  override def invoke(value: IOTData, context: SinkFunction.Context[_]): Unit = {
+    println("insert Hbase 数据。。。")
+  }
+
+  override def close(): Unit = {
+    println("close Hbase 数据。。。")
+  }
+}

+ 69 - 0
data_collect/src/main/scala/com/persagy/iot/func/SplitData.scala

@@ -0,0 +1,69 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.app.IOTApp.no_value
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.IOTUtils.timestampConverter
+import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.util.Collector
+
+/**
+ * 由于每一条数据包含多个功能号,表号
+ * 自定义 flatMap 函数
+ */
+class SplitData extends FlatMapFunction[String, IOTData] {
+
+  override def flatMap(input: String, collector: Collector[IOTData]): Unit = {
+
+    try {
+      /** 处理数据,提取 数据上报包 */
+      val arr1 = input.split("\t")
+      val builds: Array[String] = arr1(3).split("&")
+      for (elem <- builds) {
+
+        val arr2: Array[String] = elem.split(";")
+
+        val build: String = arr2(0).toString
+        val sign: String = arr2(5).toString
+        val eventTimeStr: String = arr2(3)
+        val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr)
+        val sysTime: Long = System.currentTimeMillis()
+
+        for (i <- 7 until (arr2.length, 2) if (!no_value.equals(arr2(i + 1)))) {
+          val funId: String = arr2(i)
+          val value: Double = arr2(i + 1).toDouble
+          val status: Int = getStatus(eventTime, sysTime)
+          /** rowKey */
+          val id = status + build + sign + funId
+
+          /** 转为 iotData 实体类 */
+          val iotData: IOTData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
+
+          collector.collect(iotData)
+        }
+      }
+    } catch {
+      case ex: Exception => return
+    }
+
+  }
+
+  /**
+   * 判断该数据是正常,迟到,丢数
+   * @param eventTime 数据产生时间
+   * @param sysTime 系统时间
+   */
+  def getStatus(eventTime: Long, sysTime: Long): Int ={
+
+    val var1: Long = sysTime - eventTime
+
+    if (var1 <= 1800){
+      0
+    } else if (1800 < var1 && var1 <= 3600) {
+      1
+    } else {
+      3
+    }
+
+  }
+
+}

+ 5 - 0
data_collect/src/main/scala/com/persagy/iot/utils/DwConstant.scala

@@ -0,0 +1,5 @@
+package com.persagy.iot.utils
+
+class DwConstant {
+
+}

+ 27 - 0
data_collect/src/main/scala/com/persagy/iot/utils/HbaseUtil.scala

@@ -0,0 +1,27 @@
+package com.persagy.iot.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
+
+object HbaseUtil {
+
+  val KAFKA_TOPIC = "first"
+
+  val FAMILY_NAME = "data"
+
+  //zookeeper 相关信息
+  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+
+  val HBASE_ZOOKEEPER_IP = "192.168.100.75,192.168.100.84,192.168.100.147"
+
+  val HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort"
+  val HBASE_ZOOKEEPER_PORT = "2181"
+
+  def getHbaseConnection(): Connection ={
+    val conf: Configuration = HBaseConfiguration.create()
+    conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP)
+    conf.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, HBASE_ZOOKEEPER_PORT)
+    ConnectionFactory.createConnection(conf)
+  }
+}

+ 18 - 0
data_collect/src/main/scala/com/persagy/iot/utils/IOTUtils.scala

@@ -0,0 +1,18 @@
+package com.persagy.iot.utils
+
+import java.text.SimpleDateFormat
+
+object IOTUtils {
+
+  /**
+   * 将时间字段转换成时间戳
+   * @param dateStr 时间字段
+   * @param formatPattern 要转换时间字段的格式
+   * @return
+   */
+  def timestampConverter(formatPattern: String, dateStr: String): Long ={
+    val simpleDateFormat = new SimpleDateFormat(formatPattern)
+    val timestamp = simpleDateFormat.parse(dateStr).getTime
+    timestamp
+  }
+}

+ 48 - 0
data_collect/src/main/scala/com/persagy/iot/utils/KafkaUtil.scala

@@ -0,0 +1,48 @@
+package com.persagy.iot.utils
+
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+
+import java.util.Properties
+import scala.io.BufferedSource
+
+object KafkaUtil {
+
+  val dev_host: String = "192.168.4.7:9092,192.168.4.8:9092,192.168.4.9:9092"
+  val test_host: String = "192.168.100.84:9092,192.168.100.75:9092,192.168.100.147:9092"
+  val topic: String = "iot_data"
+  val file_path: String = "/Users/king/Documents/persagy/dw/log.log2021-01-11-15.log";
+
+  def kafkaConsumerProperties(): Properties = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", dev_host)
+    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
+    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
+    properties.setProperty("auto.offset.reset", "latest")
+    properties
+  }
+
+  def kafkaProducerProperties(): Properties = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", dev_host)
+    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    properties
+  }
+
+  def sendMessageToKafka(topic: String): Unit = {
+
+    /** 创建一个 KafkaProducer,发送数据 */
+    val producer = new KafkaProducer[String, String](kafkaProducerProperties())
+
+    /** 从文件读取数据,逐条发送 */
+    val bufferedSource: BufferedSource = scala.io.Source.fromFile(file_path)
+
+    for (line <- bufferedSource.getLines()) {
+      val record = new ProducerRecord[String, String](topic, line)
+      println(line.toString)
+      producer.send(record)
+//      Thread.sleep(1000L)
+    }
+  }
+
+}

+ 51 - 0
data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala

@@ -0,0 +1,51 @@
+package com.persagy.iot.utils
+
+import com.persagy.iot.app.IOTApp.no_value
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.IOTUtils.timestampConverter
+
+import java.text.SimpleDateFormat
+
+
+object TestFunction {
+
+  def main(args: Array[String]): Unit = {
+    var elem: String = "1101080259;4;report;20210111150020;58440;9919;1;11301;635.0;"
+    val arr2: Array[String] = elem.split(";")
+
+    val build: String = arr2(0).toString
+    val sign: String = arr2(5).toString
+    val eventTimeStr: String = arr2(3)
+    val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr)
+    val sysTime: Long = System.currentTimeMillis()
+
+    for (i <- 7 until (arr2.length, 2) if (!no_value.equals(arr2(i + 1)))) {
+      val funId: String = arr2(i)
+      val value: Double = arr2(i + 1).toDouble
+      val status: Int = 1
+      /** rowKey */
+      val id = status + build + sign + funId
+
+      /** 转为 iotData 实体类 */
+      val iotData: IOTData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
+      println(iotData)
+    }
+  }
+
+  def main1(args: Array[String]): Unit = {
+    val sdf: SimpleDateFormat = new SimpleDateFormat("yyyy MM dd HH:mm:ss")
+    val result1=sdf.format(1610394300000L)
+    val result2=sdf.format(1610393400000L)
+    println(result1)
+    println(result2)
+
+    println("to:")
+    for (i <- 2 to 5){
+      println(i)
+    }
+    println("until:")
+    for (i <- 2 until 5){
+      println(i)
+    }
+  }
+}

+ 117 - 0
pom.xml

@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>IOT</artifactId>
+    <packaging>pom</packaging>
+    <version>1.0-SNAPSHOT</version>
+    <modules>
+        <module>data_collect</module>
+    </modules>
+
+    <properties>
+        <flink.version>1.10.0</flink.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <kafka.version>2.2.0</kafka.version>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.49</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.1.24</version>
+        </dependency>
+
+        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hbase -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-hbase_2.11</artifactId>
+            <version>1.10.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>1.3.1</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- 该插件用于将Scala代码编译成class文件 -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.4.6</version>
+                <executions>
+                    <execution>
+                        <!-- 声明绑定到maven的compile阶段 -->
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>
+                            jar-with-dependencies
+                        </descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>