Rabbitmq实战golang实现

1. Rabbitmq 架构及原理

消息队列,又叫做消息中间件。是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信(维基百科)

MQ 的作用:

  1. 实现异步通信
  2. 系统解耦
  3. 流量削峰

MQ 带来的问题:

  1. 引入消息队列带来的延迟问题
  2. 增加了系统的复杂度
  3. 可能产生数据不一致的问题

消息丢失和消息重复消费的问题。一旦消息没有被正确地消费,就会带来数据不一致性的问题。

RabbitMQ 是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现。

关于AMQP 协议具体文档参考 https://www.amqp.org/sites/amqp.org/files/amqp.pdf

由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优点。rabbitmq简单架构如下:

架构示意图

  • Broker(代理/中介): RabbitMQ 用于收发消息的服务,默认是 5672 的端口。

  • Virtual Host(vhost):虚拟主机;标识一批交换机、消息队列和相关对象。

虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 /。

  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。

  • Banding:绑定,用于消息队列和交换机之间的关联。

一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。

直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,因为 TCP 连接是非常宝贵的资源,创建和释放也要消耗时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接

  • Connection:无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个连接,这个连接是一个 TCP 的长连接。

2. RabbitMQ的六种工作模式:

https://www.rabbitmq.com/getstarted.html 官方网站有详细示意图

  1. simple简单模式

  1. work工作模式

  1. Publish/Subscribe 发布订阅模式(fanout)

  1. Routing 路由模式 (direct)

  1. Topics 主题模式(路由模式的一种)(topic)

  1. RPC

  1. Publisher Confirms 发布确认

具体demo参考如下地址:

go操作RabbitMQ


3. 延迟队列实现(基于死信队列转发)

3.1 消息过期时间:

有两种设置方式:

  1. 通过队列属性设置消息过期时间,所有队列中的消息超过时间未被消费时,都会过期。
_, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false, // 队列解锁
		false,
		amqp.Table{
			"x-message-ttl": 4000, // 在队列中声明ttl 超时时间 单位为毫秒级,类型为int
		},
	)
  1. 设置单条消息的过期时间,在发送消息的时候指定消息属性(推荐使用消息超时)。
expiration := "4000" // 4S 4000MS
err = r.channel.Publish(
		"",
		queueName,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
			Expiration:  expiration, // push 时 在消息本体上设置expiration超时时间,单位为毫秒级别 类型为 string
		})

3.2 死信队列:

消息在某些情况下会变成死信(Dead Letter)

队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。

死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue),DLX 实际上也是普通的交换机,DLQ 也是普通的队列。

三种情况会让消息变成死信:

  • 消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue== false
  • 消息过期
  • 队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX。 > 关于这个队里的默认长度,官方没有给出,网上找了下有说是没有设置就动态增长不限。也就是根据你机器的配置情况了。

死信队列声明如下:

_, err := r.channel.QueueDeclare(
		queueName, // 这里就是将一个队列声明为如下死信交换机的死信队列
		true,
		false,
		false, 
		false,
		amqp.Table{
			"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
		},
	)

3.3 延迟队列demo:

延迟队列原理示意图

pusher:

func (r *RabbitMQ) PublishDelayQueue(queue, message, dlxExchange, routing, expiration string) error {
	defer r.CloseMq()
	queueName := queue + "_delay"
	_, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false, // 队列解锁
		false,
		amqp.Table{
			"x-dead-letter-exchange":    dlxExchange, // 声明当前队列绑定的 死信交换机
			"x-dead-letter-routing-key": routing,     // routing 模式路由名
		},
	)
	if err != nil {
		return err
	}

	// 注入消息 注册路由 routingKey
	err = r.channel.Publish(
		"",
		queueName,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
			Expiration:  expiration,
		})
	if err != nil {
		return err
	}

	fmt.Printf("push messag %s\n", message)
	return nil
}

Consumer:

func (r *RabbitMQ) ConsumeDelayQueue(queueName, dlxExchange, routing string, f func(interface{})) error {
	defer r.CloseMq()
	err := r.channel.ExchangeDeclare(
		dlxExchange,
		RoutingKind, // 交换机类型 路由模式接收
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 声明 死信队列(用于与死信交换机绑定)
	q, err := r.channel.QueueDeclare(
		queueName,
		true,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	// 绑定队列到 exchange 中
	err = r.channel.QueueBind(
		q.Name,
		routing,
		dlxExchange,
		false,
		nil)
	if err != nil {
		return err
	}

	// 消费消息
	data, err := r.channel.Consume(
		q.Name,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return err
	}

	forever := make(chan bool)
	go func() {
		for d := range data {
			fmt.Printf("Received a message: %s\n", d.Body)
			f(d.Body)
		}
	}()
	<-forever
	return nil
}

参考:

RabbitMQ工作模型与基本原理

rabbitmq消息队列原理

go操作RabbitMQ