Commit 63e929a7 authored by duanledexianxianxian's avatar duanledexianxianxian 😁

sync

parent eee41c41
...@@ -11,6 +11,11 @@ ...@@ -11,6 +11,11 @@
<orderEntry type="inheritedJdk" /> <orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.0-SNAPSHOT" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.0-SNAPSHOT" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner-blink_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.5" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.0.4" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-runtime-blink_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-java:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-core:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-annotations:1.9.0" level="project" />
......
package com.duanledexianxianxian.maven.flink.dataset;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import java.util.stream.Collectors;
/**
* @author Administrator
*/
public class DataSetApi {
public static void main(String[] args) throws Exception {
// get environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> dataSet = env.fromElements(1, 2, 3, 4, 4, 5, 6);
// DataSet<Tuple2<Integer, Integer>> result = dataSet
// .flatMap((FlatMapFunction<Integer, Tuple2<Integer, Integer>>) (x, y) -> y.collect(new Tuple2<Integer, Integer>(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT));
// DataSet<Tuple2<Integer, Integer>> result = dataSet.mapPartition(new MyMapPartitionFunction()).returns(Types.TUPLE(Types.INT, Types.INT));
// DataSet<Integer> result = dataSet.filter(x -> x > 5);
// DataSet<Integer> result = dataSet.distinct();
// // 输出25
// DataSet<Integer> result = dataSet.reduce((x, y) -> {
// return x + y;
// });
DataSet<Tuple2<Integer, Integer>> result = dataSet
.flatMap((FlatMapFunction<Integer, Tuple2<Integer, Integer>>) (x, y) -> y.collect(new Tuple2<Integer, Integer>(x, 1))).returns(Types.TUPLE(Types.INT, Types.INT));
// 输出(6,7)
result.sum(1).print();
// 输出(1,1)
result.min(0).print();
//输出(6,1)
result.max(0).sum(1).print();
// (1,1)
result.minBy(0).print();
// (6,1)
result.maxBy(0).print();
// (3,1)
// (1,1)
// (5,1)
// (6,1)
// (2,1)
// (4,2)
result.groupBy(0).sum(1).print();
result.groupBy(0).sortGroup(0, Order.ASCENDING).first(2).print();
// env.execute();
}
public static class MyMapPartitionFunction implements MapPartitionFunction<Integer, Tuple2<Integer, Integer>> {
@Override
public void mapPartition(Iterable<Integer> values, Collector<Tuple2<Integer, Integer>> out) throws Exception {
values.forEach(x -> {
out.collect(new Tuple2<>(x, 1));
});
}
}
}
package com.duanledexianxianxian.maven.flink.dataset;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Example for Dataset
*
* @author Administrator
*/
public class WordCountExample {
public static void main(String[] args) throws Exception {
// get environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?");
DataSet<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.groupBy(0)
.sum(1);
wordCounts.print();
}
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
...@@ -22,7 +22,7 @@ import java.util.List; ...@@ -22,7 +22,7 @@ import java.util.List;
* @author fengyuchenglun * @author fengyuchenglun
* @version 1.0.0 * @version 1.0.0
*/ */
public class Main1 { public class PageAccessSql {
/** /**
* The constant OUT_OF_ORDERNESS. * The constant OUT_OF_ORDERNESS.
*/ */
...@@ -60,21 +60,27 @@ public class Main1 { ...@@ -60,21 +60,27 @@ public class Main1 {
//方便我们查出输出数据 //方便我们查出输出数据
env.setParallelism(1); env.setParallelism(1);
// 初始化数据
List<PageAccess> pageAccessList = Lists.newArrayList(); List<PageAccess> pageAccessList = Lists.newArrayList();
initData(pageAccessList); initData(pageAccessList);
// 设置水印
DataStream<PageAccess> stream = env.fromCollection(pageAccessList).assignTimestampsAndWatermarks(new TimestampsAndWatermarks()); DataStream<PageAccess> stream = env.fromCollection(pageAccessList).assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
// Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime"); // Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime");
// register DataStream as Table // register DataStream as Table
tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime,rowTime.rowtime"); tableEnv.registerDataStream("pageAccess", stream, "region,userId,accessTime,rowTime.rowtime");
// oder by 主排序必须是时间属性字段
Table result = tableEnv.sqlQuery("select * from pageAccess order by rowTime ,region asc"); Table result = tableEnv.sqlQuery("select * from pageAccess order by rowTime ,region asc");
// table=>DataStream
DataStream<Row> rowDataStream = tableEnv.toAppendStream(result, Row.class); DataStream<Row> rowDataStream = tableEnv.toAppendStream(result, Row.class);
rowDataStream.print(); rowDataStream.print();
// execute
env.execute("Flink sql"); env.execute("Flink sql");
} }
...@@ -111,6 +117,7 @@ public class Main1 { ...@@ -111,6 +117,7 @@ public class Main1 {
/** /**
* 时间水印 * 时间水印
* BoundedOutOfOrdernessTimestampExtractor处理乱序
*/ */
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<PageAccess> { private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<PageAccess> {
...@@ -123,6 +130,7 @@ public class Main1 { ...@@ -123,6 +130,7 @@ public class Main1 {
@Override @Override
public long extractTimestamp(PageAccess pageAccess) { public long extractTimestamp(PageAccess pageAccess) {
// 提取eventTime
return pageAccess.getAccessTime(); return pageAccess.getAccessTime();
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment