wudla 4 years ago
parent
commit
be360b0b94

+ 6 - 6
data_collect/src/main/scala/com/persagy/iot/app/IOTAgg.scala

@@ -1,13 +1,13 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.OriginalData
+import com.persagy.iot.bean.IOTData
 import org.apache.flink.api.common.functions.AggregateFunction
 
-class IOTAgg() extends AggregateFunction[OriginalData, OriginalData, OriginalData]{
+class IOTAgg() extends AggregateFunction[IOTData, IOTData, IOTData]{
 
-  override def createAccumulator(): OriginalData = null
+  override def createAccumulator(): IOTData = null
 
-  override def add(in: OriginalData, acc: OriginalData): OriginalData = {
+  override def add(in: IOTData, acc: IOTData): IOTData = {
 
     if (acc == null){
       in
@@ -20,9 +20,9 @@ class IOTAgg() extends AggregateFunction[OriginalData, OriginalData, OriginalDat
     }
   }
 
-  override def getResult(acc: OriginalData): OriginalData = acc
+  override def getResult(acc: IOTData): IOTData = acc
 
-  override def merge(acc: OriginalData, acc1: OriginalData): OriginalData = {
+  override def merge(acc: IOTData, acc1: IOTData): IOTData = {
     if (acc == null){
       acc1
     } else if (acc1 == null) {

+ 19 - 13
data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala

@@ -1,7 +1,7 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.OriginalData
-import com.persagy.iot.func.{IOTSinkFunction, SplitData}
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.func.{IOTOriginalSinkFunction, OriginalDataAlarm, SplitData}
 import com.persagy.iot.utils.KafkaUtil
 import org.apache.flink.api.common.serialization.SimpleStringSchema
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -43,37 +43,43 @@ object IOTApp {
     })
 
     /* 转化为实体类流 */
-    val iotFilterDataStream: DataStream[OriginalData] = filterStream.flatMap(new SplitData)
-//    iotFilterDataStream.print("iotFilterDataStream")
+    val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData)
 
-//    inputStream.print("inputStream")
+    /** -------------- ---------- 处理原始数据 start ----------------------- */
 
-    val waterStream: DataStream[OriginalData] = iotFilterDataStream
+    iotFilterDataStream.keyBy(_.rowKey)
+      .process(new OriginalDataAlarm)
+    iotFilterDataStream.addSink(new IOTOriginalSinkFunction)
+
+    /** ------------------------- 处理原始数据 end ------------------------- */
+
+    /** ------------------------ 处理分精度数据 start ------------------------ */
+    val waterStream: DataStream[IOTData] = iotFilterDataStream
       /* 设置水位时间 */
-      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OriginalData](Time.minutes(30)) {
-        override def extractTimestamp(element: OriginalData) = {
+      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IOTData](Time.minutes(30)) {
+        override def extractTimestamp(element: IOTData) = {
           element.eventTime - 1
         }
       })
 
     /* 定义侧输出流 */
-    val sideOutputTag = new OutputTag[OriginalData]("late data")
+    val sideOutputTag = new OutputTag[IOTData]("late-data")
 
-    val windowsStream: DataStream[OriginalData] = waterStream.keyBy(_.rowKey)
+    val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
       .timeWindow(Time.minutes(15))
       /* 允许处理数据的最迟时间 */
       .allowedLateness(Time.minutes(60))
       /* 侧输出流 */
       .sideOutputLateData(sideOutputTag)
       .aggregate(new IOTAgg(), new IOTWindowResult())
+    /** ------------------------ 处理分精度数据 end ------------------------ */
 
     /* 设置要选取的事件时间 */
 //    val assignStream: DataStream[IOTData] = iotDataStream.assignAscendingTimestamps(_.eventTime)
 
     /* 开窗数据保存hbase,侧输出流的数据查询hbase对比后再存入hbase */
-    windowsStream.addSink(new IOTSinkFunction)
-
-    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTSinkFunction)
+    windowsStream.addSink(new IOTOriginalSinkFunction)
+    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTOriginalSinkFunction)
 
     env.execute("iot data collect")
   }

+ 4 - 4
data_collect/src/main/scala/com/persagy/iot/app/IOTWindowResult.scala

@@ -1,15 +1,15 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.OriginalData
+import com.persagy.iot.bean.IOTData
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.util.Collector
 
-class IOTWindowResult() extends WindowFunction[OriginalData, OriginalData, String, TimeWindow]{
+class IOTWindowResult() extends WindowFunction[IOTData, IOTData, String, TimeWindow]{
   override def apply(key: String,
                      window: TimeWindow,
-                     input: Iterable[OriginalData],
-                     out: Collector[OriginalData]): Unit = {
+                     input: Iterable[IOTData],
+                     out: Collector[IOTData]): Unit = {
     out.collect(input.head.copy(windowEnd = window.getEnd))
   }
 }

+ 0 - 29
data_collect/src/main/scala/com/persagy/iot/bean/AccuracyData.scala

@@ -1,29 +0,0 @@
-package com.persagy.iot.bean
-
-/**
- * 分精度数据实体类
- */
-case class AccuracyData(
-                    /** 数据状态+楼号+表号+功能号+分精度时间
-                     * 格式: 0:293:92:2343:1611818625644
-                     */
-                    rowKey: String,
-
-                    /** 读取的表值 */
-                    value: Double,
-
-//                    /** 产生的时间字段 */
-//                    eventTimeStr: String,
-
-                    /** 产生的时间戳 */
-                    eventTime: Long,
-
-                    /** 进入系统的时间 */
-                    sysTime: Long,
-
-                    /** 分精度时间 */
-                    windowEnd: Long,
-
-                    /** 数据状态: 0:正常,1:迟到,2:丢数 */
-                    status: Int
-                  )

+ 1 - 1
data_collect/src/main/scala/com/persagy/iot/bean/OriginalData.scala

@@ -1,6 +1,6 @@
 package com.persagy.iot.bean
 
-case class OriginalData(
+case class IOTData(
                     /** 数据状态+楼号+表号+功能号+原始数据产生时间
                      * 格式: 0:293:92:2343:1611818625644
                      */

+ 48 - 0
data_collect/src/main/scala/com/persagy/iot/func/IOTAccuracySinkFunction.scala

@@ -0,0 +1,48 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.HbaseUtil
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.util.Try
+
+/**
+ * 分精度0数据写入 HBase
+ */
+class IOTAccuracySinkFunction extends RichSinkFunction[IOTData] {
+
+  val accuracy_tableName: String = "accuracy_data"
+  val family: String = "data"
+  val sysTime: String = "sys_time"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var table: Table = _
+
+  override def open(parameters: Configuration): Unit = {
+    println("init Hbase 数据。。。")
+    conn = HbaseUtil.getHbaseConnection()
+  }
+
+  /**
+   *
+   */
+  override def invoke(iotData: IOTData, context: SinkFunction.Context[_]): Unit = {
+    val put = new Put(Bytes.toBytes(iotData.rowKey))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.sysTime))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.value))
+    Try(table.put(put))
+    println("insert into " + )
+  }
+
+  override def close(): Unit = {
+    println("close Hbase 数据。。。")
+  }
+
+  def now(): Long ={
+    System.currentTimeMillis()
+  }
+}

