构建基于 ZeroMQ 与 Cassandra 的 Monorepo 高吞吐量异步日志管道


当 Monorepo 内的 Go 微服务数量超过一百个,并且核心业务流量开始指数级增长时,一个过去被忽视的问题浮出水面:日志。传统的同步日志写入(无论是写入 stdout/stderr 还是文件)在高并发下开始显现其 I/O 瓶颈,直接拖慢了核心业务的响应时间。一个请求链路可能横跨5-10个服务,每个服务打印数条日志,导致磁盘 I/O 竞争和 CPU 上下文切换成本变得无法接受。我们需要一个日志解决方案,它必须是非阻塞的、高吞吐的,并且能与我们现有的技术栈深度整合。

定义问题与架构权衡

核心诉求非常明确:应用服务必须与日志写入的 I/O 过程完全解耦。应用进程的职责是“生产”日志,然后以近乎零成本的方式将其“发射”出去,后续的收集、处理、存储都不能对主业务线程产生任何性能影响。

方案A:业界标准 - ELK/EFK 栈

这是最常见的解决方案。应用将日志打印到 stdout,由部署在同一节点上的 Fluentd 或 Logstash Agent 收集,然后转发给 Elasticsearch 集群。

  • 优势: 生态成熟,拥有强大的查询和可视化能力(Kibana)。社区支持广泛。
  • 劣势:
    1. 性能回压: 当日志产生速率超过 Agent 处理或 Elasticsearch 接收能力时,回压机制可能最终导致应用进程的 stdout 管道阻塞,问题又回到了原点。
    2. 资源争抢: Agent 本身消耗 CPU 和内存,在资源敏感型的服务节点上,这种争抢是不可忽视的。
    3. 运维复杂度: 维护一个大规模、高写入量的 Elasticsearch 集群本身就是一项专业且成本高昂的工作。对于我们这种写入远大于查询,且查询模式固定的场景,ES 的全文索引能力有些过度设计。

方案B:定制化管道 - ZeroMQ + Cassandra

这个方案旨在用更轻量、更贴合我们场景的组件来解决问题。

  • 架构:

    1. 入口与追踪: Nginx 作为网关,为每个进入的请求生成唯一的 X-Request-ID
    2. 应用端 (Producer): Go-Gin 服务内嵌一个轻量级的日志库,该库不执行任何磁盘 I/O,而是通过 ZeroMQ 的 PUSH 套接字将结构化日志(JSON)异步推送到一个本地或网络端点。
    3. 传输层: ZeroMQ 负责在应用与日志收集器之间建立一个高性能、低延迟的消息通道。它是一个库,而非一个独立的 broker 服务,这大大降低了架构的复杂性。
    4. 收集端 (Consumer): 一个或多个独立的 Go 服务作为日志收集器,使用 ZeroMQ 的 PULL 套接字接收日志,进行批量处理。
    5. 存储层: 收集器将批量日志写入 Cassandra 集群。
  • 优势:

    1. 极致解耦: ZeroMQ 的 zmq.DONTWAIT 模式能确保日志发送操作是非阻塞的。如果下游拥堵,消息会根据高水位标记(HWM)策略被缓存或直接丢弃,应用主线程完全不受影响。这正是我们追求的“发射后不管”。
    2. 超高性能: ZeroMQ 在底层协议和内存管理上进行了深度优化,延迟可达微秒级,吞吐量极高。
    3. 存储匹配: Cassandra 的日志结构化存储(LSM-tree)架构天生为高并发写入而设计。其宽表模型非常适合存储结构化日志,特别是当我们的主要查询维度是 trace_id 时。
  • 劣势:

    1. 可靠性权衡: ZeroMQ 的核心哲学是速度。在默认的 PUSH/PULL 模式下,如果收集器崩溃,正在传输的或在缓冲区内的消息可能会丢失。这是一种“至多一次”的交付语义,对于非关键审计级别的日志,这种权衡是值得的。
    2. 查询能力: Cassandra 的查询能力远不如 Elasticsearch 灵活。你必须基于预先设计的查询模式来设计表结构(Query-Driven Modeling)。

最终决策

考虑到性能隔离是我们的首要矛盾,我们选择了方案 B。日志的微小丢失在业务上是可接受的,但业务服务的性能抖动是不可接受的。方案 B 的组件栈 (Go, ZeroMQ, Cassandra) 也与我们团队现有的技术能力高度吻合。

核心实现概览

首先,我们用 Mermaid 勾勒出整个数据流的架构。

graph TD
    subgraph "Request Entry"
        A[Client] --> B(Nginx)
    end

    subgraph "Monorepo Services"
        B -- "HTTP Request + X-Request-ID" --> C{Go-Gin Service A}
        C --> D{Go-Gin Service B}
    end

    subgraph "Logging Pipeline"
        C -- "Log Entry (JSON)" --> E(ZeroMQ PUSH Socket)
        D -- "Log Entry (JSON)" --> E
        E -- "ipc:///tmp/zlogger.sock" --> F[Log Collector Service]
        F -- "Batch Insert" --> G[(Cassandra Cluster)]
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#f9f,stroke:#333,stroke-width:2px
    style F fill:#ccf,stroke:#333,stroke-width:2px

1. Nginx: 注入追踪ID

这是链路追踪的起点。所有日志都必须能通过一个ID串联起来。

# /etc/nginx/nginx.conf

http {
    # ...
    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    # 为每个请求生成一个唯一的ID
    map $http_x_request_id $reqid {
        default $request_id;
        ""      $request_id;
    }

    server {
        listen 80;
        # ...

        location / {
            # 将 request_id 传递给上游服务
            proxy_set_header X-Request-ID $reqid;
            proxy_pass http://app_backend;
        }
    }
}

$request_id 是 Nginx 内置变量,它会生成一个32位的十六进制随机字符串。

2. Monorepo 中的异步日志包 zlog

我们在 Monorepo 的 pkg/zlog 目录下创建了一个专用的日志包。所有微服务都将依赖这个包,确保日志行为的一致性。

pkg/zlog/logger.go:

package zlog

import (
	"encoding/json"
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	zmq "github.com/pebbe/zmq4"
)

// LogLevel defines the level of logging.
type LogLevel string

const (
	LevelInfo  LogLevel = "INFO"
	LevelError LogLevel = "ERROR"
	LevelDebug LogLevel = "DEBUG"
)

// LogEntry represents a structured log message.
type LogEntry struct {
	Timestamp  string   `json:"timestamp"`
	Level      LogLevel `json:"level"`
	Service    string   `json:"service"`
	TraceID    string   `json:"trace_id"`
	Message    string   `json:"message"`
	Hostname   string   `json:"hostname"`
	PID        int      `json:"pid"`
}

// Logger is the async logger client.
type Logger struct {
	pusher    *zmq.Socket
	serviceName string
	hostname    string
	pid         int
	mu        sync.Mutex
}

var (
	globalLogger *Logger
	once         sync.Once
)

// InitGlobalLogger initializes the global logger instance. This should be called once on app startup.
// endpoint is the ZeroMQ endpoint, e.g., "ipc:///tmp/zlogger.sock" or "tcp://127.0.0.1:5555"
func InitGlobalLogger(serviceName, endpoint string) error {
	var err error
	once.Do(func() {
		pusher, sockErr := zmq.NewSocket(zmq.PUSH)
		if sockErr != nil {
			err = fmt.Errorf("failed to create ZeroMQ PUSH socket: %w", sockErr)
			return
		}

		// 设置高水位标记,防止无限缓存占用内存
		// 当未发送的消息达到1000条时,新的发送操作会立即返回EAGAIN错误或被丢弃
		if sockErr = pusher.SetSndhwm(1000); sockErr != nil {
			err = fmt.Errorf("failed to set sndhwm: %w", sockErr)
			return
		}

		if sockErr = pusher.Connect(endpoint); sockErr != nil {
			err = fmt.Errorf("failed to connect to ZeroMQ endpoint '%s': %w", endpoint, sockErr)
			return
		}

		hostname, _ := os.Hostname()

		globalLogger = &Logger{
			pusher:      pusher,
			serviceName: serviceName,
			hostname:    hostname,
			pid:         os.Getpid(),
		}
	})
	return err
}

// Close ensures the ZeroMQ socket is properly closed.
func Close() {
	if globalLogger != nil {
		globalLogger.mu.Lock()
		defer globalLogger.mu.Unlock()
		if globalLogger.pusher != nil {
			globalLogger.pusher.Close()
		}
	}
}

// Get returns the global logger instance.
func Get() *Logger {
	if globalLogger == nil {
		// Fallback to stdout if not initialized, this is a bad state.
		// In a real project, this might panic or have a more robust fallback.
		fmt.Println("FATAL: zlog is not initialized. Call InitGlobalLogger first.")
		os.Exit(1)
	}
	return globalLogger
}

// GinMiddleware injects the logger with trace ID into the Gin context.
func GinMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		traceID := c.GetHeader("X-Request-ID")
		if traceID == "" {
			// In case Nginx is bypassed or fails to set it, we generate one.
			// This part can be replaced with a more robust UUID generator.
			traceID = fmt.Sprintf("gen-%d", time.Now().UnixNano())
		}
		c.Set("trace_id", traceID)
		c.Next()
	}
}

// Info logs an informational message.
func (l *Logger) Info(c *gin.Context, format string, args ...interface{}) {
	l.log(c, LevelInfo, format, args...)
}

// Error logs an error message.
func (l *Logger) Error(c *gin.Context, err error, format string, args ...interface{}) {
	message := fmt.Sprintf(format, args...)
	fullMessage := fmt.Sprintf("%s | error: %v", message, err)
	l.log(c, LevelError, fullMessage)
}

func (l *Logger) log(c *gin.Context, level LogLevel, format string, args ...interface{}) {
	traceID, _ := c.Get("trace_id").(string)

	entry := LogEntry{
		Timestamp:  time.Now().UTC().Format(time.RFC3339Nano),
		Level:      level,
		Service:    l.serviceName,
		TraceID:    traceID,
		Message:    fmt.Sprintf(format, args...),
		Hostname:   l.hostname,
		PID:        l.pid,
	}

	raw, err := json.Marshal(entry)
	if err != nil {
		// If JSON marshaling fails, something is very wrong. Log to stderr.
		fmt.Fprintf(os.Stderr, "zlog: failed to marshal log entry: %v\n", err)
		return
	}

	l.mu.Lock()
	defer l.mu.Unlock()

	// The core of the non-blocking mechanism.
	// zmq.DONTWAIT ensures this call never blocks. If the HWM is reached,
	// it returns immediately with an error, effectively dropping the log.
	_, err = l.pusher.SendBytes(raw, zmq.DONTWAIT)
	if err != nil {
		// Log send failure to stderr for debugging, but don't block the app.
		// This is an accepted data loss scenario.
		fmt.Fprintf(os.Stderr, "zlog: failed to send log via ZeroMQ: %v\n", err)
	}
}

使用示例 (在某个服务中):

// services/user-service/main.go
package main

import (
	"fmt"
	"my-monorepo/pkg/zlog"
	"net/http"

	"github.com/gin-gonic/gin"
)

