当前位置:首页 » 《关注互联网》 » 正文

消息中间件介绍&RabitMQ环境搭建(Linux)_熟透的蜗牛的博客

9 人参与  2022年01月26日 08:53  分类 : 《关注互联网》  评论

点击全文阅读


目录

一、什么是消息中间件

消息中间件的组成

二、应用场景

解耦

异步

削峰填谷

三、消息中间件选型

四、RabitMQ环境搭建

单机模式

集群模式

 配置镜像集群模式

五、RabitMQ消息类型

(1)订阅模型-Fanout

(2)订阅模型-Direct:

(3)订阅模型-Topic

六、代码

连接工具

点对点和工作队列模式

Fanout模式

 Direct模式

 Topic模式


一、什么是消息中间件

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

RabitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB(企业服务总线)整合。

消息中间件的组成

  • Broker:消息服务器,作为server提供消息核心服务。
  • Producer:消息生产者,业务的发起方,负责生产消息传输给broker。
  • Consumer:消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理。
  • Topic:主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的广播。
  • Queue:队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收。
  • Message:消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输。

二、应用场景

解耦

 如上图,未使用MQ的情况下,如果B、C、D三个系统,有需求改变,那么A系统都会响应的更改,使用MQ之后,我们只需要把数据发送的MQ中,B、C、D自己根据需求去mq订阅响应的内容即可,从而达到系统耦合的结果。

异步

 如果某些需求场景不需要立即返回数据结果,那么就可以采用MQ的形式,对消息异步的处理,这样可以提高系统的响应速度。

削峰填谷

如果请求超过了服务器承受的最大值,那么就可能会击垮服务器,这时候,把请求通过mq队列缓存起来,来限制请求的峰值,从而达到保护服务器的目的。

三、消息中间件选型

RabbitMQ   erlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。
RocketMQjava开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。
Kafka Scala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景,更偏向日志收集。
ActiveMQ    java开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

四、RabitMQ环境搭建

单机模式

#拉取镜像
docker pull rabbitmq:3.9-management
#创建容器并启动
[root@bogon ~]# docker run  -d -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

 登录 http://192.168.139.154:15672/#/ 默认账号密码是guest,guest

注意:如果安装过程出现这个错误,重启一下docker就可以解决了systemctl restart docker。

iptables failed: iptables --wait -t nat -A DOCKER -p tcp -d 0/0 --dport 15672 -j DNAT --to-destination 172.17.0.2:15672 ! -i docker0: iptables: No chain/target/match by that name

集群模式

安装容器

[root@bogon ~]# docker run -d --hostname rabbit1 --name rabbitmq1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:3.9-management
[root@bogon ~]# docker run -d --hostname rabbit2 --name rabbitmq2 -p 5673:5672 --link rabbitmq1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:3.9-management
[root@bogon ~]# docker run -d --hostname rabbit3 --name rabbitmq3 -p 5674:5672 --link rabbitmq1:rabbit1 --link rabbitmq2:rabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitmq_cookie' rabbitmq:3.9-management

添加节点

#节点1
[root@bogon ~]# docker exec -it rabbitmq1 bash
root@rabbit1:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@rabbit1 ...
root@rabbit1:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@rabbit1 ...
root@rabbit1:/# rabbitmqctl start_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Starting node rabbit@rabbit1 ...
root@rabbit1:/# 
#节点2
[root@bogon ~]# docker exec -it rabbitmq2 bash
root@rabbit2:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@rabbit2 ...
root@rabbit2:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@rabbit2 ...
root@rabbit2:/# rabbitmqctl join_cluster --ram rabbit@rabbit1  
root@rabbit2:/# rabbitmqctl start_app
#节点3
[root@bogon ~]# docker exec -it rabbitmq3 bash
root@rabbit3:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@rabbit3 ...
root@rabbit3:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@rabbit3 ...
root@rabbit3:/# rabbitmqctl join_cluster --ram rabbit@rabbit1
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Clustering node rabbit@rabbit3 with rabbit@rabbit1

13:45:31.790 [warn]  Feature flags: the previous instance of this node must have failed to write the `feature_flags` file at `/var/lib/rabbitmq/mnesia/rabbit@rabbit3-feature_flags`:

13:45:31.790 [warn]  Feature flags:   - list of previously disabled feature flags now marked as such: [:maintenance_mode_status]

13:45:32.000 [error] Failed to create a tracked connection table for node :rabbit@rabbit3: {:node_not_running, :rabbit@rabbit3}

13:45:32.001 [error] Failed to create a per-vhost tracked connection table for node :rabbit@rabbit3: {:node_not_running, :rabbit@rabbit3}