+ 63 - 0
data_collect/src/main/scala/com/persagy/iot/func/IOTOriginalSinkFunction.scala

@@ -0,0 +1,63 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.HbaseUtil
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.util.Try
+
+/**
+ * 原始数据写入 HBase
+ */
+class IOTOriginalSinkFunction extends RichSinkFunction[IOTData] {
+
+  val original_tableName: String = "original_data"
+  val family: String = "data"
+  val sysTime: String = "sys_time"
+  val status: String = "status"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var originalTable: Table = _
+
+  override def open(parameters: Configuration): Unit = {
+    println("create Hbase connection ......")
+    conn = HbaseUtil.getHbaseConnection()
+
+    println("create Hbase table ......")
+    originalTable = conn.getTable(TableName.valueOf(original_tableName))
+  }
+
+  override def invoke(iotData: IOTData, context: SinkFunction.Context[_]): Unit = {
+    // rowKey 按照 楼号:表号:功能号:时间
+    val rowKey: String = iotData.rowKey
+    val put = new Put(Bytes.toBytes(rowKey))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.value))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(status), Bytes.toBytes(iotData.status))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.sysTime))
+    originalTable.put(put)
+    println("insert into " + original_tableName + " rowKey = " + rowKey)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (originalTable != null) {
+        originalTable.close()
+      }
+      if (conn != null) {
+        conn.close()
+      }
+      println("init Hbase create connection ")
+    } catch {
+      case e:Exception => System.err.println(e.getMessage)
+    }
+  }
+
+  def now(): Long ={
+    System.currentTimeMillis()
+  }
+}

