TAOCARTS 知识

跨境电商独立站数据同步方案:本地消息表 + RocketMQ 实现最终一致性-阿里云开发者社区

2026-06-26 系统功能介绍

一、分布式数据同步的挑战

在微服务架构中,跨服务的数据同步是经典难题。以Taocarts跨境电商独立站系统为例:用户下单后,订单数据需要同步到商品服务(扣减库存)、物流服务(生成运单)、通知服务(发送邮件)等多个下游系统。

如果采用强一致性的分布式事务(如2PC、TCC),性能和复杂性都会急剧上升。更务实的方案是采用“最终一致性”——允许短暂的不一致窗口,但保证数据最终会同步成功。

二、本地消息表方案

消息表设计

sql

CREATE TABLE

local_message

(

id

bigint PRIMARY KEY AUTO_INCREMENT,

message_id

varchar(64) NOT NULL COMMENT '消息唯一ID',

topic

varchar(64) NOT NULL COMMENT 'MQ Topic',

payload

text NOT NULL COMMENT '消息内容(JSON)',

status

tinyint NOT NULL DEFAULT 0 COMMENT '0-待发送, 1-已发送, 2-发送失败',

retry_count

int NOT NULL DEFAULT 0,

max_retries

int NOT NULL DEFAULT 3,

next_retry_time

datetime DEFAULT NULL,

created_at

datetime DEFAULT CURRENT_TIMESTAMP,

updated_at

datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

UNIQUE KEY

uk_message_id

(

message_id

),

KEY

idx_status_next_retry

(

status

,

next_retry_time

)

);

订单创建时写入本地消息

在订单创建的事务中,同时插入本地消息记录。确保订单和消息要么同时成功,要么同时失败。

java

@Service

@Transactional

public class OrderService {

@Autowired

private OrderMapper orderMapper;

@Autowired

private LocalMessageMapper messageMapper;

public void createOrder(OrderDTO orderDTO) {

// 1. 保存订单

Order order = new Order();

order.setOrderNo(generateOrderNo());

order.setUserId(orderDTO.getUserId());

order.setAmount(orderDTO.getAmount());

order.setStatus(OrderStatus.PENDING_PAYMENT);

orderMapper.insert(order);

// 2. 保存本地消息

LocalMessage msg = new LocalMessage();

msg.setMessageId(UUID.randomUUID().toString());

msg.setTopic("ORDER_CREATED");

msg.setPayload(JSON.toJSONString(order));

msg.setStatus(0);

msg.setNextRetryTime(new Date());

messageMapper.insert(msg);

}

}

三、后台任务扫描与重试机制

使用定时任务扫描待发送的消息,发送到RocketMQ。发送成功后更新状态为“已发送”;发送失败则增加重试次数,并设置下次重试时间(指数退避)。

java

@Component

public class MessageSendScheduler {

@Autowired

private LocalMessageMapper messageMapper;

@Autowired

private RocketMQTemplate rocketMQTemplate;

@Scheduled(fixedDelay = 5000)

public void sendPendingMessages() {

List messages = messageMapper.selectPendingMessages(100);

for (LocalMessage msg : messages) {

try {

SendResult result = rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload());

if (result.getSendStatus() == SendStatus.SEND_OK) {

msg.setStatus(1);

messageMapper.updateById(msg);

}

} catch (Exception e) {

msg.setRetryCount(msg.getRetryCount() + 1);

if (msg.getRetryCount() >= msg.getMaxRetries()) {

msg.setStatus(2); // 失败,进入死信

alertService.send("消息发送失败,进入死信队列: " + msg.getMessageId());

} else {

// 指数退避:2^retryCount 分钟

long delayMinutes = 1L << msg.getRetryCount();

msg.setNextRetryTime(new Date(System.currentTimeMillis() + delayMinutes * 60 * 1000));

}

messageMapper.updateById(msg);

}

}

}

}

四、消费端幂等性处理

下游服务消费消息时必须保证幂等性,使用Redis记录已处理的消息ID防止重复消费。

java

@Component

@RocketMQMessageListener(topic = "ORDER_CREATED", consumerGroup = "inventory-consumer")

public class InventoryConsumer implements RocketMQListener {

@Autowired

private RedisTemplate redisTemplate;

@Autowired

private InventoryService inventoryService;

@Override

public void onMessage(String message) {

JSONObject json = JSON.parseObject(message);

String messageId = json.getString("messageId");

// 幂等检查

String key = "processed:" + messageId;

Boolean success = redisTemplate.opsForValue().setIfAbsent(key, "1", Duration.ofHours(24));

if (Boolean.FALSE.equals(success)) {

log.info("消息已处理过,跳过: {}", messageId);

return;

}

try {

Order order = json.getObject("order", Order.class);

inventoryService.deductStock(order.getProductId(), order.getQuantity());

} catch (Exception e) {

// 处理失败,删除幂等标记,让消息重试

redisTemplate.delete(key);

throw e;

}

}

}

五、死信处理与人工介入

对于进入死信队列的消息,系统会自动发送告警到钉钉群,运维人员可通过管理后台查看、手动重发或修复数据。

java

@RestController

@RequestMapping("/admin/messages")

public class DeadLetterController {

@Autowired

private LocalMessageMapper messageMapper;

@Autowired

private RocketMQTemplate rocketMQTemplate;

@PostMapping("/retry/{id}")

public Result retry(@PathVariable Long id) {

LocalMessage msg = messageMapper.selectById(id);

if (msg.getStatus() != 2) {

return Result.error("只有死信消息可以重试");

}

rocketMQTemplate.syncSend(msg.getTopic(), msg.getPayload());

msg.setStatus(0);

msg.setRetryCount(0);

msg.setNextRetryTime(new Date());

messageMapper.updateById(msg);

return Result.success();

}

}

Taocarts系统通过本地消息表 + RocketMQ方案成功解耦了订单创建与库存扣减、物流生成、通知发送等操作,生产环境运行一年来,消息送达率达到99.99%,仅有极少数死信通过人工介入快速修复