构建可观测的两阶段提交协调器及其在混沌测试下的韧性验证


一个需要跨越两个独立服务(OrderServiceInventoryService)的原子操作需求,最终把我们推向了自研一个轻量级两阶段提交(2PC)协调器的道路上。业务场景很简单:创建订单必须成功扣减库存,反之亦然,这是一个典型的“要么全成功,要么全失败”的分布式事务问题。放弃成熟的Seata或GTS等框架,并非出于傲慢,而是因为当前场景下的事务参与方和逻辑都极为固定,我们希望获得对超时、重试和故障排查日志的完全控制权,而不是引入一个庞大的、配置复杂的通用解决方案。

首要任务是定义协调器的状态机。这是整个2PC协议的核心,任何逻辑上的瑕疵都会导致数据不一致。

stateDiagram-v2
    direction LR
    [*] --> PENDING: Start Transaction

    PENDING --> VOTING: Send Prepare
    note right of PENDING: 协调器等待所有参与者就绪
    VOTING --> PREPARED: All Voted Yes
    note right of VOTING: 等待所有参与者投票
任何一个No或超时
都将转入ABORTING VOTING --> ABORTING: One Voted No / Timeout PREPARED --> COMMITTING: Send Commit COMMITTING --> COMMITTED: All Ack Commit note left of COMMITTED: 事务成功 ABORTING --> ABORTED: All Ack Abort note left of ABORTED: 事务失败 PREPARED --> ABORTING: Commit Failure COMMITTING --> ABORTING: Ack Timeout

基于这个状态图,我们用Go语言来定义协调器的核心数据结构和状态。

// pkg/coordinator/coordinator.go

package coordinator

import (
	"context"
	"sync"
	"time"
	"log"

	"github.com/getsentry/sentry-go"
	"github.com/google/uuid"
)

// TransactionState 定义了事务的各个状态
type TransactionState string

const (
	StatePending   TransactionState = "PENDING"
	StateVoting    TransactionState = "VOTING"
	StatePrepared  TransactionState = "PREPARED"
	StateCommitting TransactionState = "COMMITTING"
	StateCommitted TransactionState = "COMMITTED"
	StateAborting  TransactionState = "ABORTING"
	StateAborted   TransactionState = "ABORTED"
)

// Participant 定义了事务参与者的接口
// 在真实项目中,这会是一个RPC客户端
type Participant interface {
	Prepare(ctx context.Context, txID string) (bool, error)
	Commit(ctx context.Context, txID string) error
	Abort(ctx context.Context, txID string) error
	ID() string // 返回参与者的唯一标识
}

// Transaction 存储一个分布式事务的完整状态
type Transaction struct {
	ID           string
	State        TransactionState
	Participants []Participant
	// 使用map来记录每个参与者的投票结果
	Votes        map[string]bool 
	mu           sync.Mutex
}

// Coordinator 负责管理和驱动事务状态的流转
type Coordinator struct {
	transactions map[string]*Transaction
	mu           sync.RWMutex
	// 定义各阶段的超时时间,这在生产环境中至关重要
	voteTimeout    time.Duration
	commitTimeout  time.Duration
}

// NewCoordinator 创建一个新的协调器实例
func NewCoordinator(voteTimeout, commitTimeout time.Duration) *Coordinator {
	return &Coordinator{
		transactions:  make(map[string]*Transaction),
		voteTimeout:   voteTimeout,
		commitTimeout: commitTimeout,
	}
}

// Begin 启动一个新的事务
func (c *Coordinator) Begin(participants ...Participant) (*Transaction, error) {
	txID := uuid.NewString()
	tx := &Transaction{
		ID:           txID,
		State:        StatePending,
		Participants: participants,
		Votes:        make(map[string]bool),
	}

	c.mu.Lock()
	c.transactions[txID] = tx
	c.mu.Unlock()

	log.Printf("[INFO] Transaction %s began with %d participants", txID, len(participants))
	
	// 异步执行事务,避免阻塞调用方
	go c.execute(tx)

	return tx, nil
}

设计的核心在于 execute 方法,它驱动整个事务流程。这里的错误处理和状态转换是系统的关键,也是最容易出错的地方。

// pkg/coordinator/executor.go

package coordinator

import (
	"context"
	"fmt"
	"log"
	"sync"

	"github.com/getsentry/sentry-go"
)

