wudla 4 years ago
parent
commit
518eabd870

+ 6 - 6
data_collect/src/main/scala/com/persagy/iot/app/IOTAgg.scala

@@ -1,13 +1,13 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
 import org.apache.flink.api.common.functions.AggregateFunction
 
-class IOTAgg() extends AggregateFunction[IOTData, IOTData, IOTData]{
+class IOTAgg() extends AggregateFunction[OriginalData, OriginalData, OriginalData]{
 
-  override def createAccumulator(): IOTData = null
+  override def createAccumulator(): OriginalData = null
 
-  override def add(in: IOTData, acc: IOTData): IOTData = {
+  override def add(in: OriginalData, acc: OriginalData): OriginalData = {
 
     if (acc == null){
       in
@@ -20,9 +20,9 @@ class IOTAgg() extends AggregateFunction[IOTData, IOTData, IOTData]{
     }
   }
 
-  override def getResult(acc: IOTData): IOTData = acc
+  override def getResult(acc: OriginalData): OriginalData = acc
 
-  override def merge(acc: IOTData, acc1: IOTData): IOTData = {
+  override def merge(acc: OriginalData, acc1: OriginalData): OriginalData = {
     if (acc == null){
       acc1
     } else if (acc1 == null) {

+ 11 - 8
data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala

@@ -1,6 +1,6 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
 import com.persagy.iot.func.{IOTSinkFunction, SplitData}
 import com.persagy.iot.utils.KafkaUtil
 import org.apache.flink.api.common.serialization.SimpleStringSchema
@@ -43,23 +43,23 @@ object IOTApp {
     })
 
     /* 转化为实体类流 */
-    val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData)
+    val iotFilterDataStream: DataStream[OriginalData] = filterStream.flatMap(new SplitData)
 //    iotFilterDataStream.print("iotFilterDataStream")
 
 //    inputStream.print("inputStream")
 
-    val waterStream: DataStream[IOTData] = iotFilterDataStream
+    val waterStream: DataStream[OriginalData] = iotFilterDataStream
       /* 设置水位时间 */
-      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IOTData](Time.minutes(30)) {
-        override def extractTimestamp(element: IOTData) = {
+      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OriginalData](Time.minutes(30)) {
+        override def extractTimestamp(element: OriginalData) = {
           element.eventTime - 1
         }
       })
 
     /* 定义侧输出流 */
-    val sideOutputTag = new OutputTag[IOTData]("late data")
+    val sideOutputTag = new OutputTag[OriginalData]("late data")
 
-    val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.id)
+    val windowsStream: DataStream[OriginalData] = waterStream.keyBy(_.rowKey)
       .timeWindow(Time.minutes(15))
       /* 允许处理数据的最迟时间 */
       .allowedLateness(Time.minutes(60))
@@ -69,8 +69,11 @@ object IOTApp {
 
     /* 设置要选取的事件时间 */
 //    val assignStream: DataStream[IOTData] = iotDataStream.assignAscendingTimestamps(_.eventTime)
+
+    /* 开窗数据保存hbase,侧输出流的数据查询hbase对比后再存入hbase */
     windowsStream.addSink(new IOTSinkFunction)
-//    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTSourceFunction)
+
+    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTSinkFunction)
 
     env.execute("iot data collect")
   }

+ 4 - 4
data_collect/src/main/scala/com/persagy/iot/app/IOTWindowResult.scala

@@ -1,15 +1,15 @@
 package com.persagy.iot.app
 
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
 import org.apache.flink.streaming.api.scala.function.WindowFunction
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.util.Collector
 
-class IOTWindowResult() extends WindowFunction[IOTData, IOTData, String, TimeWindow]{
+class IOTWindowResult() extends WindowFunction[OriginalData, OriginalData, String, TimeWindow]{
   override def apply(key: String,
                      window: TimeWindow,
-                     input: Iterable[IOTData],
-                     out: Collector[IOTData]): Unit = {
+                     input: Iterable[OriginalData],
+                     out: Collector[OriginalData]): Unit = {
     out.collect(input.head.copy(windowEnd = window.getEnd))
   }
 }

+ 29 - 0
data_collect/src/main/scala/com/persagy/iot/bean/AccuracyData.scala

@@ -0,0 +1,29 @@
+package com.persagy.iot.bean
+
+/**
+ * 分精度数据实体类
+ */
+case class AccuracyData(
+                    /** 数据状态+楼号+表号+功能号+分精度时间
+                     * 格式: 0:293:92:2343:1611818625644
+                     */
+                    rowKey: String,
+
+                    /** 读取的表值 */
+                    value: Double,
+
+//                    /** 产生的时间字段 */
+//                    eventTimeStr: String,
+
+                    /** 产生的时间戳 */
+                    eventTime: Long,
+
+                    /** 进入系统的时间 */
+                    sysTime: Long,
+
+                    /** 分精度时间 */
+                    windowEnd: Long,
+
+                    /** 数据状态: 0:正常,1:迟到,2:丢数 */
+                    status: Int
+                  )

+ 5 - 3
data_collect/src/main/scala/com/persagy/iot/bean/IOTData.scala

@@ -1,8 +1,10 @@
 package com.persagy.iot.bean
 
-case class IOTData(
-                    /** 数据状态+楼号+表号+功能号 */
-                    id: String,
+case class OriginalData(
+                    /** 数据状态+楼号+表号+功能号+原始数据产生时间
+                     * 格式: 0:293:92:2343:1611818625644
+                     */
+                    rowKey: String,
 
                     /** 楼号 */
                     build: String,

+ 18 - 5
data_collect/src/main/scala/com/persagy/iot/func/IOTSinkFunction.scala

@@ -1,25 +1,38 @@
 package com.persagy.iot.func
 
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
+import com.persagy.iot.utils.HbaseUtil
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
-import org.apache.hadoop.hbase.client.Connection
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
 
 
-class IOTSinkFunction extends RichSinkFunction[IOTData] {
+class IOTSinkFunction extends RichSinkFunction[OriginalData] {
 
   var conn: Connection = _
+  var table: Table = _
 
   override def open(parameters: Configuration): Unit = {
     println("init Hbase 数据。。。")
-//    conn = HbaseUtil.getHbaseConnection()
+    conn = HbaseUtil.getHbaseConnection()
   }
 
-  override def invoke(value: IOTData, context: SinkFunction.Context[_]): Unit = {
+  override def invoke(value: OriginalData, context: SinkFunction.Context[_]): Unit = {
+    val admin: Admin = conn.getAdmin
+
+    val put = new Put(Bytes.toBytes(now()))
+
+
     println("insert Hbase 数据。。。")
   }
 
   override def close(): Unit = {
     println("close Hbase 数据。。。")
   }
+
+  def now(): Long ={
+    System.currentTimeMillis()
+  }
 }

+ 5 - 5
data_collect/src/main/scala/com/persagy/iot/func/SplitData.scala

@@ -1,7 +1,7 @@
 package com.persagy.iot.func
 
 import com.persagy.iot.app.IOTApp.no_value
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
 import com.persagy.iot.utils.IOTUtils.timestampConverter
 import org.apache.flink.api.common.functions.FlatMapFunction
 import org.apache.flink.util.Collector
@@ -10,9 +10,9 @@ import org.apache.flink.util.Collector
  * 由于每一条数据包含多个功能号,表号
  * 自定义 flatMap 函数
  */
-class SplitData extends FlatMapFunction[String, IOTData] {
+class SplitData extends FlatMapFunction[String, OriginalData] {
 
-  override def flatMap(input: String, collector: Collector[IOTData]): Unit = {
+  override def flatMap(input: String, collector: Collector[OriginalData]): Unit = {
 
     try {
       /** 处理数据,提取 数据上报包 */
@@ -33,10 +33,10 @@ class SplitData extends FlatMapFunction[String, IOTData] {
           val value: Double = arr2(i + 1).toDouble
           val status: Int = getStatus(eventTime, sysTime)
           /** rowKey */
-          val id = status + build + sign + funId
+          val rowKey = status + ":" + build + ":" + sign + ":" + funId
 
           /** 转为 iotData 实体类 */
-          val iotData: IOTData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
+          val iotData: OriginalData = OriginalData(rowKey, value, eventTimeStr, eventTime, sysTime, 0L, status)
 
           collector.collect(iotData)
         }

+ 49 - 2
data_collect/src/main/scala/com/persagy/iot/utils/HbaseUtil.scala

@@ -1,8 +1,11 @@
 package com.persagy.iot.utils
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.HBaseConfiguration
-import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
+import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
+import org.apache.hadoop.hbase.client.{Admin, Connection, ConnectionFactory, Put}
+import org.apache.hadoop.hbase.util.Bytes
+
+import scala.util.Try
 
 object HbaseUtil {
 
@@ -18,10 +21,54 @@ object HbaseUtil {
   val HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "hbase.zookeeper.property.clientPort"
   val HBASE_ZOOKEEPER_PORT = "2181"
 
+  var conn: Connection = _
+
   def getHbaseConnection(): Connection ={
     val conf: Configuration = HBaseConfiguration.create()
     conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP)
     conf.set(HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, HBASE_ZOOKEEPER_PORT)
     ConnectionFactory.createConnection(conf)
   }
+
+  /**
+   *
+   * @param conn Hbase 连接
+   * @param rowKey Hbase 的 rowKey
+   * @param name 表名
+   * @param valueMap 数据格式为:Map[family, Map[qualifier, value]],
+   */
+  def insertData(conn: Connection, rowKey: String, name: String, valueMap: Map[Object, Map[Object, Object]]): Unit ={
+
+    //获取表连接
+    val tableName = TableName.valueOf(name)
+    val table = conn.getTable(tableName)
+    valueMap.
+    val put = new Put(rowKey.getBytes());
+    put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(valueMap.head._2.head._2.toString))
+
+    Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
+  }
+
+  /**
+   * 创建表
+   * @param conn Hbase 连接
+   * @param name 要创建的表名
+   * @param familyNames 建议不超过3个,否则会影响性能
+   */
+  def createTable(conn: Connection, name: String, familyNames: String*): Unit ={
+
+    val admin: Admin = conn.getAdmin
+    val tableName: TableName = TableName.valueOf(name)
+
+    //创建表描述
+    val hTableDescriptor = new HTableDescriptor(tableName)
+
+    //遍历 familyName,创建列族
+    for (familyName <- familyNames) {
+      val hColumnDescriptor = new HColumnDescriptor(familyName)
+      hTableDescriptor.addFamily(hColumnDescriptor)
+    }
+
+    admin.createTable(hTableDescriptor)
+  }
 }

+ 2 - 2
data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala

@@ -1,7 +1,7 @@
 package com.persagy.iot.utils
 
 import com.persagy.iot.app.IOTApp.no_value
-import com.persagy.iot.bean.IOTData
+import com.persagy.iot.bean.OriginalData
 import com.persagy.iot.utils.IOTUtils.timestampConverter
 
 import java.text.SimpleDateFormat
@@ -27,7 +27,7 @@ object TestFunction {
       val id = status + build + sign + funId
 
       /** 转为 iotData 实体类 */
-      val iotData: IOTData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
+      val iotData: OriginalData = IOTData(id, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
       println(iotData)
     }
   }