Spring Boot 3 集成 RabbitMQ 实践指南

news/2025/2/24 12:19:56

Spring Boot 3 集成 RabbitMQ 实践指南

1. RabbitMQ 核心原理

1.1 什么是RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,使用Erlang语言开发,基于AMQP(Advanced Message Queuing Protocol)协议实现。它支持多种消息传递模式,具有高可用性、可扩展性和可靠性等特点。

1.2 核心概念

1.2.1 基础组件
  1. Producer(生产者)

    • 消息的发送者
    • 负责创建消息并发布到RabbitMQ中
  2. Consumer(消费者)

    • 消息的接收者
    • 连接到RabbitMQ服务器并订阅队列
  3. Exchange(交换机)

    • 接收生产者发送的消息并根据路由规则转发到队列
    • 类型:
      • Direct Exchange:根据routing key精确匹配
      • Topic Exchange:根据routing key模式匹配
      • Fanout Exchange:广播到所有绑定队列
      • Headers Exchange:根据消息属性匹配
  4. Queue(队列)

    • 消息存储的地方
    • 支持持久化、临时、自动删除等特性
  5. Binding(绑定)

    • 交换机和队列之间的虚拟连接
    • 定义消息路由规则
1.2.2 高级特性
  1. 消息持久化

    • 交换机持久化:创建时设置durable=true
    • 队列持久化:创建时设置durable=true
    • 消息持久化:设置delivery-mode=2
  2. 消息确认机制

    • 生产者确认:Publisher Confirm和Return机制
    • 消费者确认:自动确认、手动确认、批量确认
  3. 死信队列(DLX)

    • 消息被拒绝且不重新入队
    • 消息过期(TTL)
    • 队列达到最大长度

1.3 应用场景

  1. 异步处理

    • 发送邮件、短信通知
    • 日志处理、报表生成
    • 文件处理、图片处理
  2. 应用解耦

    • 系统间通信
    • 服务解耦
    • 流程分离
  3. 流量控制

    • 削峰填谷
    • 请求缓冲
    • 流量整形
  4. 定时任务

    • 延迟队列
    • 定时处理
    • 任务调度

2. 环境搭建

2.1 基础环境

  • Spring Boot: 3.x
  • Java: 17+
  • RabbitMQ: 3.12+
  • Maven/Gradle

2.2 依赖配置

<dependencies>
    <!-- Spring Boot Starter AMQP -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    <!-- Spring Boot Starter Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <!-- Jackson -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
</dependencies>

2.3 基础配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 消息确认配置
    publisher-confirm-type: correlated  # 开启发布确认
    publisher-returns: true             # 开启发布返回
    template:
      mandatory: true                   # 消息路由失败返回
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual        # 手动确认
        prefetch: 1                     # 每次获取消息数量
        retry:
          enabled: true                 # 开启重试
          initial-interval: 1000        # 重试间隔时间
          max-attempts: 3               # 最大重试次数
          multiplier: 1.0              # 重试时间乘数
    # SSL配置(可选)
    ssl:
      enabled: false
      key-store: classpath:keystore.p12
      key-store-password: password
      trust-store: classpath:truststore.p12
      trust-store-password: password

3. 核心配置类

3.1 RabbitMQ配置类

@Configuration
@EnableRabbit
public class RabbitMQConfig {
    
    // 交换机名称
    public static final String BUSINESS_EXCHANGE = "business.exchange";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    
    // 队列名称
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    
    // 路由键
    public static final String BUSINESS_KEY = "business.key";
    public static final String DEAD_LETTER_KEY = "dead.letter.key";
    
    // 业务交换机
    @Bean
    public DirectExchange businessExchange() {
        return ExchangeBuilder.directExchange(BUSINESS_EXCHANGE)
                .durable(true)
                .build();
    }
    
