|
@@ -1,11 +1,10 @@
|
|
|
package com.persagy.iot.app
|
|
|
|
|
|
import com.persagy.iot.bean.IOTData
|
|
|
-import com.persagy.iot.func.{IOTOriginalSinkFunction, OriginalDataAlarm, SplitData}
|
|
|
-import com.persagy.iot.utils.KafkaUtil
|
|
|
+import com.persagy.iot.func.{IOTAccuracySinkFunction, IOTOriginalSinkFunction, OriginalDataAlarm, SplitData}
|
|
|
+import com.persagy.iot.utils._
|
|
|
import org.apache.flink.api.common.serialization.SimpleStringSchema
|
|
|
import org.apache.flink.streaming.api.TimeCharacteristic
|
|
|
-import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
|
|
|
import org.apache.flink.streaming.api.scala._
|
|
|
import org.apache.flink.streaming.api.windowing.time.Time
|
|
|
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
|
|
@@ -19,13 +18,20 @@ object IOTApp {
|
|
|
val no_value: String = "ff ff ff ff"
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+// val params: ParameterTool = ParameterTool.fromArgs(args)
|
|
|
+// val lastTime: Long = params.getLong("lastTime")
|
|
|
+// val loseTime: Long = params.getLong("loseTime")
|
|
|
+// val earlyTime: Long = params.getLong("earlyTime")
|
|
|
+// val timeOffer: Long = params.getLong("timeOffer")
|
|
|
+
|
|
|
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
|
|
|
|
|
|
/* 设置并行度 */
|
|
|
env.setParallelism(1)
|
|
|
|
|
|
/* 设置时间语义 */
|
|
|
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
|
|
|
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
|
|
|
|
|
|
val inputStream: DataStream[String] =
|
|
|
env.addSource( new FlinkKafkaConsumer[String](KafkaUtil.topic, new SimpleStringSchema(), KafkaUtil.kafkaConsumerProperties()) )
|
|
@@ -42,44 +48,47 @@ object IOTApp {
|
|
|
}
|
|
|
})
|
|
|
|
|
|
- /* 转化为实体类流 */
|
|
|
- val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData)
|
|
|
-
|
|
|
- val waterStream: DataStream[IOTData] = iotFilterDataStream
|
|
|
- /* 设置水位时间 */
|
|
|
- .assignAscendingTimestamps(_.eventTime - 1)
|
|
|
+ /* 转化为实体类流,此处转换的实体类的 rowKey 不包含时间,需要在后续的处理中加入 */
|
|
|
+ val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData(1800L, 3600L, 1000L,0))
|
|
|
+// val iotFilterDataStream: DataStream[IOTData] = filterStream.flatMap(new SplitData(lastTime, loseTime, earlyTime, timeOffer))
|
|
|
|
|
|
/** ------------------------ 处理原始数据 start ----------------------- */
|
|
|
-
|
|
|
- val keyedStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
|
|
|
+ val keyedStream: DataStream[IOTData] = iotFilterDataStream.keyBy(_.rowKey)
|
|
|
.process(new OriginalDataAlarm)
|
|
|
+
|
|
|
keyedStream.getSideOutput(new OutputTag[IOTData]("late-data")).print("outputTag:")
|
|
|
-// keyedStream.addSink(new IOTOriginalSinkFunction)
|
|
|
+ keyedStream.addSink(new IOTOriginalSinkFunction)
|
|
|
|
|
|
/** ------------------------- 处理原始数据 end ------------------------- */
|
|
|
|
|
|
- /** ------------------------ 处理分精度数据 start ------------------------ */
|
|
|
|
|
|
+ /** ------------------------ 处理分精度数据 start ------------------------ */
|
|
|
+ val waterStream: DataStream[IOTData] = iotFilterDataStream
|
|
|
+ .map(iotData => {
|
|
|
+ /* 添加时间 */
|
|
|
+ val key: String = iotData.rowKey
|
|
|
+ val time: Time = Time.minutes(15)
|
|
|
+ val l: Long = IOTUtils.accuracyTime(iotData.eventTime, time)
|
|
|
+ iotData.copy(rowKey = key + ":" + l)
|
|
|
+ iotData
|
|
|
+ })
|
|
|
+ .assignAscendingTimestamps(_.eventTime - 1)
|
|
|
|
|
|
/* 定义侧输出流 */
|
|
|
-// val sideOutputTag = new OutputTag[IOTData]("late-data")
|
|
|
-//
|
|
|
-// val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
|
|
|
-// .timeWindow(Time.minutes(15))
|
|
|
-// /* 允许处理数据的最迟时间 */
|
|
|
-// .allowedLateness(Time.minutes(60))
|
|
|
-// /* 侧输出流 */
|
|
|
-// .sideOutputLateData(sideOutputTag)
|
|
|
-// .aggregate(new IOTAgg(), new IOTWindowResult())
|
|
|
- /** ------------------------ 处理分精度数据 end ------------------------ */
|
|
|
+ val sideOutputTag = new OutputTag[IOTData]("late-data")
|
|
|
|
|
|
- /* 设置要选取的事件时间 */
|
|
|
-// val assignStream: DataStream[IOTData] = iotDataStream.assignAscendingTimestamps(_.eventTime)
|
|
|
+ val windowsStream: DataStream[IOTData] = waterStream.keyBy(_.rowKey)
|
|
|
+ .timeWindow(Time.minutes(15))
|
|
|
+ .sideOutputLateData(sideOutputTag)
|
|
|
+ .aggregate(new IOTAgg(), new IOTWindowResult())
|
|
|
|
|
|
- /* 开窗数据保存hbase,侧输出流的数据查询hbase对比后再存入hbase */
|
|
|
-// windowsStream.addSink(new IOTOriginalSinkFunction)
|
|
|
-// windowsStream.getSideOutput(sideOutputTag).addSink(new IOTOriginalSinkFunction)
|
|
|
+ /* 开窗数据保存hbase,侧输出流的数据查询hbase,对比后再存入hbase */
|
|
|
+ windowsStream.addSink(new IOTAccuracySinkFunction)
|
|
|
+ windowsStream.getSideOutput(sideOutputTag).addSink(new IOTAccuracySinkFunction)
|
|
|
+ /** ------------------------ 处理分精度数据 end ------------------------ */
|
|
|
|
|
|
+ windowsStream.print("result: ")
|
|
|
+ windowsStream.getSideOutput(sideOutputTag).print("sideOutputTag:")
|
|
|
env.execute("iot data collect")
|
|
|
}
|
|
|
|