MySQL 数据转存 Redis、通过 Redis 修改数据并同步回 MySQL

主动更新 + 消息队列异步同步 策略,平衡性能与一致性。


一、整体架构


二、代码实现(Spring Boot + JPA + Redis + RabbitMQ)

1. 依赖配置(pom.xml

<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
</dependencies>

2. 配置文件(application.yml

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/mydb
    username: root
    password: password
  jpa:
    hibernate.ddl-auto: update
  redis:
    host: localhost
    port: 6379
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

3. 核心代码

(1) MySQL 实体类(User.java
@Entity
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String email;
    // Getters & Setters
}
(2) Redis 数据同步服务(RedisSyncService.java
@Service
public class RedisSyncService {
    @Autowired
    private UserRepository userRepository;
    @Autowired
    private RedisTemplate<String, User> redisTemplate;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 1. 初始加载:MySQL → Redis
    @PostConstruct
    public void initLoadToRedis() {
        List<User> users = userRepository.findAll();
        users.forEach(user -> 
            redisTemplate.opsForValue().set("user:" + user.getId(), user)
        );
    }

    // 2. 修改 Redis 数据并触发同步
    public void updateUserViaRedis(Long id, User newUser) {
        // 更新 Redis
        redisTemplate.opsForValue().set("user:" + id, newUser);
        
        // 发送消息到队列(异步更新 MySQL)
        rabbitTemplate.convertAndSend("user.update.queue", newUser);
    }
}
(3) RabbitMQ 消费者(异步更新 MySQL)
@Component
public class UserUpdateListener {
    @Autowired
    private UserRepository userRepository;

    @RabbitListener(queues = "user.update.queue")
    public void handleUserUpdate(User user) {
        userRepository.save(user); // 更新 MySQL
        System.out.println("MySQL updated: " + user.getId());
    }
}
(4) 控制器层(UserController.java
@RestController
@RequestMapping("/users")
public class UserController {
    @Autowired
    private RedisSyncService redisSyncService;

    @PutMapping("/{id}")
    public ResponseEntity<String> updateUser(@PathVariable Long id, @RequestBody User user) {
        redisSyncService.updateUserViaRedis(id, user);
        return ResponseEntity.ok("Redis updated. MySQL sync in progress...");
    }
}

三、关键逻辑说明

  1. 初始数据加载

    • 服务启动时,从 MySQL 加载全量数据到 Redis(@PostConstruct)。

  2. 数据修改流程

    • 用户通过 API 修改 Redis 数据 → 触发消息队列事件 → 消费者异步更新 MySQL。

  3. 一致性保障

    • 最终一致性:通过消息队列解耦,避免同步阻塞(容忍毫秒级延迟)。

    • 异常处理:消息队列自带重试机制(RabbitMQ ACK),防止更新失败。


四、优化扩展建议

  1. 双删策略(应对并发脏读)
    在更新 MySQL 前后删除 Redis 缓存,减少不一致时间窗口:

    public void updateUserWithDoubleDelete(Long id, User user) {
        redisTemplate.delete("user:" + id);      // 第一次删除
        userRepository.save(user);              // 更新 MySQL
        rabbitTemplate.convertAndSend(...);      // 异步二次删除(延迟1秒)
    }
  2. 定时补偿任务
    定期扫描 MySQL 与 Redis 差异数据,修复不一致:

    @Scheduled(fixedDelay = 60000)
    public void checkDataConsistency() {
        // 对比 Redis 与 MySQL 数据,以 MySQL 为准修复
    }
  3. 批量操作优化
    使用 pipeline 批量读写 Redis,减少网络开销。


五、注意事项

  • 事务问题:Redis 不支持回滚,需避免 MySQL 更新失败导致 Redis 数据错误(通过消息队列重试解决)。

  • 缓存穿透:对空值设置短 TTL(如 redisTemplate.opsForValue().set(key, null, 60, TimeUnit.SECONDS))。

  • 性能监控:使用 Spring Actuator 监控 Redis 和 RabbitMQ 状态。


MySQL 数据转存 Redis、通过 Redis 修改数据并同步回 MySQL
https://uniomo.com/archives/wei-ming-ming-wen-zhang-HpbRzPUk
作者
雨落秋垣
发布于
2025年09月01日
许可协议