kafka
kafka
- kafka是分布式发布-订阅消息系统
- 主要特点
- 同时为发布和订阅提供高吞吐量.kafka每秒可以产生约25万消息(50MB),每秒处理55万消息(110MB)
- 可进行持久化操作.将消息持久化到磁盘,因此可用于批量消费
- 分布式系统,易于向外扩展.所有的producer、broker和consumer都会有多个,均为分布式.无需停机即可扩展机器
- 消息被处理的状态是在consumer端维护,而不是server端维护.失败时自动平衡
- 支持online和offline的场景
kafka架构
kafka为显示架构:producer、broker(kafka)和consumer都可以有多个.producer和consumer实现kafka注册的接口,数据从producer产生,broker承担一个中间件缓存和分发的作用.broker分发注册到系统中consumer.broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存.基于简单、高性能,且与编程语言无关的TCP协议
- 基本概念
- topic:特指kafka处理的消息源(feeds of message)的不同分类
- partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列.
- message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息
- producer:消息和数据生产者,向kafka的一个topic发布消息的过程叫做producers
- consumers:消息和数据消费者,订阅topic并处理其发布的消息的过程叫做consumers
- broker:缓存代理,kafka集群中的一台或多台服务器统称为broker
- 发送消息流程
- producer根据指定的partition方法(round-robin,hash等)将消息发布到指定topic的partition里面
- kafka集群收到producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),不关注消息是否被消费
- consumer从集群pull数据,并控制获取消息的offset
- 吞吐量、负载均衡、消息拉取、扩展性
- 内存访问:直接使用linux文件系统的cache,来高效缓存数据,对数据进行读取和写入
- 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘中,充分利用磁盘的顺序读写性能
- zero-copy:减少IO操作步骤
- 对消息的处理
- 支持数据批量发送
安装
- zookeeper环境
- 解压命令
tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
- 修改配置文件
broker.id=0 port=9092 host.name=192.168.150.70 advertised.host.name=192.168.150.70 log.dirs=/usr/local/kafka_2.12/kafka-logs num.partitions=2 zookeeper.connect=192.168.150.70:2181,192.168.150.71:2181,192.168.150.72:2181
- 建立日志文件夹
mkdir /usr/local/kafka_2.12/kafka-logs
- 启动kafka
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &
常用命令
- 创建topic主题命令
kafka-topics.sh --zookeeper 192.168.150.70:2181 --create --topic topic1 --partitions 1 --replication-f ## --zookeeper为zookeeper服务列表 ## --create命令后--topic为创建topic并指定topic name ## --partitions为指定分区数量 ## --relication-factor为指定副本集数量
- 查看topic列表命令
kafka-topics.sh --zookeeper 192.168.150.70:2181 --list
- kafka命令发送数据
kafka-console-producer.sh --broker-list 192.168.150.51:9092 --topic topic1
- kafka命令接收数据
kafka-console-consumer.sh --bootstrap-server 192.168.150.51:9092 --topic topic1 --from-beginning
- 删除topic命令
kafka-topics.sh --zookeeper 192.168.150.70:2181 --delete --topic topic1