+ 0 - 38
data_collect/src/main/scala/com/persagy/iot/func/IOTSinkFunction.scala

@@ -1,38 +0,0 @@
-package com.persagy.iot.func
-
-import com.persagy.iot.bean.OriginalData
-import com.persagy.iot.utils.HbaseUtil
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
-import org.apache.hadoop.hbase.TableName
-import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.util.Bytes
-
-
-class IOTSinkFunction extends RichSinkFunction[OriginalData] {
-
-  var conn: Connection = _
-  var table: Table = _
-
-  override def open(parameters: Configuration): Unit = {
-    println("init Hbase 数据。。。")
-    conn = HbaseUtil.getHbaseConnection()
-  }
-
-  override def invoke(value: OriginalData, context: SinkFunction.Context[_]): Unit = {
-    val admin: Admin = conn.getAdmin
-
-    val put = new Put(Bytes.toBytes(now()))
-
-
-    println("insert Hbase 数据。。。")
-  }
-
-  override def close(): Unit = {
-    println("close Hbase 数据。。。")
-  }
-
-  def now(): Long ={
-    System.currentTimeMillis()
-  }
-}

+ 25 - 0
data_collect/src/main/scala/com/persagy/iot/func/OriginalDataAlarm.scala

@@ -0,0 +1,25 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.util.Collector
+
+class OriginalDataAlarm extends KeyedProcessFunction[String, IOTData, IOTData] {
+
+  lazy val alarmTimeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("alarm-time", classOf[Long]))
+
+  override def processElement(iotData: IOTData,
+                              ctx: KeyedProcessFunction[String, IOTData, IOTData]#Context,
+                              out: Collector[IOTData]): Unit = {
+    val alarmTime: Long = iotData.eventTime + 1000 * 60 * 60
+    ctx.timerService().registerEventTimeTimer(alarmTime)
+    alarmTimeState.update(iotData.eventTime)
+  }
+
+  override def onTimer(timestamp: Long,
+                       ctx: KeyedProcessFunction[String, IOTData, IOTData]#OnTimerContext,
+                       out: Collector[IOTData]): Unit = {
+
+  }
+}

+ 8 - 6
data_collect/src/main/scala/com/persagy/iot/func/SplitData.scala

@@ -1,18 +1,20 @@
 package com.persagy.iot.func
 
 import com.persagy.iot.app.IOTApp.no_value
-import com.persagy.iot.bean.OriginalData
+import com.persagy.iot.bean.IOTData
 import com.persagy.iot.utils.IOTUtils.timestampConverter
 import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.time.Time
 import org.apache.flink.util.Collector
 
 /**
  * 由于每一条数据包含多个功能号,表号
  * 自定义 flatMap 函数
  */
-class SplitData extends FlatMapFunction[String, OriginalData] {
+class SplitData extends FlatMapFunction[String, IOTData] {
 
-  override def flatMap(input: String, collector: Collector[OriginalData]): Unit = {
+
+  override def flatMap(input: String, collector: Collector[IOTData]): Unit = {
 
     try {
       /** 处理数据,提取 数据上报包 */
@@ -25,7 +27,7 @@ class SplitData extends FlatMapFunction[String, OriginalData] {
         val build: String = arr2(0).toString
         val sign: String = arr2(5).toString
         val eventTimeStr: String = arr2(3)
-        val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr)
+        val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr, Time.hours(8).toMilliseconds)
         val sysTime: Long = System.currentTimeMillis()
 
         for (i <- 7 until (arr2.length, 2) if (!no_value.equals(arr2(i + 1)))) {
@@ -33,10 +35,10 @@ class SplitData extends FlatMapFunction[String, OriginalData] {
           val value: Double = arr2(i + 1).toDouble
           val status: Int = getStatus(eventTime, sysTime)
           /** rowKey */
-          val rowKey = status + ":" + build + ":" + sign + ":" + funId
+          val rowKey = build + ":" + sign + ":" + funId + ":" + eventTime
 
           /** 转为 iotData 实体类 */
-          val iotData: OriginalData = OriginalData(rowKey, value, eventTimeStr, eventTime, sysTime, 0L, status)
+          val iotData: IOTData = IOTData(rowKey, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
 
           collector.collect(iotData)
         }

+ 1 - 0
data_collect/src/main/scala/com/persagy/iot/utils/DwConstant.scala

@@ -2,4 +2,5 @@ package com.persagy.iot.utils
 
 class DwConstant {
 
+
 }

+ 16 - 8
data_collect/src/main/scala/com/persagy/iot/utils/HbaseUtil.scala

@@ -18,7 +18,8 @@ object HbaseUtil {
   //zookeeper 相关信息
   val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
 
-  val HBASE_ZOOKEEPER_IP = "192.168.100.75,192.168.100.84,192.168.100.147"
+  val HBASE_ZOOKEEPER_IP_TEST = "192.168.100.75,192.168.100.84,192.168.100.147"
+  val HBASE_ZOOKEEPER_IP_DEV = "192.168.4.7,192.168.4.8,192.168.4.9"
 
   val HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort"
   val HBASE_ZOOKEEPER_PORT = "2181"
@@ -27,7 +28,7 @@ object HbaseUtil {
 
   def getHbaseConnection(): Connection ={
     val conf: Configuration = HBaseConfiguration.create()
-    conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP)
+    conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP_DEV)
     conf.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, HBASE_ZOOKEEPER_PORT)
     ConnectionFactory.createConnection(conf)
   }
@@ -59,8 +60,8 @@ object HbaseUtil {
       }
     }
     //获取表连接
-    val tableName = TableName.valueOf(name)
-    val table = conn.getTable(tableName)
+//    val tableName = TableName.valueOf(name)
+//    val table = conn.getTable(tableName)
 
     val put = new Put(rowKey.getBytes());
     put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(valueMap.head._2.head._2.toString))
@@ -71,16 +72,16 @@ object HbaseUtil {
   /**
    * 创建表
    * @param conn Hbase 连接
-   * @param name 要创建的表名
+   * @param tableName 要创建的表名
    * @param familyNames 建议不超过3个,否则会影响性能
    */
-  def createTable(conn: Connection, name: String, familyNames: String*): Unit ={
+  def createTable(conn: Connection, tableName: String, familyNames: String*): Unit ={
 
     val admin: Admin = conn.getAdmin
-    val tableName: TableName = TableName.valueOf(name)
+    val tn: TableName = TableName.valueOf(tableName)
 
     //创建表描述
-    val hTableDescriptor = new HTableDescriptor(tableName)
+    val hTableDescriptor = new HTableDescriptor(tn)
 
     //遍历 familyName,创建列族
     for (familyName <- familyNames) {
@@ -90,4 +91,11 @@ object HbaseUtil {
 
     admin.createTable(hTableDescriptor)
   }
+
+  def main(args: Array[String]): Unit = {
+    //创建原始数据表
+    createTable(getHbaseConnection(), "original_data", "data")
+    //创建分精度数据表
+    createTable(getHbaseConnection(), "accuracy_data", "data")
+  }
 }

+ 9 - 2
data_collect/src/main/scala/com/persagy/iot/utils/IOTUtils.scala

@@ -8,11 +8,18 @@ object IOTUtils {
    * 将时间字段转换成时间戳
    * @param dateStr 时间字段
    * @param formatPattern 要转换时间字段的格式
+   * @param offer 时差(单位:毫秒)
    * @return
    */
-  def timestampConverter(formatPattern: String, dateStr: String): Long ={
+  def timestampConverter(formatPattern: String, dateStr: String, offer: Long): Long ={
     val simpleDateFormat = new SimpleDateFormat(formatPattern)
     val timestamp = simpleDateFormat.parse(dateStr).getTime
-    timestamp
+    timestamp + offer
   }
+
+  /**
+   * 获取当前时间
+   * @return
+   */
+  def now(): Long = {System.currentTimeMillis ()}
 }

+ 2 - 2
data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala

@@ -1,7 +1,7 @@
 package com.persagy.iot.utils
 
 import com.persagy.iot.app.IOTApp.no_value
-import com.persagy.iot.bean.OriginalData
+import com.persagy.iot.bean.IOTData
 import com.persagy.iot.utils.IOTUtils.timestampConverter
 
 import java.text.SimpleDateFormat
@@ -27,7 +27,7 @@ object TestFunction {
       val id = status + build + sign + funId
 
       /** 转为 iotData 实体类 */
-      val iotData: OriginalData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
+      val iotData: IOTData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
       println(iotData)
     }
   }