Commit e0101e59 authored by duanledexianxianxian's avatar duanledexianxianxian 😁

sync

parent 88424a64
...@@ -52,6 +52,17 @@ ...@@ -52,6 +52,17 @@
<orderEntry type="library" name="Maven: org.apache.flink:flink-clients_2.11:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-clients_2.11:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-optimizer_2.11:1.9.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-optimizer_2.11:1.9.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-guava:18.0-7.0" level="project" /> <orderEntry type="library" name="Maven: org.apache.flink:flink-shaded-guava:18.0-7.0" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-planner_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-common:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala-bridge_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-reflect:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang:scala-compiler:2.11.12" level="project" />
<orderEntry type="library" name="Maven: org.scala-lang.modules:scala-xml_2.11:1.0.5" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-streaming-scala_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-java-bridge_2.11:1.9.1" level="project" />
<orderEntry type="library" name="Maven: org.apache.flink:flink-table-api-java:1.9.1" level="project" />
<orderEntry type="library" name="Maven: com.google.guava:guava:23.0" level="project" /> <orderEntry type="library" name="Maven: com.google.guava:guava:23.0" level="project" />
<orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" /> <orderEntry type="library" name="Maven: com.google.errorprone:error_prone_annotations:2.0.18" level="project" />
<orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" /> <orderEntry type="library" name="Maven: com.google.j2objc:j2objc-annotations:1.1" level="project" />
......
...@@ -66,6 +66,18 @@ under the License. ...@@ -66,6 +66,18 @@ under the License.
<version>${flink.version}</version> <version>${flink.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
......
...@@ -7,7 +7,7 @@ public class AppConstants { ...@@ -7,7 +7,7 @@ public class AppConstants {
/** /**
* 配置文件 * 配置文件
*/ */
public final static String PROPERTIES_FILE_NAME = "application.properties"; public final static String PROPERTIES_FILE_NAME = "/application.properties";
public static final String STREAM_PARALLELISM = "stream.parallelism"; public static final String STREAM_PARALLELISM = "stream.parallelism";
......
package com.duanledexianxianxian.maven.flink.common.schemas;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/**
* @author Administrator
*/
public class SimplekafkaStringSchema implements KafkaSerializationSchema<Object>, KafkaDeserializationSchema<String> {
private String topic;
@Override
public ProducerRecord<byte[], byte[]> serialize(Object element, @Nullable Long timestamp) {
return new ProducerRecord(this.topic, JSONObject.toJSONString(element));
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return null;
}
@Override
public TypeInformation<String> getProducedType() {
return null;
}
}
...@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ...@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
...@@ -54,18 +54,31 @@ import java.util.Properties; ...@@ -54,18 +54,31 @@ import java.util.Properties;
* @author Administrator * @author Administrator
*/ */
public class Main { public class Main {
/**
* kafka=>flink=>kafka
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtils.createParameterTool(args); /**
* 参数工具类
*/
ParameterTool parameterTool = ExecutionEnvUtils.createParameterTool(args);
// 创建执行环境上下文 // 创建执行环境上下文
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置窗口的时间单位为process time //设置窗口的时间单位为process time
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "113.105.144.9:9104");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("topic_first", new SimpleStringSchema(), properties);
Properties sourceProp = new Properties();
sourceProp.setProperty("bootstrap.servers", parameterTool.get("kafka.source.brokers"));
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(parameterTool.get("kafka.source.topic"), new SimpleStringSchema(), sourceProp);
//从最早开始消费 //从最早开始消费
DataStream<String> stream = env DataStream<String> stream = env
...@@ -75,7 +88,14 @@ public class Main { ...@@ -75,7 +88,14 @@ public class Main {
SingleOutputStreamOperator<Student> student = stream.map((MapFunction<String, Student>) value -> JSONObject.parseObject(value, Student.class)); SingleOutputStreamOperator<Student> student = stream.map((MapFunction<String, Student>) value -> JSONObject.parseObject(value, Student.class));
student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).addSink(new KafkaProducer<>).print(); Properties sinkProp = new Properties();
sinkProp.setProperty("bootstrap.servers", parameterTool.get("kafka.sink.brokers"));
// Deprecated
FlinkKafkaProducer producer = new FlinkKafkaProducer(parameterTool.get("kafka.sink.brokers"), parameterTool.get("kafka.sink.topic"), new SimpleStringSchema());
student.keyBy(Student::getSex).window(TumblingEventTimeWindows.of(Time.seconds(20))).aggregate(new Agg()).map(x -> JSONObject.toJSONString(x)).addSink(producer);
// execute program // execute program
env.execute("Flink Streaming Java API Skeleton"); env.execute("Flink Streaming Java API Skeleton");
......
package com.duanledexianxianxian.maven.flink.sql.sql;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Instant;
import java.util.Random;
/**
* @author Administrator
*/
public class Main {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
SingleOutputStreamOperator<Event> source = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
// create a TableEnvironment for specific planner batch or streaming
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// convert DataStream to Table
Table table = tableEnv.fromDataStream(source, "eventTime.rowtime");
// register DataStream as Table
tableEnv.registerTable("Zhisheng", table);
Table sorted = tableEnv.sqlQuery("select * from Zhisheng ");
DataStream<Row> rowDataStream = tableEnv.toAppendStream(sorted, Row.class);
rowDataStream.print();
//把执行计划打印出来
// System.out.println(env.getExecutionPlan());
env.execute("sort-streaming-data");
}
/**
* 事件
*/
public static class Event {
Long eventTime;
Event() {
//构造生成带有事件时间的数据(乱序)
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
@Override
public String toString() {
return "Event{" +
"eventTime=" + eventTime +
'}';
}
}
/**
* 数据源,这里不断的造数据
*/
private static class OutOfOrderEventSource extends RichSourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while (running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
/**
* 时间水印
*/
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}
#kafka sink brokers
kafka.source.brokers=113.105.144.9:9104
#kafka sink topic
kafka.source.topic=topic_first
#kafka sink brokers
kafka.sink.brokers=113.105.144.9:9104
#kafka sink topic
kafka.sink.topic=topic_second
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment