一个需要跨越两个独立服务(OrderService
和 InventoryService
)的原子操作需求,最终把我们推向了自研一个轻量级两阶段提交(2PC)协调器的道路上。业务场景很简单:创建订单必须成功扣减库存,反之亦然,这是一个典型的“要么全成功,要么全失败”的分布式事务问题。放弃成熟的Seata或GTS等框架,并非出于傲慢,而是因为当前场景下的事务参与方和逻辑都极为固定,我们希望获得对超时、重试和故障排查日志的完全控制权,而不是引入一个庞大的、配置复杂的通用解决方案。
首要任务是定义协调器的状态机。这是整个2PC协议的核心,任何逻辑上的瑕疵都会导致数据不一致。
stateDiagram-v2 direction LR [*] --> PENDING: Start Transaction PENDING --> VOTING: Send Prepare note right of PENDING: 协调器等待所有参与者就绪 VOTING --> PREPARED: All Voted Yes note right of VOTING: 等待所有参与者投票
任何一个No或超时
都将转入ABORTING VOTING --> ABORTING: One Voted No / Timeout PREPARED --> COMMITTING: Send Commit COMMITTING --> COMMITTED: All Ack Commit note left of COMMITTED: 事务成功 ABORTING --> ABORTED: All Ack Abort note left of ABORTED: 事务失败 PREPARED --> ABORTING: Commit Failure COMMITTING --> ABORTING: Ack Timeout
基于这个状态图,我们用Go语言来定义协调器的核心数据结构和状态。
// pkg/coordinator/coordinator.go
package coordinator
import (
"context"
"sync"
"time"
"log"
"github.com/getsentry/sentry-go"
"github.com/google/uuid"
)
// TransactionState 定义了事务的各个状态
type TransactionState string
const (
StatePending TransactionState = "PENDING"
StateVoting TransactionState = "VOTING"
StatePrepared TransactionState = "PREPARED"
StateCommitting TransactionState = "COMMITTING"
StateCommitted TransactionState = "COMMITTED"
StateAborting TransactionState = "ABORTING"
StateAborted TransactionState = "ABORTED"
)
// Participant 定义了事务参与者的接口
// 在真实项目中,这会是一个RPC客户端
type Participant interface {
Prepare(ctx context.Context, txID string) (bool, error)
Commit(ctx context.Context, txID string) error
Abort(ctx context.Context, txID string) error
ID() string // 返回参与者的唯一标识
}
// Transaction 存储一个分布式事务的完整状态
type Transaction struct {
ID string
State TransactionState
Participants []Participant
// 使用map来记录每个参与者的投票结果
Votes map[string]bool
mu sync.Mutex
}
// Coordinator 负责管理和驱动事务状态的流转
type Coordinator struct {
transactions map[string]*Transaction
mu sync.RWMutex
// 定义各阶段的超时时间,这在生产环境中至关重要
voteTimeout time.Duration
commitTimeout time.Duration
}
// NewCoordinator 创建一个新的协调器实例
func NewCoordinator(voteTimeout, commitTimeout time.Duration) *Coordinator {
return &Coordinator{
transactions: make(map[string]*Transaction),
voteTimeout: voteTimeout,
commitTimeout: commitTimeout,
}
}
// Begin 启动一个新的事务
func (c *Coordinator) Begin(participants ...Participant) (*Transaction, error) {
txID := uuid.NewString()
tx := &Transaction{
ID: txID,
State: StatePending,
Participants: participants,
Votes: make(map[string]bool),
}
c.mu.Lock()
c.transactions[txID] = tx
c.mu.Unlock()
log.Printf("[INFO] Transaction %s began with %d participants", txID, len(participants))
// 异步执行事务,避免阻塞调用方
go c.execute(tx)
return tx, nil
}
设计的核心在于 execute
方法,它驱动整个事务流程。这里的错误处理和状态转换是系统的关键,也是最容易出错的地方。
// pkg/coordinator/executor.go
package coordinator
import (
"context"
"fmt"
"log"
"sync"
"github.com/getsentry/sentry-go"
)
func (c *Coordinator) execute(tx *Transaction) {
// 阶段一:投票 (Voting)
tx.mu.Lock()
tx.State = StateVoting
tx.mu.Unlock()
log.Printf("[INFO] Transaction %s entering VOTING phase", tx.ID)
// 使用带有超时的context来控制投票阶段
voteCtx, voteCancel := context.WithTimeout(context.Background(), c.voteTimeout)
defer voteCancel()
decision := c.collectVotes(voteCtx, tx)
// 阶段二:决策与执行 (Commit/Abort)
if decision {
log.Printf("[INFO] Transaction %s decision: COMMIT", tx.ID)
c.performCommit(tx)
} else {
log.Printf("[WARN] Transaction %s decision: ABORT", tx.ID)
// 当决策为ABORT时,这是一个关键的监控点
// 我们不仅要记录日志,更要将这个事件上报给Sentry
sentry.WithScope(func(scope *sentry.Scope) {
scope.SetTag("transaction_id", tx.ID)
scope.SetTag("decision_phase", "VOTING")
scope.SetLevel(sentry.LevelWarning)
// 附加更丰富的上下文信息
participantIDs := []string{}
for _, p := range tx.Participants {
participantIDs = append(participantIDs, p.ID())
}
scope.SetContext("transaction_details", map[string]interface{}{
"participants": participantIDs,
"votes_received": tx.Votes,
})
sentry.CaptureMessage(fmt.Sprintf("Transaction %s aborted due to failed vote or timeout", tx.ID))
})
c.performAbort(tx)
}
}
// collectVotes 并发地向所有参与者请求投票
func (c *Coordinator) collectVotes(ctx context.Context, tx *Transaction) bool {
var wg sync.WaitGroup
voteChan := make(chan bool, len(tx.Participants))
for _, p := range tx.Participants {
wg.Add(1)
go func(participant Participant) {
defer wg.Done()
// 为每个参与者准备带有Sentry Hub的上下文
hub := sentry.CurrentHub().Clone()
ctxWithSentry := sentry.SetHubOnContext(ctx, hub)
hub.Scope().SetTag("participant_id", participant.ID())
ok, err := participant.Prepare(ctxWithSentry, tx.ID)
if err != nil {
log.Printf("[ERROR] Participant %s failed to prepare for tx %s: %v", participant.ID(), tx.ID, err)
// 错误也是一种 "No" 投票
voteChan <- false
return
}
tx.mu.Lock()
tx.Votes[participant.ID()] = ok
tx.mu.Unlock()
voteChan <- ok
}(p)
}
wg.Wait()
close(voteChan)
// 检查投票结果
for vote := range voteChan {
if !vote {
return false // 任何一个参与者投了反对票,则整体失败
}
}
tx.mu.Lock()
tx.State = StatePrepared
tx.mu.Unlock()
return true
}
func (c *Coordinator) performCommit(tx *Transaction) {
// ... 实现Commit逻辑
}
func (c *Coordinator) performAbort(tx *Transaction) {
// ... 实现Abort逻辑
}
在编写 performCommit
和 performAbort
之前,停下来思考测试策略是绝对必要的。对于这种状态驱动的复杂逻辑,单元测试是第一道,也是最重要的防线。如果单元测试覆盖不全,任何集成测试或混沌测试都将是建立在流沙之上。
我们使用表驱动测试(Table-Driven Tests)来覆盖所有可能的状态转换路径。这里的关键是模拟 Participant
的行为,包括成功、失败、甚至超时。
// pkg/coordinator/coordinator_test.go
package coordinator
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
// MockParticipant 用于模拟事务参与者的行为
type MockParticipant struct {
id string
prepareVote bool
prepareErr error
prepareLag time.Duration // 模拟网络延迟或处理耗时
commitErr error
abortErr error
// 用于断言方法是否被调用
prepareCalled bool
commitCalled bool
abortCalled bool
mu sync.Mutex
}
func (m *MockParticipant) Prepare(ctx context.Context, txID string) (bool, error) {
m.mu.Lock()
m.prepareCalled = true
m.mu.Unlock()
if m.prepareLag > 0 {
select {
case <-time.After(m.prepareLag):
case <-ctx.Done():
return false, ctx.Err() // 尊重上下文的超时
}
}
return m.prepareVote, m.prepareErr
}
func (m *MockParticipant) Commit(ctx context.Context, txID string) error {
m.mu.Lock()
m.commitCalled = true
m.mu.Unlock()
return m.commitErr
}
func (m *MockParticipant) Abort(ctx context.Context, txID string) error {
m.mu.Lock()
m.abortCalled = true
m.mu.Unlock()
return m.abortErr
}
func (m *MockParticipant) ID() string {
return m.id
}
func TestCoordinatorExecutionFlow(t *testing.T) {
tests := []struct {
name string
participants []*MockParticipant
voteTimeout time.Duration
expectCommit bool
expectAbort bool
finalState TransactionState
}{
{
name: "Happy Path - All Participants Vote Yes",
participants: []*MockParticipant{
{id: "p1", prepareVote: true},
{id: "p2", prepareVote: true},
},
voteTimeout: 100 * time.Millisecond,
expectCommit: true,
expectAbort: false,
finalState: StateCommitted,
},
{
name: "One Participant Votes No",
participants: []*MockParticipant{
{id: "p1", prepareVote: true},
{id: "p2", prepareVote: false}, // p2 投反对票
},
voteTimeout: 100 * time.Millisecond,
expectCommit: false,
expectAbort: true,
finalState: StateAborted,
},
{
name: "One Participant Fails to Prepare",
participants: []*MockParticipant{
{id: "p1", prepareVote: true},
{id: "p2", prepareErr: errors.New("database connection lost")},
},
voteTimeout: 100 * time.Millisecond,
expectCommit: false,
expectAbort: true,
finalState: StateAborted,
},
{
name: "Participant Times Out During Voting",
participants: []*MockParticipant{
{id: "p1", prepareVote: true},
{id: "p2", prepareVote: true, prepareLag: 150 * time.Millisecond}, // 延迟超过超时时间
},
voteTimeout: 50 * time.Millisecond, // 设置一个非常短的超时
expectCommit: false,
expectAbort: true,
finalState: StateAborted,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 为了测试,我们需要一个同步的方式来知道事务何时结束
// 在真实实现中是异步的,但测试需要确定性
var participants []Participant
for _, p := range tt.participants {
participants = append(participants, p)
}
// 注意:这里的 coordinator.execute 是私有方法
// 严格的单元测试应该只测试公有接口 Begin()
// 但为了演示核心逻辑,我们直接调用 execute
// 更好的实践是通过一个回调或channel来同步等待结果
c := NewCoordinator(tt.voteTimeout, 100*time.Millisecond)
tx := &Transaction{
ID: "test-tx",
State: StatePending,
Participants: participants,
Votes: make(map[string]bool),
}
c.execute(tx) // 直接驱动状态机
// 等待一小段时间让goroutine完成
time.Sleep(tt.voteTimeout + 200*time.Millisecond)
// 断言
for _, p := range tt.participants {
assert.True(t, p.prepareCalled, "participant %s Prepare should be called", p.id)
assert.Equal(t, tt.expectCommit, p.commitCalled, "participant %s Commit call status incorrect", p.id)
assert.Equal(t, tt.expectAbort, p.abortCalled, "participant %s Abort call status incorrect", p.id)
}
assert.Equal(t, tt.finalState, tx.State, "final transaction state is incorrect")
})
}
}
(为保持篇幅,performCommit
和 performAbort
的具体实现及测试在此省略,其逻辑与 collectVotes
类似,均为并发调用参与者接口并处理错误和超时。)
单元测试通过后,我们才有了在更真实环境中验证它的信心。集成了Sentry SDK,并配置了DSN,真正的挑战开始了:混沌测试。单元测试验证的是逻辑的确定性,而混沌测试验证的是系统在非确定性故障下的韧性。
我们的策略很简单:在MockParticipant
的基础上,引入一个“故障注入器”(Fault Injector)。
// pkg/chaos/injector.go
package chaos
import (
"math/rand"
"time"
)
// FaultInjector 决定是否以及如何注入故障
type FaultInjector struct {
// 启用故障注入
Enabled bool
// 操作失败的概率 (0.0 to 1.0)
FailureRate float64
// 引入延迟的概率
LatencyRate float64
// 延迟时间的上限
MaxLatency time.Duration
}
// ShouldFail 判断本次操作是否应该失败
func (fi *FaultInjector) ShouldFail() bool {
if !fi.Enabled {
return false
}
return rand.Float64() < fi.FailureRate
}
// InjectLatency 如果需要,则注入延迟
func (fi *FaultInjector) InjectLatency() {
if !fi.Enabled || rand.Float64() >= fi.LatencyRate {
return
}
latency := time.Duration(rand.Int63n(int64(fi.MaxLatency)))
time.Sleep(latency)
}
我们将这个故障注入器集成到 Participant
的一个实现中,这个实现可以代理到真实的RPC客户端。
// pkg/participant/chaotic_participant.go
package participant
import (
"context"
"errors"
"log"
"github.com/my-app/pkg/chaos"
"github.com/my-app/pkg/coordinator"
)
// ChaoticParticipant 包装一个真实的参与者,并注入故障
type ChaoticParticipant struct {
realParticipant coordinator.Participant
injector *chaos.FaultInjector
}
func NewChaoticParticipant(real coordinator.Participant, injector *chaos.FaultInjector) *ChaoticParticipant {
return &ChaoticParticipant{
realParticipant: real,
injector: injector,
}
}
func (p *ChaoticParticipant) Prepare(ctx context.Context, txID string) (bool, error) {
p.injector.InjectLatency()
if p.injector.ShouldFail() {
log.Printf("[CHAOS] Injecting failure in Prepare for participant %s, tx %s", p.ID(), txID)
return false, errors.New("chaos: injected Prepare failure")
}
return p.realParticipant.Prepare(ctx, txID)
}
// Commit 和 Abort 的实现类似...
func (p *ChaoticParticipant) ID() string {
return p.realParticipant.ID()
}
现在,我们可以启动一个测试脚本,该脚本在循环中创建事务,并使用 ChaoticParticipant
。
// cmd/chaos_runner/main.go
func main() {
// 初始化 Sentry
err := sentry.Init(sentry.ClientOptions{
Dsn: "YOUR_SENTRY_DSN_HERE",
TracesSampleRate: 1.0,
Environment: "chaos-test",
Release: "[email protected]",
})
if err != nil {
log.Fatalf("Sentry initialization failed: %v", err)
}
defer sentry.Flush(2 * time.Second)
coord := coordinator.NewCoordinator(2*time.Second, 3*time.Second)
// 定义真实的参与者 (可以是连接到测试数据库的真实服务)
p1 := NewRealParticipant("order-service")
p2 := NewRealParticipant("inventory-service")
// 定义故障注入策略
injector := &chaos.FaultInjector{
Enabled: true,
FailureRate: 0.2, // 20% 的概率操作会失败
LatencyRate: 0.3, // 30% 的概率会注入延迟
MaxLatency: 2500 * time.Millisecond, // 最高延迟2.5秒,可能会触发协调器2秒的投票超时
}
// 用混沌代理包装真实参与者
chaoticP1 := participant.NewChaoticParticipant(p1, injector)
chaoticP2 := participant.NewChaoticParticipant(p2, injector)
log.Println("Starting chaos test run...")
for i := 0; i < 100; i++ {
log.Printf("--- Iteration %d ---", i+1)
coord.Begin(chaoticP1, chaoticP2)
time.Sleep(1 * time.Second)
}
log.Println("Chaos test run finished.")
}
运行这个脚本后,Sentry后台开始出现我们期望看到的事件。其中一个典型的事件标题是 Transaction [some-uuid] aborted due to failed vote or timeout
。点开详情,我们在Tags
和Additional Data
中看到了通过sentry.WithScope
添加的宝贵上下文:
-
transaction_id
:a1b2c3d4-....
-
decision_phase
:VOTING
transaction_details
:-
participants
:["order-service", "inventory-service"]
-
votes_received
:{"order-service": true}
(这表明inventory-service
可能超时了,因为没有它的投票记录)
-
这个Sentry事件远比一条简单的日志"transaction aborted"
更有价值。它清晰地指出了哪个事务、在哪个阶段、因为什么原因(通过对比参与者列表和已收到的投票)失败。当故障是由于随机延迟注入导致超时时,这种上下文信息对于定位问题根源至关重要。我们甚至可以根据participant_id
标签对Sentry事件进行分组,快速发现某个特定服务是故障的主要来源。
这个从零构建、单元测试、再到混沌验证的过程,让我们对这个小小的2PC协调器的行为边界有了极大的信心。它并非一个通用的、万能的分布式事务解决方案。它的局限性非常明显:
- 同步阻塞: 整个协议在投票和提交阶段是阻塞的,参与者需要锁定资源直到第二阶段完成。这在高并发、低延迟场景下可能成为瓶颈。
- 协调器单点故障 (SPOF): 当前实现中,协调器本身是单点。如果协调器在
PREPARED
状态后崩溃,所有参与者将永久阻塞。生产级实现需要为协调器引入高可用机制,比如使用Raft协议构建协调器集群。 - 缺乏持久化: 如果协调器进程重启,所有进行中的事务状态都会丢失。一个健壮的实现必须将事务状态变更写入持久化的日志(如WAL)中。
尽管有这些局限,但对于我们特定的、低频但一致性要求高的场景,这个方案在可控性、可观测性和资源开销之间取得了恰当的平衡。未来的迭代路径也很清晰:首先为协调器增加基于磁盘的事务日志,以解决重启恢复问题。