RabbitMQ实战
前言
RabbitMQ是基于AMQP高级消息队列协议的消息中间件,基于Erlang语言编写。继承了Erlang的高性能,健壮及可伸缩的特性。当前业界常用的消息中间件主要包括ActiveMQ、RabbitMQ、RocketMQ以及kafka,其中RabbitMQ主要突出点在于其首次部署难度低、与接入开发语言无关。从一般企业使用情况来看,RabbitMQ较为适合中小型企业对非强关联业务的应用。
RabbitMQ组成
在实际使用之前先有理解RabbitMQ在物理层面是由什么组成的。RabbitMQ实际应用上是一个第三方服务,类似Zookeeper等方式启动。通过代码对需要发送的消息投放到RabbitMQ服务中,而服务内部会自行处理(是否持久化,按哪种策略派发等)后再发放到对应的消费者中去。
因此RabbitMQ实际上划分为几个部分:
消息生产者:产生并投放消息
MQ消息队列:固化并按策略派发消息
消息消费者:获取并使用消息
MQ消息队列:
RabbitMQ消息队列中存在几个概念:交换器、队列、绑定、路由键
队列通过路由键Routing key的某种规则绑定到交换器中,生产者把消息投放到交换器时再根据绑定的路由键将特定的消息路由到指定的队列上,订阅了指定队列的消费者即可接受到对应的消息。
一、简单消息队列
只创建队列,生产者和消费者支持多对多关系,每次消息的消费都是循环消费到对应的消费者上:
/* 定义队列 */ @Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("queue"); } }
消费者:
/* 注册消费者订阅指定队列 */ @Component public class Consumer { @RabbitListener(queues = "queue") public void receiver(User user) { System.out.println("Receive:" + user.toString()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
生产者:
消息生成并投放到消息队列中,这里涉及两个内容。产生消息,这个消息可以是不同的字符串,也可以是一个复杂的业务对象。投放消息,需要设置投放的交换器、队列以及是否需要持久化等。
基于springboot的源码实现:
@Component public class Provider { @Autowired private AmqpTemplate template; public void send(int i) { template.convertAndSend("queue", new User(i, "zqliang", "123456")); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } }
这里通过Jackson2JsonMessageCoverter来序列化需要传输的消息,最后消息是通过json的方式在队列里传输,因此在接收消息时也需要通过相同的方式反序列化。convertAndSend方法包含了几个参数,这里只是通过队列来投放消息。
二、基于路由键匹配的消息队列
消息路由键支持topic规则匹配,通过#可以设置通配路由键。当消息投放的路由键满足#通配规则时,相关队列订阅的消费者都将受到该消息:
路由配置:
@Configuration public class RabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(RabbitConfig.message, true); } @Bean public Queue queueMessages() { return new Queue(RabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
消费者:
@Component public class Consumer { @RabbitListener(queues = RabbitConfig.message) public void receiverTopic(User user) { System.out.println("receiverTopicMessage:" + user.toString()); } @RabbitListener(queues = RabbitConfig.messages) public void receiverTopic1(User user) { System.out.println("receiverTopicMessages:" + user.toString()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
生产者:
@Component public class Provider { @Autowired private AmqpTemplate template; public void send() { template.convertAndSend("exchange", "topic.message", new User(1, "zqliang", "123456")); template.convertAndSend("exchange", "topic.messages", new User(2, "zqliang", "123456")); } }
上述案例中,生产者分别投放了路由键为topic.message和topic.messages的消息,由于通过exchange交换器绑定的路由键中topic.message对两个队列都满足,所以第一条消息两个消费者都可以消费
三、基于广播方式投放消息
通过fanoutExchange方式投放的消息,会以广播的方式投放到所有以FanoutExchange绑定方式的队列中:
路由配置:
@Configuration public class RabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } }
生产者:
@Component public class Provider { @Autowired private AmqpTemplate template; public void send(int i) { template.convertAndSend("fanoutExchange", "", new User(i, "zqliang", "123456")); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } }
通过该方式可以对所有消费者以广播的方式投放消息。
持久化消息
有时候由于消息队列异常,导致消息队列出现重启等操作时,需要对未消费部分的消息进行持久化,以便下次启动队列后订阅的消费能够继续消费未消费的队列。
@Bean public Queue queueMessage() { return new Queue(RabbitConfig.message, true); }
持久化操作只要在创建队列是传入durable参数即可,true表示需要持久化。
队列中的消息都以append的方式写到一个文件中,当这个文件的大小超过指定的限制大小后,关闭这个文件再创建一个新的文件供消息的写入。文件名(*.rdq)从0开始然后依次累加。当某个消息被删除时,并不立即从文件中删除相关信息,而是做一些记录,当垃圾数据达到一定比例时,启动垃圾回收处理,将逻辑相邻的文件中的数据合并到一个文件中。
消息写入磁盘的条件:
1、消息本身在publish的时候就要求消息写入磁盘;
2、内存紧张,需要将部分内存中的消息转移到磁盘;
写入过程:
1、写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘);
2、有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘;
3、每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。
如何确保消息不丢失?
上面持久化操作就是为了确保消息不丢失的一种机制,但是,有时候对于多消费者的情况下会出现部分消费者还没消费的情况下机器异常。持久化后的消息在服务恢复时需要重新被消费,这个时候单靠持久化还不行。Message acknowledgment机制就是消费端消费完成要通知服务端,服务端才把消息从内存删除。同时为了确保RabbitMQ的高可用,还可以对单节点的服务进行集群化部署。当时这里我觉得就没必要了,因为使用RabbitMQ的初衷就是其部署简单,如果要是需要超强可用性的应用场景可以考虑更换其他MQ。
可视化管理
启动可视化管理服务后可以在浏览器上查看相关消息的传输情况,调整队列,交换器,以及登录人员的授权等。
windows下可视化启动:
rabbitmq-plugins.bat enable rabbitmq_management
启动后访问地址:
guest / guest
总结
不管最后使用哪种MQ服务,只要能够理解并正式应用起来都是触类旁通的吧。原始的扫表操异步作现在可以通过MQ来完成了,说明技术的革新确实可以解放生产力啊!!
0 Responses to "RabbitMQ实战"