在一个复杂的业务场景中,用户的一个操作,比如“确认下单”,背后可能触发一连串的服务调用:创建订单、扣减积分、锁定库存、发放优惠券。这些服务独立部署,各自拥有独立的数据库。当这个流程中的任何一步失败,整个系统的数据一致性就面临严峻挑战。更棘手的是,如果用户在发起操作的瞬间网络中断,前端应用如何保证这个操作最终会被执行,而不是无声无息地丢失?
传统的数据库ACID事务无法跨越服务边界,而两阶段提交(2PC)这类强一致性方案在分布式环境中又过于笨重,对系统可用性是巨大的打击。我们需要的,是一种更具弹性的最终一致性方案。Saga模式便是为此而生。
但标准的Saga模式主要解决的是后端服务间的一致性问题。它并未触及那个最不可靠的环节:客户端到服务端的网络连接。本文将记录一次完整的实践:我们如何将Saga模式延伸至客户端,利用Service Worker的离线能力,结合作为Saga协调器的API网关,以及具备强一致性与高可用性的CockroachDB,构建一个全链路、可容忍网络中断的分布式事务解决方案。
技术选型与架构决策
在真实项目中,技术选型从来不是孤立的。我们面临的核心痛点是:保证一个由用户发起、跨越多个微服务、且可能在网络不佳环境下启动的业务流程的最终一致性。
分布式事务模型:Saga
我们选择了编排式(Orchestration)Saga。由一个中心化的协调器(我们的API网关)来驱动整个流程,负责调用每个服务的正向操作(Transation)和反向补偿操作(Compensation)。相比于 choreography(编舞式),这种方式的依赖关系更清晰,更容易追踪和调试整个事务的状态,对于我们这个场景来说,可维护性更高。Saga日志与业务数据的存储:CockroachDB
Saga模式的可靠性严重依赖于其执行日志的持久化。这个日志本身就是状态机,必须高可用且强一致。CockroachDB作为一个分布式SQL数据库,提供了原生的Serializable隔离级别和跨节点的ACID事务保障。用它来存储Saga日志,可以确保协调器本身即使发生故障重启,也能从日志中恢复事务的上下文,继续执行或回滚。同时,业务数据也存储在CockroachDB中,利用其分布式能力保证业务数据的高可用。客户端持久化与离线保证:Service Workers
这是整个方案的关键创新点。我们不能信任客户端到服务器的第一次请求一定成功。Service Worker运行在浏览器后台,独立于页面生命周期,拥有拦截网络请求、访问本地缓存(IndexedDB)和后台同步(Background Sync)的能力。它成为了我们Saga事务的“客户端前哨”,负责在用户意图产生时,就将其持久化在本地,然后以可靠的方式尝试与后端协调器通信。Saga协调器与API入口:API网关 (Node.js/Express)
一个轻量级的Node.js应用作为API网关,它不仅仅是流量转发,更核心的职责是扮演Saga协调器。它接收来自Service Worker的请求,记录Saga启动状态,然后按预定步骤调用下游服务。前端状态反馈:Chakra UI
一个复杂的异步流程,必须给用户清晰、即时的反馈。Chakra UI以其组合式和可访问性强的组件,能帮助我们快速构建出能反映“处理中”、“已提交”、“成功”、“失败”等多种状态的界面,并通过useToast
等钩子提供非侵入式的通知。
架构与流程概览
在深入代码之前,我们先通过图表理清整个流程。
sequenceDiagram participant User as 用户 participant ReactUI as Chakra UI 界面 participant SW as Service Worker participant IDB as IndexedDB participant Gateway as API网关 (Saga协调器) participant CockroachDB as 数据库 participant OrderSvc as 订单服务 participant PointSvc as 积分服务 User->>ReactUI: 点击“确认下单” ReactUI->>SW: 发起 fetch('/api/create-order', payload) activate SW SW->>IDB: 将请求存入'sync-queue' (持久化用户意图) SW-->>ReactUI: (可选)立即响应,UI进入pending状态 SW->>Gateway: 尝试发送请求 deactivate SW Note right of SW: 如果此时网络中断...
请求失败,但已存入IDB activate Gateway Gateway->>CockroachDB: 创建Saga Log, 状态: PENDING Gateway-->>CockroachDB: 返回Saga Transaction ID Gateway-->>SW: 响应 202 Accepted (Saga已受理) deactivate Gateway Note over SW, Gateway: 后续流程由网关异步驱动 activate Gateway Gateway->>OrderSvc: 1. 创建订单 activate OrderSvc OrderSvc->>CockroachDB: INSERT into orders OrderSvc-->>Gateway: 成功 deactivate OrderSvc Gateway->>CockroachDB: 更新Saga Log, step 1: SUCCEEDED Gateway->>PointSvc: 2. 扣减积分 activate PointSvc PointSvc->>CockroachDB: UPDATE user_points PointSvc-->>Gateway: 失败 (例如:积分不足) deactivate PointSvc Note over Gateway, CockroachDB: 检测到失败,开始补偿 Gateway->>CockroachDB: 更新Saga Log, step 2: FAILED Gateway->>OrderSvc: 1. 补偿:取消订单 activate OrderSvc OrderSvc->>CockroachDB: UPDATE orders SET status = 'CANCELLED' OrderSvc-->>Gateway: 补偿成功 deactivate OrderSvc Gateway->>CockroachDB: 更新Saga Log, 状态: COMPENSATED deactivate Gateway Note over SW: 网络恢复后,Background Sync触发 SW->>Gateway: (通过Sync Event) 重试之前失败的请求
数据库设计:Saga的心脏
一切从数据结构开始。在CockroachDB中,我们需要两类表:业务表和Saga日志表。
-- file: schema.sql
-- CockroachDB Schema for our offline-capable Saga
-- Saga Log Table: 状态机的心脏
-- 这张表记录了每个分布式事务的生命周期。
-- 协调器崩溃后,可以根据这张表来恢复和继续流程。
CREATE TABLE saga_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_name VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL CHECK (status IN ('PENDING', 'SUCCEEDED', 'FAILED', 'COMPENSATING', 'COMPENSATED')),
current_step INT NOT NULL DEFAULT 0,
payload JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Saga Step Log Table: 记录每个步骤的细节
-- 用于精细化追踪和故障排查。
CREATE TABLE saga_step_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
saga_id UUID NOT NULL REFERENCES saga_logs(id),
step_name VARCHAR(100) NOT NULL,
step_order INT NOT NULL,
status VARCHAR(20) NOT NULL CHECK (status IN ('PENDING', 'SUCCEEDED', 'FAILED')),
request_payload JSONB,
response_payload JSONB,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (saga_id, step_order)
);
-- Business Table: Orders
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL,
product_id UUID NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'PENDING' CHECK (status IN ('PENDING', 'CONFIRMED', 'CANCELLED')),
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Business Table: User Points
CREATE TABLE user_points (
user_id UUID PRIMARY KEY,
balance INT NOT NULL CHECK (balance >= 0),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- CockroachDB 的索引对于性能至关重要
CREATE INDEX ON saga_logs (status, updated_at);
CREATE INDEX ON saga_step_logs (saga_id);
这里的saga_logs
表是核心,status
字段驱动着整个事务状态的流转。payload
存储了发起事务所需要的全部上下文信息。
后端实现:微服务与Saga协调器
1. 模拟微服务
在真实项目中,这些是独立部署的服务。为了演示,我们用Express在同一个应用中模拟它们,但保持接口独立。
// file: services.js
// 模拟订单和积分微服务
const express = require('express');
const { Pool } = require('pg');
// CockroachDB connection pool
const pool = new Pool({
// connection string for CockroachDB
connectionString: process.env.DATABASE_URL,
});
const createService = (name) => {
const router = express.Router();
router.use(express.json());
// 正向操作 (Transaction)
router.post('/execute', async (req, res) => {
console.log(`[${name} Service] Received execute request`);
try {
// 模拟业务逻辑和可能的失败
if (name === 'points' && req.body.userId === 'user-id-with-insufficient-points') {
throw new Error('Insufficient points');
}
// 实际数据库操作
// ... (e.g., pool.query('INSERT ...'))
console.log(`[${name} Service] Execution successful`);
res.status(200).json({ success: true, message: `${name} executed` });
} catch (error) {
console.error(`[${name} Service] Execution failed:`, error.message);
// 返回一个结构化的错误,方便协调器处理
res.status(500).json({ success: false, message: error.message, code: 'EXECUTION_ERROR' });
}
});
// 补偿操作 (Compensation)
router.post('/compensate', async (req, res) => {
console.log(`[${name} Service] Received compensate request`);
try {
// 补偿逻辑必须是幂等的,且尽可能保证成功
// ... (e.g., pool.query('UPDATE orders SET status = CANCELLED ...'))
console.log(`[${name} Service] Compensation successful`);
res.status(200).json({ success: true, message: `${name} compensated` });
} catch (error) {
console.error(`[${name} Service] Compensation failed:`, error.message);
// 补偿失败是严重问题,需要告警和人工介入
res.status(500).json({ success: false, message: `CRITICAL: Compensation failed for ${name}`, code: 'COMPENSATION_ERROR' });
}
});
return router;
};
const app = express();
app.use('/order', createService('order'));
app.use('/points', createService('points'));
// ... server listening logic ...
这里的关键点在于,每个服务的/execute
接口可能会失败,而/compensate
接口必须被设计成高度可靠且幂等。
2. API网关:Saga协调器
这是后端的“大脑”,负责驱动整个流程。
// file: gateway.js
const express = require('express');
const { Pool } = require('pg');
const axios = require('axios'); // For making requests to other services
const { v4: uuidv4 } = require('uuid');
const pool = new Pool({ connectionString: process.env.DATABASE_URL });
const app = express();
app.use(express.json());
// 定义Saga流程
const CREATE_ORDER_SAGA = {
name: 'CREATE_ORDER',
steps: [
{ name: 'create_order', serviceUrl: 'http://localhost:3001/order' },
{ name: 'deduct_points', serviceUrl: 'http://localhost:3002/points' },
// ... more steps like lock_inventory, issue_coupon
]
};
// 启动Saga的API入口
app.post('/api/create-order', async (req, res) => {
const payload = req.body;
const client = await pool.connect();
try {
// 1. 创建Saga日志,持久化意图
const result = await client.query(
'INSERT INTO saga_logs (saga_name, status, payload) VALUES ($1, $2, $3) RETURNING id',
[CREATE_ORDER_SAGA.name, 'PENDING', payload]
);
const sagaId = result.rows[0].id;
// 2. 立即返回202 Accepted,表示请求已被接受,将异步处理
// 这是为了应对客户端超时,并告知客户端Saga已启动
res.status(202).json({ sagaId, message: 'Saga process started.' });
// 3. 异步执行Saga流程
executeSaga(sagaId, payload);
} catch (error) {
console.error('Failed to start saga:', error);
res.status(500).json({ message: 'Internal Server Error' });
} finally {
client.release();
}
});
async function executeSaga(sagaId, payload) {
const sagaDefinition = CREATE_ORDER_SAGA;
let currentStep = 0;
try {
for (const step of sagaDefinition.steps) {
currentStep++;
await pool.query('UPDATE saga_logs SET current_step = $1 WHERE id = $2', [currentStep, sagaId]);
// 调用正向操作
await axios.post(`${step.serviceUrl}/execute`, payload);
}
// 所有步骤成功
await pool.query('UPDATE saga_logs SET status = $1 WHERE id = $2', ['SUCCEEDED', sagaId]);
console.log(`Saga ${sagaId} SUCCEEDED`);
} catch (error) {
console.error(`Saga ${sagaId} FAILED at step ${currentStep}:`, error.response?.data || error.message);
await pool.query('UPDATE saga_logs SET status = $1 WHERE id = $2', ['FAILED', sagaId]);
// 启动补偿流程
await compensateSaga(sagaId, payload, currentStep - 1);
}
}
async function compensateSaga(sagaId, payload, lastSuccessfulStep) {
const sagaDefinition = CREATE_ORDER_SAGA;
await pool.query('UPDATE saga_logs SET status = $1 WHERE id = $2', ['COMPENSATING', sagaId]);
console.log(`Starting compensation for saga ${sagaId} from step ${lastSuccessfulStep}`);
try {
for (let i = lastSuccessfulStep - 1; i >= 0; i--) {
const step = sagaDefinition.steps[i];
// 调用补偿操作
await axios.post(`${step.serviceUrl}/compensate`, payload);
}
await pool.query('UPDATE saga_logs SET status = $1 WHERE id = $2', ['COMPENSATED', sagaId]);
console.log(`Saga ${sagaId} COMPENSATED`);
} catch (error) {
console.error(`CRITICAL: Compensation for saga ${sagaId} failed. Manual intervention required.`);
// 这里的失败是灾难性的,需要引入告警机制
}
}
// ... server listening logic ...
这个协调器的实现有几个要点:
- 入口点分离:
/api/create-order
接口只负责快速地创建Saga记录并返回202
,真正的执行是异步的。这避免了客户端长时间等待。 - 状态持久化:每一步执行前后,都会更新
saga_logs
表。如果协调器在executeSaga
函数执行中途崩溃,重启后可以通过查询PENDING
状态的Saga来恢复流程。 - 补偿逻辑:当任何一步失败,
compensateSaga
会从最后一个成功的步骤开始,逆序执行补偿操作。
客户端实现:Service Worker与Chakra UI的协同
现在,我们将战线转移到客户端,这是保证“离线可用”的核心。
1. Service Worker:离线能力的基石
Service Worker的注册过程是标准的,这里不再赘述。核心在于fetch
事件拦截和sync
事件处理。
// file: public/sw.js
// 使用一个简单的库来操作IndexedDB
import { openDB } from 'idb';
const DB_NAME = 'saga-requests';
const STORE_NAME = 'sync-queue';
// 初始化IndexedDB
const dbPromise = openDB(DB_NAME, 1, {
upgrade(db) {
if (!db.objectStoreNames.contains(STORE_NAME)) {
db.createObjectStore(STORE_NAME, { keyPath: 'id', autoIncrement: true });
}
},
});
// 拦截特定API请求
self.addEventListener('fetch', (event) => {
const url = new URL(event.request.url);
if (event.request.method === 'POST' && url.pathname === '/api/create-order') {
event.respondWith(handleApiRequest(event));
}
});
async function handleApiRequest(event) {
const requestClone = event.request.clone();
const body = await requestClone.json();
try {
// 直接尝试发送请求
const response = await fetch(requestClone);
// 如果服务器正常受理,直接返回响应
if (response.status === 202) {
sendMessageToClient({ type: 'SAGA_ACCEPTED', payload: body });
return response;
}
// 对于其他错误,也走离线逻辑
throw new Error('Server error, queuing for background sync.');
} catch (error) {
console.log('Fetch failed, queuing for background sync.');
// 网络错误或服务器错误,将请求存入IndexedDB并注册后台同步
await queueRequestForSync(event.request);
// 立即向客户端反馈,告知已进入离线模式
sendMessageToClient({ type: 'SAGA_QUEUED_OFFLINE', payload: body });
// 返回一个模拟的成功响应,让UI可以进入pending状态
// 这是一个设计权衡,欺骗UI,让用户以为提交成功了。
return new Response(JSON.stringify({ message: 'Request queued for background sync.' }), {
status: 202, // 模拟Accepted状态
headers: { 'Content-Type': 'application/json' },
});
}
}
// 将请求存入IndexedDB
async function queueRequestForSync(request) {
const db = await dbPromise;
const body = await request.json();
const headers = {};
for (const [key, value] of request.headers.entries()) {
headers[key] = value;
}
await db.add(STORE_NAME, {
url: request.url,
method: request.method,
body,
headers,
});
// 注册一个名为 'saga-sync' 的后台同步事件
await self.registration.sync.register('saga-sync');
}
// 处理后台同步事件
self.addEventListener('sync', (event) => {
if (event.tag === 'saga-sync') {
event.waitUntil(processSyncQueue());
}
});
async function processSyncQueue() {
const db = await dbPromise;
const requests = await db.getAll(STORE_NAME);
for (const req of requests) {
try {
const response = await fetch(req.url, {
method: req.method,
headers: req.headers,
body: JSON.stringify(req.body),
});
if (response.ok) {
// 发送成功,从队列中删除
await db.delete(STORE_NAME, req.id);
sendMessageToClient({ type: 'SAGA_SYNC_SUCCESS', payload: req.body });
} else {
// 如果服务器返回错误,可能需要更复杂的重试策略
console.error('Sync failed with server error:', response.status);
}
} catch (error) {
// 网络依然不通,请求会保留在队列中,等待下一次sync事件
console.log('Sync failed, will retry later.');
// 确保waitUntil不会因为这个错误而提前结束
// 这样sync事件会稍后重试
throw error;
}
}
}
// 与客户端UI通信
async function sendMessageToClient(message) {
const clients = await self.clients.matchAll();
for (const client of clients) {
client.postMessage(message);
}
}
这段代码是客户端韧性的核心。它将用户操作的意图(fetch
请求)从一个瞬时的网络行为,转变成一个持久化的、保证最终会被执行的任务。
2. Chakra UI 前端组件
前端需要监听来自Service Worker的消息,并用Chakra UI的组件来优雅地展示状态。
// file: components/OrderForm.js
import { useState, useEffect } from 'react';
import {
Box,
Button,
FormControl,
FormLabel,
Input,
VStack,
useToast,
Spinner,
Alert,
AlertIcon,
} from '@chakra-ui/react';
function OrderForm() {
const [isLoading, setIsLoading] = useState(false);
const [status, setStatus] = useState('idle'); // idle, pending, offline, success, failed
const toast = useToast();
// 监听来自Service Worker的消息
useEffect(() => {
const handleMessage = (event) => {
const { type, payload } = event.data;
if (type === 'SAGA_ACCEPTED') {
setStatus('pending');
toast({
title: '订单已提交',
description: '正在后台处理您的订单,您可以安全关闭此页面。',
status: 'info',
duration: 9000,
isClosable: true,
});
} else if (type === 'SAGA_QUEUED_OFFLINE') {
setStatus('offline');
toast({
title: '网络中断',
description: '您的请求已保存,网络恢复后将自动提交。',
status: 'warning',
duration: 9000,
isClosable: true,
});
}
// 在实际应用中,还需要一个WebSocket或轮询机制来获取Saga的最终成功或失败状态
};
navigator.serviceWorker.addEventListener('message', handleMessage);
return () => {
navigator.serviceWorker.removeEventListener('message', handleMessage);
};
}, [toast]);
const handleSubmit = async (event) => {
event.preventDefault();
setIsLoading(true);
setStatus('pending');
const payload = {
userId: 'user-id-with-sufficient-points', // or 'user-id-with-insufficient-points' to test failure
productId: 'prod-123',
amount: 99.99,
};
try {
// 发出的请求会被Service Worker拦截
await fetch('/api/create-order', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
});
// 注意:即使离线,由于SW的respondWith,这里可能不会抛出错误
} catch (error) {
// 这个catch块可能永远不会执行
console.error('This should not happen if SW is active:', error);
} finally {
setIsLoading(false);
}
};
return (
<Box p={8} maxWidth="500px" borderWidth={1} borderRadius={8} boxShadow="lg">
<form onSubmit={handleSubmit}>
<VStack spacing={4}>
<FormControl isRequired>
<FormLabel>产品ID</FormLabel>
<Input defaultValue="prod-123" isReadOnly />
</FormControl>
{/* ... 其他表单字段 ... */}
{status === 'offline' && (
<Alert status='warning'>
<AlertIcon />
网络连接已断开。您的请求已暂存,将在恢复连接后自动处理。
</Alert>
)}
<Button
type="submit"
colorScheme="teal"
width="full"
isLoading={isLoading || status === 'pending'}
loadingText="提交中..."
>
确认下单
</Button>
</VStack>
</form>
</Box>
);
}
这个UI组件通过监听Service Worker的message
事件来更新自身状态,为用户提供了非常清晰的即时反馈。无论网络状况如何,用户的点击操作都会得到一个确定的响应。
局限性与未来迭代方向
这个方案虽然解决了核心问题,但在生产环境中应用,还需考虑以下几点:
最终状态通知:当前实现中,前端只知道Saga被受理或离线暂存。但Saga最终是成功还是失败,前端是不知道的。在真实项目中,需要一套通知机制,比如WebSocket或服务器发送事件(SSE),当Saga状态在数据库中变为
SUCCEEDED
或COMPENSATED
时,主动推送消息给客户端。协调器的健壮性:我们的协调器是单点的。虽然它本身是无状态的(状态在CockroachDB),但如果进程崩溃,需要有机制(如Kubernetes的自动重启)来恢复它。此外,还需要一个恢复任务,在协调器启动时,扫描数据库中所有处于
PENDING
或COMPENSATING
状态的Saga,并重新驱动它们。幂等性保障:Service Worker的后台同步可能会重试请求。API网关和下游服务必须对请求做幂等性处理。一个常见的做法是,在客户端生成一个唯一的请求ID(
request-uuid
),在Saga的生命周期中传递它,并在服务端进行检查。补偿逻辑的复杂性:我们演示的补偿逻辑很简单。但真实的补偿操作可能非常复杂,比如,如果优惠券已经被用户使用,该如何补偿?这涉及到业务层面的设计,有时甚至需要人工介入。补偿失败的告警和处理流程必须是运维体系的一部分。
这套架构的真正价值在于,它将系统的韧性边界从服务器端扩展到了用户的设备上,构建了一个真正意义上的端到端的、对网络故障有极强容忍度的业务流程处理系统。它承认并拥抱了分布式环境的不可靠性,并通过分层的持久化和状态机,最终实现了业务的一致性。