1. Rabbitmq 架构及原理
消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信(维基百科)
MQ 的作用:
- 实现异步通信
- 系统解耦
- 流量削峰
MQ 带来的问题:
- 引入消息队列带来的延迟问题
- 增加了系统的复杂度
- 可能产生数据不一致的问题
消息丢失和消息重复消费的问题。一旦消息没有被正确地消费,就会带来数据不一致性的问题。
RabbitMQ 是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现。
关于AMQP 协议具体文档参考 https://www.amqp.org/sites/amqp.org/files/amqp.pdf
由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。rabbitmq简单架构如下:
Broker(代理/中介): RabbitMQ 用于收发消息的服务,默认是 5672 的端口。
Virtual Host(vhost):虚拟主机;标识一批交换机、消息队列和相关对象。
虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。
Banding:绑定,用于消息队列和交换机之间的关联。
一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。
直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,因为 TCP 连接是非常宝贵的资源,创建和释放也要消耗时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接
- Connection:无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个连接,这个连接是一个 TCP 的长连接。
2. RabbitMQ的六种工作模式:
https://www.rabbitmq.com/getstarted.html 官方网站有详细示意图
- simple简单模式
- work工作模式
- Publish/Subscribe 发布订阅模式(fanout)
- Routing 路由模式 (direct)
- Topics 主题模式(路由模式的一种)(topic)
- RPC
- Publisher Confirms 发布确认
具体demo参考如下地址:
3. 延迟队列实现(基于死信队列转发)
3.1 消息过期时间:
有两种设置方式:
- 通过队列属性设置消息过期时间,所有队列中的消息超过时间未被消费时,都会过期。
_, err := r.channel.QueueDeclare(
queueName,
true,
false,
false, // 队列解锁
false,
amqp.Table{
"x-message-ttl": 4000, // 在队列中声明ttl 超时时间 单位为毫秒级,类型为int
},
)
- 设置单条消息的过期时间,在发送消息的时候指定消息属性(推荐使用消息超时)。
expiration := "4000" // 4S 4000MS
err = r.channel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
Expiration: expiration, // push 时 在消息本体上设置expiration超时时间,单位为毫秒级别 类型为 string
})
3.2 死信队列:
消息在某些情况下会变成死信(Dead Letter)
队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。
死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue),DLX 实际上也是普通的交换机,DLQ 也是普通的队列。
三种情况会让消息变成死信:
- 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false
- 消息过期
- 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。 > 关于这个队里的默认长度,官方没有给出,网上找了下有说是没有设置就动态增长不限。也就是根据你机器的配置情况了。
死信队列声明如下:
_, err := r.channel.QueueDeclare(
queueName, // 这里就是将一个队列声明为如下死信交换机的死信队列
true,
false,
false,
false,
amqp.Table{
"x-dead-letter-exchange": dlxExchange, // 声明当前队列绑定的 死信交换机
},
)
3.3 延迟队列demo:
pusher:
func (r *RabbitMQ) PublishDelayQueue(queue, message, dlxExchange, routing, expiration string) error {
defer r.CloseMq()
queueName := queue + "_delay"
_, err := r.channel.QueueDeclare(
queueName,
true,
false,
false, // 队列解锁
false,
amqp.Table{
"x-dead-letter-exchange": dlxExchange, // 声明当前队列绑定的 死信交换机
"x-dead-letter-routing-key": routing, // routing 模式路由名
},
)
if err != nil {
return err
}
// 注入消息 注册路由 routingKey
err = r.channel.Publish(
"",
queueName,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
Expiration: expiration,
})
if err != nil {
return err
}
fmt.Printf("push messag %s\n", message)
return nil
}
Consumer:
func (r *RabbitMQ) ConsumeDelayQueue(queueName, dlxExchange, routing string, f func(interface{})) error {
defer r.CloseMq()
err := r.channel.ExchangeDeclare(
dlxExchange,
RoutingKind, // 交换机类型 路由模式接收
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// 声明 死信队列(用于与死信交换机绑定)
q, err := r.channel.QueueDeclare(
queueName,
true,
false,
false,
false,
nil,
)
if err != nil {
return err
}
// 绑定队列到 exchange 中
err = r.channel.QueueBind(
q.Name,
routing,
dlxExchange,
false,
nil)
if err != nil {
return err
}
// 消费消息
data, err := r.channel.Consume(
q.Name,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for d := range data {
fmt.Printf("Received a message: %s\n", d.Body)
f(d.Body)
}
}()
<-forever
return nil
}