From 63e929a748fe4c3360683137aa87828e903bc268 Mon Sep 17 00:00:00 2001 From: duanledexianxianxian Date: Thu, 31 Oct 2019 17:55:30 +0800 Subject: [PATCH] sync --- maven/flink-quickstart.iml | 5 ++ .../maven/flink/dataset/DataSetApi.java | 83 +++++++++++++++++++ .../maven/flink/dataset/WordCountExample.java | 39 +++++++++ .../sql/{Main1.java => PageAccessSql.java} | 10 ++- 4 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java create mode 100644 maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java rename maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/{Main1.java => PageAccessSql.java} (93%) diff --git a/maven/flink-quickstart.iml b/maven/flink-quickstart.iml index 8f86a21..96ee944 100644 --- a/maven/flink-quickstart.iml +++ b/maven/flink-quickstart.iml @@ -11,6 +11,11 @@ + + + + + diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java new file mode 100644 index 0000000..d954a5d --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/DataSetApi.java @@ -0,0 +1,83 @@ +package com.duanledexianxianxian.maven.flink.dataset; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.MapPartitionFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +import java.util.stream.Collectors; + +/** + * @author Administrator + */ +public class DataSetApi { + public static void main(String[] args) throws Exception { + // get environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet dataSet = env.fromElements(1, 2, 3, 4, 4, 5, 6); + +// DataSet> result = dataSet +// .flatMap((FlatMapFunction>) (x, y) -> y.collect(new Tuple2(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT)); + + +// DataSet> result = dataSet.mapPartition(new MyMapPartitionFunction()).returns(Types.TUPLE(Types.INT, Types.INT)); + +// DataSet result = dataSet.filter(x -> x > 5); + + +// DataSet result = dataSet.distinct(); + +// // 输出25 +// DataSet result = dataSet.reduce((x, y) -> { +// return x + y; +// }); + + + DataSet> result = dataSet + .flatMap((FlatMapFunction>) (x, y) -> y.collect(new Tuple2(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT)); + + // 输出(6,7) + result.sum(1).print(); + + // 输出(1,1) + result.min(0).print(); + + //输出(6,1) + result.max(0).sum(1).print(); + +// (1,1) + result.minBy(0).print(); + +// (6,1) + result.maxBy(0).print(); + + +// (3,1) +// (1,1) +// (5,1) +// (6,1) +// (2,1) +// (4,2) + result.groupBy(0).sum(1).print(); + + result.groupBy(0).sortGroup(0, Order.ASCENDING).first(2).print(); + +// env.execute(); + } + + public static class MyMapPartitionFunction implements MapPartitionFunction> { + + @Override + public void mapPartition(Iterable values, Collector> out) throws Exception { + values.forEach(x -> { + out.collect(new Tuple2<>(x, 1)); + }); + } + } + +} diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java new file mode 100644 index 0000000..94b47fd --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/dataset/WordCountExample.java @@ -0,0 +1,39 @@ +package com.duanledexianxianxian.maven.flink.dataset; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; + +/** + * Example for Dataset + * + * @author Administrator + */ +public class WordCountExample { + public static void main(String[] args) throws Exception { + // get environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSet text = env.fromElements( + "Who's there?", + "I think I hear them. Stand, ho! Who's there?"); + + DataSet> wordCounts = text + .flatMap(new LineSplitter()) + .groupBy(0) + .sum(1); + + wordCounts.print(); + } + + public static class LineSplitter implements FlatMapFunction> { + @Override + public void flatMap(String line, Collector> out) { + for (String word : line.split(" ")) { + out.collect(new Tuple2(word, 1)); + } + } + } +} diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java similarity index 93% rename from maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java rename to maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java index 02d986b..0151169 100644 --- a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/PageAccessSql.java @@ -22,7 +22,7 @@ import java.util.List; * @author fengyuchenglun * @version 1.0.0 */ -public class Main1 { +public class PageAccessSql { /** * The constant OUT_OF_ORDERNESS. */ @@ -60,21 +60,27 @@ public class Main1 { //方便我们查出输出数据 env.setParallelism(1); + + // 初始化数据 List pageAccessList = Lists.newArrayList(); initData(pageAccessList); + // 设置水印 DataStream stream = env.fromCollection(pageAccessList).assignTimestampsAndWatermarks(new TimestampsAndWatermarks()); // Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime"); // register DataStream as Table tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime,rowTime.rowtime"); + // oder by 主排序必须是时间属性字段 Table result = tableEnv.sqlQuery("select * from pageAccess order by rowTime ,region asc"); + // table=>DataStream DataStream rowDataStream = tableEnv.toAppendStream(result, Row.class); rowDataStream.print(); + // execute env.execute("Flink sql"); } @@ -111,6 +117,7 @@ public class Main1 { /** * 时间水印 + * BoundedOutOfOrdernessTimestampExtractor处理乱序 */ private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor { @@ -123,6 +130,7 @@ public class Main1 { @Override public long extractTimestamp(PageAccess pageAccess) { + // 提取eventTime return pageAccess.getAccessTime(); } } -- GitLab