Commit 0fc74f13 authored by duanledexianxianxian's avatar duanledexianxianxian 😁

dataStream api

parent 1ae838fa
......@@ -32,5 +32,10 @@
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
</project>
......@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
* @author fengyuchenglun
* @version 1.0.0
*/
@Component
//@Component
public class TestConsumer {
/**
......
......@@ -3,6 +3,10 @@ package com.duanledexianxianxian.demo.kafka.listener;
import com.alibaba.fastjson.JSONObject;
import com.duanledexianxianxian.demo.kafka.model.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
......@@ -11,6 +15,7 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.*;
/**
* @author duanledexianxianxian
......@@ -23,24 +28,39 @@ public class AppListener implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
//labdam方式
new Thread(() -> {
log.info(Thread.currentThread().getName() + ":使用lambda表达式创建线程");
ThreadFactory namedThreadFactory = new BasicThreadFactory.Builder().namingPattern("producer-kafka-pool-%d").daemon(true).build();
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
int i = 0;
while (true) {
Student student = new Student();
student.setStudentId(String.valueOf(LocalDateTime.now().getNano()));
student.setName(Thread.currentThread().getName());
student.setAge(String.valueOf(new Random().nextInt()));
log.info("Send Data:{}", student);
kafkaTemplate.send("topic_first", JSONObject.toJSONString(student));
int finalI = i;
pool.execute(() -> {
this.sendData(finalI);
// sleep 5 seconds
});
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start();
private void sendData(int i) {
Student student = new Student();
student.setStudentId("duanledexianxianxian" + i);
student.setName(RandomStringUtils.randomAlphabetic(10));
student.setAge(new Random().nextInt(19) + 6);
student.setSex(Byte.valueOf(String.valueOf((new Random().nextInt(2)))));
log.info("Thread:{} Send Data:{}", Thread.currentThread().getName(), student);
kafkaTemplate.send("topic_first", JSONObject.toJSONString(student));
}
}
......@@ -14,7 +14,22 @@ import java.io.Serializable;
@Data
public class Student implements Serializable {
private static final long serialVersionUID = -4572819276161640638L;
/**
* 学生学号
*/
private String studentId;
/**
* 学生姓名
*/
private String name;
private String age;
/**
* 学生年龄
*/
private Integer age;
/**
* 学生性别
* 0-男
* 1-女
*/
private Byte sex;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment