wudla 4 years ago
parent
commit
240e9a735f
19 changed files with 489 additions and 35 deletions
  1. 12 9
      data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala
  2. 4 4
      data_collect/src/main/scala/com/persagy/iot/app/SendMessage.scala
  3. 1 0
      data_collect/src/main/scala/com/persagy/iot/bean/IOTData.scala
  4. 17 2
      data_collect/src/main/scala/com/persagy/iot/func/IOTAccuracySinkFunction.scala
  5. 53 0
      data_collect/src/main/scala/com/persagy/iot/func/IOTAlarmSink.scala
  6. 34 0
      data_collect/src/main/scala/com/persagy/iot/func/IOTLateSinkFunction.scala
  7. 3 7
      data_collect/src/main/scala/com/persagy/iot/func/IOTOriginalSinkFunction.scala
  8. 2 0
      data_collect/src/main/scala/com/persagy/iot/func/OriginalDataAlarm.scala
  9. 3 3
      data_collect/src/main/scala/com/persagy/iot/func/SplitData.scala
  10. 51 0
      data_collect/src/main/scala/com/persagy/iot/test/IOTAppTest.scala
  11. 3 3
      data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala
  12. 24 0
      data_transfer/src/main/scala/com/persagy/energy/app/DicToFile.scala
  13. 16 7
      data_transfer/src/main/scala/com/persagy/energy/app/File2FileApp.scala
  14. 35 0
      data_transfer/src/main/scala/com/persagy/energy/app/File2Hbase.scala
  15. 13 0
      data_transfer/src/main/scala/com/persagy/energy/app/SendMessage.scala
  16. 48 0
      data_transfer/src/main/scala/com/persagy/energy/sink/HbaseSink.scala
  17. 4 0
      data_transfer/src/main/scala/com/persagy/energy/utils/DTUtils.scala
  18. 103 0
      data_transfer/src/main/scala/com/persagy/energy/utils/HbaseUtil.scala
  19. 63 0
      data_transfer/src/main/scala/com/persagy/energy/utils/KafkaUtil.scala

+ 12 - 9
data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala

@@ -52,17 +52,18 @@ object IOTApp {
     val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData(1800L, 3600L, 1000L,0))
 //    val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData(lastTime, loseTime, earlyTime, timeOffer))
 
-    /** ------------------------ 处理原始数据 start ----------------------- */
+    /* ------------------------ 处理原始数据 start ----------------------- */
     val keyedStream: DataStream[IOTData] = iotFilterDataStream.keyBy(_.rowKey)
       .process(new OriginalDataAlarm)
 
-    keyedStream.getSideOutput(new OutputTag[IOTData]("late-data")).print("outputTag:")
     keyedStream.addSink(new IOTOriginalSinkFunction)
+//    keyedStream.print("原始数据: ")
+//    keyedStream.getSideOutput(new OutputTag[IOTData]("late-data")).print("丢数报警点位: ")
 
-    /** ------------------------- 处理原始数据 end ------------------------- */
+    /* ------------------------- 处理原始数据 end ------------------------- */
 
 
-    /** ------------------------ 处理分精度数据 start ------------------------ */
+    /* ------------------------ 处理分精度数据 start ------------------------ */
     val waterStream: DataStream[IOTData] = iotFilterDataStream
       .map(iotData => {
         /* 添加时间 */
@@ -78,17 +79,19 @@ object IOTApp {
     val sideOutputTag = new OutputTag[IOTData]("late-data")
 
     val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
-      .timeWindow(Time.minutes(15))
+      .timeWindow(Time.seconds(15))
       .sideOutputLateData(sideOutputTag)
       .aggregate(new IOTAgg(), new IOTWindowResult())
 
     /* 开窗数据保存hbase,侧输出流的数据查询hbase,对比后再存入hbase */
     windowsStream.addSink(new IOTAccuracySinkFunction)
-    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTAccuracySinkFunction)
-    /** ------------------------ 处理分精度数据 end ------------------------ */
+//    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTAccuracySinkFunction)
+
+//    windowsStream.print("分精度数据: ")
+//    windowsStream.getSideOutput(sideOutputTag).print("迟到的分精度数据: ")
+    /* ------------------------ 处理分精度数据 end ------------------------ */
+
 
-    windowsStream.print("result: ")
-    windowsStream.getSideOutput(sideOutputTag).print("sideOutputTag:")
     env.execute("iot data collect")
   }
 

+ 4 - 4
data_collect/src/main/scala/com/persagy/iot/app/SendMessage.scala

@@ -5,9 +5,9 @@ import com.persagy.iot.utils.KafkaUtil
 
 object SendMessage {
 
-  def main(args: Array[String]): Unit = {
-    /** 发送数据到 kafka */
-    KafkaUtil.sendMessageToKafka(KafkaUtil.topic)
-  }
+//  def main(args: Array[String]): Unit = {
+//    /* 发送数据到 kafka */
+//    KafkaUtil.sendMessageToKafka(KafkaUtil.topic)
+//  }
 
 }

+ 1 - 0
data_collect/src/main/scala/com/persagy/iot/bean/IOTData.scala

@@ -1,5 +1,6 @@
 package com.persagy.iot.bean
 
+// TODO 点位的延迟差值
 case class IOTData(
                     /** 数据状态+楼号+表号+功能号+原始数据产生时间
                      * 格式: 0:293:92:2343:1611818625644

+ 17 - 2
data_collect/src/main/scala/com/persagy/iot/func/IOTAccuracySinkFunction.scala

@@ -4,6 +4,7 @@ import com.persagy.iot.bean.IOTData
 import com.persagy.iot.utils.HbaseUtil
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.util.Bytes
 
@@ -25,6 +26,7 @@ class IOTAccuracySinkFunction extends RichSinkFunction[IOTData] {
   override def open(parameters: Configuration): Unit = {
     println("init Hbase ......")
     conn = HbaseUtil.getHbaseConnection()
+    table = conn.getTable(TableName.valueOf(accuracy_tableName))
   }
 
   /**
@@ -32,12 +34,25 @@ class IOTAccuracySinkFunction extends RichSinkFunction[IOTData] {
    */
   override def invoke(iotData: IOTData, context: SinkFunction.Context[_]): Unit = {
     val put = new Put(Bytes.toBytes(iotData.rowKey))
-    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.sysTime))
-    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.value))
+    println(iotData.sysTime)
+    println(iotData.value)
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.sysTime.toString))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.value.toString))
     Try(table.put(put))
   }
 
   override def close(): Unit = {
+    try {
+      if (table != null) {
+        table.close()
+      }
+      if (conn != null) {
+        conn.close()
+      }
+      println("init Hbase create connection ")
+    } catch {
+      case e:Exception => System.err.println(e.getMessage)
+    }
     println("close Hbase 数据。。。")
   }
 

+ 53 - 0
data_collect/src/main/scala/com/persagy/iot/func/IOTAlarmSink.scala

@@ -0,0 +1,53 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.HbaseUtil
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client.{Connection, Put, Table}
+import org.apache.hadoop.hbase.util.Bytes
+
+class IOTAlarmSink extends RichSinkFunction[IOTData] {
+
+
+  val original_tableName: String = "alarm_original_data"
+  val family: String = "data"
+  val sysTime: String = "sys_time"
+  val status: String = "status"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var originalTable: Table = _
+  override def open(parameters: Configuration): Unit = {
+    println("init Hbase connection ......")
+    conn = HbaseUtil.getHbaseConnection()
+
+    println("create Hbase table ......")
+    originalTable = conn.getTable(TableName.valueOf(original_tableName))
+  }
+
+  override def invoke(iotData: IOTData, context: SinkFunction.Context[_]): Unit = {
+    // rowKey 按照 楼号:表号:功能号:时间
+    val rowKey: String = iotData.rowKey + ":" + iotData.eventTime
+    val put = new Put(Bytes.toBytes(rowKey))
+
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.sysTime))
+    originalTable.put(put)
+    println("insert into " + original_tableName + " rowKey = " + rowKey)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (originalTable != null) {
+        originalTable.close()
+      }
+      if (conn != null) {
+        conn.close()
+      }
+      println("init Hbase create connection ")
+    } catch {
+      case e:Exception => System.err.println(e.getMessage)
+    }
+  }
+}

+ 34 - 0
data_collect/src/main/scala/com/persagy/iot/func/IOTLateSinkFunction.scala

@@ -0,0 +1,34 @@
+package com.persagy.iot.func
+
+import com.persagy.iot.bean.IOTData
+import com.persagy.iot.utils.HbaseUtil
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client.{Connection, Table}
+
+class IOTLateSinkFunction  extends RichSinkFunction[IOTData]{
+
+  val accuracy_tableName: String = "accuracy_data"
+  val family: String = "data"
+  val sysTime: String = "sys_time"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var table: Table = _
+
+  override def open(parameters: Configuration): Unit = {
+    println("init Hbase ......")
+    conn = HbaseUtil.getHbaseConnection()
+
+    table = conn.getTable(TableName.valueOf(accuracy_tableName))
+  }
+
+  override def invoke(value: IOTData, context: SinkFunction.Context[_]): Unit = {
+
+  }
+
+  override def close(): Unit = {
+
+  }
+}

+ 3 - 7
data_collect/src/main/scala/com/persagy/iot/func/IOTOriginalSinkFunction.scala

@@ -36,9 +36,9 @@ class IOTOriginalSinkFunction extends RichSinkFunction[IOTData] {
     // rowKey 按照 楼号:表号:功能号:时间
     val rowKey: String = iotData.rowKey + ":" + iotData.eventTime
     val put = new Put(Bytes.toBytes(rowKey))
-    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.value))
-    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(status), Bytes.toBytes(iotData.status))
-    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.sysTime))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.value.toString))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(status), Bytes.toBytes(iotData.status.toString))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.sysTime.toString))
     originalTable.put(put)
     println("insert into " + original_tableName + " rowKey = " + rowKey)
   }
@@ -56,8 +56,4 @@ class IOTOriginalSinkFunction extends RichSinkFunction[IOTData] {
       case e:Exception => System.err.println(e.getMessage)
     }
   }
-
-  def now(): Long ={
-    System.currentTimeMillis()
-  }
 }

+ 2 - 0
data_collect/src/main/scala/com/persagy/iot/func/OriginalDataAlarm.scala

