Saga模式与分布式事务
约 1542 字大约 5 分钟
distributedsaga
2025-05-28
概述
Saga模式是1987年由Hector Garcia-Molina和Kenneth Salem在论文《Sagas》中提出的一种分布式事务解决方案。它将一个长事务拆分为一系列本地事务,每个本地事务都有对应的补偿事务。如果某个步骤失败,则按逆序执行已完成步骤的补偿事务,实现最终一致性。Saga模式在微服务架构中被广泛采用。
核心概念
一个Saga由一系列子事务 T1, T2, ..., Tn 组成,每个子事务 Ti 都有对应的补偿事务 Ci。
执行规则:
- 成功路径:T1 → T2 → T3 → ... → Tn
- 补偿路径:Ti 失败时,执行 Ci-1 → Ci-2 → ... → C1
两种实现模式
编排模式(Choreography)
各服务通过事件(Event)进行协作,没有中心协调者。每个服务监听事件并执行自己的本地事务,然后发布新的事件。
// 编排模式 - 订单服务
@Service
public class OrderService {
@EventListener
public void onOrderCreateRequest(CreateOrderCommand cmd) {
Order order = Order.create(cmd);
order.setStatus(OrderStatus.PENDING);
orderRepository.save(order);
// 发布事件,触发下一步
eventPublisher.publish(new OrderCreatedEvent(order.getId(), order.getItems()));
}
@EventListener
public void onSagaCompleted(SagaCompletedEvent event) {
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.CONFIRMED);
orderRepository.save(order);
}
@EventListener
public void onPaymentFailed(PaymentFailedEvent event) {
// 补偿:取消订单
Order order = orderRepository.findById(event.getOrderId());
order.setStatus(OrderStatus.CANCELLED);
orderRepository.save(order);
}
}
// 编排模式 - 库存服务
@Service
public class InventoryService {
@EventListener
public void onOrderCreated(OrderCreatedEvent event) {
try {
inventoryRepository.reserve(event.getItems());
eventPublisher.publish(new InventoryReservedEvent(event.getOrderId()));
} catch (InsufficientInventoryException e) {
eventPublisher.publish(new InventoryReservationFailedEvent(event.getOrderId()));
}
}
@EventListener
public void onPaymentFailed(PaymentFailedEvent event) {
// 补偿:恢复库存
inventoryRepository.release(event.getOrderId());
}
}协调模式(Orchestration)
由一个中央协调器(Saga Execution Coordinator, SEC)负责管理Saga的执行流程。
// 协调模式 - Saga定义
public class OrderSagaDefinition {
public SagaDefinition<OrderSagaData> sagaDefinition() {
return SagaDefinition.<OrderSagaData>builder()
.step()
.invokeParticipant(this::createOrder)
.withCompensation(this::cancelOrder)
.step()
.invokeParticipant(this::reserveInventory)
.withCompensation(this::releaseInventory)
.step()
.invokeParticipant(this::processPayment)
.withCompensation(this::refundPayment)
.step()
.invokeParticipant(this::sendNotification)
// 最后一步通常不需要补偿
.build();
}
}
// Saga协调器实现
public class SagaOrchestrator<T> {
private final SagaDefinition<T> definition;
private final SagaStateRepository stateRepository;
public void execute(T sagaData) {
SagaState state = new SagaState(sagaData);
stateRepository.save(state);
List<SagaStep<T>> steps = definition.getSteps();
for (int i = 0; i < steps.size(); i++) {
state.setCurrentStep(i);
stateRepository.save(state);
try {
steps.get(i).invoke(sagaData);
state.markStepCompleted(i);
} catch (Exception e) {
state.setStatus(SagaStatus.COMPENSATING);
stateRepository.save(state);
compensate(steps, sagaData, i - 1);
state.setStatus(SagaStatus.COMPENSATED);
stateRepository.save(state);
return;
}
}
state.setStatus(SagaStatus.COMPLETED);
stateRepository.save(state);
}
private void compensate(List<SagaStep<T>> steps, T data, int fromStep) {
for (int i = fromStep; i >= 0; i--) {
try {
steps.get(i).compensate(data);
} catch (Exception e) {
// 补偿失败需要重试或人工介入
log.error("Compensation failed at step {}", i, e);
retryCompensation(steps.get(i), data);
}
}
}
}两种模式对比
| 特性 | 编排模式 | 协调模式 |
|---|---|---|
| 耦合度 | 低(事件驱动) | 中(依赖协调器) |
| 复杂度 | 简单Saga容易,复杂Saga难维护 | 统一管理,复杂Saga更清晰 |
| 单点风险 | 无 | 协调器是单点 |
| 可观测性 | 难以追踪全局状态 | 集中管理,易于监控 |
| 适用场景 | 参与者少(3-4个) | 参与者多,流程复杂 |
隔离性挑战
Saga不具备ACID中的隔离性(Isolation),可能出现以下异常:
语义锁(Semantic Lock)
通过业务状态字段作为"锁",防止其他Saga干扰:
public class OrderWithSemanticLock {
private OrderStatus status; // PENDING, APPROVED, CANCELLED
// 创建订单时设置为PENDING(语义锁)
public void create() {
this.status = OrderStatus.PENDING;
// PENDING状态的订单不会被其他流程处理
}
// Saga完成后解锁
public void approve() {
this.status = OrderStatus.APPROVED;
}
// 补偿时释放锁
public void cancel() {
this.status = OrderStatus.CANCELLED;
}
}使用Temporal实现Saga
Temporal(前身为Cadence,由Uber开源)是流行的工作流引擎,非常适合实现Saga模式。
// Temporal Workflow实现Saga
@WorkflowInterface
public interface OrderSagaWorkflow {
@WorkflowMethod
void processOrder(OrderRequest request);
}
@WorkflowImpl
public class OrderSagaWorkflowImpl implements OrderSagaWorkflow {
private final OrderActivities orderActs = Workflow.newActivityStub(
OrderActivities.class, ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3).build())
.build());
private final InventoryActivities invActs = Workflow.newActivityStub(
InventoryActivities.class, ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(30))
.build());
private final PaymentActivities payActs = Workflow.newActivityStub(
PaymentActivities.class, ActivityOptions.newBuilder()
.setStartToCloseTimeout(Duration.ofSeconds(60))
.build());
@Override
public void processOrder(OrderRequest request) {
Saga saga = new Saga(new Saga.Options.Builder().build());
try {
// 步骤1: 创建订单
String orderId = orderActs.createOrder(request);
saga.addCompensation(() -> orderActs.cancelOrder(orderId));
// 步骤2: 预留库存
invActs.reserveInventory(orderId, request.getItems());
saga.addCompensation(() -> invActs.releaseInventory(orderId));
// 步骤3: 扣款
payActs.processPayment(orderId, request.getAmount());
saga.addCompensation(() -> payActs.refundPayment(orderId));
// 步骤4: 确认订单
orderActs.confirmOrder(orderId);
} catch (ActivityFailure e) {
// 自动执行补偿
saga.compensate();
throw e;
}
}
}Spring实现示例
// 使用Spring状态机实现Saga协调器
@Configuration
public class OrderSagaStateMachineConfig
extends StateMachineConfigurerAdapter<SagaState, SagaEvent> {
@Override
public void configure(StateMachineTransitionConfigurer<SagaState, SagaEvent> transitions)
throws Exception {
transitions
.withExternal()
.source(SagaState.STARTED).target(SagaState.ORDER_CREATED)
.event(SagaEvent.CREATE_ORDER)
.action(createOrderAction())
.and()
.withExternal()
.source(SagaState.ORDER_CREATED).target(SagaState.INVENTORY_RESERVED)
.event(SagaEvent.RESERVE_INVENTORY)
.action(reserveInventoryAction())
.and()
.withExternal()
.source(SagaState.INVENTORY_RESERVED).target(SagaState.PAYMENT_PROCESSED)
.event(SagaEvent.PROCESS_PAYMENT)
.action(processPaymentAction())
.and()
// 补偿路径
.withExternal()
.source(SagaState.INVENTORY_RESERVED).target(SagaState.COMPENSATING)
.event(SagaEvent.PAYMENT_FAILED)
.action(compensateAction());
}
}总结
Saga模式是微服务架构下管理分布式事务的主流方案。编排模式适合简单场景,保持了服务间的低耦合;协调模式适合复杂流程,提供了更好的可控性和可观测性。使用Saga时需要特别注意隔离性问题,通过语义锁等策略来缓解。工程实践中推荐使用Temporal等成熟的工作流引擎来实现Saga,避免重复造轮子。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于