Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
F
flink
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Service Desk
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Operations
Operations
Incidents
Environments
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Leona
web
flink
Commits
8b8dbd87
Commit
8b8dbd87
authored
Oct 30, 2019
by
duanledexianxianxian
😁
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
sync code
parent
1d7ea378
Changes
6
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
360 additions
and
1 deletion
+360
-1
maven/.gitignore
maven/.gitignore
+1
-0
maven/flink-quickstart.iml
maven/flink-quickstart.iml
+1
-0
maven/pom.xml
maven/pom.xml
+8
-0
maven/src/main/java/com/duanledexianxianxian/maven/flink/HotItems.java
...n/java/com/duanledexianxianxian/maven/flink/HotItems.java
+201
-1
maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java
...a/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java
+82
-0
maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main2.java
...a/com/duanledexianxianxian/maven/flink/sql/sql/Main2.java
+67
-0
No files found.
maven/.gitignore
View file @
8b8dbd87
...
...
@@ -107,3 +107,4 @@ buildNumber.properties
.mvn/wrapper/maven-wrapper.jar
/.idea/
/src/main/resources/UserBehavior.csv
maven/flink-quickstart.iml
View file @
8b8dbd87
...
...
@@ -67,6 +67,7 @@
<orderEntry
type=
"library"
name=
"Maven: com.google.errorprone:error_prone_annotations:2.0.18"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: com.google.j2objc:j2objc-annotations:1.1"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.codehaus.mojo:animal-sniffer-annotations:1.14"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-table-planner-blink_2.11:1.0-SNAPSHOT"
level=
"project"
/>
<orderEntry
type=
"library"
scope=
"RUNTIME"
name=
"Maven: org.slf4j:slf4j-log4j12:1.7.7"
level=
"project"
/>
<orderEntry
type=
"library"
scope=
"RUNTIME"
name=
"Maven: log4j:log4j:1.2.17"
level=
"project"
/>
<orderEntry
type=
"library"
name=
"Maven: org.apache.flink:flink-connector-kafka_2.11:1.9.0"
level=
"project"
/>
...
...
maven/pom.xml
View file @
8b8dbd87
...
...
@@ -85,6 +85,14 @@ under the License.
</dependency>
<dependency>
<groupId>
org.apache.flink
</groupId>
<artifactId>
flink-table-planner-blink_2.12
</artifactId>
<version>
1.9.1
</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
...
...
maven/src/main/java/com/duanledexianxianxian/maven/flink/HotItems.java
View file @
8b8dbd87
package
com.duanledexianxianxian.maven.flink
;
import
org.apache.flink.api.common.functions.AggregateFunction
;
import
org.apache.flink.api.common.functions.FilterFunction
;
import
org.apache.flink.api.common.state.ListState
;
import
org.apache.flink.api.common.state.ListStateDescriptor
;
import
org.apache.flink.api.java.io.PojoCsvInputFormat
;
import
org.apache.flink.api.java.tuple.Tuple
;
import
org.apache.flink.api.java.tuple.Tuple1
;
import
org.apache.flink.api.java.typeutils.PojoTypeInfo
;
import
org.apache.flink.api.java.typeutils.TypeExtractor
;
import
org.apache.flink.configuration.Configuration
;
import
org.apache.flink.core.fs.Path
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.streaming.api.functions.KeyedProcessFunction
;
import
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
;
import
org.apache.flink.streaming.api.functions.windowing.WindowFunction
;
import
org.apache.flink.streaming.api.windowing.time.Time
;
import
org.apache.flink.streaming.api.windowing.windows.TimeWindow
;
import
org.apache.flink.util.Collector
;
import
java.io.File
;
import
java.net.URISyntaxException
;
import
java.net.URL
;
import
java.sql.Timestamp
;
import
java.util.ArrayList
;
import
java.util.Comparator
;
import
java.util.List
;
/**
* 热门商品推荐
...
...
@@ -17,7 +37,7 @@ import java.net.URL;
* 根据淘宝提供的用户行为数据,分析每天topn热点商品
*/
public
class
HotItems
{
public
static
void
main
(
String
[]
args
)
throws
URISyntax
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 创建 execution environment
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
// 告诉系统按照 EventTime 处理时间
...
...
@@ -28,6 +48,186 @@ public class HotItems {
// UserBehavior.csv 的本地文件路径, 在 resources 目录下
URL
fileUrl
=
HotItems
.
class
.
getClassLoader
().
getResource
(
"UserBehavior.csv"
);
// 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
Path
filePath
=
Path
.
fromLocalFile
(
new
File
(
fileUrl
.
toURI
()));
PojoTypeInfo
<
UserBehavior
>
pojoType
=
(
PojoTypeInfo
<
UserBehavior
>)
TypeExtractor
.
createTypeInfo
(
UserBehavior
.
class
);
// 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
String
[]
fieldOrder
=
new
String
[]{
"userId"
,
"itemId"
,
"categoryId"
,
"behavior"
,
"timestamp"
};
// 创建 PojoCsvInputFormat
PojoCsvInputFormat
<
UserBehavior
>
csvInput
=
new
PojoCsvInputFormat
<>(
filePath
,
pojoType
,
fieldOrder
);
env
// 创建数据源,得到 UserBehavior 类型的 DataStream
.
createInput
(
csvInput
,
pojoType
)
// 抽取出时间和生成 watermark
.
assignTimestampsAndWatermarks
(
new
AscendingTimestampExtractor
<
UserBehavior
>()
{
@Override
public
long
extractAscendingTimestamp
(
UserBehavior
userBehavior
)
{
// 原始数据单位秒,将其转成毫秒
return
userBehavior
.
timestamp
*
1000
;
}
})
// 过滤出只有点击的数据
.
filter
(
new
FilterFunction
<
UserBehavior
>()
{
@Override
public
boolean
filter
(
UserBehavior
userBehavior
)
throws
Exception
{
// 过滤出只有点击的数据
return
userBehavior
.
behavior
.
equals
(
"pv"
);
}
})
.
keyBy
(
"itemId"
)
.
timeWindow
(
Time
.
minutes
(
60
),
Time
.
minutes
(
5
))
.
aggregate
(
new
CountAgg
(),
new
WindowResultFunction
())
.
keyBy
(
"windowEnd"
)
.
process
(
new
TopNHotItems
(
3
))
.
print
();
env
.
execute
(
"Hot Items Job"
);
}
/**
* 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
*/
public
static
class
TopNHotItems
extends
KeyedProcessFunction
<
Tuple
,
ItemViewCount
,
String
>
{
private
final
int
topSize
;
public
TopNHotItems
(
int
topSize
)
{
this
.
topSize
=
topSize
;
}
// 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
private
ListState
<
ItemViewCount
>
itemState
;
@Override
public
void
open
(
Configuration
parameters
)
throws
Exception
{
super
.
open
(
parameters
);
ListStateDescriptor
<
ItemViewCount
>
itemsStateDesc
=
new
ListStateDescriptor
<>(
"itemState-state"
,
ItemViewCount
.
class
);
itemState
=
getRuntimeContext
().
getListState
(
itemsStateDesc
);
}
@Override
public
void
processElement
(
ItemViewCount
input
,
Context
context
,
Collector
<
String
>
collector
)
throws
Exception
{
// 每条数据都保存到状态中
itemState
.
add
(
input
);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
context
.
timerService
().
registerEventTimeTimer
(
input
.
windowEnd
+
1
);
}
@Override
public
void
onTimer
(
long
timestamp
,
OnTimerContext
ctx
,
Collector
<
String
>
out
)
throws
Exception
{
// 获取收到的所有商品点击量
List
<
ItemViewCount
>
allItems
=
new
ArrayList
<>();
for
(
ItemViewCount
item
:
itemState
.
get
())
{
allItems
.
add
(
item
);
}
// 提前清除状态中的数据,释放空间
itemState
.
clear
();
// 按照点击量从大到小排序
allItems
.
sort
(
new
Comparator
<
ItemViewCount
>()
{
@Override
public
int
compare
(
ItemViewCount
o1
,
ItemViewCount
o2
)
{
return
(
int
)
(
o2
.
viewCount
-
o1
.
viewCount
);
}
});
// 将排名信息格式化成 String, 便于打印
StringBuilder
result
=
new
StringBuilder
();
result
.
append
(
"====================================\n"
);
result
.
append
(
"时间: "
).
append
(
new
Timestamp
(
timestamp
-
1
)).
append
(
"\n"
);
for
(
int
i
=
0
;
i
<
allItems
.
size
()
&&
i
<
topSize
;
i
++)
{
ItemViewCount
currentItem
=
allItems
.
get
(
i
);
// No1: 商品ID=12224 浏览量=2413
result
.
append
(
"No"
).
append
(
i
).
append
(
":"
)
.
append
(
" 商品ID="
).
append
(
currentItem
.
itemId
)
.
append
(
" 浏览量="
).
append
(
currentItem
.
viewCount
)
.
append
(
"\n"
);
}
result
.
append
(
"====================================\n\n"
);
// 控制输出频率,模拟实时滚动结果
Thread
.
sleep
(
1000
);
out
.
collect
(
result
.
toString
());
}
}
/**
* 用于输出窗口的结果
*/
public
static
class
WindowResultFunction
implements
WindowFunction
<
Long
,
ItemViewCount
,
Tuple
,
TimeWindow
>
{
@Override
public
void
apply
(
Tuple
key
,
// 窗口的主键,即 itemId
TimeWindow
window
,
// 窗口
Iterable
<
Long
>
aggregateResult
,
// 聚合函数的结果,即 count 值
Collector
<
ItemViewCount
>
collector
// 输出类型为 ItemViewCount
)
throws
Exception
{
Long
itemId
=
((
Tuple1
<
Long
>)
key
).
f0
;
Long
count
=
aggregateResult
.
iterator
().
next
();
collector
.
collect
(
ItemViewCount
.
of
(
itemId
,
window
.
getEnd
(),
count
));
}
}
/**
* COUNT 统计的聚合函数实现,每出现一条记录加一
*/
public
static
class
CountAgg
implements
AggregateFunction
<
UserBehavior
,
Long
,
Long
>
{
@Override
public
Long
createAccumulator
()
{
return
0L
;
}
@Override
public
Long
add
(
UserBehavior
userBehavior
,
Long
acc
)
{
return
acc
+
1
;
}
@Override
public
Long
getResult
(
Long
acc
)
{
return
acc
;
}
@Override
public
Long
merge
(
Long
acc1
,
Long
acc2
)
{
return
acc1
+
acc2
;
}
}
/**
* 商品点击量(窗口操作的输出类型)
*/
public
static
class
ItemViewCount
{
public
long
itemId
;
// 商品ID
public
long
windowEnd
;
// 窗口结束时间戳
public
long
viewCount
;
// 商品的点击量
public
static
ItemViewCount
of
(
long
itemId
,
long
windowEnd
,
long
viewCount
)
{
ItemViewCount
result
=
new
ItemViewCount
();
result
.
itemId
=
itemId
;
result
.
windowEnd
=
windowEnd
;
result
.
viewCount
=
viewCount
;
return
result
;
}
}
/**
* 用户行为数据结构
**/
public
static
class
UserBehavior
{
public
long
userId
;
// 用户ID
public
long
itemId
;
// 商品ID
public
int
categoryId
;
// 商品类目ID
public
String
behavior
;
// 用户行为, 包括("pv", "buy", "cart", "fav")
public
long
timestamp
;
// 行为发生的时间戳,单位秒
}
}
maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main1.java
0 → 100644
View file @
8b8dbd87
package
com.duanledexianxianxian.maven.flink.sql.sql
;
import
com.google.common.collect.Lists
;
import
lombok.AllArgsConstructor
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
import
org.apache.flink.api.java.tuple.Tuple2
;
import
org.apache.flink.streaming.api.TimeCharacteristic
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.EnvironmentSettings
;
import
org.apache.flink.table.api.Table
;
import
org.apache.flink.table.api.TableEnvironment
;
import
org.apache.flink.table.api.java.StreamTableEnvironment
;
import
org.apache.flink.types.Row
;
import
org.apache.flink.types.StringValue
;
import
java.util.List
;
public
class
Main1
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// Streaming 环境
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
EnvironmentSettings
settings
=
EnvironmentSettings
.
newInstance
()
.
useBlinkPlanner
()
.
inStreamingMode
()
.
build
();
StreamTableEnvironment
tableEnv
=
StreamTableEnvironment
.
create
(
env
,
settings
);
// 设置EventTime
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//方便我们查出输出数据
env
.
setParallelism
(
1
);
List
<
PageAccess
>
pageAccessList
=
Lists
.
newArrayList
();
initData
(
pageAccessList
);
DataStream
<
PageAccess
>
stream
=
env
.
fromCollection
(
pageAccessList
);
// Table pageAccess = tableEnv.fromDataStream(stream, "region,userId,accessTime");
tableEnv
.
registerDataStream
(
"pageAccess"
,
stream
,
"region,userId,accessTime"
);
Table
result
=
tableEnv
.
sqlQuery
(
"select * from pageAccess where region='BeiJing' order by userId desc"
);
DataStream
<
Row
>
rowDataStream
=
tableEnv
.
toAppendStream
(
result
,
Row
.
class
);
rowDataStream
.
print
();
env
.
execute
(
"Flink sql"
);
}
public
static
void
initData
(
List
<
PageAccess
>
pageAccessList
)
{
for
(
Object
[]
initDatum
:
INIT_DATA
)
{
PageAccess
pageAccess
=
new
PageAccess
(
String
.
valueOf
(
initDatum
[
0
]),
String
.
valueOf
(
initDatum
[
1
]),
Long
.
valueOf
(
String
.
valueOf
(
initDatum
[
2
])));
pageAccessList
.
add
(
pageAccess
);
}
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public
static
class
PageAccess
{
// 区域
private
String
region
;
// 用户名称
private
String
userId
;
// 访问时间
private
Long
accessTime
;
}
private
static
final
Object
[][]
INIT_DATA
=
{
{
"ShangHai"
,
"U0010"
,
1510365660000L
},
{
"BeiJing"
,
"U1001"
,
1510365660000L
},
{
"BeiJing"
,
"U2032"
,
1510366200000L
},
{
"BeiJing"
,
"U1100"
,
1510366260000L
},
{
"ShangHai"
,
"U0011"
,
1510373400000L
},
};
}
maven/src/main/java/com/duanledexianxianxian/maven/flink/sql/sql/Main2.java
0 → 100644
View file @
8b8dbd87
package
com.duanledexianxianxian.maven.flink.sql.sql
;
import
org.apache.flink.streaming.api.datastream.DataStream
;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
;
import
org.apache.flink.table.api.Table
;
import
org.apache.flink.table.api.java.StreamTableEnvironment
;
import
org.apache.flink.types.Row
;
import
java.util.Arrays
;
/**
* @author Administrator
*/
public
class
Main2
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// set up execution environment
StreamExecutionEnvironment
env
=
StreamExecutionEnvironment
.
getExecutionEnvironment
();
StreamTableEnvironment
tableEnv
=
StreamTableEnvironment
.
create
(
env
);
DataStream
<
Order
>
orderA
=
env
.
fromCollection
(
Arrays
.
asList
(
new
Order
(
1L
,
"beer"
,
3
),
new
Order
(
1L
,
"diaper"
,
4
),
new
Order
(
3L
,
"rubber"
,
2
)));
DataStream
<
Order
>
orderB
=
env
.
fromCollection
(
Arrays
.
asList
(
new
Order
(
2L
,
"pen"
,
3
),
new
Order
(
2L
,
"rubber"
,
3
),
new
Order
(
4L
,
"beer"
,
1
)));
// convert DataStream to Table
Table
tableA
=
tableEnv
.
fromDataStream
(
orderA
,
"user, product, amount"
);
DataStream
<
Row
>
rowDataStream
=
tableEnv
.
toAppendStream
(
tableA
,
Row
.
class
);
rowDataStream
.
print
();
env
.
execute
(
"Flink sql"
);
}
/**
* Simple POJO.
*/
public
static
class
Order
{
public
Long
user
;
public
String
product
;
public
int
amount
;
public
Order
()
{
}
public
Order
(
Long
user
,
String
product
,
int
amount
)
{
this
.
user
=
user
;
this
.
product
=
product
;
this
.
amount
=
amount
;
}
@Override
public
String
toString
()
{
return
"Order{"
+
"user="
+
user
+
", product='"
+
product
+
'\''
+
", amount="
+
amount
+
'}'
;
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment