RocketMQ 基础

1. RocketMQ

1.1 设计理念

消息中间件的难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次
RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息至少被消费一次,但不承诺消息不会被消费者多次消费,其消费的幂等由消费者实现

1.2 启动顺序

  • 首先启动 NameServer,再启动 Broker 向 NameServer 注册(NameServer 对 Broker 进行心跳检测)
  • NameServer是RocketMQ的轻量级注册中心,负责存储和管理Broker集群以及Topic路由信息
  • Broker是RocketMQ的消息处理节点,负责接收生产者发送的消息并存储消息,以及响应消费者的消息读取请求

2. 路由中心NameServer

2.1 路由注册

  • (1) Broker发送心跳包
  • (2) NameServer处理心跳包

3. Topic

  • Group 标识一类消息的生产者、消费者;一个分组对应一个主题
  • RocketMQ 基于订阅发布机制,一个Topic拥有多个消息队列,一个Broker为每一主题默认创建4个读队列4个写队列

4. 消息发送

  • 同步发送:发送消息后,同步响应
  • 异步发送:发送消息后,异步响应
  • 单向发送:无返回值
public void demo() throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group");
    producer.start();

    Message msg = new Message(topic, tag, byte[]);

    // 单向发送
    producer.sendOneway(msg);

    // 同步发送
    SendResult result = producer.send(msg);

    // 异步回调
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            String msgId = sendResult.getMsgId;
        }
        @Override
        public void onException(Throwable e) {
            String msgId = sendResult.getMsgId;
        }
    });
}

5. 消息消费

  • 两种消费模式
    • 拉模式 (Pull Consumer)
    • 推模式 (Push Consumer)

5.1 拉模式

  • 消费者(Consumer) 主动从 Broker (消息服务器) 中拉取消息,它会周期性地向 Broker 发送请求,询问是否有新的消息可以消费
  • 消费者通过 DefaultMQPullConsumer 实现拉取逻辑,它会定期或根据需要调用 pull() 方法从 Broker 获取消息队列中的消息

5.2 推模式 (Push Consumer)

  • 在 RocketMQ 中,Push 模式实际上是基于 Pull 模式的一种封装优化,它并不是传统意义上的服务端主动推送消息给客户端。
  • 在推模式下,虽然消费者看起来像是被动接收消息,但实际上内部仍然是通过定期拉取的方式来实现的,只是对用户隐藏了这一细节
  • DefaultMQPushConsumer 作为推模式的消费者,在内部使用定时任务或者监听器机制自动执行拉取操作,对使用者而言更像是消息被推送过来
public void consumer() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("my_topic", "tag_test");
    // CONSUME_FROM_LAST_OFFSET(默认值):从最新的消息开始消费,即忽略之前的所有消息
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    // 批量消费的消息数量上限
    consumer.setConsumeMessageBatchMaxSize(1);

    // 消息事件监听
    consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
        try {
            for (MessageExt message : messages) {
                logger.info("consumer_message, topic={}, tags={}, msgId={}, properties={}, body={}",
                        message.getTopic(), message.getTags(), message.getMsgId(), message.getProperties(), new String(message.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            // 消息重试
            logger.error("consumer_message_exception_retry, topic=my_topic, tags=tag_test, exception={}", e.getMessage());
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });

    consumer.start();
    logger.info("consumer_message_success");
}

【信息由网络或者个人提供,如有涉及版权请联系COOY资源网邮箱处理】

© 版权声明
THE END
喜欢就支持一下吧
点赞7 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容