13:45:32.001 [error] Failed to create a per-user tracked connection table for node :rabbit@rabbit3: {:node_not_running, :rabbit@rabbit3}
root@rabbit3:/# rabbitmqctl start_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Starting node rabbit@rabbit3 ...

 此时安装完成的为普通集群模式

  • Exchange 的元数据信息在所有节点上是一致的,而 Queue(存放消息的队列)的完整数据则只会存在于创建它的那个节点上。其他节点只知道这个 queue 的 metadata 信息和一个指向 queue 的 owner node 的指针;
  • RabbitMQ 集群会始终同步四种类型的内部元数据(类似索引):
  1. 队列元数据:队列名称和它的属性;
  2. 交换器元数据:交换器名称、类型和属性;
  3. 绑定元数据:一张简单的表格展示了如何将消息路由到队列;
  4. vhost元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性;因此,当用户访问其中任何一个 RabbitMQ 节点时,通过 rabbitmqctl 查询到的元数据信息都是相同的。
  • 无法实现高可用性,当创建 queue 的节点故障后,其他节点是无法取到消息实体的。如果做了消息持久化,那么得等创建 queue 的节点恢复后,才可以被消费。如果没有持久化的话,就会产生消息丢失的现象。

 配置镜像集群模式

概念:
把队列做成镜像队列,让各队列存在于多个节点中,属于 RabbitMQ 的高可用性方案。镜像模式和普通模式的不同在于,queue和 message 会在集群各节点之间同步,而不是在 consumer 获取数据时临时拉取。

特点:
(1)实现了高可用性。部分节点挂掉后,不会影响 rabbitmq 的使用。
(2)降低了系统性能。镜像队列数量过多,大量的消息同步也会加大网络带宽开销。
(3)适合对可用性要求较高的业务场景。

 

name:随便取,策略名称
Pattern:^ 匹配符,只有一个^代表匹配所有
Definition:ha-mode=all 为匹配类型,分为3种模式:all(表示所有的queue)

或者使用命令:
#rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

添加一个队列

 可见队列已经同步到其他节点上。

五、RabitMQ消息类型

第一种:点对点 生产者将消息发送到队列,然后消费者从队列中取消息依次消费,消费之后,消息出队列,本次消费结束

第二种:工作队列。又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。消息在多个消费者共享,但是一个消息只能被一个消费者消费。
总之:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消
费,就会消失,因此任务是不会被重复执行的 。

第三种:发布订阅、Routing(路由键)、Topics(主题)

1、1个生产者,多个消费者

2、每一个消费者都有自己的一个队列

3、生产者没有将消息直接发送到队列,而是发送到了交换机

4、每个队列都要绑定到交换机

5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型direct(直连交换机,把消息交给符合指定routing key 的队列)、topic(通配符,把消息交给符合routing pattern(路由模式) 的队列)、headers 和fanout(扇形交换机或者广播,将消息交给所有绑定到交换机的队列)。Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。

(1)订阅模型-Fanout

Fanout,也称为广播。
在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个消费者
  • 2) 每个消费者有自己的queue(队列)
  • 3) 每个队列都要绑定到Exchange(交换机)
  • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
  • 5) 交换机把消息发送给绑定过的所有队列
  • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

(2)订阅模型-Direct:

 

在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

(3)订阅模型-Topic


Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符

通配符规则:#:匹配一个或多个词

*:匹配不多不少恰好 1 个词

六、代码

连接工具

package com.xiaojie.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 创建mq连接
 * @date 2021/9/24 22:54
 */
public class MyConnection {

    public static Connection getConnection() throws  IOException, TimeoutException {
        // 1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置连接地址
        connectionFactory.setHost("192.168.139.154");
        // 3.设置端口号
        connectionFactory.setPort(5672);
        // 4.设置账号和密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        // 5.设置VirtualHost
        connectionFactory.setVirtualHost("/xiaojie");
        return connectionFactory.newConnection();
    }
}

点对点和工作队列模式

package com.xiaojie.rabbitmq.p2p;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 点对点生产者
 * 生产者生产消息时如果没有对应的队列,则直接遗弃消息,并不会报错。
 * @date 2021/9/24 22:53
 */
public class PProvider {

    //定义队列
    private static final String QUEUE_NAME = "myqueue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("生产者启动成功..");
        // 1.创建连接
        Connection connection = MyConnection.getConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        //创建队列,如果队列存在则使用这个队列,不存在则创建
        //第一个参数,对列名称  myqueue
        //第二个参数,是否持久话,false表示不持久化数据,MQ停掉后数据就会丢失
        //第三个参数,是否队列私有化,false表示所有的消费者都可以访问,true表示只有第一次拥有它的消费者才可以一直使用,其他消费者不能访问
        //第四个参数,是否自动删除,false连接停掉后不自动删除掉这个队列
        //第五个参数,其他额外的参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {
            String msg = "测试点对点消息" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息成功:" + msg);
        }
        channel.close();
        connection.close();
    }
}
package com.xiaojie.rabbitmq.p2p;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: TODO
 * @date 2021/9/24 23:15
 */
