Docker部署Kafka与SpringBoot整合 | Eddie'Blog
Docker部署Kafka与SpringBoot整合

Docker部署Kafka与SpringBoot整合

eddie 558 2020-10-09

目录

一、镜像拉取

docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker pull sheepkiller/kafka-manager

二、编写 docker-compose.yml

2.1 新建目录
mkdir -p /data/kafka/

2.2 docker
###################
version: '3.2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    restart: always
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    depends_on: [ zookeeper ]
    ports:
      - "9092:9092" 
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.x.xxx:9092
      - KAFKA_LISTENERS=PLAINTEXT://:9092
    volumes:
      - /data/kafka/docker.sock:/var/run/docker.sock
    restart: always
  kafka-manager:
    image: sheepkiller/kafka-manager
    container_name: kafka-manager 
    ports:
      - "9000:9000" 
    environment:
      ZK_HOSTS: zookeeper:2181
    restart: always

三、启动服务

docker-compose up -d

四、容器测试

4.1 进入kafka容器
docker exec -it kafka bash

4.2 进入bin目录
cd /opt/kafka_2.13-2.6.0/bin/

4.3 创建一个topic, –zookeeper后面的参数为,容器的name
kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1  

4.4 查看刚刚创建的topic, –zookeeper后面的参数为,容器的name
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test

4.5 发布消息
bash-4.4# kafka-console-producer.sh --topic=test --broker-list kafka:9092
>hello,eddie
>i am eddie

4.6 接受消息
bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic test        
hello,eddie
i am eddie

五、Kafka-manager管理界面

http://192.168.x.xxx:9000/

Clusters --> Add Cluster

图片.png

六、Springboot Consumer 整合

6.1 application.yml

############################################################
#
#  服务配置
#
############################################################
server:
  servlet:
    context-path: /consumer
  port: 8002

############################################################
#
#  spring 与 lafla 整合
#
############################################################
spring:
  kafka:
    bootstrap-servers: 192.168.x.xxx:9092
    consumer:
      enable-auto-commit: false # 消息的签收机制:手工签收
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      ## 序列化配置
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: manual # 手工签收
      concurrency: 5

6.2 消费端接收消息

package com.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @author eddie.lee
 * @projectName springboot-kafka
 * @package com.kafka.consumer
 * @className kafkaConsumerService
 * @description
 * @date created in 2020-10-14 10:58
 * @modified by
 */
@Slf4j
@Component
public class KafkaConsumerService {

    @KafkaListener(groupId = "group02", topics = "topic02")
    public void onMessage(ConsumerRecord<String, Object> record,
                          Acknowledgment acknowledgment,
                          Consumer<?, ?> consumer) {
        log.info("消费端接收消息:{}" , record.value());
        // 手工签收机制
        //acknowledgment.acknowledge();
    }
}

6.3 查看组信息 (Consumer需要启动状态)

bash-4.4# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group group02

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
group02         topic02         0          0               0               0               consumer-2-6e2d9d53-32e2-4c74-b11e-a6f9e6ade8d2 /192.168.x.xx   consumer-2

七、Springboot Producer整合

7.1 Producer配置

############################################################
#
#  服务配置
#
############################################################
server:
  port: 8001
  servlet:
    context-path: /producer

############################################################
#
#  spring 与 lafla 整合
#
############################################################
spring:
  kafka:
    bootstrap-servers: 192.168.x.xxx:9092
    producer:
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
      ## 	这个是kafka生产端最重要的选项
      acks: 1
      ## 批量发送数据的配置
      batch-size: 16384
      ## 设置kafka 生产者内存缓存区的大小(32M)
      buffer-memory: 33554432
      ## kafka producer 发送消息失败时的一个重试的次数
      retries: 0
      ## kafka消息的序列化配置
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

7.2 模板服务

/**
 * @author eddie.lee
 * @projectName springboot-kafka
 * @package com.kafka.producer
 * @className KafkaProducerService
 * @description
 * @date created in 2020-10-09 16:50
 * @modified by
 */
@Slf4j
@Component
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessage(String topic, Object object) {

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("发送消息成功: " + result.toString());
            }

            @Override
            public void onFailure(Throwable throwable) {
                log.error("发送消息失败: " + throwable.getMessage());
            }
        });
    }
}

7.3 单元测试

/**
 * @author eddie.lee
 * @projectName springboot-kafka
 * @package producer.test
 * @className ApplicationTests
 * @description
 * @date created in 2020-10-14 11:10
 * @modified by
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

    /**
     * 包名必需和上面一直,不然会出现注入不成功
     * 错误例子: producer.test
     * 正确例子: com.kafka.producer.test
     */
    @Autowired
    private KafkaProducerService kafkaProducerService;

    @Test
    public void send() throws InterruptedException {

        String topic = "topic02";
        for(int i=0; i < 1000; i ++) {
            kafkaProducerService.sendMessage(topic, "hello kafka" + i);
            Thread.sleep(5);
        }

        Thread.sleep(Integer.MAX_VALUE);

    }

}

八、测试

8.1 发送方
图片.png

8.2 接收方
图片.png