diff --git a/maven/flink-quickstart.iml b/maven/flink-quickstart.iml index e9231310756e18deb654f127aaa9eca9668dbfb7..e0e5543b6613aa451b916f44a9aad7311dbe1b75 100644 --- a/maven/flink-quickstart.iml +++ b/maven/flink-quickstart.iml @@ -52,6 +52,17 @@ + + + + + + + + + + + diff --git a/maven/pom.xml b/maven/pom.xml index 7ee292bdf0e34006ffd2514f076bf5149360fcaf..93a52ad9297dba2d7c3952cd2decd0d539975ef5 100644 --- a/maven/pom.xml +++ b/maven/pom.xml @@ -66,6 +66,18 @@ under the License. ${flink.version} provided + + org.apache.flink + flink-table-planner_2.11 + 1.9.1 + + + + org.apache.flink + flink-table-api-java-bridge_2.11 + 1.9.1 + + com.google.guava guava diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java index 83a965a0606ca9b6d420ccce9a6c1a0eab28749e..fa21e3ed202776646a49edc1e85393e32c1e6f2b 100644 --- a/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java @@ -7,7 +7,7 @@ public class AppConstants { /** * 配置文件 */ - public final static String PROPERTIES_FILE_NAME = "application.properties"; + public final static String PROPERTIES_FILE_NAME = "/application.properties"; public static final String STREAM_PARALLELISM = "stream.parallelism"; diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/schemas/SimplekafkaStringSchema.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/schemas/SimplekafkaStringSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..d5e54f62e4e283a67d2bee83d4c1f9f58a5fed07 --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/schemas/SimplekafkaStringSchema.java @@ -0,0 +1,45 @@ +package com.duanledexianxianxian.maven.flink.common.schemas; + +import com.alibaba.fastjson.JSONObject; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; + +import javax.annotation.Nullable; + +/** + * @author Administrator + */ +public class SimplekafkaStringSchema implements KafkaSerializationSchema, KafkaDeserializationSchema { + private String topic; + + @Override + public ProducerRecord serialize(Object element, @Nullable Long timestamp) { + return new ProducerRecord(this.topic, JSONObject.toJSONString(element)); + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public String deserialize(ConsumerRecord record) throws Exception { + return null; + } + + @Override + public TypeInformation getProducedType() { + return null; + } +} diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java index d8c5dd6531023b02e0974fb53676b3c2206860c1..dec499e1f13bc7bb39c20efb38df0ce2df9c9745 100644 --- a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java @@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.List; import java.util.Properties; @@ -54,18 +54,31 @@ import java.util.Properties; * @author Administrator */ public class Main { + /** + * kafka=>flink=>kafka + * + * @param args + * @throws Exception + */ public static void main(String[] args) throws Exception { - final ParameterTool parameterTool = ExecutionEnvUtils.createParameterTool(args); + /** + * 参数工具类 + */ + ParameterTool parameterTool = ExecutionEnvUtils.createParameterTool(args); // 创建执行环境上下文 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + //设置窗口的时间单位为process time env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); - Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", "113.105.144.9:9104"); - FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic_first", new SimpleStringSchema(), properties); + + Properties sourceProp = new Properties(); + sourceProp.setProperty("bootstrap.servers", parameterTool.get("kafka.source.brokers")); + + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(parameterTool.get("kafka.source.topic"), new SimpleStringSchema(), sourceProp); //从最早开始消费 DataStream stream = env @@ -75,7 +88,14 @@ public class Main { SingleOutputStreamOperator student = stream.map((MapFunction) value -> JSONObject.parseObject(value, Student.class)); - student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).addSink(new KafkaProducer<>).print(); + Properties sinkProp = new Properties(); + sinkProp.setProperty("bootstrap.servers", parameterTool.get("kafka.sink.brokers")); + + // Deprecated + FlinkKafkaProducer producer = new FlinkKafkaProducer(parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new SimpleStringSchema()); + + + student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).map(x -> JSONObject.toJSONString(x)).addSink(producer); // execute program env.execute("Flink Streaming Java API Skeleton"); diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main.java new file mode 100644 index 0000000000000000000000000000000000000000..4797531779b0cc01a970161acca3e6671ab983de --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main.java @@ -0,0 +1,108 @@ +package com.duanledexianxianxian.maven.flink.sql.sql; + +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import java.time.Instant; +import java.util.Random; + +/** + * @author Administrator + */ +public class Main { + public static final int OUT_OF_ORDERNESS = 1000; + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + SingleOutputStreamOperator source = env.addSource(new OutOfOrderEventSource()) + .assignTimestampsAndWatermarks(new TimestampsAndWatermarks()); + + // create a TableEnvironment for specific planner batch or streaming + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + // convert DataStream to Table + Table table = tableEnv.fromDataStream(source, "eventTime.rowtime"); + + // register DataStream as Table + tableEnv.registerTable("Zhisheng", table); + Table sorted = tableEnv.sqlQuery("select * from Zhisheng "); + DataStream rowDataStream = tableEnv.toAppendStream(sorted, Row.class); + + rowDataStream.print(); + + //把执行计划打印出来 +// System.out.println(env.getExecutionPlan()); + + env.execute("sort-streaming-data"); + } + + + /** + * 事件 + */ + public static class Event { + + Long eventTime; + + Event() { + //构造生成带有事件时间的数据(乱序) + this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS)); + } + + @Override + public String toString() { + return "Event{" + + "eventTime=" + eventTime + + '}'; + } + } + + + /** + * 数据源,这里不断的造数据 + */ + private static class OutOfOrderEventSource extends RichSourceFunction { + + private volatile boolean running = true; + + @Override + public void run(SourceContext ctx) throws Exception { + while (running) { + ctx.collect(new Event()); + Thread.sleep(1); + } + } + + @Override + public void cancel() { + running = false; + } + } + + + /** + * 时间水印 + */ + private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor { + + public TimestampsAndWatermarks() { + super(Time.milliseconds(OUT_OF_ORDERNESS)); + } + + @Override + public long extractTimestamp(Event event) { + return event.eventTime; + } + } +} diff --git a/maven/src/main/resources/application.properties b/maven/src/main/resources/application.properties index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..cf7eb12031ccca328b2acdb1fe6942b0509351c4 100644 --- a/maven/src/main/resources/application.properties +++ b/maven/src/main/resources/application.properties @@ -0,0 +1,8 @@ +#kafka sink brokers +kafka.source.brokers=113.105.144.9:9104 +#kafka sink topic +kafka.source.topic=topic_first +#kafka sink brokers +kafka.sink.brokers=113.105.144.9:9104 +#kafka sink topic +kafka.sink.topic=topic_second