func (c *Coordinator) execute(tx *Transaction) {
	// 阶段一:投票 (Voting)
	tx.mu.Lock()
	tx.State = StateVoting
	tx.mu.Unlock()

	log.Printf("[INFO] Transaction %s entering VOTING phase", tx.ID)

	// 使用带有超时的context来控制投票阶段
	voteCtx, voteCancel := context.WithTimeout(context.Background(), c.voteTimeout)
	defer voteCancel()

	decision := c.collectVotes(voteCtx, tx)

	// 阶段二:决策与执行 (Commit/Abort)
	if decision {
		log.Printf("[INFO] Transaction %s decision: COMMIT", tx.ID)
		c.performCommit(tx)
	} else {
		log.Printf("[WARN] Transaction %s decision: ABORT", tx.ID)
		// 当决策为ABORT时,这是一个关键的监控点
		// 我们不仅要记录日志,更要将这个事件上报给Sentry
		sentry.WithScope(func(scope *sentry.Scope) {
			scope.SetTag("transaction_id", tx.ID)
			scope.SetTag("decision_phase", "VOTING")
			scope.SetLevel(sentry.LevelWarning)
			
			// 附加更丰富的上下文信息
			participantIDs := []string{}
			for _, p := range tx.Participants {
				participantIDs = append(participantIDs, p.ID())
			}
			scope.SetContext("transaction_details", map[string]interface{}{
				"participants": participantIDs,
				"votes_received": tx.Votes,
			})
			
			sentry.CaptureMessage(fmt.Sprintf("Transaction %s aborted due to failed vote or timeout", tx.ID))
		})
		c.performAbort(tx)
	}
}

// collectVotes 并发地向所有参与者请求投票
func (c *Coordinator) collectVotes(ctx context.Context, tx *Transaction) bool {
	var wg sync.WaitGroup
	voteChan := make(chan bool, len(tx.Participants))

	for _, p := range tx.Participants {
		wg.Add(1)
		go func(participant Participant) {
			defer wg.Done()
			
			// 为每个参与者准备带有Sentry Hub的上下文
			hub := sentry.CurrentHub().Clone()
			ctxWithSentry := sentry.SetHubOnContext(ctx, hub)
			hub.Scope().SetTag("participant_id", participant.ID())

			ok, err := participant.Prepare(ctxWithSentry, tx.ID)
			if err != nil {
				log.Printf("[ERROR] Participant %s failed to prepare for tx %s: %v", participant.ID(), tx.ID, err)
				// 错误也是一种 "No" 投票
				voteChan <- false
				return
			}

			tx.mu.Lock()
			tx.Votes[participant.ID()] = ok
			tx.mu.Unlock()
			voteChan <- ok
		}(p)
	}

	wg.Wait()
	close(voteChan)

	// 检查投票结果
	for vote := range voteChan {
		if !vote {
			return false // 任何一个参与者投了反对票,则整体失败
		}
	}
	
	tx.mu.Lock()
	tx.State = StatePrepared
	tx.mu.Unlock()
	return true
}

func (c *Coordinator) performCommit(tx *Transaction) {
	// ... 实现Commit逻辑
}

func (c *Coordinator) performAbort(tx *Transaction) {
	// ... 实现Abort逻辑
}

在编写 performCommitperformAbort 之前,停下来思考测试策略是绝对必要的。对于这种状态驱动的复杂逻辑,单元测试是第一道,也是最重要的防线。如果单元测试覆盖不全,任何集成测试或混沌测试都将是建立在流沙之上。

我们使用表驱动测试(Table-Driven Tests)来覆盖所有可能的状态转换路径。这里的关键是模拟 Participant 的行为,包括成功、失败、甚至超时。

// pkg/coordinator/coordinator_test.go

package coordinator

import (
	"context"
	"errors"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
)

// MockParticipant 用于模拟事务参与者的行为
type MockParticipant struct {
	id          string
	prepareVote bool
	prepareErr  error
	prepareLag  time.Duration // 模拟网络延迟或处理耗时
	commitErr   error
	abortErr    error
	
	// 用于断言方法是否被调用
	prepareCalled bool
	commitCalled  bool
	abortCalled   bool
	mu            sync.Mutex
}

func (m *MockParticipant) Prepare(ctx context.Context, txID string) (bool, error) {
	m.mu.Lock()
	m.prepareCalled = true
	m.mu.Unlock()

	if m.prepareLag > 0 {
		select {
		case <-time.After(m.prepareLag):
		case <-ctx.Done():
			return false, ctx.Err() // 尊重上下文的超时
		}
	}
	return m.prepareVote, m.prepareErr
}

func (m *MockParticipant) Commit(ctx context.Context, txID string) error {
	m.mu.Lock()
	m.commitCalled = true
	m.mu.Unlock()
	return m.commitErr
}

func (m *MockParticipant) Abort(ctx context.Context, txID string) error {
	m.mu.Lock()
	m.abortCalled = true
	m.mu.Unlock()
	return m.abortErr
}

func (m *MockParticipant) ID() string {
	return m.id
}