public class PConsumer {
    private static final String QUEUE_NAME = "myqueue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = MyConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费消息msg:" + msg);
            }
        };
        // 3.创建我们的监听的消息
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

 

package com.xiaojie.rabbitmq.p2p;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 模拟工作队列,即多个消费者消费消息,
 * 结果是消息均等消费,就是在工作队列模式下,默认情况下消息是均摊到每个消费者的。
 * @date 2021/9/24 23:15
 */
public class PConsumer2 {
    private static final String QUEUE_NAME = "myqueue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = MyConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费消息msg:" + msg);
            }
        };
        // 3.创建我们的监听的消息
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

Fanout模式

package com.xiaojie.rabbitmq.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: fanout模式生产者
 * @date 2021/9/24 23:45
 */
public class Provider {
    public static final  String EXCHANGE="my_fanout_exchange";


    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型(扇形交换机)
        channel.exchangeDeclare(EXCHANGE, "fanout");
        //创建消息
        String msg="fanout交换机消息。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送成功。。。。。。");
        //关闭通道,关闭连接
        channel.close();
        connection.close();
    }
}
package com.xiaojie.rabbitmq.fanout;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 短信消费者
 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者
 * @date 2021/9/24 23:47
 */
public class SmsConsumer {
    //交换机
    private static final  String EXCHANGE="my_fanout_exchange";
    //队列
    private  static final String SMS_QUEUE="sms_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MyConnection.getConnection();
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(SMS_QUEUE, false, false, false, null);
        //绑定队列到交换机
        channel.queueBind(SMS_QUEUE, EXCHANGE, "");
        System.out.println("短信消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的短信消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(SMS_QUEUE,true, consumer);
    }
}
package com.xiaojie.rabbitmq.fanout;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 微信消费者
 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者
 * @date 2021/9/24 23:47
 */
public class WxConsumer {
    //交换机
    private static final  String EXCHANGE="my_fanout_exchange";
    //队列
    private  static final String WX_QUEUE="wx_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MyConnection.getConnection();
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(WX_QUEUE, false, false, false, null);
        //绑定队列到交换机
        channel.queueBind(WX_QUEUE, EXCHANGE, "");
        System.out.println("微信消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的短信消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(WX_QUEUE,true, consumer);
    }
}

 Direct模式

package com.xiaojie.rabbitmq.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: direct生产者
 * @date 2021/9/24 23:39
 */
public class Provider {
    public static final  String EXCHANGE="my_direct_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型
        channel.exchangeDeclare(EXCHANGE, "direct");
        //路由键
        String routingKey="email";
        //创建消息
        String msg="direct---交换机的消息。。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes());
        System.out.println("生产者启动成功。。。。。");
        //关闭连接,管道
        channel.close();
        connection.close();
    }
}
package com.xiaojie.rabbitmq.direct;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 路由键交换机
 * @date 2021/9/25 0:01
 */
public class Consumer {
    public static String EMAIL_QUEUE_FANOUT="email_queue";
    public static final  String EXCHANGE="my_direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建mq连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null);
        //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey
        channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email");
        System.out.println("邮件消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer);
    }
}

 Topic模式

package com.xiaojie.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: topic生产者 通配符模式
 * @date 2021/9/24 23:39
 */
public class Provider {
    public static final  String EXCHANGE="my_topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型
        channel.exchangeDeclare(EXCHANGE, "topic");
        //路由键
        String routingKey="email.send";
        //创建消息
        String msg="topic---交换机的消息。。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes());
        System.out.println("生产者启动成功。。。。。");
        //关闭连接,管道
        channel.close();
        connection.close();
    }
}

 

package com.xiaojie.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 路由键交换机
 * 通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词
 * @date 2021/9/25 0:01
 */
public class Consumer {
    public static String EMAIL_QUEUE_FANOUT="email_queue";
    public static final  String EXCHANGE="my_topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建mq连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null);
        //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey
        //通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词
        channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email.#");
        System.out.println("邮件消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer);
    }
}

完整代码:springboot-redis: Springboot整合redis哨兵机制哨兵配置请参考:https://blog.csdn.net/weixin_39555954/article/details/120147401

参考:docker部署RabbitMQ集群 - Alan6 - 博客园

RabbitMQ之五种消息模型 - 早上起床喝酸_奶 - 博客园


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/33919.html

队列  消息  交换机  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1