@@ -17,8 +17,10 @@ import org.apache.flink.util.Collector
  */
 class OriginalDataAlarm extends KeyedProcessFunction[String, IOTData, IOTData] {
 
+  /** 最新数据状态 */
   lazy val alarmState: ValueState[IOTData] = getRuntimeContext.getState(new ValueStateDescriptor[IOTData]("alarm-IOTData", classOf[IOTData]))
 
+  /** 丢数时间触发器 */
   lazy val loseTime: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("loseTime", classOf[Long]))
 
   override def processElement(iotData: IOTData,

+ 3 - 3
data_collect/src/main/scala/com/persagy/iot/func/SplitData.scala

@@ -21,7 +21,7 @@ class SplitData(lastTime: Long, loseTime: Long, earlyTime: Long, timeOffer: Long
   override def flatMap(input: String, collector: Collector[IOTData]): Unit = {
 
     try {
-      /** 处理数据,提取 数据上报包 */
+      /* 处理数据,提取 数据上报包 */
       val arr1 = input.split("\t")
       val builds: Array[String] = arr1(3).split("&")
       for (elem <- builds) {
@@ -38,10 +38,10 @@ class SplitData(lastTime: Long, loseTime: Long, earlyTime: Long, timeOffer: Long
           val value: Double = arr2(i + 1).toDouble
           val status: Int = getStatus(eventTime, sysTime, lastTime, loseTime, earlyTime)
 
-          /** rowKey */
+          /* rowKey */
           val rowKey = build + ":" + sign + ":" + funId
 
-          /** 转为 iotData 实体类 */
+          /* 转为 iotData 实体类 */
           val iotData: IOTData = IOTData(rowKey, build, sign, funId, value, eventTimeStr, eventTime, sysTime, 0L, status)
 
           collector.collect(iotData)

+ 51 - 0
data_collect/src/main/scala/com/persagy/iot/test/IOTAppTest.scala

@@ -0,0 +1,51 @@
+package com.persagy.iot.test
+
+import com.persagy.iot.utils._
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{Cell, CellUtil, TableName}
+
+import scala.collection.JavaConverters.asScalaBufferConverter
+
+object IOTAppTest {
+
+  /** 只要状态为 report 的数据 */
+  val common_type: String = "report"
+
+  /** 判断功能号的值是否存在 */
+  val no_value: String = "ff ff ff ff"
+
+  val accuracy_tableName: String = "accuracy_data"
+  val family: String = "data"
+  val sysTime: String = "sys_time"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var table: Table = _
+
+  def main(args: Array[String]): Unit = {
+    conn = HbaseUtil.getHbaseConnection()
+
+    table = conn.getTable(TableName.valueOf(accuracy_tableName))
+
+    val rowKey: String = "1002:22:1"
+    val get = new Get(Bytes.toBytes(rowKey))
+
+    val result: Result = table.get(get)
+    val row: String = Bytes.toString(result.getRow)
+    for (cell:Cell <- result.listCells().asScala){
+      val string: String = Bytes.toString(CellUtil.cloneQualifier(cell))
+      val str: String = Bytes.toString(CellUtil.cloneFamily(cell))
+      println(str)
+      string match {
+        case "sys_time" => println(row + " : " + string + ":" + Bytes.toString(CellUtil.cloneValue(cell)))
+        case "value" => println(row + " : " + string + ":" + Bytes.toString(CellUtil.cloneValue(cell)))
+        case _ => println("no")
+      }
+
+    }
+  }
+
+
+}
+

+ 3 - 3
data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala

@@ -13,10 +13,10 @@ object TestFunction {
   def main1(args: Array[String]): Unit = {
     var data: String = "1101080259;4;report;20210111150020;58440;9919;1;11301;635.0;"
 
-    val arr2: Array[String] = data.split(";")
+    val arr2 = data.split(";")
 
-    val build: String = arr2(0).toString
-    val sign: String = arr2(5).toString
+    val build: String = arr2(0)
+    val sign: String = arr2(5)
     val eventTimeStr: String = arr2(3)
     val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr, 0L)
     val sysTime: Long = System.currentTimeMillis()

+ 24 - 0
data_transfer/src/main/scala/com/persagy/energy/app/DicToFile.scala

@@ -0,0 +1,24 @@
+package com.persagy.energy.app
+
+import com.persagy.energy.app.File2FileApp.filePath
+
+import java.io.File
+import scala.io.BufferedSource
+
+
+object DicToFile {
+  val filePath: String = "/Users/king/Downloads/fjd数据/fjd_0_near_1d_2016"
+//  val filePath: String = "/Users/king/Downloads/HbaseCat-2.2.1/fjd_0_near_1d_2016.json"
+  val outPath: String = "/Users/king/Downloads/fjd_0_near_1d_2016.txt"
+
+  def main(args: Array[String]): Unit = {
+    /** 从文件读取数据,逐条发送 */
+//    val bufferedSource: BufferedSource = scala.io.Source.fromFile(filePath)
+    val file = new File(filePath)
+  }
+
+  def subdirs(dir: File): Iterator[File] = {
+    val children = dir.listFiles.filter(_.isDirectory)
+    children.toIterator ++ children.toIterator.flatMap(subdirs _)
+  }
+}

+ 16 - 7
data_transfer/src/main/scala/com/persagy/energy/app/File2FileApp.scala

@@ -11,8 +11,8 @@ import scala.io.BufferedSource
 
 object File2FileApp {
 
-  val filePath: String = "/Users/king/Downloads/energy-1d.txt"
-  val outPath: String = "/Users/king/Downloads/energy-1d-to-hdfs1.txt"
+  val filePath: String = "/Users/king/Downloads/HbaseCat-2.2.1/fjd_0_near_1d_2020.json"
+  val outPath: String = "/Users/king/Downloads/fjd_0_near_1d_2020.txt"
 
   def main(args: Array[String]): Unit = {
 
@@ -26,17 +26,26 @@ object File2FileApp {
       val parser = new JSONParser(1)
       val nObject: JSONObject = parser.parse(line).asInstanceOf[JSONObject]
 
-      val energyModelSign: String = nObject.get("energyModelSign").toString
-      val energyModelNodeSign: String = nObject.get("energyModelNodeSign").toString
+//      val energyModelSign: String = nObject.get("energyModelSign").toString
+//      val energyModelNodeSign: String = nObject.get("energyModelNodeSign").toString
+//      val dataTime: String = DTUtils.timestampConverter("yyyyMMddHHmmss", nObject.get("data_time").toString, 0L).toString
+//      val dataValue: String = nObject.get("data_value").toString
+//      val building: String = nObject.get("building").toString
+//
+//      val result: String = energyModelSign + "\t" + energyModelNodeSign + "\t" + dataTime + "\t" + dataValue + "\t" + building
+
+      val building: String = nObject.get("building").toString
+      val meter: String = nObject.get("meter").toString
+      val funcId: String = nObject.get("funcid").toString
       val dataTime: String = DTUtils.timestampConverter("yyyyMMddHHmmss", nObject.get("data_time").toString, 0L).toString
+      val dataFlag: String = nObject.get("data_flag").toString
       val dataValue: String = nObject.get("data_value").toString
-      val building: String = nObject.get("building").toString
 
-      val result: String = energyModelSign + "\t" + energyModelNodeSign + "\t" + dataTime + "\t" + dataValue + "\t" + building
+      val result: String = building + "\t" + meter + "\t" + funcId + "\t" + dataTime + "\t" + dataFlag + "\t" + dataValue
 
       writer.println(result)
 
-      if (count == 1000){
+      if (count == 5000){
         writer.flush()
       }
     }

+ 35 - 0
data_transfer/src/main/scala/com/persagy/energy/app/File2Hbase.scala

@@ -0,0 +1,35 @@
+package com.persagy.energy.app
+
+import com.persagy.energy.sink.HbaseSink
+import com.persagy.energy.utils.KafkaUtil
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.api.common.serialization.SimpleStringSchema
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
+
+object File2Hbase {
+
+  def main(args: Array[String]): Unit = {
+
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+
+    /* 设置并行度 */
+    env.setParallelism(1)
+
+    /* 设置时间语义 */
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+    val inputStream: DataStream[String] =
+      env.addSource( new FlinkKafkaConsumer[String](KafkaUtil.topic, new SimpleStringSchema(), KafkaUtil.kafkaConsumerProperties()) )
+
+    val value: DataStream[(String, String)] = inputStream.map(data => {
+      val strings: Array[String] = data.split("\t")
+      (strings(0), strings(1))
+    })
+
+//    value.print("energy_data: ")
+    value.addSink(new HbaseSink)
+
+    env.execute("energy : ")
+  }
+}

+ 13 - 0
data_transfer/src/main/scala/com/persagy/energy/app/SendMessage.scala

@@ -0,0 +1,13 @@
+package com.persagy.energy.app
+
+import com.persagy.energy.utils.KafkaUtil
+
+
+object SendMessage {
+
+  def main(args: Array[String]): Unit = {
+    /** 发送数据到 kafka */
+    KafkaUtil.sendMessageToKafka(KafkaUtil.topic)
+  }
+
+}

+ 48 - 0
data_transfer/src/main/scala/com/persagy/energy/sink/HbaseSink.scala

@@ -0,0 +1,48 @@
+package com.persagy.energy.sink
+
+import com.persagy.energy.utils.HbaseUtil
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client.{Connection, Put, Table}
+import org.apache.hadoop.hbase.util.Bytes
+
+class HbaseSink extends RichSinkFunction[(String, String)]{
+
+  val energy_tableName: String = "energy_data"
+  val family: String = "data"
+  val value: String = "value"
+
+  var conn: Connection = _
+  var energyTable: Table = _
+
+  override def open(parameters: Configuration): Unit = {
+    println("create Hbase connection ......")
+    conn = HbaseUtil.getHbaseConnection()
+
+    println("create Hbase table ......")
+    energyTable = conn.getTable(TableName.valueOf(energy_tableName))
+  }
+
+  override def invoke(v: (String, String), context: SinkFunction.Context[_]): Unit = {
+    val rowKey: String = v._1
+    val put = new Put(Bytes.toBytes(rowKey))
+    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(v._2))
+    energyTable.put(put)
+    println("insert into " + energyTable + " rowKey = " + rowKey)
+  }
+
+  override def close(): Unit = {
+    try {
+      if (energyTable != null) {
+        energyTable.close()
+      }
+      if (conn != null) {
+        conn.close()
+      }
+      println("init Hbase create connection ")
+    } catch {
+      case e:Exception => System.err.println(e.getMessage)
+    }
+  }
+}

+ 4 - 0
data_transfer/src/main/scala/com/persagy/energy/utils/DTUtils.scala

@@ -35,4 +35,8 @@ object DTUtils {
    * @return
    */
   def now(): Long = System.currentTimeMillis()
+
+  def main(args: Array[String]): Unit = {
+
+  }
 }

+ 103 - 0
data_transfer/src/main/scala/com/persagy/energy/utils/HbaseUtil.scala

@@ -0,0 +1,103 @@
+package com.persagy.energy.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
+
+import java.util
+import scala.collection.JavaConverters.asScalaBufferConverter
+import scala.util.Try
+
+object HbaseUtil {
+
+  val KAFKA_TOPIC = "first"
+
+  val FAMILY_NAME = "data"
+
+  //zookeeper 相关信息
+  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+
+  val HBASE_ZOOKEEPER_IP_SF = "192.168.66.20,192.168.66.21,192.168.66.22"
+  val HBASE_ZOOKEEPER_IP_TEST = "192.168.100.75,192.168.100.84,192.168.100.147"
+  val HBASE_ZOOKEEPER_IP_DEV = "192.168.4.7,192.168.4.8,192.168.4.9"
+
+  val HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT = "hbase.zookeeper.property.clientPort"
+  val HBASE_ZOOKEEPER_PORT = "2181"
+
+  var conn: Connection = _
+
+  def getHbaseConnection(): Connection ={
+    val conf: Configuration = HBaseConfiguration.create()
+    conf.set(HBASE_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_IP_SF)
+    conf.set(HBASE_ZOOKEEPER_PROPERTY_CLIENT_PORT, HBASE_ZOOKEEPER_PORT)
+    val connection: Connection = ConnectionFactory.createConnection(conf)
+    connection
+  }
+
+  /**
+   *
+   * @param conn Hbase 连接
+   * @param rowKey Hbase 的 rowKey
+   * @param name 表名
+   * @param valueMap 数据格式为:Map[family, Map[qualifier, value]],
+   */
+  def insertData(conn: Connection, rowKey: String, name: String, valueMap: Map[Object, Map[Object, Object]]): Unit ={
+
+    val get = new Get("rowKey".getBytes)
+    val scan = new Scan()
+    val table: Table = conn.getTable(TableName.valueOf("test_create"))
+    var scanner: ResultScanner = null
+    val value: util.Iterator[Result] = scanner.iterator()
+    while(value.hasNext){
+      val result: Result = value.next()
+      val rowKey: String = Bytes.toString(result.getRow)
+      for (cell:Cell <- result.listCells().asScala){
+        val string: String = Bytes.toString(CellUtil.cloneQualifier(cell))
+        string match {
+          case "date" => println(rowKey + " : " + string)
+          case _ => println("no")
+        }
+
+      }
+    }
+    //获取表连接
+//    val tableName = TableName.valueOf(name)
+//    val table = conn.getTable(tableName)
+
+    val put = new Put(rowKey.getBytes());
+    put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes(valueMap.head._2.head._2.toString))
+
+    Try(table.put(put)).getOrElse(table.close())//将数据写入HBase,若出错关闭table
+  }
+
+  /**
+   * 创建表
+   * @param conn Hbase 连接
+   * @param tableName 要创建的表名
+   * @param familyNames 建议不超过3个,否则会影响性能
+   */
+  def createTable(conn: Connection, tableName: String, familyNames: String*): Unit ={
+
+    val admin: Admin = conn.getAdmin
+    val tn: TableName = TableName.valueOf(tableName)
+
+    //创建表描述
+    val hTableDescriptor = new HTableDescriptor(tn)
+
+    //遍历 familyName,创建列族
+    for (familyName <- familyNames) {
+      val hColumnDescriptor = new HColumnDescriptor(familyName)
+      hTableDescriptor.addFamily(hColumnDescriptor)
+    }
+
+    admin.createTable(hTableDescriptor)
+  }
+
+  def main(args: Array[String]): Unit = {
+    //创建原始数据表
+    createTable(getHbaseConnection(), "energy_data", "data")
+    //创建分精度数据表
+//    createTable(getHbaseConnection(), "accuracy_data", "data")
+  }
+}

+ 63 - 0
data_transfer/src/main/scala/com/persagy/energy/utils/KafkaUtil.scala

@@ -0,0 +1,63 @@
+package com.persagy.energy.utils
+
+import net.minidev.json.JSONObject
+import net.minidev.json.parser.JSONParser
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+
+import java.util.Properties
+import scala.io.BufferedSource
+
+object KafkaUtil {
+
+  val dev_host: String = "192.168.4.7:9092,192.168.4.8:9092,192.168.4.9:9092"
+  val test_host: String = "192.168.100.84:9092,192.168.100.75:9092,192.168.100.147:9092"
+  val topic: String = "energy_data"
+//  val file_path: String = "/Users/king/Documents/persagy/dw/log.log2021-01-11-15.log";
+  val filePath: String = "/Users/king/Downloads/energy-1d.txt"
+  val outPath: String = "/Users/king/Downloads/energy-1d-to-hdfs1.txt"
+
+  def kafkaConsumerProperties(): Properties = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", dev_host)
+    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
+    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
+    properties.setProperty("auto.offset.reset", "latest")
+    properties
+  }
+
+  def kafkaProducerProperties(): Properties = {
+    val properties = new Properties()
+    properties.setProperty("bootstrap.servers", dev_host)
+    properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
+    properties
+  }
+
+  def sendMessageToKafka(topic: String): Unit = {
+
+    /** 创建一个 KafkaProducer,发送数据 */
+    val producer = new KafkaProducer[String, String](kafkaProducerProperties())
+
+    /** 从文件读取数据,逐条发送 */
+    val bufferedSource: BufferedSource = scala.io.Source.fromFile(filePath)
+
+    for (line <- bufferedSource.getLines()) {
+
+      val parser = new JSONParser(1)
+      val nObject: JSONObject = parser.parse(line).asInstanceOf[JSONObject]
+
+      val energyModelSign: String = nObject.get("energyModelSign").toString
+      val energyModelNodeSign: String = nObject.get("energyModelNodeSign").toString
+      val dataTime: String = DTUtils.timestampConverter("yyyyMMddHHmmss", nObject.get("data_time").toString, 0L).toString
+      val dataValue: String = nObject.get("data_value").toString
+      val building: String = nObject.get("building").toString
+
+      val result: String = building + ":" + energyModelSign + ":" + energyModelNodeSign + ":" + dataTime + "\t" + dataValue
+      println(result)
+      val record = new ProducerRecord[String, String](topic, result)
+      producer.send(record)
+//      Thread.sleep(1000L)
+    }
+  }
+
+}