diff --git a/maven/flink-quickstart.iml b/maven/flink-quickstart.iml
index 8f86a21fa8e14f1459cecf81e25e38cfd12b643d..96ee94466ad43bdd5a12c8a8d2cbf161a71546aa 100644
--- a/maven/flink-quickstart.iml
+++ b/maven/flink-quickstart.iml
@@ -11,6 +11,11 @@
+
+
+
+
+
diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java
new file mode 100644
index 0000000000000000000000000000000000000000..d954a5d1d3b31ae34f60cc72780ffdc05b80ec53
--- /dev/null
+++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java
@@ -0,0 +1,83 @@
+package com.duanledexianxianxian.maven.flink.dataset;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapPartitionFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+import java.util.stream.Collectors;
+
+/**
+ * @author Administrator
+ */
+public class DataSetApi {
+ public static void main(String[] args) throws Exception {
+ // get environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet dataSet = env.fromElements(1, 2, 3, 4, 4, 5, 6);
+
+// DataSet> result = dataSet
+// .flatMap((FlatMapFunction>) (x, y) -> y.collect(new Tuple2(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT));
+
+
+// DataSet> result = dataSet.mapPartition(new MyMapPartitionFunction()).returns(Types.TUPLE(Types.INT, Types.INT));
+
+// DataSet result = dataSet.filter(x -> x > 5);
+
+
+// DataSet result = dataSet.distinct();
+
+// // 输出25
+// DataSet result = dataSet.reduce((x, y) -> {
+// return x + y;
+// });
+
+
+ DataSet> result = dataSet
+ .flatMap((FlatMapFunction>) (x, y) -> y.collect(new Tuple2(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT));
+
+ // 输出(6,7)
+ result.sum(1).print();
+
+ // 输出(1,1)
+ result.min(0).print();
+
+ //输出(6,1)
+ result.max(0).sum(1).print();
+
+// (1,1)
+ result.minBy(0).print();
+
+// (6,1)
+ result.maxBy(0).print();
+
+
+// (3,1)
+// (1,1)
+// (5,1)
+// (6,1)
+// (2,1)
+// (4,2)
+ result.groupBy(0).sum(1).print();
+
+ result.groupBy(0).sortGroup(0, Order.ASCENDING).first(2).print();
+
+// env.execute();
+ }
+
+ public static class MyMapPartitionFunction implements MapPartitionFunction> {
+
+ @Override
+ public void mapPartition(Iterable values, Collector> out) throws Exception {
+ values.forEach(x -> {
+ out.collect(new Tuple2<>(x, 1));
+ });
+ }
+ }
+
+}
diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java
new file mode 100644
index 0000000000000000000000000000000000000000..94b47fdf7487f5a5e1aad1e4af9684a15f53abcc
--- /dev/null
+++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java
@@ -0,0 +1,39 @@
+package com.duanledexianxianxian.maven.flink.dataset;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Example for Dataset
+ *
+ * @author Administrator
+ */
+public class WordCountExample {
+ public static void main(String[] args) throws Exception {
+ // get environment
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet text = env.fromElements(
+ "Who's there?",
+ "I think I hear them. Stand, ho! Who's there?");
+
+ DataSet> wordCounts = text
+ .flatMap(new LineSplitter())
+ .groupBy(0)
+ .sum(1);
+
+ wordCounts.print();
+ }
+
+ public static class LineSplitter implements FlatMapFunction> {
+ @Override
+ public void flatMap(String line, Collector> out) {
+ for (String word : line.split(" ")) {
+ out.collect(new Tuple2(word, 1));
+ }
+ }
+ }
+}
diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java
similarity index 93%
rename from maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java
rename to maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java
index 02d986b237cbd22c9e949e60bec1dd216ebd63ef..0151169c30a3492e58c2a4619f1cb98150159a37 100644
--- a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java
+++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java
@@ -22,7 +22,7 @@ import java.util.List;
* @author fengyuchenglun
* @version 1.0.0
*/
-public class Main1 {
+public class PageAccessSql {
/**
* The constant OUT_OF_ORDERNESS.
*/
@@ -60,21 +60,27 @@ public class Main1 {
//方便我们查出输出数据
env.setParallelism(1);
+
+ // 初始化数据
List pageAccessList = Lists.newArrayList();
initData(pageAccessList);
+ // 设置水印
DataStream stream = env.fromCollection(pageAccessList).assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
// Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime");
// register DataStream as Table
tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime,rowTime.rowtime");
+ // oder by 主排序必须是时间属性字段
Table result = tableEnv.sqlQuery("select * from pageAccess order by rowTime ,region asc");
+ // table=>DataStream
DataStream rowDataStream = tableEnv.toAppendStream(result, Row.class);
rowDataStream.print();
+ // execute
env.execute("Flink sql");
}
@@ -111,6 +117,7 @@ public class Main1 {
/**
* 时间水印
+ * BoundedOutOfOrdernessTimestampExtractor处理乱序
*/
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor {
@@ -123,6 +130,7 @@ public class Main1 {
@Override
public long extractTimestamp(PageAccess pageAccess) {
+ // 提取eventTime
return pageAccess.getAccessTime();
}
}