From 1ae838faed2593ed8fabcda62a0090f8ad9118f9 Mon Sep 17 00:00:00 2001 From: duanledexianxianxian Date: Mon, 21 Oct 2019 10:17:17 +0800 Subject: [PATCH] add kafka demo --- .gitignore | 83 +++++++++++++++++++ README.md | 5 +- kafka/pom.xml | 36 ++++++++ .../demo/kafka/Application.java | 21 +++++ .../demo/kafka/consumer/TestConsumer.java | 26 ++++++ .../kafka/controller/KafkaController.java | 24 ++++++ .../demo/kafka/listener/AppListener.java | 46 ++++++++++ .../demo/kafka/model/Student.java | 20 +++++ kafka/src/main/resources/application.yml | 38 +++++++++ pom.xml | 58 +++++++++++++ 10 files changed, 356 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 kafka/pom.xml create mode 100644 kafka/src/main/java/com/duanledexianxianxian/demo/kafka/Application.java create mode 100644 kafka/src/main/java/com/duanledexianxianxian/demo/kafka/consumer/TestConsumer.java create mode 100644 kafka/src/main/java/com/duanledexianxianxian/demo/kafka/controller/KafkaController.java create mode 100644 kafka/src/main/java/com/duanledexianxianxian/demo/kafka/listener/AppListener.java create mode 100644 kafka/src/main/java/com/duanledexianxianxian/demo/kafka/model/Student.java create mode 100644 kafka/src/main/resources/application.yml create mode 100644 pom.xml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..639b399 --- /dev/null +++ b/.gitignore @@ -0,0 +1,83 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar + diff --git a/README.md b/README.md index 2aa4975..174ed4a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ # springboot-demo -Spring boot demo \ No newline at end of file +Spring boot demo + +### Note +1. springboot 配置文件统一使用.yml文件格式,比如application.yml \ No newline at end of file diff --git a/kafka/pom.xml b/kafka/pom.xml new file mode 100644 index 0000000..c77bf41 --- /dev/null +++ b/kafka/pom.xml @@ -0,0 +1,36 @@ + + + + demo-parent + com.duanledexianxianxian.demo + 1.0-SNAPSHOT + + 4.0.0 + + kafka + + + + + org.springframework.kafka + spring-kafka + + + com.alibaba + fastjson + 1.2.56 + + + org.projectlombok + lombok + provided + + + org.springframework.kafka + spring-kafka-test + test + + + \ No newline at end of file diff --git a/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/Application.java b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/Application.java new file mode 100644 index 0000000..fd0a87d --- /dev/null +++ b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/Application.java @@ -0,0 +1,21 @@ +package com.duanledexianxianxian.demo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author duanledexianxianxian + * @date 2019/10/18 23:32 + * @since 1.0.0 + */ +@SpringBootApplication +@Slf4j +public class Application { + + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + + } +} diff --git a/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/consumer/TestConsumer.java b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/consumer/TestConsumer.java new file mode 100644 index 0000000..a101ab3 --- /dev/null +++ b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/consumer/TestConsumer.java @@ -0,0 +1,26 @@ +package com.duanledexianxianxian.demo.kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * kafka消费者测试 + * + * @author fengyuchenglun + * @version 1.0.0 + */ +@Component +public class TestConsumer { + + /** + * Listen. + * + * @param record the record + * @throws Exception the exception + */ + @KafkaListener(topics = "topic_first") + public void listen(ConsumerRecord record) throws Exception { + System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); + } +} diff --git a/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/controller/KafkaController.java b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/controller/KafkaController.java new file mode 100644 index 0000000..c6878ba --- /dev/null +++ b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/controller/KafkaController.java @@ -0,0 +1,24 @@ +package com.duanledexianxianxian.demo.kafka.controller; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author duanledexianxianxian + * @date 2019/10/18 23:50 + * @since 1.0.0 + */ +@RestController +public class KafkaController { + @Autowired + private KafkaTemplate kafkaTemplate; + + @GetMapping("/message/send") + public boolean send(@RequestParam String message){ + kafkaTemplate.send("testTopic",message); + return true; + } +} diff --git a/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/listener/AppListener.java b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/listener/AppListener.java new file mode 100644 index 0000000..e1a8b4b --- /dev/null +++ b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/listener/AppListener.java @@ -0,0 +1,46 @@ +package com.duanledexianxianxian.demo.kafka.listener; + +import com.alibaba.fastjson.JSONObject; +import com.duanledexianxianxian.demo.kafka.model.Student; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Random; + +/** + * @author duanledexianxianxian + * @date 2019/10/19 0:44 + * @since 1.0.0 + */ +@Component +@Slf4j +public class AppListener implements ApplicationListener { + @Autowired + private KafkaTemplate kafkaTemplate; + + @Override + public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { + //labdam方式 + new Thread(() -> { + log.info(Thread.currentThread().getName() + ":使用lambda表达式创建线程"); + while (true) { + Student student = new Student(); + student.setStudentId(String.valueOf(LocalDateTime.now().getNano())); + student.setName(Thread.currentThread().getName()); + student.setAge(String.valueOf(new Random().nextInt())); + log.info("Send Data:{}", student); + kafkaTemplate.send("topic_first", JSONObject.toJSONString(student)); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + } +} diff --git a/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/model/Student.java b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/model/Student.java new file mode 100644 index 0000000..3bf32ec --- /dev/null +++ b/kafka/src/main/java/com/duanledexianxianxian/demo/kafka/model/Student.java @@ -0,0 +1,20 @@ +package com.duanledexianxianxian.demo.kafka.model; + +import lombok.Data; + +import java.io.Serializable; + +/** + * 学生实体类 + * + * @author duanledexianxianxian + * @date 2019/10/19 0:32 + * @since 1.0.0 + */ +@Data +public class Student implements Serializable { + private static final long serialVersionUID = -4572819276161640638L; + private String studentId; + private String name; + private String age; +} diff --git a/kafka/src/main/resources/application.yml b/kafka/src/main/resources/application.yml new file mode 100644 index 0000000..9ad2db8 --- /dev/null +++ b/kafka/src/main/resources/application.yml @@ -0,0 +1,38 @@ +spring: + kafka: + # 指定kafka server的地址,集群配多个,中间,逗号隔开 + bootstrap-servers: 113.105.144.9:9104 + #=============== provider ======================= + producer: + # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败, + # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。 + retries: 0 + # 每次批量发送消息的数量,produce积累到一定数据,一次发送 + batch-size: 16384 + # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据 + buffer-memory: 33554432 + # 指定消息key和消息体的编解码方式 + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer + properties: + #当设置了缓冲区,消息就不会即时发送,如果消息总不够条数、或者消息不够buffer大小就不发送了吗?当消息超过linger时间,也会发送。 + linger.ms: 1 + #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下: + #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。 + #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。 + #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。 + #可以设置的值为:all, -1, 0, 1 + acks: 1 + + #=============== consumer ======================= + consumer: + group-id: test + # enable.auto.commit:true --> 设置自动提交offset + enable-auto-commit: true + # auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。: + auto-commit-interval: 100ms + # 指定消息key和消息体的编解码方式 + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.apache.kafka.common.serialization.StringDeserializer + properties: + session.timeout.ms: 15000 \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..89c8ba3 --- /dev/null +++ b/pom.xml @@ -0,0 +1,58 @@ + + + 4.0.0 + + com.duanledexianxianxian.demo + demo-parent + pom + 1.0-SNAPSHOT + + kafka + + + + org.springframework.boot + spring-boot-starter-parent + 2.1.6.RELEASE + + + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + 1.18.10 + provided + + + org.springframework.boot + spring-boot-starter-test + test + + + com.jayway.jsonpath + json-path + test + + + + + 1.8 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file -- GitLab