Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
F
flink
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Leona
web
flink
Commits
88424a64
Commit
88424a64
authored
Oct 22, 2019
by
duanledexianxianxian
😁
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
sync
parent
04f84e14
Changes
7
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
377 additions
and
5 deletions
+377
-5
maven/flink-quickstart.iml
maven/flink-quickstart.iml
+68
-1
maven/pom.xml
maven/pom.xml
+6
-0
maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java
...va/com/duanledexianxianxian/maven/flink/StreamingJob.java
+61
-4
maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java
...ianxianxian/maven/flink/common/constant/AppConstants.java
+18
-0
maven/src/main/java/com/duanledexianxianxian/maven/flink/common/util/ExecutionEnvUtils.java
...anxianxian/maven/flink/common/util/ExecutionEnvUtils.java
+93
-0
maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java
...com/duanledexianxianxian/maven/flink/sink/kafka/Main.java
+131
-0
maven/src/main/resources/application.properties
maven/src/main/resources/application.properties
+0
-0
No files found.
maven/flink-quickstart.iml
View file @
88424a64
<?xml version="1.0" encoding="UTF-8"?>
<module
type=
"JAVA_MODULE"
version=
"4"
/>
\ No newline at end of file
<module
org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule=
"true"
type=
"JAVA_MODULE"
version=
"4"
>
<component
name=
"NewModuleRootManager"
LANGUAGE_LEVEL=
"JDK_1_8"
>
<output
url=
"file://$MODULE_DIR$/target/classes"
/>
<output-test
url=
"file://$MODULE_DIR$/target/test-classes"
/>
<content
url=
"file://$MODULE_DIR$"
>
<sourceFolder
url=
"file://$MODULE_DIR$/src/main/java"
isTestSource=
"false"
/>
<sourceFolder
url=
"file://$MODULE_DIR$/src/main/resources"
type=
"java-resource"
/>
<excludeFolder
url=
"file://$MODULE_DIR$/target"
/>
</content>
<orderEntry
type=
"inheritedJdk"
/>
<orderEntry
type=
"sourceFolder"
forTests=
"false"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-java:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-core:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-annotations:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-metrics-core:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.esotericsoftware.kryo:kryo:2.24.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.esotericsoftware.minlog:minlog:1.2"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.objenesis:objenesis:2.1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: commons-collections:commons-collections:3.2.2"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.commons:commons-compress:1.18"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-shaded-asm-6:6.2.1-7.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.commons:commons-lang3:3.3.2"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.commons:commons-math3:3.5"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.slf4j:slf4j-api:1.7.15"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.google.code.findbugs:jsr305:1.3.9"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:force-shading:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-streaming-java_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-runtime_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-queryable-state-client-java:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-hadoop-fs:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: commons-io:commons-io:2.4"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-shaded-netty:4.1.32.Final-7.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-shaded-jackson:2.9.8-7.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: commons-cli:commons-cli:1.3.1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.javassist:javassist:3.19.0-GA"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.scala-lang:scala-library:2.11.12"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe.akka:akka-actor_2.11:2.5.21"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe:config:1.3.3"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.scala-lang.modules:scala-java8-compat_2.11:0.7.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe.akka:akka-stream_2.11:2.5.21"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.reactivestreams:reactive-streams:1.0.2"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe:ssl-config-core_2.11:0.3.7"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.scala-lang.modules:scala-parser-combinators_2.11:1.1.1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe.akka:akka-protobuf_2.11:2.5.21"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.typesafe.akka:akka-slf4j_2.11:2.5.21"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.clapper:grizzled-slf4j_2.11:1.3.2"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.github.scopt:scopt_2.11:3.5.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.xerial.snappy:snappy-java:1.1.4"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.twitter:chill_2.11:0.7.6"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.twitter:chill-java:0.7.6"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-clients_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-optimizer_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-shaded-guava:18.0-7.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.google.guava:guava:23.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.google.errorprone:error_prone_annotations:2.0.18"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.google.j2objc:j2objc-annotations:1.1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14"
level=
"project"
/>
<orderEntry
type=
"library"
scope=
"RUNTIME"
name=
"Maven: org.slf4j:slf4j-log4j12:1.7.7"
level=
"project"
/>
<orderEntry
type=
"library"
scope=
"RUNTIME"
name=
"Maven: log4j:log4j:1.2.17"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-connector-kafka_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-connector-kafka-base_2.11:1.9.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.kafka:kafka-clients:2.2.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.github.luben:zstd-jni:1.3.8-1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.lz4:lz4-java:1.5.0"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.alibaba:fastjson:1.2.56"
level=
"project"
/>
<orderEntry
type=
"library"
scope=
"PROVIDED"
name=
"Maven: org.projectlombok:lombok:1.18.10"
level=
"project"
/>
</component>
</module>
\ No newline at end of file
maven/pom.xml
View file @
88424a64
...
...
@@ -66,6 +66,12 @@ under the License.
<version>
${flink.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
com.google.guava
</groupId>
<artifactId>
guava
</artifactId>
<version>
23.0
</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
...
...
maven/src/main/java/com/duanledexianxianxian/maven/flink/StreamingJob.java
View file @
88424a64
...
...
@@ -20,18 +20,26 @@ package com.duanledexianxianxian.maven.flink;
import
com.alibaba.fastjson.JSONObject
;
import
com.duanledexianxianxian.maven.flink.model.Student
;
import
com.google.common.collect.Lists
;
import
org.apache.flink.api.common.functions.AggregateFunction
;
import
org.apache.flink.api.common.functions.FlatMapFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.functions.KeySelector
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.KeyedStream
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
org.apache.flink.util.Collector
;
import
java.util.List
;
import
java.util.Properties
;
import
java.util.concurrent.TimeUnit
;
/**
* Skeleton for a Flink Streaming Job.
...
...
@@ -52,6 +60,8 @@ public class StreamingJob {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 创建执行环境上下文
final
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//设置窗口的时间单位为process time
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
IngestionTime
);
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
"bootstrap.servers"
,
"113.105.144.9:9104"
);
...
...
@@ -62,11 +72,11 @@ public class StreamingJob {
// consumer.setStartFromEarliest();
DataStream
<
String
>
stream
=
env
.
addSource
(
consumer
);
stream
.
print
();
//
stream.print();
// map 函数 flatMap是map的特殊形式
SingleOutputStreamOperator
<
Student
>
student
=
stream
.
map
((
MapFunction
<
String
,
Student
>)
value
->
JSONObject
.
parseObject
(
value
,
Student
.
class
));
student
.
print
();
//
student.print();
// flatMap函数
// SingleOutputStreamOperator<String> studentFlatMap = student.flatMap((FlatMapFunction<Student, String>) (value, out) -> {
...
...
@@ -77,9 +87,8 @@ public class StreamingJob {
// studentFlatMap.print();
KeyedStream
<
Student
,
Integer
>
keyBy
=
student
.
keyBy
((
KeySelector
<
Student
,
Integer
>)
value
->
value
.
getAge
()
);
student
.
keyBy
(
Student:
:
getSex
).
window
(
TumblingEventTimeWindows
.
of
(
Time
.
seconds
(
20
))).
aggregate
(
new
Agg
()).
print
(
);
keyBy
.
print
();
/*
...
...
@@ -106,4 +115,52 @@ public class StreamingJob {
// execute program
env
.
execute
(
"Flink Streaming Java API Skeleton"
);
}
/**
* 自定义聚合函数
*/
public
static
class
Agg
implements
AggregateFunction
<
Student
,
Tuple2
<
List
<
Student
>,
Long
>,
Tuple2
<
List
<
Student
>,
Long
>>
{
//创建一个数据统计的容器,提供给后续操作使用。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
createAccumulator
()
{
return
new
Tuple2
<
List
<
Student
>,
Long
>();
}
//每个元素被添加进窗口的时候调用。
//第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
add
(
Student
in
,
Tuple2
<
List
<
Student
>,
Long
>
acc
)
{
List
<
Student
>
list
=
acc
.
f0
;
Long
count
=
acc
.
f1
;
if
(
list
==
null
)
{
list
=
Lists
.
newArrayList
();
}
list
.
add
(
in
);
if
(
count
==
null
)
{
count
=
1L
;
}
else
{
count
++;
}
return
new
Tuple2
<>(
list
,
count
);
}
//窗口统计事件触发时调用来返回出统计的结果。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
getResult
(
Tuple2
<
List
<
Student
>,
Long
>
acc
)
{
return
acc
;
}
//只有在当窗口合并的时候调用,合并2个容器
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
merge
(
Tuple2
<
List
<
Student
>,
Long
>
acc1
,
Tuple2
<
List
<
Student
>,
Long
>
acc2
)
{
List
<
Student
>
list1
=
acc1
.
f0
;
List
<
Student
>
list2
=
acc2
.
f0
;
Long
count1
=
acc1
.
f1
;
Long
count2
=
acc2
.
f1
;
list1
.
addAll
(
list2
);
return
new
Tuple2
<>(
list1
,
count1
+
count2
);
}
}
}
maven/src/main/java/com/duanledexianxianxian/maven/flink/common/constant/AppConstants.java
0 → 100644
View file @
88424a64
package
com.duanledexianxianxian.maven.flink.common.constant
;
/**
* @author Administrator
*/
public
class
AppConstants
{
/**
* 配置文件
*/
public
final
static
String
PROPERTIES_FILE_NAME
=
"application.properties"
;
public
static
final
String
STREAM_PARALLELISM
=
"stream.parallelism"
;
public
static
final
String
STREAM_CHECKPOINT_ENABLE
=
"stream.checkpoint.enable"
;
public
static
final
String
STREAM_CHECKPOINT_INTERVAL
=
"stream.checkpoint.interval"
;
}
maven/src/main/java/com/duanledexianxianxian/maven/flink/common/util/ExecutionEnvUtils.java
0 → 100644
View file @
88424a64
package
com.duanledexianxianxian.maven.flink.common.util
;
import
com.duanledexianxianxian.maven.flink.common.constant.AppConstants
;
import
org.apache.flink.api.common.restartstrategy.RestartStrategies
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
java.io.IOException
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* 执行环境参数工具类
*
* @author Administrator
*/
public
class
ExecutionEnvUtils
{
/**
* Create parameter tool parameter tool.
*
* @param args the args
* @return the parameter tool
* @throws Exception the exception
*/
public
static
ParameterTool
createParameterTool
(
final
String
[]
args
)
throws
Exception
{
return
ParameterTool
.
fromPropertiesFile
(
ExecutionEnvUtils
.
class
.
getResourceAsStream
(
AppConstants
.
PROPERTIES_FILE_NAME
))
.
mergeWith
(
ParameterTool
.
fromArgs
(
args
))
.
mergeWith
(
ParameterTool
.
fromSystemProperties
())
.
mergeWith
(
ParameterTool
.
fromMap
(
getenv
()));
}
/**
* The constant PARAMETER_TOOL.
*/
public
static
final
ParameterTool
PARAMETER_TOOL
=
createParameterTool
();
private
static
ParameterTool
createParameterTool
()
{
try
{
return
ParameterTool
.
fromPropertiesFile
(
ExecutionEnvUtils
.
class
.
getResourceAsStream
(
AppConstants
.
PROPERTIES_FILE_NAME
))
.
mergeWith
(
ParameterTool
.
fromSystemProperties
())
.
mergeWith
(
ParameterTool
.
fromMap
(
getenv
()));
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
return
null
;
}
/**
* 将环境变量中的值转换成map结构
*
* @return
*/
private
static
Map
<
String
,
String
>
getenv
()
{
Map
<
String
,
String
>
map
=
new
HashMap
<>();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
System
.
getenv
().
entrySet
())
{
map
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
return
map
;
}
/**
* 设置默认参数
*
* @param parameterTool the parameter tool
* @return the stream execution environment
* @throws Exception the exception
*/
public
static
StreamExecutionEnvironment
prepare
(
ParameterTool
parameterTool
)
throws
Exception
{
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
// 设置作业的默认并行度
env
.
setParallelism
(
parameterTool
.
getInt
(
AppConstants
.
STREAM_PARALLELISM
,
5
));
// 关闭系统日志
env
.
getConfig
().
disableSysoutLogging
();
// 失败之后,重拾4次,每次间隔10秒
env
.
getConfig
().
setRestartStrategy
(
RestartStrategies
.
fixedDelayRestart
(
4
,
10000
));
if
(
parameterTool
.
getBoolean
(
AppConstants
.
STREAM_CHECKPOINT_ENABLE
,
true
))
{
// create a checkpoint every 5 seconds
env
.
enableCheckpointing
(
parameterTool
.
getInt
(
AppConstants
.
STREAM_CHECKPOINT_INTERVAL
,
1000
));
}
// make parameters available in the web interface
env
.
getConfig
().
setGlobalJobParameters
(
parameterTool
);
// 默认是事件时间
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
EventTime
);
return
env
;
}
}
maven/src/main/java/com/duanledexianxianxian/maven/flink/sink/kafka/Main.java
0 → 100644
View file @
88424a64
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.duanledexianxianxian.maven.flink.sink.kafka
;
import
com.alibaba.fastjson.JSONObject
;
import
com.duanledexianxianxian.maven.flink.common.util.ExecutionEnvUtils
;
import
com.duanledexianxianxian.maven.flink.model.Student
;
import
com.google.common.collect.Lists
;
import
org.apache.flink.api.common.functions.AggregateFunction
;
import
org.apache.flink.api.common.functions.MapFunction
;
import
org.apache.flink.api.common.serialization.SimpleStringSchema
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.api.java.utils.ParameterTool
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
;
import
org.apache.kafka.clients.producer.KafkaProducer
;
import
java.util.List
;
import
java.util.Properties
;
/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
* ……
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*
* @author Administrator
*/
public
class
Main
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
final
ParameterTool
parameterTool
=
ExecutionEnvUtils
.
createParameterTool
(
args
);
// 创建执行环境上下文
final
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
//设置窗口的时间单位为process time
env
.
setStreamTimeCharacteristic
(
TimeCharacteristic
.
IngestionTime
);
Properties
properties
=
new
Properties
();
properties
.
setProperty
(
"bootstrap.servers"
,
"113.105.144.9:9104"
);
FlinkKafkaConsumer
<
String
>
consumer
=
new
FlinkKafkaConsumer
<>(
"topic_first"
,
new
SimpleStringSchema
(),
properties
);
//从最早开始消费
DataStream
<
String
>
stream
=
env
.
addSource
(
consumer
);
// map 函数 flatMap是map的特殊形式
SingleOutputStreamOperator
<
Student
>
student
=
stream
.
map
((
MapFunction
<
String
,
Student
>)
value
->
JSONObject
.
parseObject
(
value
,
Student
.
class
));
student
.
keyBy
(
Student:
:
getSex
).
window
(
TumblingEventTimeWindows
.
of
(
Time
.
seconds
(
20
))).
aggregate
(
new
Agg
()).
addSink
(
new
KafkaProducer
<>).
print
();
// execute program
env
.
execute
(
"Flink Streaming Java API Skeleton"
);
}
/**
* 自定义聚合函数
*/
public
static
class
Agg
implements
AggregateFunction
<
Student
,
Tuple2
<
List
<
Student
>,
Long
>,
Tuple2
<
List
<
Student
>,
Long
>>
{
//创建一个数据统计的容器,提供给后续操作使用。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
createAccumulator
()
{
return
new
Tuple2
<
List
<
Student
>,
Long
>();
}
//每个元素被添加进窗口的时候调用。
//第一个参数是添加进窗口的元素,第二个参数是统计的容器(上面创建的那个)。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
add
(
Student
in
,
Tuple2
<
List
<
Student
>,
Long
>
acc
)
{
List
<
Student
>
list
=
acc
.
f0
;
Long
count
=
acc
.
f1
;
if
(
list
==
null
)
{
list
=
Lists
.
newArrayList
();
}
list
.
add
(
in
);
if
(
count
==
null
)
{
count
=
1L
;
}
else
{
count
++;
}
return
new
Tuple2
<>(
list
,
count
);
}
//窗口统计事件触发时调用来返回出统计的结果。
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
getResult
(
Tuple2
<
List
<
Student
>,
Long
>
acc
)
{
return
acc
;
}
//只有在当窗口合并的时候调用,合并2个容器
@Override
public
Tuple2
<
List
<
Student
>,
Long
>
merge
(
Tuple2
<
List
<
Student
>,
Long
>
acc1
,
Tuple2
<
List
<
Student
>,
Long
>
acc2
)
{
List
<
Student
>
list1
=
acc1
.
f0
;
List
<
Student
>
list2
=
acc2
.
f0
;
Long
count1
=
acc1
.
f1
;
Long
count2
=
acc2
.
f1
;
list1
.
addAll
(
list2
);
return
new
Tuple2
<>(
list1
,
count1
+
count2
);
}
}
}
maven/src/main/resources/application.properties
0 → 100644
View file @
88424a64
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment