[TOC]
# 目录
## 一、镜像拉取
```json
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
docker pull sheepkiller/kafka-manager
```
## 二、编写 docker-compose.yml
```json
2.1 新建目录
mkdir -p /data/kafka/
2.2 docker
###################
version: '3.2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.x.xxx:9092
- KAFKA_LISTENERS=PLAINTEXT://:9092
volumes:
- /data/kafka/docker.sock:/var/run/docker.sock
restart: always
kafka-manager:
image: sheepkiller/kafka-manager
container_name: kafka-manager
ports:
- "9000:9000"
environment:
ZK_HOSTS: zookeeper:2181
restart: always
```
## 三、启动服务
```json
docker-compose up -d
```
## 四、容器测试
```json
4.1 进入kafka容器
docker exec -it kafka bash
4.2 进入bin目录
cd /opt/kafka_2.13-2.6.0/bin/
4.3 创建一个topic, –zookeeper后面的参数为,容器的name
kafka-topics.sh --create --topic test --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
4.4 查看刚刚创建的topic, –zookeeper后面的参数为,容器的name
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic test
4.5 发布消息
bash-4.4# kafka-console-producer.sh --topic=test --broker-list kafka:9092
>hello,eddie
>i am eddie
4.6 接受消息
bash-4.4# kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic test
hello,eddie
i am eddie
```
## 五、Kafka-manager管理界面
```json
http://192.168.x.xxx:9000/
```
> Clusters --> Add Cluster

## 六、Springboot Consumer 整合
6.1 application.yml
```json
############################################################
#
# 服务配置
#
############################################################
server:
servlet:
context-path: /consumer
port: 8002
############################################################
#
# spring 与 lafla 整合
#
############################################################
spring:
kafka:
bootstrap-servers: 192.168.x.xxx:9092
consumer:
enable-auto-commit: false # 消息的签收机制:手工签收
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
## 序列化配置
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual # 手工签收
concurrency: 5
```
6.2 消费端接收消息
```json
package com.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @author eddie.lee
* @projectName springboot-kafka
* @package com.kafka.consumer
* @className kafkaConsumerService
* @description
* @date created in 2020-10-14 10:58
* @modified by
*/
@Slf4j
@Component
public class KafkaConsumerService {
@KafkaListener(groupId = "group02", topics = "topic02")
public void onMessage(ConsumerRecord<String, Object> record,
Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
log.info("消费端接收消息:{}" , record.value());
// 手工签收机制
//acknowledgment.acknowledge();
}
}
```
6.3 查看组信息 (Consumer需要启动状态)
```json
bash-4.4# kafka-consumer-groups.sh --bootstrap-server kafka:9092 --describe --group group02
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
group02 topic02 0 0 0 0 consumer-2-6e2d9d53-32e2-4c74-b11e-a6f9e6ade8d2 /192.168.x.xx consumer-2
```
## 七、Springboot Producer整合
7.1 Producer配置
```json
############################################################
#
# 服务配置
#
############################################################
server:
port: 8001
servlet:
context-path: /producer
############################################################
#
# spring 与 lafla 整合
#
############################################################
spring:
kafka:
bootstrap-servers: 192.168.x.xxx:9092
producer:
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
## 这个是kafka生产端最重要的选项
acks: 1
## 批量发送数据的配置
batch-size: 16384
## 设置kafka 生产者内存缓存区的大小(32M)
buffer-memory: 33554432
## kafka producer 发送消息失败时的一个重试的次数
retries: 0
## kafka消息的序列化配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
```
7.2 模板服务
```json
/**
* @author eddie.lee
* @projectName springboot-kafka
* @package com.kafka.producer
* @className KafkaProducerService
* @description
* @date created in 2020-10-09 16:50
* @modified by
*/
@Slf4j
@Component
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendMessage(String topic, Object object) {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("发送消息成功: " + result.toString());
}
@Override
public void onFailure(Throwable throwable) {
log.error("发送消息失败: " + throwable.getMessage());
}
});
}
}
```
7.3 单元测试
```json
/**
* @author eddie.lee
* @projectName springboot-kafka
* @package producer.test
* @className ApplicationTests
* @description
* @date created in 2020-10-14 11:10
* @modified by
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
/**
* 包名必需和上面一直,不然会出现注入不成功
* 错误例子: producer.test
* 正确例子: com.kafka.producer.test
*/
@Autowired
private KafkaProducerService kafkaProducerService;
@Test
public void send() throws InterruptedException {
String topic = "topic02";
for(int i=0; i < 1000; i ++) {
kafkaProducerService.sendMessage(topic, "hello kafka" + i);
Thread.sleep(5);
}
Thread.sleep(Integer.MAX_VALUE);
}
}
```
## 八、测试
8.1 发送方

8.2 接收方

Docker部署Kafka与SpringBoot整合