diff --git a/maven/flink-quickstart.iml b/maven/flink-quickstart.iml index 78b2cc53b203f0b97534bb1184cdc7b474339fb4..e9231310756e18deb654f127aaa9eca9668dbfb7 100644 --- a/maven/flink-quickstart.iml +++ b/maven/flink-quickstart.iml @@ -1,2 +1,69 @@ - \ No newline at end of file + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/maven/pom.xml b/maven/pom.xml index 2edbe7465681be39381e2a9e38cace02f7ae4d74..7ee292bdf0e34006ffd2514f076bf5149360fcaf 100644 --- a/maven/pom.xml +++ b/maven/pom.xml @@ -66,6 +66,12 @@ under the License. ${flink.version} provided + + com.google.guava + guava + 23.0 + + diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java index f814a7d8cb8401cdfe0e3bd27a85848e4d823e3c..1fcbc6cdc00d18f6a0844972f522bdb9b3ecebed 100644 --- a/maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java @@ -20,18 +20,26 @@ package com.duanledexianxianxian.maven.flink; import com.alibaba.fastjson.JSONObject; import com.duanledexianxianxian.maven.flink.model.Student; +import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; +import java.util.List; import java.util.Properties; +import java.util.concurrent.TimeUnit; /** * Skeleton for a Flink Streaming Job. @@ -52,6 +60,8 @@ public class StreamingJob { public static void main(String[] args) throws Exception { // 创建执行环境上下文 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + //设置窗口的时间单位为process time + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "113.105.144.9:9104"); @@ -62,11 +72,11 @@ public class StreamingJob { // consumer.setStartFromEarliest(); DataStream stream = env .addSource(consumer); - stream.print(); +// stream.print(); // map 函数 flatMap是map的特殊形式 SingleOutputStreamOperator student = stream.map((MapFunction) value -> JSONObject.parseObject(value, Student.class)); - student.print(); +// student.print(); // flatMap函数 // SingleOutputStreamOperator studentFlatMap = student.flatMap((FlatMapFunction) (value, out) -> { @@ -77,9 +87,8 @@ public class StreamingJob { // studentFlatMap.print(); - KeyedStream keyBy = student.keyBy((KeySelector) value -> value.getAge()); + student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).print(); - keyBy.print(); /* @@ -106,4 +115,52 @@ public class StreamingJob { // execute program env.execute("Flink Streaming Java API Skeleton"); } + + /** + * 自定义聚合函数 + */ + public static class Agg implements AggregateFunction, Long>, Tuple2, Long>> { + //创建一个数据统计的容器,提供给后续操作使用。 + @Override + public Tuple2, Long> createAccumulator() { + return new Tuple2, Long>(); + } + + //每个元素被添加进窗口的时候调用。 + //第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。 + @Override + public Tuple2, Long> add(Student in, Tuple2, Long> acc) { + List list = acc.f0; + Long count = acc.f1; + if (list == null) { + list = Lists.newArrayList(); + } + list.add(in); + if (count == null) { + count = 1L; + } else { + count++; + } + return new Tuple2<>(list, count); + } + + //窗口统计事件触发时调用来返回出统计的结果。 + @Override + public Tuple2, Long> getResult(Tuple2, Long> acc) { + return acc; + } + + //只有在当窗口合并的时候调用,合并2个容器 + @Override + public Tuple2, Long> merge(Tuple2, Long> acc1, Tuple2, Long> acc2) { + List list1 = acc1.f0; + List list2 = acc2.f0; + Long count1 = acc1.f1; + Long count2 = acc2.f1; + list1.addAll(list2); + return new Tuple2<>(list1, count1 + count2); + } + } } + + diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java new file mode 100644 index 0000000000000000000000000000000000000000..83a965a0606ca9b6d420ccce9a6c1a0eab28749e --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java @@ -0,0 +1,18 @@ +package com.duanledexianxianxian.maven.flink.common.constant; + +/** + * @author Administrator + */ +public class AppConstants { + /** + * 配置文件 + */ + public final static String PROPERTIES_FILE_NAME = "application.properties"; + + + public static final String STREAM_PARALLELISM = "stream.parallelism"; + public static final String STREAM_CHECKPOINT_ENABLE = "stream.checkpoint.enable"; + public static final String STREAM_CHECKPOINT_INTERVAL = "stream.checkpoint.interval"; + + +} diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/util/ExecutionEnvUtils.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/util/ExecutionEnvUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..7b11ac06e2b1981144deac4bed3c87b187043aa7 --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/common/util/ExecutionEnvUtils.java @@ -0,0 +1,93 @@ +package com.duanledexianxianxian.maven.flink.common.util; + +import com.duanledexianxianxian.maven.flink.common.constant.AppConstants; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * 执行环境参数工具类 + * + * @author Administrator + */ +public class ExecutionEnvUtils { + + /** + * Create parameter tool parameter tool. + * + * @param args the args + * @return the parameter tool + * @throws Exception the exception + */ + public static ParameterTool createParameterTool(final String[] args) throws Exception { + return ParameterTool + .fromPropertiesFile(ExecutionEnvUtils.class.getResourceAsStream(AppConstants.PROPERTIES_FILE_NAME)) + .mergeWith(ParameterTool.fromArgs(args)) + .mergeWith(ParameterTool.fromSystemProperties()) + .mergeWith(ParameterTool.fromMap(getenv())); + } + + + /** + * The constant PARAMETER_TOOL. + */ + public static final ParameterTool PARAMETER_TOOL = createParameterTool(); + + private static ParameterTool createParameterTool() { + try { + return ParameterTool + .fromPropertiesFile(ExecutionEnvUtils.class.getResourceAsStream(AppConstants.PROPERTIES_FILE_NAME)) + .mergeWith(ParameterTool.fromSystemProperties()) + .mergeWith(ParameterTool.fromMap(getenv())); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * 将环境变量中的值转换成map结构 + * + * @return + */ + private static Map getenv() { + Map map = new HashMap<>(); + for (Map.Entry entry : System.getenv().entrySet()) { + map.put(entry.getKey(), entry.getValue()); + } + return map; + } + + + /** + * 设置默认参数 + * + * @param parameterTool the parameter tool + * @return the stream execution environment + * @throws Exception the exception + */ + public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // 设置作业的默认并行度 + env.setParallelism(parameterTool.getInt(AppConstants.STREAM_PARALLELISM, 5)); + // 关闭系统日志 + env.getConfig().disableSysoutLogging(); + // 失败之后,重拾4次,每次间隔10秒 + env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); + if (parameterTool.getBoolean(AppConstants.STREAM_CHECKPOINT_ENABLE, true)) { + // create a checkpoint every 5 seconds + env.enableCheckpointing(parameterTool.getInt(AppConstants.STREAM_CHECKPOINT_INTERVAL, 1000)); + } + // make parameters available in the web interface + env.getConfig().setGlobalJobParameters(parameterTool); + // 默认是事件时间 + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + return env; + } +} + diff --git a/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java new file mode 100644 index 0000000000000000000000000000000000000000..d8c5dd6531023b02e0974fb53676b3c2206860c1 --- /dev/null +++ b/maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.duanledexianxianxian.maven.flink.sink.kafka; + +import com.alibaba.fastjson.JSONObject; +import com.duanledexianxianxian.maven.flink.common.util.ExecutionEnvUtils; +import com.duanledexianxianxian.maven.flink.model.Student; +import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +import java.util.List; +import java.util.Properties; + +/** + * Skeleton for a Flink Streaming Job. + * + *

For a tutorial how to write a Flink streaming application, check the + * tutorials and examples on the Flink Website. + * …… + *

To package your application into a JAR file for execution, run + * 'mvn clean package' on the command line. + * + *

If you change the name of the main class (with the public static void main(String[] args)) + * method, change the respective entry in the POM.xml file (simply search for 'mainClass'). + * + * @author Administrator + */ +public class Main { + + public static void main(String[] args) throws Exception { + final ParameterTool parameterTool = ExecutionEnvUtils.createParameterTool(args); + + // 创建执行环境上下文 + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + //设置窗口的时间单位为process time + env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); + Properties properties = new Properties(); + properties.setProperty("bootstrap.servers", "113.105.144.9:9104"); + + FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("topic_first", new SimpleStringSchema(), properties); + + //从最早开始消费 + DataStream stream = env + .addSource(consumer); + + // map 函数 flatMap是map的特殊形式 + SingleOutputStreamOperator student = stream.map((MapFunction) value -> JSONObject.parseObject(value, Student.class)); + + + student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).addSink(new KafkaProducer<>).print(); + + // execute program + env.execute("Flink Streaming Java API Skeleton"); + } + + /** + * 自定义聚合函数 + */ + public static class Agg implements AggregateFunction, Long>, Tuple2, Long>> { + //创建一个数据统计的容器,提供给后续操作使用。 + @Override + public Tuple2, Long> createAccumulator() { + return new Tuple2, Long>(); + } + + //每个元素被添加进窗口的时候调用。 + //第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。 + @Override + public Tuple2, Long> add(Student in, Tuple2, Long> acc) { + List list = acc.f0; + Long count = acc.f1; + if (list == null) { + list = Lists.newArrayList(); + } + list.add(in); + if (count == null) { + count = 1L; + } else { + count++; + } + return new Tuple2<>(list, count); + } + + //窗口统计事件触发时调用来返回出统计的结果。 + @Override + public Tuple2, Long> getResult(Tuple2, Long> acc) { + return acc; + } + + //只有在当窗口合并的时候调用,合并2个容器 + @Override + public Tuple2, Long> merge(Tuple2, Long> acc1, Tuple2, Long> acc2) { + List list1 = acc1.f0; + List list2 = acc2.f0; + Long count1 = acc1.f1; + Long count2 = acc2.f1; + list1.addAll(list2); + return new Tuple2<>(list1, count1 + count2); + } + } +} + + diff --git a/maven/src/main/resources/application.properties b/maven/src/main/resources/application.properties new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391