func TestCoordinatorExecutionFlow(t *testing.T) {
	tests := []struct {
		name              string
		participants      []*MockParticipant
		voteTimeout       time.Duration
		expectCommit      bool
		expectAbort       bool
		finalState        TransactionState
	}{
		{
			name: "Happy Path - All Participants Vote Yes",
			participants: []*MockParticipant{
				{id: "p1", prepareVote: true},
				{id: "p2", prepareVote: true},
			},
			voteTimeout:  100 * time.Millisecond,
			expectCommit: true,
			expectAbort:  false,
			finalState:   StateCommitted,
		},
		{
			name: "One Participant Votes No",
			participants: []*MockParticipant{
				{id: "p1", prepareVote: true},
				{id: "p2", prepareVote: false}, // p2 投反对票
			},
			voteTimeout:  100 * time.Millisecond,
			expectCommit: false,
			expectAbort:  true,
			finalState:   StateAborted,
		},
		{
			name: "One Participant Fails to Prepare",
			participants: []*MockParticipant{
				{id: "p1", prepareVote: true},
				{id: "p2", prepareErr: errors.New("database connection lost")},
			},
			voteTimeout:  100 * time.Millisecond,
			expectCommit: false,
			expectAbort:  true,
			finalState:   StateAborted,
		},
		{
			name: "Participant Times Out During Voting",
			participants: []*MockParticipant{
				{id: "p1", prepareVote: true},
				{id: "p2", prepareVote: true, prepareLag: 150 * time.Millisecond}, // 延迟超过超时时间
			},
			voteTimeout:  50 * time.Millisecond, // 设置一个非常短的超时
			expectCommit: false,
			expectAbort:  true,
			finalState:   StateAborted,
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			// 为了测试,我们需要一个同步的方式来知道事务何时结束
			// 在真实实现中是异步的,但测试需要确定性
			var participants []Participant
			for _, p := range tt.participants {
				participants = append(participants, p)
			}
			
			// 注意:这里的 coordinator.execute 是私有方法
			// 严格的单元测试应该只测试公有接口 Begin()
			// 但为了演示核心逻辑,我们直接调用 execute
			// 更好的实践是通过一个回调或channel来同步等待结果
			c := NewCoordinator(tt.voteTimeout, 100*time.Millisecond)
			tx := &Transaction{
				ID: "test-tx",
				State: StatePending,
				Participants: participants,
				Votes: make(map[string]bool),
			}

			c.execute(tx) // 直接驱动状态机
			
			// 等待一小段时间让goroutine完成
			time.Sleep(tt.voteTimeout + 200*time.Millisecond)

			// 断言
			for _, p := range tt.participants {
				assert.True(t, p.prepareCalled, "participant %s Prepare should be called", p.id)
				assert.Equal(t, tt.expectCommit, p.commitCalled, "participant %s Commit call status incorrect", p.id)
				assert.Equal(t, tt.expectAbort, p.abortCalled, "participant %s Abort call status incorrect", p.id)
			}
			assert.Equal(t, tt.finalState, tx.State, "final transaction state is incorrect")
		})
	}
}

(为保持篇幅,performCommitperformAbort 的具体实现及测试在此省略,其逻辑与 collectVotes 类似,均为并发调用参与者接口并处理错误和超时。)

单元测试通过后,我们才有了在更真实环境中验证它的信心。集成了Sentry SDK,并配置了DSN,真正的挑战开始了:混沌测试。单元测试验证的是逻辑的确定性,而混沌测试验证的是系统在非确定性故障下的韧性。

我们的策略很简单:在MockParticipant的基础上,引入一个“故障注入器”(Fault Injector)。

// pkg/chaos/injector.go

package chaos

import (
	"math/rand"
	"time"
)

// FaultInjector 决定是否以及如何注入故障
type FaultInjector struct {
	// 启用故障注入
	Enabled bool
	// 操作失败的概率 (0.0 to 1.0)
	FailureRate float64
	// 引入延迟的概率
	LatencyRate float64
	// 延迟时间的上限
	MaxLatency time.Duration
}

// ShouldFail 判断本次操作是否应该失败
func (fi *FaultInjector) ShouldFail() bool {
	if !fi.Enabled {
		return false
	}
	return rand.Float64() < fi.FailureRate
}

// InjectLatency 如果需要,则注入延迟
func (fi *FaultInjector) InjectLatency() {
	if !fi.Enabled || rand.Float64() >= fi.LatencyRate {
		return
	}
	latency := time.Duration(rand.Int63n(int64(fi.MaxLatency)))
	time.Sleep(latency)
}

我们将这个故障注入器集成到 Participant 的一个实现中,这个实现可以代理到真实的RPC客户端。

// pkg/participant/chaotic_participant.go

package participant

import (
	"context"
	"errors"
	"log"

	"github.com/my-app/pkg/chaos"
	"github.com/my-app/pkg/coordinator"
)

// ChaoticParticipant 包装一个真实的参与者,并注入故障
type ChaoticParticipant struct {
	realParticipant coordinator.Participant
	injector        *chaos.FaultInjector
}

func NewChaoticParticipant(real coordinator.Participant, injector *chaos.FaultInjector) *ChaoticParticipant {
	return &ChaoticParticipant{
		realParticipant: real,
		injector:        injector,
	}
}

func (p *ChaoticParticipant) Prepare(ctx context.Context, txID string) (bool, error) {
	p.injector.InjectLatency()
	if p.injector.ShouldFail() {
		log.Printf("[CHAOS] Injecting failure in Prepare for participant %s, tx %s", p.ID(), txID)
		return false, errors.New("chaos: injected Prepare failure")
	}
	return p.realParticipant.Prepare(ctx, txID)
}
// Commit 和 Abort 的实现类似...

func (p *ChaoticParticipant) ID() string {
	return p.realParticipant.ID()
}

现在,我们可以启动一个测试脚本,该脚本在循环中创建事务,并使用 ChaoticParticipant

// cmd/chaos_runner/main.go

func main() {
	// 初始化 Sentry
	err := sentry.Init(sentry.ClientOptions{
		Dsn: "YOUR_SENTRY_DSN_HERE",
		TracesSampleRate: 1.0,
		Environment: "chaos-test",
		Release: "[email protected]",
	})
	if err != nil {
		log.Fatalf("Sentry initialization failed: %v", err)
	}
	defer sentry.Flush(2 * time.Second)

	coord := coordinator.NewCoordinator(2*time.Second, 3*time.Second)

	// 定义真实的参与者 (可以是连接到测试数据库的真实服务)
	p1 := NewRealParticipant("order-service")
	p2 := NewRealParticipant("inventory-service")

	// 定义故障注入策略
	injector := &chaos.FaultInjector{
		Enabled:     true,
		FailureRate: 0.2, // 20% 的概率操作会失败
		LatencyRate: 0.3, // 30% 的概率会注入延迟
		MaxLatency:  2500 * time.Millisecond, // 最高延迟2.5秒,可能会触发协调器2秒的投票超时
	}

	// 用混沌代理包装真实参与者
	chaoticP1 := participant.NewChaoticParticipant(p1, injector)
	chaoticP2 := participant.NewChaoticParticipant(p2, injector)

	log.Println("Starting chaos test run...")
	for i := 0; i < 100; i++ {
		log.Printf("--- Iteration %d ---", i+1)
		coord.Begin(chaoticP1, chaoticP2)
		time.Sleep(1 * time.Second)
	}
	log.Println("Chaos test run finished.")
}

运行这个脚本后,Sentry后台开始出现我们期望看到的事件。其中一个典型的事件标题是 Transaction [some-uuid] aborted due to failed vote or timeout。点开详情,我们在TagsAdditional Data中看到了通过sentry.WithScope添加的宝贵上下文:

  • transaction_id: a1b2c3d4-....
  • decision_phase: VOTING
  • transaction_details:
    • participants: ["order-service", "inventory-service"]
    • votes_received: {"order-service": true} (这表明 inventory-service 可能超时了,因为没有它的投票记录)

这个Sentry事件远比一条简单的日志"transaction aborted"更有价值。它清晰地指出了哪个事务、在哪个阶段、因为什么原因(通过对比参与者列表和已收到的投票)失败。当故障是由于随机延迟注入导致超时时,这种上下文信息对于定位问题根源至关重要。我们甚至可以根据participant_id标签对Sentry事件进行分组,快速发现某个特定服务是故障的主要来源。

这个从零构建、单元测试、再到混沌验证的过程,让我们对这个小小的2PC协调器的行为边界有了极大的信心。它并非一个通用的、万能的分布式事务解决方案。它的局限性非常明显:

  1. 同步阻塞: 整个协议在投票和提交阶段是阻塞的,参与者需要锁定资源直到第二阶段完成。这在高并发、低延迟场景下可能成为瓶颈。
  2. 协调器单点故障 (SPOF): 当前实现中,协调器本身是单点。如果协调器在PREPARED状态后崩溃,所有参与者将永久阻塞。生产级实现需要为协调器引入高可用机制,比如使用Raft协议构建协调器集群。
  3. 缺乏持久化: 如果协调器进程重启,所有进行中的事务状态都会丢失。一个健壮的实现必须将事务状态变更写入持久化的日志(如WAL)中。

尽管有这些局限,但对于我们特定的、低频但一致性要求高的场景,这个方案在可控性、可观测性和资源开销之间取得了恰当的平衡。未来的迭代路径也很清晰:首先为协调器增加基于磁盘的事务日志,以解决重启恢复问题。


  目录