    // 死信交换机
    @Bean
    public DirectExchange deadLetterExchange() {
        return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE)
                .durable(true)
                .build();
    }
    
    // 业务队列
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(3);
        // 消息过期时间
        args.put("x-message-ttl", 60000);
        // 队列最大长度
        args.put("x-max-length", 1000);
        // 死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        args.put("x-dead-letter-routing-key", DEAD_LETTER_KEY);
        
        return QueueBuilder.durable(BUSINESS_QUEUE)
                .withArguments(args)
                .build();
    }
    
    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    
    // 业务绑定
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
                .to(businessExchange())
                .with(BUSINESS_KEY);
    }
    
    // 死信绑定
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with(DEAD_LETTER_KEY);
    }
    
    // 消息转换器
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    // RabbitTemplate配置
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

3.2 消息确认配置

@Configuration
@Slf4j
public class RabbitConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息发送到交换机成功: correlationData={}", correlationData);
        } else {
            log.error("消息发送到交换机失败: correlationData={}, cause={}", correlationData, cause);
            // 处理失败逻辑,如重试、告警等
        }
    }
    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息路由到队列失败: exchange={}, routingKey={}, replyCode={}, replyText={}, message={}",
                returned.getExchange(),
                returned.getRoutingKey(),
                returned.getReplyCode(),
                returned.getReplyText(),
                new String(returned.getMessage().getBody()));
        // 处理失败逻辑,如重试、告警等
    }
}

4. 消息生产者

4.1 消息发送服务

@Service
@Slf4j
public class MessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessage(Object message, String exchange, String routingKey) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            log.info("消息发送成功: message={}, exchange={}, routingKey={}, correlationData={}",
                    message, exchange, routingKey, correlationData);
        } catch (Exception e) {
            log.error("消息发送异常: message={}, exchange={}, routingKey={}, correlationData={}, error={}",
                    message, exchange, routingKey, correlationData, e.getMessage());
            throw new RuntimeException("消息发送失败", e);
        }
    }
    
    public void sendDelayMessage(Object message, String exchange, String routingKey, long delayMillis) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        MessagePostProcessor messagePostProcessor = msg -> {
            msg.getMessageProperties().setDelay((int) delayMillis);
            return msg;
        };
        
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);
            log.info("延迟消息发送成功: message={}, exchange={}, routingKey={}, delay={}, correlationData={}",
                    message, exchange, routingKey, delayMillis, correlationData);
        } catch (Exception e) {
            log.error("延迟消息发送异常: message={}, exchange={}, routingKey={}, delay={}, correlationData={}, error={}",
                    message, exchange, routingKey, delayMillis, correlationData, e.getMessage());
            throw new RuntimeException("延迟消息发送失败", e);
        }
    }
}

5. 消息消费者

5.1 消息处理服务

@Service
@Slf4j
public class MessageConsumer {
    
    @RabbitListener(queues = RabbitMQConfig.BUSINESS_QUEUE)
    public void handleMessage(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            // 获取消息内容
            String messageBody = new String(message.getBody());
            log.info("收到消息: message={}, deliveryTag={}", messageBody, deliveryTag);
            
            // 业务处理
            processMessage(messageBody);
            
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("消息处理成功: deliveryTag={}", deliveryTag);
            
        } catch (Exception e) {
            log.error("消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
            
            // 判断是否重新投递
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重试,拒绝消息: deliveryTag={}", deliveryTag);
                channel.basicReject(deliveryTag, false);
            } else {
                log.info("消息首次处理失败,重新投递: deliveryTag={}", deliveryTag);
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
    
    private void processMessage(String message) {
        // 实现具体的业务逻辑
        log.info("处理消息: {}", message);
    }
}

5.2 死信消息处理

@Service
@Slf4j
public class DeadLetterConsumer {
    
    @RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)
    public void handleDeadLetter(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        
        try {
            String messageBody = new String(message.getBody());
            log.info("收到死信消息: message={}, deliveryTag={}", messageBody, deliveryTag);
            
            // 死信消息处理逻辑
            processDeadLetter(messageBody);
            
            channel.basicAck(deliveryTag, false);
            log.info("死信消息处理成功: deliveryTag={}", deliveryTag);
            
        } catch (Exception e) {
            log.error("死信消息处理异常: deliveryTag={}, error={}", deliveryTag, e.getMessage());
            channel.basicReject(deliveryTag, false);
        }
    }
    
    private void processDeadLetter(String message) {
        // 实现死信消息处理逻辑
        log.info("处理死信消息: {}", message);
    }
}

6. 接口控制器

@RestController
@RequestMapping("/api/mq")
@Slf4j
public class MessageController {
    
