work-queues

Work Queues #

一个生产者,一个默认的交换机,一个队列,两个消费

生产者 #

需要再idea.ext.options 中加上 -Deditable.java.test.console=true

import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Scanner;
import java.util.concurrent.TimeUnit;

public class NewTask {
    private static final String TASK_QUEUE_NAME = "task_queue";
    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Test
    public void acceptProducer()throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        //从控制台当中接受信息
        logger.info("start");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();

            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            logger.info(" [x] Sent '" + message + "'");
        }

        TimeUnit.MINUTES.sleep(2);
        logger.info("end");
    }

}

消费者 #

import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import com.my.common.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

public class NewWorker {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private static final String TASK_QUEUE_NAME = "task_queue";

    @Test
    public void twoWork()throws Exception{
        oneWork() ;
    }


    @Test
    public void oneWork() throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        logger.info(DateUtil.now() +"等待接收消息....");

        //3:推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            logger.info("接收时间:"+DateUtil.now());
            //consumerTag 消费者标签,用来区分多个消费者
            logger.info("consumerTag:" + consumerTag);
            logger.info("properties:" + JSONUtil.toJsonStr(message.getProperties()));
            logger.info("envelope:" + JSONUtil.toJsonStr(message.getEnvelope()));
            logger.info("message:" + new String(message.getBody()));
            logger.info("");
        };
        //4:取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = consumerTag -> {
            //consumerTag 消费者标签,用来区分多个消费者
            logger.info("consumerTag:" + consumerTag);
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(TASK_QUEUE_NAME,true,deliverCallback,cancelCallback) ;
        TimeUnit.MINUTES.sleep(2);
    }

}

oneWork() twoWork() 分别执行就是两个消费者