wudla %!s(int64=4) %!d(string=hai) anos
pai
achega
d9d808e9c2

+ 21 - 22
data_collect/src/main/scala/com/persagy/iot/app/IOTApp.scala

@@ -25,7 +25,7 @@ object IOTApp {
     env.setParallelism(1)
 
     /* 设置时间语义 */
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val inputStream: DataStream[String] =
       env.addSource( new FlinkKafkaConsumer[String](KafkaUtil.topic, new SimpleStringSchema(), KafkaUtil.kafkaConsumerProperties()) )
@@ -45,41 +45,40 @@ object IOTApp {
     /* 转化为实体类流 */
     val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData)
 
-    /** -------------- ---------- 处理原始数据 start ----------------------- */
+    val waterStream: DataStream[IOTData] = iotFilterDataStream
+      /* 设置水位时间 */
+      .assignAscendingTimestamps(_.eventTime - 1)
+
+    /** ------------------------ 处理原始数据 start ----------------------- */
 
-    iotFilterDataStream.keyBy(_.rowKey)
+    val keyedStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
       .process(new OriginalDataAlarm)
-    iotFilterDataStream.addSink(new IOTOriginalSinkFunction)
+    keyedStream.getSideOutput(new OutputTag[IOTData]("late-data")).print("outputTag:")
+//    keyedStream.addSink(new IOTOriginalSinkFunction)
 
     /** ------------------------- 处理原始数据 end ------------------------- */
 
     /** ------------------------ 处理分精度数据 start ------------------------ */
-    val waterStream: DataStream[IOTData] = iotFilterDataStream
-      /* 设置水位时间 */
-      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IOTData](Time.minutes(30)) {
-        override def extractTimestamp(element: IOTData) = {
-          element.eventTime - 1
-        }
-      })
+
 
     /* 定义侧输出流 */
-    val sideOutputTag = new OutputTag[IOTData]("late-data")
-
-    val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
-      .timeWindow(Time.minutes(15))
-      /* 允许处理数据的最迟时间 */
-      .allowedLateness(Time.minutes(60))
-      /* 侧输出流 */
-      .sideOutputLateData(sideOutputTag)
-      .aggregate(new IOTAgg(), new IOTWindowResult())
+//    val sideOutputTag = new OutputTag[IOTData]("late-data")
+//
+//    val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
+//      .timeWindow(Time.minutes(15))
+//      /* 允许处理数据的最迟时间 */
+//      .allowedLateness(Time.minutes(60))
+//      /* 侧输出流 */
+//      .sideOutputLateData(sideOutputTag)
+//      .aggregate(new IOTAgg(), new IOTWindowResult())
     /** ------------------------ 处理分精度数据 end ------------------------ */
 
     /* 设置要选取的事件时间 */
 //    val assignStream: DataStream[IOTData] = iotDataStream.assignAscendingTimestamps(_.eventTime)
 
     /* 开窗数据保存hbase,侧输出流的数据查询hbase对比后再存入hbase */
-    windowsStream.addSink(new IOTOriginalSinkFunction)
-    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTOriginalSinkFunction)
+//    windowsStream.addSink(new IOTOriginalSinkFunction)
+//    windowsStream.getSideOutput(sideOutputTag).addSink(new IOTOriginalSinkFunction)
 
     env.execute("iot data collect")
   }

+ 0 - 1
data_collect/src/main/scala/com/persagy/iot/func/IOTAccuracySinkFunction.scala

@@ -35,7 +35,6 @@ class IOTAccuracySinkFunction extends RichSinkFunction[IOTData] {
     put.addColumn(Bytes.toBytes(family), Bytes.toBytes(value), Bytes.toBytes(iotData.sysTime))
     put.addColumn(Bytes.toBytes(family), Bytes.toBytes(sysTime), Bytes.toBytes(iotData.value))
     Try(table.put(put))
-    println("insert into " + )
   }
 
   override def close(): Unit = {

+ 17 - 6
data_collect/src/main/scala/com/persagy/iot/func/OriginalDataAlarm.scala

@@ -3,23 +3,34 @@ package com.persagy.iot.func
 import com.persagy.iot.bean.IOTData
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.util.Collector
 
 class OriginalDataAlarm extends KeyedProcessFunction[String, IOTData, IOTData] {
 
-  lazy val alarmTimeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("alarm-time", classOf[Long]))
-
+//  lazy val alarmTimeState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("alarm-time", classOf[Long]))
+  lazy val alarmState: ValueState[IOTData] = getRuntimeContext.getState(new ValueStateDescriptor[IOTData]("alarm-IOTData", classOf[IOTData]))
   override def processElement(iotData: IOTData,
                               ctx: KeyedProcessFunction[String, IOTData, IOTData]#Context,
                               out: Collector[IOTData]): Unit = {
-    val alarmTime: Long = iotData.eventTime + 1000 * 60 * 60
-    ctx.timerService().registerEventTimeTimer(alarmTime)
-    alarmTimeState.update(iotData.eventTime)
+    // 迟到时间触发
+    val lateTime: Long = iotData.eventTime + 1000 * 60 * 30
+    // 丢数时间触发
+    val loseTime: Long = iotData.eventTime + 1000 * 60 * 60
+//    ctx.timerService().registerEventTimeTimer(loseTime)
+//    ctx.timerService().registerEventTimeTimer(lateTime)
+//    ctx.timerService().registerEventTimeTimer(System.currentTimeMillis() + 1000L * 3)
+    ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 1000L * 3)
+    alarmState.update(iotData)
+
+//    out.collect(IOTData(rowKey = ???, build = ???, sign = ???, funId = ???, value = ???, eventTimeStr = ???, eventTime = ???, sysTime = ???, windowEnd = ???, status = ???))
   }
 
   override def onTimer(timestamp: Long,
                        ctx: KeyedProcessFunction[String, IOTData, IOTData]#OnTimerContext,
                        out: Collector[IOTData]): Unit = {
-
+    val lateDataTag = new OutputTag[IOTData]("late-data")
+    ctx.output(lateDataTag, alarmState.value())
+    alarmState.clear()
   }
 }

+ 1 - 1
data_collect/src/main/scala/com/persagy/iot/utils/KafkaUtil.scala

@@ -41,7 +41,7 @@ object KafkaUtil {
       val record = new ProducerRecord[String, String](topic, line)
       println(line.toString)
       producer.send(record)
-//      Thread.sleep(1000L)
+//      Thread.sleep(500L)
     }
   }
 

+ 1 - 1
data_collect/src/main/scala/com/persagy/iot/utils/TestFunction.scala

@@ -16,7 +16,7 @@ object TestFunction {
     val build: String = arr2(0).toString
     val sign: String = arr2(5).toString
     val eventTimeStr: String = arr2(3)
-    val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr)
+    val eventTime: Long = timestampConverter("yyyyMMddHHmmss", eventTimeStr, 0L)
     val sysTime: Long = System.currentTimeMillis()
 
     for (i <- 7 until (arr2.length, 2) if (!no_value.equals(arr2(i + 1)))) {