Commit 7bd5609d authored by duanledexianxianxian's avatar duanledexianxianxian 😁

sync

parent e0101e59
...@@ -106,3 +106,4 @@ buildNumber.properties ...@@ -106,3 +106,4 @@ buildNumber.properties
.mvn/timing.properties .mvn/timing.properties
.mvn/wrapper/maven-wrapper.jar .mvn/wrapper/maven-wrapper.jar
/.idea/
...@@ -7,7 +7,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -7,7 +7,10 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
...@@ -35,8 +38,12 @@ public class Main { ...@@ -35,8 +38,12 @@ public class Main {
Table table = tableEnv.fromDataStream(source, "eventTime.rowtime"); Table table = tableEnv.fromDataStream(source, "eventTime.rowtime");
// register DataStream as Table // register DataStream as Table
tableEnv.registerTable("Zhisheng", table); // tableEnv.registerTable("Zhisheng", table);
Table sorted = tableEnv.sqlQuery("select * from Zhisheng "); Table sorted = table.window(Over.partitionBy("eventTime")
.orderBy("eventTime.desc")
.preceding("20.second")
.following("CURRENT_RANGE")
.as("window")).select("eventTime ");
DataStream<Row> rowDataStream = tableEnv.toAppendStream(sorted, Row.class); DataStream<Row> rowDataStream = tableEnv.toAppendStream(sorted, Row.class);
rowDataStream.print(); rowDataStream.print();
...@@ -80,7 +87,7 @@ public class Main { ...@@ -80,7 +87,7 @@ public class Main {
public void run(SourceContext<Event> ctx) throws Exception { public void run(SourceContext<Event> ctx) throws Exception {
while (running) { while (running) {
ctx.collect(new Event()); ctx.collect(new Event());
Thread.sleep(1); Thread.sleep(5000);
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment