简介

Apache Pulsar是一个分布式消息队列,它主要由以下三部分组成。

组件作用
Broker负责producer和consumer的请求还有消息的复制与分发,Broker无状态不存储数据
Zookeeper存储元数据、集群配置,负责任务协调还有服务发现等
Bookkeeper消息数据还有cursors数据的持久化存储,Bookkeeper的每一个存储节点叫做bookie

producer往Pulsar发送数据,consumer从Pulsar接受数据,consumer接收数据的过程叫做subscription(订阅)。Pulsar有四种订阅模式

模式名模式
独占(exclusive)一个subscription只能有一个consumer,如果多个consumer使用相同的subscription去订阅一个topic就会报错
故障转移(failover)一个subscription允许多个comsumer,但是只有一个consumer会工作。当当前consumer失去连接时,其它的consumer才会上线消费
共享(shared)多个consumer可以使用同一个subscription去订阅一个topic,消息通过轮询的方式发送给consumer
共享键(key_shared)和上面类似,只是消息会根据key的不同发送给不同的consumer

Broker不存储数据,数据存储在Bookkeeper上。topic会分为多个partition,partition则会分配到不同的broker上,producer和consumer会与broker上的partition连接从而实现数据的发送和接收。Pulsar支持多层级的topic,可以设置是否持久化以及租户、命名空间还有topic的名称

{persistent|non-persistent}://tenant/namespace/topic

安装

我们有三个节点172.19.67.171172.19.67.190172.19.67.202。首先我们在这三个节点上下载Pulsar的安装包

useradd pulsarsu - pulsarwget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gztar -zxvf apache-pulsar-2.10.0-bin.tar.gzcd apache-pulsar-2.10.0

1. 安装Zookeeper集群,初始化Pulsar集群元数据

修改每个节点的conf/zookeeper.conf配置文件,添加如下配置

server.1=172.19.67.171:2888:3888server.2=172.19.67.190:2888:3888server.3=172.19.67.202:2888:3888

之后针对每个节点的序号,把对应的序号设置到该节点的dataDir目录下的myid文件中

mkdir -p data/zookeeperecho 1 > data/zookeeper/myid

修改完配置文件之后,在每个节点上启动Zookeeper的服务

bin/pulsar-daemon start zookeeper

启动好了集群之后使用如下命令写入Pulsar的元数据

bin/pulsar initialize-cluster-metadata \    --cluster pulsar-cluster-1 \    --zookeeper 172.19.67.171:2181 \    --configuration-store 172.19.67.171:2181 \    --web-service-url http://172.19.67.171:8080,172.19.67.190:8080,172.19.67.202:8080 \    --broker-service-url pulsar://172.19.67.171:6650,172.19.67.190:6650,172.19.67.202:6650

具体含义如下

参数含义
clusterPulsar的集群名称
zookeeperZookeeper的地址
configuration-store配置存储地址,使用Zookeeper的地址
web-service-urlPulsar集群web服务的地址
broker-service-urlbroker服务的地址

2. 安装Bookkeeper集群

设置所有节点的conf/bookkeeper.conf配置文件,添加Zookeeper连接信息

zkServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181

之后在每个节点启动Bookkeeper服务

bin/pulsar-daemon start bookie

之后使用如下命令验证Bookkeeper的集群状态

bin/bookkeeper shell simpletest --ensemble 3 --writeQuorum 3 --ackQuorum 3 --numEntries 3

3. 安装Pulsar Brokers

修改所有节点的conf/broker.conf配置文件

# 配置pulsar broker连接的zookeeper集群地址zookeeperServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181configurationStoreServers=172.19.67.171:2181,172.19.67.190:2181,172.19.67.202:2181# broker数据端口brokerServicePort=6650# broker web服务端口webServicePort=8080# pulsar 集群名字,和前面zookeeper初始化集群元数据时配置的一样clusterName=pulsar-cluster-1# 创建一个ledger时使用的bookie数量managedLedgerDefaultEnsembleSize=2# 每个消息的副本数量managedLedgerDefaultWriteQuorum=2# 完成写操作前等待副本ack的数量managedLedgerDefaultAckQuorum=2

之后在每个节点启动Pulsar服务

bin/pulsar-daemon start broker

使用客户端发送和消费消息

修改conf/client.conf文件

webServiceUrl=http://172.19.67.171:8080,172.19.67.190:8080,172.19.67.202:8080brokerServiceUrl=pulsar://172.19.67.171:6650,172.19.67.190:6650,172.19.67.202:6650

之后使用客户端进行消费

bin/pulsar-client consume \    persistent://public/default/pulsar-test \    -n 100 \    -s "consumer-test" \    -t "Exclusive"

新开一个窗口,使用生产者发送消息

bin/pulsar-client produce \    persistent://public/default/pulsar-test \    -n 1 \    -m "hello, this is a test for Pulsar"

参考

Pulsar 介绍与部署