Publish/Subscribe

Publish/Subscribe (发布/订阅) #

一个生产者,一个交换机,两个队列,两个消费者

  • 生产者是发送消息的用户应用程序。
  • 队列是存储消息的缓冲区。
  • 消费者是接收消息的用户应用程序。

创建 扇形交换机(fanout) #

RabbitMQ 中消息传递模型的核心思想是生产者 从不将任何消息直接发送到队列。实际上,很多时候 生产者甚至不知道消息是否会传递给任何所有队列

有几种可用的交换类型:direct, topic, headers , fanout 本次使用 fanout并且使用它的日志

channel.exchangeDeclare("logs", "fanout");

fanout 交换非常简单。正如您可能从 名称,它只是将收到的所有消息广播到所有队列让它知道

临时队列 #

String queueName = channel.queueDeclare().getQueue();

绑定 #

我们已经创建了一个扇出交换和一个队列。现在我们需要 告诉交换机将消息发送到我们的队列。这种关系 交换和队列之间称为绑定。

channel.queueBind(queueName, "logs", "");

从现在开始,日志交换会将消息附加到我们的队列中。

将一切整合在一起 #

发出日志消息的生产者程序看起来不多 与上一教程不同。最重要的变化是 我们现在希望将消息发布到我们的日志交换,而不是 无名者。我们需要在发送时提供一个路由密钥,但它 对于扇出交换,将忽略值。下面是 EmitLog.java 程序的代码:

生产者 #

package com.ng.my.publish_subscribe;

import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Scanner;
import java.util.concurrent.TimeUnit;

/**
 * @author : chengdu
 * @date :  2023/8/20-08
 **/
public class EmitLog {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private static final String EXCHANGE_NAME = "logs";

    @Test
    public void acceptProducer()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        //创建交换机 并指定广播模式
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //从控制台当中接受信息
        logger.info("start");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            logger.info(" [x] Sent '" + message + "'");
            //将消息推入到广播模式的交换机中
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }

        TimeUnit.MINUTES.sleep(2);
        logger.info("end");
    }

}

消费者 #


package com.ng.my.publish_subscribe;

import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author : chengdu
 * @date :  2023/8/20-08
 **/
public class ReceiveLogs {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private static final String EXCHANGE_NAME = "logs";
    @Test
    public void executeWork() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //创建一个具有随机名称的队列,或者, 甚至更好 - 让服务器为我们选择一个随机队列名称 (这也是为什么发布的时候压根没有指定  路由key)
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        logger.info("queueName:"+queueName);

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //队列名称,自动应答
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

        TimeUnit.MINUTES.sleep(2);
        logger.info("end");
    }

}

总结 #

生产者将消息推送到广播模式的交换机中 消费者自己创建随机队列 然后将随机队列与广播模式的交换机绑定 最后就是可以直接处理了