文章目录
  1. 1. RabbitMQ多源配置及ack相关使用
    1. 1.0.0.1. server端行为
      1. 1.0.0.1.1. server端行为
    2. 1.0.0.2. 消息丢失的风险
  • 1.1. 对比
  • RabbitMQ多源配置及ack相关使用

    pom.xml:

    1
    2
    3
    4
    5
    <!--RabbitMQ依赖-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    RabbitMQConfig.java:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150

    import com.alibaba.fastjson.JSON;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;

    @Slf4j
    @Configuration
    public class RabbitMQConfig {

    // mq主连接
    @Bean(name = "myConnectionFactory")
    @Primary
    public CachingConnectionFactory publicConnectionFactory(
    @Value("${v1.my.spring.rabbitmq.host}") String host,
    @Value("${v1.my.spring.rabbitmq.port}") int port,
    @Value("${v1.my.spring.rabbitmq.username}") String username,
    @Value("${v1.my.spring.rabbitmq.password}") String password,
    @Value("${v1.my.spring.rabbitmq.virtual-host}") String virtualHost,
    @Value("${v1.my.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
    @Value("${v1.my.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setPublisherConfirms(publisherConfirms);
    connectionFactory.setPublisherReturns(publisherReturns);
    return connectionFactory;
    }

    @Bean(name = "myRabbitTemplate")
    @Primary
    public RabbitTemplate publicRabbitTemplate(
    @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
    @Value("${v1.my.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
    RabbitTemplate myRabbitTemplate = new RabbitTemplate(connectionFactory);
    myRabbitTemplate.setMandatory(mandatory);
    myRabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
    if (!ack) {
    log.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", s, JSON.toJSONString(correlationData));
    } else {
    log.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", s, JSON.toJSONString(correlationData));
    }
    });
    myRabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
    // LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
    });
    return myRabbitTemplate;
    }

    @Bean(name = "myContainerFactory")
    @Primary
    public SimpleRabbitListenerContainerFactory insMessageListenerContainer(
    @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory,
    @Value("${v1.my.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
    @Value("${v1.my.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
    factory.setPrefetchCount(prefetch);
    return factory;
    }

    @Bean(name = "myRabbitAdmin")
    @Primary
    public RabbitAdmin publicRabbitAdmin(
    @Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;
    }


    //如果有多mq源,则把下面代码打开

    // @Bean(name = "v2ConnectionFactory")
    // public CachingConnectionFactory hospSyncConnectionFactory(
    // @Value("${v2.spring.rabbitmq.host}") String host,
    // @Value("${v2.spring.rabbitmq.port}") int port,
    // @Value("${v2.spring.rabbitmq.username}") String username,
    // @Value("${v2.spring.rabbitmq.password}") String password,
    // @Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,
    // @Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,
    // @Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {
    // CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    // connectionFactory.setHost(host);
    // connectionFactory.setPort(port);
    // connectionFactory.setUsername(username);
    // connectionFactory.setPassword(password);
    // connectionFactory.setVirtualHost(virtualHost);
    // connectionFactory.setPublisherConfirms(publisherConfirms);
    // connectionFactory.setPublisherReturns(publisherReturns);
    // return connectionFactory;
    // }
    //
    // @Bean(name = "v2RabbitTemplate")
    // public RabbitTemplate firstRabbitTemplate(
    // @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
    // @Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {
    // RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);
    // v2RabbitTemplate.setMandatory(mandatory);
    // v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {
    // if (!ack) {
    //// LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));
    // } else {
    //// LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));
    // }
    // });
    // v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
    //// LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});
    // });
    // return v2RabbitTemplate;
    // }
    //
    // @Bean(name = "v2ContainerFactory")
    // public SimpleRabbitListenerContainerFactory hospSyncFactory(
    // @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,
    // @Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,
    // @Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch
    // ) {
    // SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    // factory.setConnectionFactory(connectionFactory);
    // factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));
    // factory.setPrefetchCount(prefetch);
    // return factory;
    // }
    //
    // @Bean(name = "v2RabbitAdmin")
    // public RabbitAdmin iqianzhanRabbitAdmin(
    // @Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {
    // RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    // rabbitAdmin.setAutoStartup(true);
    // return rabbitAdmin;
    // }




    }

    application.yaml:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    v1:	
    my:
    spring:
    rabbitmq:
    host: xxx.com
    port: 5672
    username: xxx
    password: xxx
    virtual-host: xxx
    publisher-confirms: true
    publisher-returns: true
    listener:
    simple:
    acknowledge-mode: auto
    prefetch: 15
    template:
    mandatory: false

    acknowledge-mode说明

    1, 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL)

    • AcknowledgeMode.AUTO模式下,由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。

    自动确认ack,消费者代码参考:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @RabbitHandler
    public void consumer(String message){
    try {
    JSONObject messageJSON = JSON.parseObject(message);
    EventIdentity eventId = EventIdentity.valueOf(messageJSON.getString("eventId"));
    BaseEventMessageVO<?> eventMessageVO = eventId.parseJSONToVO(message);
    if (eventMessageVO != null)
    onMessage(eventMessageVO);
    else
    log.debug("解析message失败,无法取得合适的对象!!,message:{}",message);
    } catch (Exception e) {
    //捕获异常,保证ack消息正常确认,防止出现消息循环消费并堵塞队列
    log.error("queens xxx 消费失败!!,message:{}",message,e);
    }

    }
    • AcknowledgeMode.MANUAL模式需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。

    手动确认ack,消费者代码参考:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    public void receive(Message message, Channel channel) throws IOException {
    log.info("BaseIMReceiver.receive,mq头信息:{}", message.getMessageProperties());
    log.info("收到的mq消息:{}", message.getBody());
    IMMQMessage immqMessage = null;
    String msg = null;

    try {
    String body = new String(message.getBody(), "utf-8");
    immqMessage = JSON.parseObject(body, IMMQMessage.class);
    msg = immqMessage.getBody();
    log.info("接收转换的body消息:{}", msg);
    } catch (UnsupportedEncodingException e) {
    log.error("消息格式不正确,error msg:{}", message.getBody());
    }

    if (!StringUtils.hasText(msg)) {
    //ack返回true,告诉服务器收到这条消息 已经被我消费了 可以在队列删掉 这样以后就不会再发了
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    log.warn("转换mq的message出问题,将错误格式的消息从队列中清除!!,message:{}", message.getBody());
    return;
    }

    boolean ack = true;
    Exception exception = null;
    if (immqMessage.getRetrySize() < 3) {
    try {
    processing(msg);
    ack = true;
    } catch (Exception e) {
    ack = false;
    exception = e;
    }
    } else {
    log.warn("此消息重试3次仍然请求失败!!,message:{}", msg);
    try {
    saveRetryFailMessage(msg);
    } catch (Exception e) {
    log.error("保存重试3次仍然失败的消息出错!!,message:{}", msg, e);
    }
    }

    if (!ack) {
    log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    } else {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    }

    server端行为

    • rabbitmq server推送给每个channel的消息数量有限制,会保证每个channel没有收到ack的消息数量不会超过prefetchCount。
    • server端会暂存没有收到ack的消息,等消费端ack后才会丢掉;如果收到消费端的nack(消费失败的标识)或connection断开没收到反馈,会将消息放回到原队列头部。

    这种模式不会丢消息,但效率较低,因为server端需要等收到消费端的答复之后才会继续推送消息,当然,推送消息和等待答复是异步的,可适当增大prefetchCount提高效率。

    注意,有ack的模式下,需要考虑setDefaultRequeueRejected(false),否则当消费消息抛出异常没有catch住时,这条消息会被rabbitmq放回到queue头部,再被推送过来,然后再抛异常再放回…死循环了。设置false的作用是抛异常时不放回,而是直接丢弃,所以可能需要对这条消息做处理,以免丢失。更详细的配置参考这里

    2, 无ack模式(AcknowledgeMode.NONE)

    server端行为
    • rabbitmq server默认推送的所有消息都认为已经消费成功,会无脑的向消费端推送消息。
    • 因为rabbitmq server认为推送的消息已被成功消费,所以推送出去的消息不会暂存在server端。

    消息丢失的风险

    当BlockingQueue堆满时(BlockingQueue一定会先满),server端推送消息会失败,然后断开connection。消费端从Socket读取Frame将会抛出SocketException,触发异常处理,shutdown掉connection和所有的channel,channel shutdown后WorkPool中的channel信息(包括channel inProgress,channel ready以及Map)全部清空,所以BlockingQueue中的数据会全部丢失。

    此外,服务重启时也需对内存中未处理完的消息做必要的处理,以免丢失。

    而在rabbitmq server,connection断掉后就没有消费者去消费这个queue,因此在server端会看到消息堆积的现象。

    对比

    • ack模式:效率高,存在丢失大量消息的风险。
    • ack模式:效率低,不会丢消息。

    参考文章

    源代码

    文章目录
    1. 1. RabbitMQ多源配置及ack相关使用
      1. 1.0.0.1. server端行为
        1. 1.0.0.1.1. server端行为
      2. 1.0.0.2. 消息丢失的风险
  • 1.1. 对比