一个生产者,一个交换机,两个队列,两个消费者
绑定 #
在前面的示例中,我们已经创建了绑定。你可能还记得 代码如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定可以采用额外的路由键参数。为了避免 与basic_publish参数混淆,我们将它称为绑定键。这就是我们如何使用键创建绑定:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
直接交换(direct) #
与 扇出不同的是 扇出是广播模式 无脑通知到所有队列 ,队列无需知道路由规则,只需知道广播交换机就行
- 规则算法: 直接交换很简单 - 消息转到 binding key(绑定键) 与消息的 routing key(路由键) 完全匹配的队列。
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定 #
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。
示例 #
生产者 #
package com.ng.my.routing;
import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 生产者
* @author : chengdu
* @date : 2023/8/20-08
**/
public class EmitLogDirect {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String EXCHANGE_NAME = "logDirect";
@Test
public void acceptProducer() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//创建交换机 并指定直接交换模式
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//从控制台当中接受信息
logger.info("start");
//创建需要绑定的路由key 注意这里不说bingKey 是 routingKey
Map<String, String> bindingKeyMap = new HashMap<>(4);
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug", "调试 debug 信息");
TimeUnit.SECONDS.sleep(10);
Iterator<Map.Entry<String, String>> iterator = bindingKeyMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String> stringEntry = iterator.next();
//路由key
String routingKey = stringEntry.getKey();
String message = stringEntry.getValue();
logger.info("生产者 路由匹配算法key:"+routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
}
TimeUnit.MINUTES.sleep(2);
logger.info("end");
}
}
消费者 #
package com.ng.my.routing;
import cn.hutool.json.JSONUtil;
import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* 消费者
* @author : chengdu
* @date : 2023/8/20-08
**/
public class ReceiveLogsDirect {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String EXCHANGE_NAME = "logDirect";
@Test
public void executeWork() throws Exception{
Channel channel = RabbitMqUtils.getChannel();
//创建2个临时队列
String queueOne = channel.queueDeclare().getQueue();
String queueTwo = channel.queueDeclare().getQueue();
//第一个队列 绑定一个路由key
channel.queueBind(queueOne, EXCHANGE_NAME, "error");
//第二个队列分别和三个路由路由key相绑定
channel.queueBind(queueTwo, EXCHANGE_NAME, "info");
channel.queueBind(queueTwo, EXCHANGE_NAME, "error");
channel.queueBind(queueTwo, EXCHANGE_NAME, "warning");
//丢掉debug消息
// channel.queueBind(queueTwo, EXCHANGE_NAME, "debug");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
logger.info("consumerTag:" + consumerTag);
logger.info("properties:" + JSONUtil.toJsonStr(delivery.getProperties()));
logger.info("envelope:" + JSONUtil.toJsonStr(delivery.getEnvelope()));
System.out.println(" 消息 '" + message + "'");
};
//队列名称,自动应答
channel.basicConsume(queueOne, true, deliverCallback, consumerTag -> { });
channel.basicConsume(queueTwo, true, deliverCallback, consumerTag -> { });
TimeUnit.MINUTES.sleep(2);
logger.info("end");
}
}
总结 #
在生产者那是路由key 这里确定的是交换机和路由key的关系
在消费者那路由key 必须和生产者一致才行 路由key可以多个队列绑定相同的路由key