    @Autowired
    private MessageProducer messageProducer;
    
    @PostMapping("/send")
    public ResponseEntity<String> sendMessage(@RequestBody MessageDTO message) {
        try {
            messageProducer.sendMessage(message.getContent(),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY);
            return ResponseEntity.ok("消息发送成功");
        } catch (Exception e) {
            log.error("消息发送失败: {}", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("消息发送失败: " + e.getMessage());
        }
    }
    
    @PostMapping("/send/delay")
    public ResponseEntity<String> sendDelayMessage(
            @RequestBody MessageDTO message,
            @RequestParam long delayMillis) {
        try {
            messageProducer.sendDelayMessage(message.getContent(),
                    RabbitMQConfig.BUSINESS_EXCHANGE,
                    RabbitMQConfig.BUSINESS_KEY,
                    delayMillis);
            return ResponseEntity.ok("延迟消息发送成功");
        } catch (Exception e) {
            log.error("延迟消息发送失败: {}", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("延迟消息发送失败: " + e.getMessage());
        }
    }
}

7. 监控与运维

7.1 RabbitMQ管理界面

  • 访问地址:http://localhost:15672
  • 默认账号:guest/guest
  • 主要功能:
    • 队列监控
    • 交换机管理
    • 连接状态
    • 消息追踪

7.2 Prometheus + Grafana监控

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    static_configs:
      - targets: ['localhost:15692']

7.3 日志配置

logging:
  level:
    org.springframework.amqp: INFO
    com.your.package: DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7.4 告警配置

@Configuration
public class RabbitMQAlertConfig {
    
    @Value("${alert.dingtalk.webhook}")
    private String webhookUrl;
    
    @Bean
    public AlertService alertService() {
        return new DingTalkAlertService(webhookUrl);
    }
}

8. 最佳实践

8.1 消息幂等性处理

@Service
public class MessageIdempotentHandler {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    public boolean isProcessed(String messageId) {
        String key = "mq:processed:" + messageId;
        return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
    }
}

8.2 消息重试策略

@Configuration
public class RetryConfig {
    
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        return retryTemplate;
    }
}

8.3 消息序列化

@Configuration
public class MessageConverterConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setCreateMessageIds(true);
        return converter;
    }
}

8.4 消息追踪

@Aspect
@Component
@Slf4j
public class MessageTraceAspect {
    
    @Around("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
    public Object traceMessage(ProceedingJoinPoint joinPoint) throws Throwable {
        String messageId = MDC.get("messageId");
        log.info("开始处理消息: messageId={}", messageId);
        
        try {
            Object result = joinPoint.proceed();
            log.info("消息处理完成: messageId={}", messageId);
            return result;
        } catch (Exception e) {
            log.error("消息处理异常: messageId={}, error={}", messageId, e.getMessage());
            throw e;
        }
    }
}

9. 常见问题与解决方案

9.1 消息丢失问题

  1. 生产者确认机制
  2. 消息持久化
  3. 手动确认模式
  4. 集群高可用

9.2 消息重复消费

  1. 幂等性处理
  2. 消息去重
  3. 业务检查

9.3 消息堆积问题

  1. 增加消费者数量
  2. 提高处理效率
  3. 队列分片
  4. 死信队列处理

9.4 性能优化

