Commit eee41c41 authored by duanledexianxianxian's avatar duanledexianxianxian 😁

sync

parent 8b8dbd87
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
</content> </content>
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.0-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.9.0" level="project" />
...@@ -41,7 +42,6 @@ ...@@ -41,7 +42,6 @@
<orderEntry type="library" name="Maven: com.typesafe.akka:akka-stream_2.11:2.5.21" level="project" /> <orderEntry type="library" name="Maven: com.typesafe.akka:akka-stream_2.11:2.5.21" level="project" />
<orderEntry type="library" name="Maven: org.reactivestreams:reactive-streams:1.0.2" level="project" /> <orderEntry type="library" name="Maven: org.reactivestreams:reactive-streams:1.0.2" level="project" />
<orderEntry type="library" name="Maven: com.typesafe:ssl-config-core_2.11:0.3.7" level="project" /> <orderEntry type="library" name="Maven: com.typesafe:ssl-config-core_2.11:0.3.7" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.1.1" level="project" />
<orderEntry type="library" name="Maven: com.typesafe.akka:akka-protobuf_2.11:2.5.21" level="project" /> <orderEntry type="library" name="Maven: com.typesafe.akka:akka-protobuf_2.11:2.5.21" level="project" />
<orderEntry type="library" name="Maven: com.typesafe.akka:akka-slf4j_2.11:2.5.21" level="project" /> <orderEntry type="library" name="Maven: com.typesafe.akka:akka-slf4j_2.11:2.5.21" level="project" />
<orderEntry type="library" name="Maven: org.clapper:grizzled-slf4j_2.11:1.3.2" level="project" /> <orderEntry type="library" name="Maven: org.clapper:grizzled-slf4j_2.11:1.3.2" level="project" />
...@@ -55,10 +55,6 @@ ...@@ -55,10 +55,6 @@
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner_2.11:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-common:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-common:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala-bridge_2.11:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala-bridge_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-reflect:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-scala_2.11:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-streaming-scala_2.11:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-streaming-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-java-bridge_2.11:1.9.1" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-java-bridge_2.11:1.9.1" level="project" />
...@@ -67,7 +63,17 @@ ...@@ -67,7 +63,17 @@
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" /> <orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" /> <orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14" level="project" /> <orderEntry type="library" name="Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.0-SNAPSHOT" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-reflect:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.5" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-runtime-blink_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:janino:3.0.9" level="project" />
<orderEntry type="library" name="Maven: org.codehaus.janino:commons-compiler:3.0.9" level="project" />
<orderEntry type="library" name="Maven: org.apache.calcite.avatica:avatica-core:1.15.0" level="project" />
<orderEntry type="library" name="Maven: org.reflections:reflections:0.9.10" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: org.slf4j:slf4j-log4j12:1.7.7" level="project" /> <orderEntry type="library" scope="RUNTIME" name="Maven: org.slf4j:slf4j-log4j12:1.7.7" level="project" />
<orderEntry type="library" scope="RUNTIME" name="Maven: log4j:log4j:1.2.17" level="project" /> <orderEntry type="library" scope="RUNTIME" name="Maven: log4j:log4j:1.2.17" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka_2.11:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-connector-kafka_2.11:1.9.0" level="project" />
......
...@@ -87,7 +87,7 @@ under the License. ...@@ -87,7 +87,7 @@ under the License.
<dependency> <dependency>
<groupId>org.apache.flink</groupId> <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId> <artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version> <version>1.9.1</version>
</dependency> </dependency>
......
...@@ -5,44 +5,70 @@ import com.google.common.collect.Lists; ...@@ -5,44 +5,70 @@ import com.google.common.collect.Lists;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.StringValue;
import java.util.List; import java.util.List;
/**
* The type Main 1.
*
* @author fengyuchenglun
* @version 1.0.0
*/
public class Main1 { public class Main1 {
/**
* The constant OUT_OF_ORDERNESS.
*/
public static final int OUT_OF_ORDERNESS = 1000;
private static final Object[][] INIT_DATA = {
{"ShangHai", "U0010", 1510365660000L},
{"BeiJing", "U1001", 1510365660000L},
{"BeiJing", "U2032", 1510366200000L},
{"BeiJing", "U1100", 1510366260000L},
{"ShangHai", "U0011", 1510373400000L},
};
/**
* The entry point of application.
*
* @param args the input arguments
* @throws Exception the exception
*/
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// Streaming 环境 // Streaming 环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance() //EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner() // .useBlinkPlanner()
.inStreamingMode() // .inStreamingMode()
.build(); // .build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings); //StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 设置EventTime // 设置EventTime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//方便我们查出输出数据 //方便我们查出输出数据
env.setParallelism(1); env.setParallelism(1);
List<PageAccess> pageAccessList = Lists.newArrayList(); List<PageAccess> pageAccessList = Lists.newArrayList();
initData(pageAccessList); initData(pageAccessList);
DataStream<PageAccess> stream = env.fromCollection(pageAccessList); DataStream<PageAccess> stream = env.fromCollection(pageAccessList).assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
// Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime"); // Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime");
tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime"); // register DataStream as Table
Table result = tableEnv.sqlQuery("select * from pageAccess where region='BeiJing' order by userId desc"); tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime,rowTime.rowtime");
Table result = tableEnv.sqlQuery("select * from pageAccess order by rowTime ,region asc");
DataStream<Row> rowDataStream = tableEnv.toAppendStream(result, Row.class); DataStream<Row> rowDataStream = tableEnv.toAppendStream(result, Row.class);
...@@ -53,6 +79,11 @@ public class Main1 { ...@@ -53,6 +79,11 @@ public class Main1 {
} }
/**
* Init data.
*
* @param pageAccessList the page access list
*/
public static void initData(List<PageAccess> pageAccessList) { public static void initData(List<PageAccess> pageAccessList) {
for (Object[] initDatum : INIT_DATA) { for (Object[] initDatum : INIT_DATA) {
PageAccess pageAccess = new PageAccess(String.valueOf(initDatum[0]), String.valueOf(initDatum[1]), Long.valueOf(String.valueOf(initDatum[2]))); PageAccess pageAccess = new PageAccess(String.valueOf(initDatum[0]), String.valueOf(initDatum[1]), Long.valueOf(String.valueOf(initDatum[2])));
...@@ -60,6 +91,12 @@ public class Main1 { ...@@ -60,6 +91,12 @@ public class Main1 {
} }
} }
/**
* The type Page access.
*
* @author fengyuchenglun
* @version 1.0.0
*/
@Data @Data
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
...@@ -72,11 +109,21 @@ public class Main1 { ...@@ -72,11 +109,21 @@ public class Main1 {
private Long accessTime; private Long accessTime;
} }
private static final Object[][] INIT_DATA = { /**
{"ShangHai", "U0010", 1510365660000L}, * 时间水印
{"BeiJing", "U1001", 1510365660000L}, */
{"BeiJing", "U2032", 1510366200000L}, private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<PageAccess> {
{"BeiJing", "U1100", 1510366260000L},
{"ShangHai", "U0011", 1510373400000L}, /**
}; * Instantiates a new Timestamps and watermarks.
*/
public TimestampsAndWatermarks() {
super(Time.milliseconds(0));
}
@Override
public long extractTimestamp(PageAccess pageAccess) {
return pageAccess.getAccessTime();
}
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment