简介
Apache Pulsar是一个分布式消息队列,它主要由以下三部分组成。
组件 | 作用 |
---|---|
Broker | 负责producer和consumer的请求还有消息的复制与分发,Broker无状态不存储数据 |
Zookeeper | 存储元数据、集群配置,负责任务协调还有服务发现等 |
Bookkeeper | 消息数据还有cursors数据的持久化存储,Bookkeeper的每一个存储节点叫做bookie |
producer往Pulsar发送数据,consumer从Pulsar接受数据,consumer接收数据的过程叫做subscription(订阅)。Pulsar有四种订阅模式
模式名 | 模式 |
---|---|
独占(exclusive) | 一个subscription只能有一个consumer,如果多个consumer使用相同的subscription去订阅一个topic就会报错 |
故障转移(failover) | 一个subscription允许多个comsumer,但是只有一个consumer会工作。当当前consumer失去连接时,其它的consumer才会上线消费 |
共享(shared) | 多个consumer可以使用同一个subscription去订阅一个topic,消息通过轮询的方式发送给consumer |
共享键(key_shared) | 和上面类似,只是消息会根据key的不同发送给不同的consumer |
Broker不存储数据,数据存储在Bookkeeper上。topic会分为多个partition,partition则会分配到不同的broker上,producer和consumer会与broker上的partition连接从而实现数据的发送和接收。Pulsar支持多层级的topic,可以设置是否持久化以及租户、命名空间还有topic的名称
{persistent|non-persistent}://tenant/namespace/topic
安装
我们有三个节点172.19.67.171
,172.19.67.190
,172.19.67.202
。首先我们在这三个节点上下载Pulsar的安装包
useradd pulsarsu - pulsarwget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gztar -zxvf apache-pulsar-2.10.0-bin.tar.gzcd apache-pulsar-2.10.0
1. 安装Zookeeper集群,初始化Pulsar集群元数据
修改每个节点的conf/zookeeper.conf
配置文件,添加如下配置
server.1=172.19.67.171:2888:3888server.2=172.19.67.190:2888:3888server.3=172.19.67.202:2888:3888
之后针对每个节点的序号,把对应的序号设置到该节点的dataDir
目录下的myid
文件中
mkdir -p data/zookeeperecho 1 > data/zookeeper/myid
修改完配置文件之后,在每个节点上启动Zookeeper的服务
bin/pulsar-daemon start zookeeper
启动好了集群之后使用如下命令写入Pulsar的元数据
bin/pulsar initialize-cluster-metadata \ --cluster pulsar-cluster-1 \ --zookeeper 172.19.67.171:2181 \ --configuration-store 172.19.67.171:2181 \ --web-service-url http://172.19.67.171:8080,172.19.67.190:8080,172.19.67.202:8080 \ --broker-service-url pulsar://172.19.67.171:6650,172.19.67.190:6650,172.19.67.202:6650
具体含义如下
参数 | 含义 |
---|---|
cluster | Pulsar的集群名称 |
zookeeper | Zookeeper的地址 |
configuration-store | 配置存储地址,使用Zookeeper的地址 |
web-service-url | Pulsar集群web服务的地址 |
broker-service-url | broker服务的地址 |
2. 安装Bookkeeper集群
设置所有节点的conf/bookkeeper.conf
配置文件,添加Zookeeper连接信息
zkServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181
之后在每个节点启动Bookkeeper服务
bin/pulsar-daemon start bookie
之后使用如下命令验证Bookkeeper的集群状态
bin/bookkeeper shell simpletest --ensemble 3 --writeQuorum 3 --ackQuorum 3 --numEntries 3
3. 安装Pulsar Brokers
修改所有节点的conf/broker.conf
配置文件
# 配置pulsar broker连接的zookeeper集群地址zookeeperServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181configurationStoreServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181# broker数据端口brokerServicePort=6650# broker web服务端口webServicePort=8080# pulsar 集群名字,和前面zookeeper初始化集群元数据时配置的一样clusterName=pulsar-cluster-1# 创建一个ledger时使用的bookie数量managedLedgerDefaultEnsembleSize=2# 每个消息的副本数量managedLedgerDefaultWriteQuorum=2# 完成写操作前等待副本ack的数量managedLedgerDefaultAckQuorum=2
之后在每个节点启动Pulsar服务
bin/pulsar-daemon start broker
使用客户端发送和消费消息
修改conf/client.conf
文件
webServiceUrl=http://172.19.67.171:8080,172.19.67.190:8080,172.19.67.202:8080brokerServiceUrl=pulsar://172.19.67.171:6650,172.19.67.190:6650,172.19.67.202:6650
之后使用客户端进行消费
bin/pulsar-client consume \ persistent://public/default/pulsar-test \ -n 100 \ -s "consumer-test" \ -t "Exclusive"
新开一个窗口,使用生产者发送消息
bin/pulsar-client produce \ persistent://public/default/pulsar-test \ -n 1 \ -m "hello, this is a test for Pulsar"