  1. 合理设置预取数量
  2. 批量确认消息
  3. 消息压缩
  4. 连接池优化

10. 高可用部署

10.1 集群配置

spring:
  rabbitmq:
    addresses: rabbit1:5672,rabbit2:5672,rabbit3:5672
    username: admin
    password: password
    virtual-host: /

10.2 镜像队列

# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

10.3 负载均衡

# nginx.conf
upstream rabbitmq_cluster {
    server rabbit1:15672;
    server rabbit2:15672;
    server rabbit3:15672;
}

11. 参考资源

  1. Spring AMQP官方文档
  2. RabbitMQ官方文档
  3. Spring Boot官方文档

http://www.niftyadmin.cn/n/5864318.html

相关文章

UE_C++ —— Gameplay Tags

目录 一&#xff0c;Defining Gameplay Tags Adding Tags in Project Settings Importing Tags from Data Table Assets Defining Tags with C 二&#xff0c;Using Defined Gameplay Tags Applying Tags to Objects Evaluating Tags with Conditional Functions 三&am…

Python pip 缓存清理:全面方法与操作指南

在使用 Python 的 pip 进行包安装时&#xff0c;pip 会将下载的包缓存起来&#xff0c;以加快后续相同包的安装速度。不过&#xff0c;随着时间推移&#xff0c;缓存会占用大量磁盘空间&#xff0c;这时你可以对其进行清理。下面为你介绍不同操作系统下清理 pip 缓存的方法。 …

Node.js中如何修改全局变量的几种方式

Node.js中如何修改全局变量。我需要先理解他们的需求。可能他们是在开发过程中遇到了需要跨模块共享数据的情况&#xff0c;或者想要配置一些全局可访问的设置。不过&#xff0c;使用全局变量可能存在一些问题&#xff0c;比如命名冲突、难以维护和测试困难&#xff0c;所以我得…

2025版-Github账号注册详细过程

目录 1.访问GitHub官网 2. 点击“Sign up”按钮 3. 填写注册信息 4. 验证机器人 5. 点击“Create account”按钮 6. 验证邮箱 7. 完成注册 8. 初始设置&#xff08;可选&#xff09; 9. 开始使用 注意事项 1.访问GitHub官网 打开浏览器&#xff0c;访问 GitHub官网。 …

IDEA通过Maven使用JBLJavaToWeb插件创建Web项目

第一步&#xff1a;IDEA下载JBLJavaToWeb插件 File--->Settings--->Plugins--->Marketplace搜索: JBLJavaToWeb 第二步&#xff1a;创建普通Maven工程 第三步&#xff1a; 将普通Maven项目转换为Web项目

KubeKey一键安装部署k8s集群和KubeSphere详细教程

目录 一、KubeKey简介 二、k8s集群KubeSphere安装 集群规划 硬件要求 Kubernetes支持版本 操作系统要求 SSH免密登录 配置集群时钟 所有节点安装依赖 安装docker DNS要求 存储要求 下载 KubeKey 验证KubeKey 配置集群文件 安装集群 验证命令 登录页面 一、Ku…

R语言安装教程(附安装包)R语言4.3.2版本安装教程

文章目录 前言一、安装包下载二、R-4.3.2安装步骤三、rtools43安装步骤四、RStudio安装步骤 前言 本教程将详细、全面地为你介绍在 Windows 系统下安装 R 语言 4.3.2 的具体步骤。无论你是初涉数据领域的新手&#xff0c;还是希望更新知识体系的专业人士&#xff0c;只要按照本…

Java NIO与传统IO性能对比分析

Java NIO与传统IO性能对比分析 在Java中&#xff0c;I/O&#xff08;输入输出&#xff09;操作是开发中最常见的任务之一。传统的I/O方式基于阻塞模型&#xff0c;而Java NIO&#xff08;New I/O&#xff09;引入了非阻塞和基于通道&#xff08;Channel&#xff09;和缓冲区&a…