C# RabbitMQ使用
// 简单使用(一对一)
class RabbitMQSimple
{
// 1. RabbitMQ实例broker重启时,所有未申明durable的交换器和队列都会被删除,交换器未声明durable不会影响队列的持久化。
// 2. RabbitMQ中消息都被保存在队列中,所以如果队列被删除,消息不管有没有设置deliveryMode=2都会被删除。
static void rabbitMQDeclare() {
IModel channel = RabbitMQConnection.GetIConnection().CreateModel();
// durable: 队列在broker重启后是否还存在。
// exclusive: 1.仅声明的连接(连接下所有信道可访问)可访问;2.连接(不是信道)断开时队列会自动删除。
// autoDelete: 是否自动删除,所有消费者断开连接之后队列是否自动被删除。
// arguments: 配置队列中的消息什么时候会自动被删除。
channel.QueueDeclare("XXXXXX", true, false, false, null);
}
static void rabbitMQProducer()
{
IModel channel = RabbitMQConnection.GetIConnection().CreateModel();
for (int i = 0; i < 20; i++)
{
try
{
// 配置消息属性(是否持久化等)
IBasicProperties properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // 1非持久化,2持久化
channel.BasicPublish("", "XXXXX", properties, Encoding.UTF8.GetBytes(i.ToString()));
Console.WriteLine(string.Format("send:{0}", i.ToString()));
}
catch (Exception ex)
{
// NOTE 考虑rabbitmq服务重启等异常处理
Console.WriteLine(ex.Message);
Thread.Sleep(1000);
}
Thread.Sleep(100);
}
}
static void rabbitMQConsumer()
{
IModel channel = RabbitMQConnection.GetIConnection().CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
try
{
string msgBody = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(string.Format("recive:{0}", msgBody));
// 应答消息:支持应答该消费者先前接收未ack的所有消息(multiple=true),应答后消息被队列删除。
channel.BasicAck(ea.DeliveryTag, false);
// 拒绝消息:消费端告诉队列这个消息我拒绝、不处理,一次只能处理一个消息,可控制删除或重新放回队列。
// requeue=true消息重新入队列,该消费者还是会消费到该消息;requeue=false消息丢弃或者进入死信队列。
// channel.BasicReject(ea.DeliveryTag, true);
// 否定消息:与BasicReject区别是支持nack该消费者先前接收未ack的所有消息(multiple=true)。
// channel.BasicNack(ea.DeliveryTag, true, true);
// 恢复消息:重新投递还未被确认的消息,requeue=true则尽可能分配给其他消费者,false则消息会重新被投递给自己。
// channel.BasicRecover(true);
}
catch (Exception ex)
{
// NOTE 考虑rabbitmq服务重启等异常处理
Console.WriteLine(ex.Message);
Thread.Sleep(1000);
}
Thread.Sleep(200);
};
// 消费端限流配置,非自动确认消息(channel.BasicConsume->autoAck=false)前提下生效,
// 如果一定数目(channel.BasicQos->prefetchCount)的消息是未确认状态,则队列不分配新的消息给该消费端。
// prefetchSize: 单条消息大小限制(0代表不限制,RabbitMQ尚未实现)。
// prefetchCount: 如果有N个消息还没有ack,则该consumer将block掉,直到有消息ack(no_ask=false下生效)。
// global: true->设置应用于channel级别,false->设置应用于consumer级别(RabbitMQ的channel级别尚未实现)。
channel.BasicQos(0, 10, false);
// autoAck:自动应答,消息被消费者取出就会从队列中删除(消息处理异常会丢失数据)。
channel.BasicConsume("XXXXXX", false, consumer);
}
public static void Start()
{
// 声明队列
rabbitMQDeclare();
// 生产者
Thread productThread = new Thread(rabbitMQProducer);
// productThread.Start();
// 消费者
rabbitMQConsumer();
Console.ReadKey();
}
}