func main() {
    // 在服务启动时初始化
	if err := zlog.InitGlobalLogger("user-service", "ipc:///tmp/zlogger.sock"); err != nil {
		panic(fmt.Sprintf("Failed to initialize logger: %v", err))
	}
	defer zlog.Close()

	r := gin.Default()
	r.Use(zlog.GinMiddleware()) // 注入中间件

	r.GET("/users/:id", func(c *gin.Context) {
		userID := c.Param("id")
        
        // 使用日志
		zlog.Get().Info(c, "Fetching user data for ID: %s", userID)

		if userID == "0" {
			zlog.Get().Error(c, fmt.Errorf("invalid user id"), "Validation failed for user ID")
			c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid user ID"})
			return
		}
        
		zlog.Get().Info(c, "User data retrieved successfully for ID: %s", userID)
		c.JSON(http.StatusOK, gin.H{"user_id": userID, "name": "John Doe"})
	})

	r.Run(":8080")
}

3. Cassandra 数据模型设计

日志数据的查询模式非常固定:根据 trace_id 查询一个请求的完整链路日志。因此,trace_id 是最理想的分区键。timestamp 作为聚类键,可以保证同一个请求的日志按时间排序。

-- 在 cqlsh 中执行
CREATE KEYSPACE IF NOT EXISTS logs WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'};

USE logs;

CREATE TABLE IF NOT EXISTS service_logs (
    trace_id text,
    event_time timestamp,
    service text,
    hostname text,
    level text,
    message text,
    pid int,
    PRIMARY KEY (trace_id, event_time)
) WITH CLUSTERING ORDER BY (event_time ASC)
  AND default_time_to_live = 2592000; -- 日志保留30天
  • PRIMARY KEY (trace_id, event_time): 这意味着所有具有相同 trace_id 的日志都会被存储在 Cassandra 的同一个分区中,并且物理上按 event_time 排序。这使得 SELECT * FROM service_logs WHERE trace_id = 'some-id' 操作极为高效。
  • default_time_to_live: 设置了 TTL,Cassandra 会自动删除过期数据,避免了手动清理的麻烦。

4. 日志收集器服务 log-collector

这是一个独立的、可以水平扩展的 Go 应用。它不断地从 ZeroMQ 拉取日志,批量写入 Cassandra。

cmd/log-collector/main.go:

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"github.com/gocql/gocql"
	zmq "github.com/pebbe/zmq4"
)

const (
	batchSize    = 100 // 每100条日志写入一次
	batchTimeout = 2 * time.Second // 或者每2秒写入一次
	cassandraHost = "127.0.0.1:9042"
	keyspace     = "logs"
	zmqEndpoint  = "ipc:///tmp/zlogger.sock"
)

// LogEntry mirrors the structure from the zlog package.
type LogEntry struct {
	Timestamp string `json:"timestamp"`
	Level     string `json:"level"`
	Service   string `json:"service"`
	TraceID   string `json:"trace_id"`
	Message   string `json:"message"`
	Hostname  string `json:"hostname"`
	PID       int    `json:"pid"`
}

func main() {
	// 初始化 ZeroMQ PULL socket
	puller, err := zmq.NewSocket(zmq.PULL)
	if err != nil {
		log.Fatalf("Failed to create ZMQ PULL socket: %v", err)
	}
	defer puller.Close()

	if err := puller.Bind(zmqEndpoint); err != nil {
		log.Fatalf("Failed to bind ZMQ endpoint '%s': %v", zmqEndpoint, err)
	}
	// 确保 socket 文件权限正确,以便其他服务可以连接
	if _, err := os.Stat("/tmp/zlogger.sock"); err == nil {
		if err := os.Chmod("/tmp/zlogger.sock", 0777); err != nil {
			log.Printf("Warning: failed to chmod socket file: %v", err)
		}
	}

	log.Printf("Log collector started. Listening on %s", zmqEndpoint)

	// 初始化 Cassandra session
	cluster := gocql.NewCluster(cassandraHost)
	cluster.Keyspace = keyspace
	cluster.Consistency = gocql.Quorum
	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatalf("Failed to connect to Cassandra: %v", err)
	}
	defer session.Close()

	log.Println("Successfully connected to Cassandra.")

	var buffer []LogEntry
	var mu sync.Mutex
	ticker := time.NewTicker(batchTimeout)

	// 启动一个goroutine来处理定时刷新
	go func() {
		for range ticker.C {
			mu.Lock()
			if len(buffer) > 0 {
				flushBuffer(session, buffer)
				buffer = nil // 清空缓冲区
			}
			mu.Unlock()
		}
	}()

	// 主循环,接收日志
	for {
		raw, err := puller.RecvBytes(0)
		if err != nil {
			log.Printf("Error receiving from ZMQ: %v", err)
			continue
		}

		var entry LogEntry
		if err := json.Unmarshal(raw, &entry); err != nil {
			log.Printf("Error unmarshaling log entry: %v", err)
			continue
		}
		
		mu.Lock()
		buffer = append(buffer, entry)
		if len(buffer) >= batchSize {
			flushBuffer(session, buffer)
			buffer = nil // 清空缓冲区
		}
		mu.Unlock()
	}
}

