Commit 04f84e14 authored by duanledexianxianxian's avatar duanledexianxianxian 😁

init project

parent 96939b06
......@@ -92,6 +92,22 @@ under the License.
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<version>1.18.10</version>
</dependency>
</dependencies>
<build>
......
......@@ -18,7 +18,20 @@
package com.duanledexianxianxian.maven.flink;
import com.alibaba.fastjson.JSONObject;
import com.duanledexianxianxian.maven.flink.model.Student;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* Skeleton for a Flink Streaming Job.
......@@ -31,12 +44,43 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*
* @author Administrator
*/
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 创建执行环境上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "113.105.144.9:9104");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic_first", new SimpleStringSchema(), properties);
//从最早开始消费
// consumer.setStartFromEarliest();
DataStream<String> stream = env
.addSource(consumer);
stream.print();
// map 函数 flatMap是map的特殊形式
SingleOutputStreamOperator<Student> student = stream.map((MapFunction<String, Student>) value -> JSONObject.parseObject(value, Student.class));
student.print();
// flatMap函数
// SingleOutputStreamOperator<String> studentFlatMap = student.flatMap((FlatMapFunction<Student, String>) (value, out) -> {
// for (String word : value.getName().split("")) {
// out.collect(word);
// }
// }).returns(String.class);
// studentFlatMap.print();
KeyedStream<Student, Integer> keyBy = student.keyBy((KeySelector<Student, Integer>) value -> value.getAge());
keyBy.print();
/*
* Here, you can start creating your execution plan for Flink.
......
package com.duanledexianxianxian.maven.flink.model;
import lombok.Data;
import java.io.Serializable;
/**
* 学生实体类
*
* @author duanledexianxianxian
* @date 2019/10/19 0:32
* @since 1.0.0
*/
@Data
public class Student implements Serializable {
private static final long serialVersionUID = -4572819276161640638L;
/**
* 学生学号
*/
private String studentId;
/**
* 学生姓名
*/
private String name;
/**
* 学生年龄
*/
private Integer age;
/**
* 学生性别
* 0-男
* 1-女
*/
private Byte sex;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment