RabbitMq实战小结

简介

目前做的项目是一个爬虫项目,想要达到分布式爬虫的效果,故需要将待爬取的链接url放入一个消息队列中,团队选择了Rabbitmq,因为之前接触过mq消息的发送,但是针对其原理还是一无所知。这边我主要采取Spring + rabbitmq 结合的方式来讲解,当然原生的rabbitmq 的java包也可以参考地址:rabbitmq官方文档

RabbitMQ优势

  • RabbitMQ是最广泛部署的开源消息队列。
  • 在小型创业公司和大型企业,全球范围内拥有超过35,000个RabbitMQ生产部署
  • RabbitMQ是轻量级的,易于部署在内部和云端。 它支持多种消息协议。
  • RabbitMQ可以部署在分布式和联合配置中,以满足高规模,高可用性要求。
  • RabbitMQ在许多操作系统和云环境中运行,并为大多数流行语言提供了广泛的开发工具。

下载与安装

Downloading and Installing RabbitMQ

理解RabbitMQ的消息通信

上面图片的理解:

你必须首先连接到Rabbit,才能消费或者发布消息。你在应用程序和Rabbit代理服务器之间创建一条TCP连接。一旦TCP连接打开(你通过了认证),应用程序就可以创建一条AMQP信道。信道是建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去的。每条信道都会被指派一个唯一ID(AMQP库会帮你记住ID的)。不论是发布消息、订阅队列或是接收消息,这些动作都是通过信道完成的。

你也许会问为什么我们需要信道呢?为什么不直接通过TCP连接发送AMQP命令呢?主要原因在于对操作系统来说建立和销毁TCP会话是非常昂贵的开销。假设应用程序从队列消费消息,并根据服务需求合理调度线程。假设你只进行TCP连接,那么每个线程都需要自行连接到Rabbit。也就是说高峰期有每秒成百上千条连接。这不仅造成TCP连接的巨大浪费,而且操作系统每秒也就只能建立这点数量的连接。因此,你可能很快就碰到性能瓶颈了。如果我们为所有线程只使用一条TCP连接以满足性能方面的要求,但又能确保每个线程的私密性,就像拥有独立连接一样的话,那不就非常完美吗?这就是要引入信道概念的原因。线程启动后,会在现成的连接上创建一条信道,也就获得了连接到Rabbit上的私密通信路径,而不会给操作系统的TCP栈造成额外负担,如图所示。因此,你可以每秒成百上千次地创建信道而不会影响操作系统。在一条TCP连接上创建多少条信道是没有限制的。把它想象成一束光纤电缆就可以了。

RabbitMQ三种Exchange模式及RPC

三种模式是在《RabbitMQ实战》中看到并做总结:

一.Direct Exchange

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。

二.Fanout Exchange

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

三.Topic Exchange

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

还有一种形式Header 形式就直接略过了,Header和Direct实现完全一致,但性能会差很多。

实现RPC

首先要弄明白,RPC是个什么东西。

(RPC) Remote Procedure Call Protocol 远程过程调用协议

在一个大型的公司,系统由大大小小的服务构成,不同的团队维护不同的代码,部署在不同的机器。但是在做开发时候往往要用到其它团队的方法,因为已经有了实现。但是这些服务部署不同的机器上,想要调用就需要网络通信,这些代码繁琐且复杂,一不小心就会写的很低效。RPC协议定义了规划,其它的公司都给出了不同的实现。比如微软的wcf,以及现在火热的WebApi。

Spring 结合MQ

发送消息的Spring 配置模板(需要配置exchange和routerkey),解释:有了exchange和routerkey就会找到相应的Queue队列。

<bean id="cmdTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="connectionFactory"/>
    <property name="exchange" value="${filter.schedule.cmd.exchange}"/>
    <property name="routingKey" value="${filter.schedule.cmd.routingKey}"/>
    <property name="retryTemplate" ref="retryTemplate"/>
    <property name="messageConverter" ref="jsonMessageConverter"/>
</bean>

配置监听器

<bean id="connectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <property name="addresses" value="${filter.schedule.mq.addresses}"/>
        <property name="username" value="${filter.schedule.mq.username}"/>
        <property name="password" value="${filter.schedule.mq.password}"/>
        <property name="virtualHost" value="${filter.schedule.mq.virtualHost}"/>
        <property name="connectionTimeout" value="${filter.schedule.mq.connectionTimeout}"/>
        <property name="connectionLimit" value="${filter.schedule.mq.connectionLimit}"/>
        <property name="executor" ref="taskExecutor"/>
    </bean>
<bean name="cmdListener" class="com.unionpay.spiderframework.parserservice.AwareMqListener">
    </bean>
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual"  prefetch="5">
        <rabbit:listener queues="${filter.schedule.spider.listener.queue}" ref="cmdListener"/>
    </rabbit:listener-container>

消息确认代码可以参考 rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack

AwareMqListener class代码(可以实现消息确认ACK):

public class AwareMqListener implements ChannelAwareMessageListener {

    private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        channel.basicQos(5);
        fixedThreadPool.submit(new ParserTask(message, channel));
    }
}

不用消息确认,就直接实现MessageListener接口的onMessage接口。

参考

rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack

rabbitmq官网

《RabbitMQ实战:高效部署分布式消息队列》

欢迎大家关注:huazi's微信公众号