func flushBuffer(session *gocql.Session, entries []LogEntry) {
	if len(entries) == 0 {
		return
	}

	start := time.Now()
	batch := session.NewBatch(gocql.LoggedBatch)
	stmt := `INSERT INTO service_logs (trace_id, event_time, service, hostname, level, message, pid) VALUES (?, ?, ?, ?, ?, ?, ?)`

	for _, entry := range entries {
		eventTime, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
		if err != nil {
			log.Printf("Invalid timestamp format '%s', skipping entry: %v", entry.Timestamp, err)
			continue
		}
		batch.Query(stmt,
			entry.TraceID,
			eventTime,
			entry.Service,
			entry.Hostname,
			entry.Level,
			entry.Message,
			entry.PID,
		)
	}
	
	if err := session.ExecuteBatch(batch); err != nil {
		log.Printf("ERROR: Failed to execute batch insert into Cassandra: %v", err)
		// 在生产环境中,这里应该有重试逻辑或将失败的批次写入死信队列
		return
	}
	
	log.Printf("Flushed %d log entries to Cassandra in %v", len(entries), time.Since(start))
}

这个收集器是无状态的,可以简单地启动多个实例绑定到不同的 ZMQ 端点(或使用 ZMQ 代理)来实现高可用和扩展性。

架构的局限性与未来迭代

尽管此方案解决了我们面临的核心性能瓶颈,但它并非银弹。在真实项目中,我们必须清楚它的边界。

  1. 消息可靠性: 当前 PUSH/PULL 模式在收集器重启或网络分区时会丢失数据。如果需要更高的可靠性,可以考虑引入 ZeroMQ 的 ROUTER/DEALER 模式配合心跳和重连机制,但这会增加客户端和服务端的复杂度。对于需要严格“至少一次”语义的场景,Kafka 仍然是更合适的选择,但那也意味着我们将重新引入重量级 broker 的运维成本。

  2. 查询的局限性: Cassandra 的查询模型是固定的。如果未来需要进行复杂的、非预期的聚合分析或全文搜索,当前 schema 无法支持。一个可能的演进方向是,收集器进行双写:一份写入 Cassandra 用于快速的链路追踪查询,另一份经过采样或过滤后,写入 ClickHouse 或 Elasticsearch 用于数据分析。

  3. 高水位标记(HWM)的影响: zlog 包中设置的 Sndhwm 是一个关键参数。它定义了在下游阻塞时,发送方愿意在内存中缓存多少条消息。这个值太小,流量高峰时会丢弃大量日志;这个值太大,又可能在极端情况下耗尽应用服务的内存。需要根据服务的内存预算和日志峰值速率进行精细调整和持续监控。

  4. IPC vs TCP: 示例中使用了 ipc:// (Inter-Process Communication),它通过 Unix domain socket 通信,性能极高,但限制了应用和收集器必须在同一台物理机上。在容器化环境中,更通用的做法是使用 tcp://,将收集器部署为独立的 DaemonSetService,这会引入微小的网络延迟,但换来了部署上的灵活性。


  目录