基于NATS JetStream与Celery构建满足BASE原则的异步任务架构及其ELK可观测性闭环


一个生产级的系统,其价值不仅在于完成业务功能,更在于它如何应对失败、如何被观测、以及如何水平扩展。当面临需要处理耗时操作(如视频转码、报告生成、数据批处理)并向前端提供实时状态反馈的需求时,一个简单的同步请求-响应模型会迅速暴露出其脆弱性。API超时、服务耦合、状态丢失、问题追溯困难等问题会接踵而至。

我们的技术挑战明确:构建一个高吞吐、高可用的异步任务处理系统。具体要求如下:

  1. 解耦与韧性: Web服务接收到任务后必须立即响应,将任务可靠地移交给后台处理,自身不被阻塞。整个任务流程不能因为某个 worker 节点的崩溃而丢失。
  2. 实时状态同步: 前端界面必须能实时、无轮询地接收到任务状态的变更(例如:排队中、处理中、已完成、失败)。
  3. 可观测性: 任意一个任务,都必须能清晰地追踪其从API入口、进入消息队列、被worker消费、再到状态变更通知的完整生命周期日志。
  4. 最终一致性: 整个系统接受异步带来的最终一致性,即遵循BASE(Basically Available, Soft state, Eventually consistent)原则,而非强依赖ACID事务。

架构抉择:从紧耦合到事件驱动

最初的方案是传统的数据库轮询。API将任务状态写入数据库表,前端通过定时轮询API来获取最新状态。这种方案简单,但在负载增高时,数据库会成为性能瓶颈,轮询也给服务端和网络带来巨大浪费。这是一种典型的紧耦合设计,违背了我们的核心诉求。

因此,我们转向了完全异步的、事件驱动的架构。该架构的核心是组件间的彻底解耦,通过消息和事件进行通信。这自然地导向了BASE模型:服务基本可用,状态允许在中间过程中变化,数据最终会达到一致。这个选择带来了新的技术选型权衡。

graph TD
    subgraph "用户端 (Browser)"
        A[React App w/ Zustand Store]
    end

    subgraph "服务端 (Python Backend)"
        B(FastAPI) -- 1. 提交任务 (HTTP) --> A
        A -- 2. 立即返回 Task ID --> B
        B -- 3. 推送任务到Celery --> C[Redis/RabbitMQ as Celery Broker]
        D[Celery Worker] -- 4. 从Broker获取任务 --> C
        D -- 5. 执行长耗时任务 --> D
        D -- 6. 发布状态事件 --> E[NATS JetStream]
    end

    subgraph "实时通知层"
        F[WebSocket Gateway] -- 7. 订阅状态事件 --> E
        F -- 8. 推送状态到客户端 --> A
    end

    subgraph "可观测性平台"
        G[ELK Stack]
        B -- 写入结构化日志 --> G
        D -- 写入结构化日志 --> G
        F -- 写入结构化日志 --> G
    end

    A -- 9. WebSocket更新Zustand Store --> A

这个架构中,每个组件职责单一。FastAPI负责接收请求和任务分发,Celery负责任务的调度和执行,NATS JetStream负责状态事件的可靠广播,WebSocket Gateway则充当了后端事件系统和前端UI之间的桥梁。所有组件都将携带唯一trace_id的结构化日志推送到ELK Stack,形成完整的可观测性闭环。

技术选型深度解析

任务队列:为何选择Celery?

虽然可以直接使用Redis List作为简单的任务队列,但Celery提供了一个成熟的分布式任务队列框架。在真实项目中,我们需要的不只是一个队列,而是包括任务重试、错误处理、定时任务、结果存储、流量控制等一系列的配套功能。自己实现这些轮子费时费力且容易出错。Celery的稳定性和丰富特性是我们的首选。

事件总线:NATS JetStream vs Kafka/RabbitMQ

这是本次架构选型的核心决策点。我们需要一个组件来广播任务处理的中间和最终状态。

  • RabbitMQ: 功能强大,支持多种交换机类型,路由灵活。但对于我们这种纯粹的发布/订阅(Pub/Sub)广播场景,其复杂性有点过高。AMQP协议也相对较重。
  • Kafka: 业界领先的流处理平台,为高吞吐日志和数据流而生。但其基于分区日志的模式和对Zookeeper(新版已移除)的依赖,使其运维成本相对较高。对于我们的“状态事件通知”场景,有点像用牛刀杀鸡。
  • NATS JetStream: NATS本身是一个极其轻量、高性能的“消息传输神经系统”。而JetStream是其内置的持久化层。它完美地契合了我们的需求:
    • 简单: 核心NATS的概念非常简单,API清晰。
    • 高性能: NATS是业界性能最高的开源消息系统之一。
    • 持久化: JetStream提供了at-least-once的投递保证,确保worker宕机重启后事件不会丢失。
    • 轻量级: 单个二进制文件,无外部依赖,运维成本极低。

我们选择NATS JetStream,因为它在满足我们“可靠事件广播”需求的同时,提供了最佳的性能与运维复杂度平衡。

核心实现代码

以下是串联整个流程的核心代码片段,重点展示了组件间的交互和配置。

1. FastAPI端:任务提交与日志上下文

我们使用python-json-logger来确保所有日志都是结构化的JSON,并注入一个trace_id来贯穿整个请求生命周期。

# main.py
import logging
import uuid
from fastapi import FastAPI, BackgroundTasks
from pythonjsonlogger import jsonlogger
from celery_worker import process_video_task

# --- 日志配置 ---
# 确保日志包含trace_id,方便ELK追踪
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(
    '%(asctime)s %(name)s %(levelname)s %(trace_id)s %(message)s'
)
logHandler.setFormatter(formatter)
logger = logging.getLogger("api_server")
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
# 创建一个上下文过滤器
class TraceIdFilter(logging.Filter):
    def filter(self, record):
        # 这里只是一个示例,生产环境应从请求头或上下文中获取
        record.trace_id = str(uuid.uuid4())
        return True

logger.addFilter(TraceIdFilter())

app = FastAPI()

@app.post("/tasks/video")
async def create_video_task(video_url: str):
    """
    接收任务请求,立即返回,并通过Celery异步处理
    """
    task_id = str(uuid.uuid4())
    trace_id = logger.filters[0].filter(logging.LogRecord(None, None, "", 0, "", (), None, None)).trace_id

    # 日志记录了任务的入口点
    logger.info(f"Received task {task_id} for video {video_url}", extra={'trace_id': trace_id})
    
    # 将任务和trace_id一起发送给Celery
    process_video_task.delay(task_id, video_url, trace_id)

    return {"task_id": task_id, "status": "QUEUED"}

2. Celery Worker:任务处理与NATS事件发布

Worker是核心的处理单元。它执行任务,并通过NATS发布进度。

# celery_worker.py
import os
import time
import asyncio
import json
import logging
from celery import Celery
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout

# --- Celery 配置 ---
# 生产环境中应使用更可靠的broker,如RabbitMQ
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')

celery_app = Celery('tasks', broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND)

# --- NATS JetStream 配置 ---
NATS_URL = os.environ.get('NATS_URL', 'nats://localhost:4222')
NATS_STREAM_NAME = "TASK_STATUS"
NATS_SUBJECT = "tasks.status.>" # 使用通配符主题

# --- 日志配置 (与API服务类似,确保结构化和trace_id) ---
logger = logging.getLogger("celery_worker")
# ... 省略与FastAPI中类似的日志配置 ...

class NatsPublisher:
    """
    一个简单的NATS JetStream发布者封装,用于复用连接
    """
    _nc = None
    _js = None

    async def connect(self):
        if not self._nc or not self._nc.is_connected:
            self._nc = NATS()
            await self._nc.connect(servers=[NATS_URL])
            self._js = self._nc.jetstream()
            # 确保Stream存在
            await self._js.add_stream(name=NATS_STREAM_NAME, subjects=[f"{NATS_SUBJECT}.*"])

    async def publish(self, task_id: str, status: str, trace_id: str, details: dict = None):
        if not self._js:
            await self.connect()

        event_data = {
            "task_id": task_id,
            "status": status,
            "timestamp": time.time(),
            "details": details or {},
            "trace_id": trace_id
        }
        payload = json.dumps(event_data).encode()
        subject_name = f"tasks.status.{task_id}"
        
        try:
            # 发布消息到JetStream,确保持久化
            ack = await self._js.publish(subject_name, payload)
            logger.info(f"Published status '{status}' for task {task_id} to NATS. Stream: {ack.stream}, Seq: {ack.seq}", extra={'trace_id': trace_id})
        except ErrTimeout:
            logger.error(f"Failed to publish status for task {task_id} due to timeout", extra={'trace_id': trace_id})
        except Exception as e:
            logger.error(f"An unexpected error occurred during NATS publish for task {task_id}: {e}", extra={'trace_id': trace_id})
            
    async def close(self):
        if self._nc and self._nc.is_connected:
            await self._nc.close()

# 实例化发布者
nats_publisher = NatsPublisher()

@celery_app.task(name='process_video_task')
def process_video_task(task_id: str, video_url: str, trace_id: str):
    """
    模拟视频处理任务
    """
    loop = asyncio.get_event_loop()

    try:
        # 1. 发布“处理中”状态
        logger.info(f"Starting to process task {task_id}", extra={'trace_id': trace_id})
        loop.run_until_complete(nats_publisher.publish(task_id, "PROCESSING", trace_id))

        # 2. 模拟耗时操作
        time.sleep(15) # 模拟视频转码
        
        # 模拟可能出现的错误
        if "error" in video_url:
            raise ValueError("Invalid video format")

        # 3. 发布“完成”状态
        result_details = {"output_path": f"/media/processed/{task_id}.mp4"}
        logger.info(f"Successfully processed task {task_id}", extra={'trace_id': trace_id})
        loop.run_until_complete(nats_publisher.publish(task_id, "COMPLETED", trace_id, result_details))
        
        return {"status": "success", "path": result_details["output_path"]}

    except Exception as e:
        # 4. 捕获异常,发布“失败”状态
        error_details = {"error_message": str(e)}
        logger.error(f"Failed to process task {task_id}: {e}", exc_info=True, extra={'trace_id': trace_id})
        loop.run_until_complete(nats_publisher.publish(task_id, "FAILED", trace_id, error_details))
        # 也可以在这里决定是否重试
        raise

这里的关键点在于,Celery worker的每一步关键状态变化都会通过NatsPublisher发布一个事件。这个事件体中同样包含了trace_id

3. WebSocket Gateway: 订阅NATS并推送前端

这个组件是连接后端事件和前端UI的桥梁。这里使用websocketsnats-py库实现。

# websocket_gateway.py
import asyncio
import json
import logging
from nats.aio.client import Client as NATS
import websockets

logger = logging.getLogger("websocket_gateway")
# ... 省略与FastAPI中类似的日志配置 ...

NATS_URL = 'nats://localhost:4222'
NATS_SUBJECT_TO_SUB = "tasks.status.>"
# 在生产环境中,你需要一个机制来映射 task_id 或 user_id 到对应的 websocket 连接
# 这里简化为广播给所有连接的客户端
CONNECTED_CLIENTS = set()

async def nats_subscriber():
    nc = NATS()
    await nc.connect(servers=[NATS_URL])
    js = nc.jetstream()
    
    logger.info(f"Subscribing to NATS subject '{NATS_SUBJECT_TO_SUB}'")

    # 创建一个持久的消费者,以防网关重启错过消息
    sub = await js.subscribe(NATS_SUBJECT_TO_SUB, durable="websocket_gateway")

    try:
        async for msg in sub.messages:
            try:
                data_str = msg.data.decode()
                event_data = json.loads(data_str)
                trace_id = event_data.get('trace_id', 'N/A')
                logger.info(f"Received event from NATS: {event_data}", extra={'trace_id': trace_id})
                
                # 广播给所有连接的客户端
                if CONNECTED_CLIENTS:
                    await asyncio.wait([client.send(data_str) for client in CONNECTED_CLIENTS])
                
                await msg.ack() # 确认消息处理完成
            except json.JSONDecodeError:
                logger.warning(f"Failed to decode NATS message: {msg.data}")
                await msg.ack() # 还是ack掉,防止坏消息阻塞队列
            except Exception as e:
                logger.error(f"Error processing NATS message: {e}")
                # 不ack,让NATS稍后重传

    finally:
        await sub.unsubscribe()
        await nc.close()

async def handler(websocket, path):
    CONNECTED_CLIENTS.add(websocket)
    logger.info(f"Client connected: {websocket.remote_address}")
    try:
        await websocket.wait_closed()
    finally:
        CONNECTED_CLIENTS.remove(websocket)
        logger.info(f"Client disconnected: {websocket.remote_address}")

async def main():
    # 同时运行NATS订阅者和WebSocket服务器
    server = websockets.serve(handler, "localhost", 8765)
    await asyncio.gather(server, nats_subscriber())

if __name__ == "__main__":
    asyncio.run(main())

4. 前端React + Zustand: 响应式状态管理

前端使用Zustand来管理任务列表的状态。它的简洁性非常适合这种由外部事件驱动状态变更的场景。

// store.js
import create from 'zustand';

// Zustand store
const useTaskStore = create((set) => ({
  tasks: {}, // a map of taskId -> taskObject
  updateTask: (taskData) => {
    set((state) => ({
      tasks: {
        ...state.tasks,
        [taskData.task_id]: {
          ...state.tasks[taskData.task_id], // 保留旧信息
          ...taskData,
        },
      },
    }));
  },
  addTask: (taskId) => {
    set((state) => ({
      tasks: {
        ...state.tasks,
        [taskId]: { task_id: taskId, status: 'QUEUED' },
      },
    }));
  },
}));

export default useTaskStore;

// TaskComponent.jsx
import React, { useEffect } from 'react';
import useTaskStore from './store';

const TaskTracker = () => {
  const { tasks, updateTask } = useTaskStore();

  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8765');

    ws.onopen = () => {
      console.log('WebSocket connection established.');
    };

    ws.onmessage = (event) => {
      try {
        const taskData = JSON.parse(event.data);
        console.log('Received task update:', taskData);
        // 调用Zustand的action来更新状态
        updateTask(taskData);
      } catch (error) {
        console.error('Failed to parse WebSocket message:', error);
      }
    };

    ws.onclose = () => {
      console.log('WebSocket connection closed.');
      // 生产环境应加入重连逻辑
    };

    // 清理函数
    return () => {
      ws.close();
    };
  }, [updateTask]); // 依赖updateTask以防其引用变化

  return (
    <div>
      <h2>Task Status</h2>
      <ul>
        {Object.values(tasks).map(task => (
          <li key={task.task_id}>
            Task {task.task_id}: <strong>{task.status}</strong>
            {task.details?.output_path && ` - Output: ${task.details.output_path}`}
            {task.details?.error_message && ` - Error: ${task.details.error_message}`}
          </li>
        ))}
      </ul>
    </div>
  );
};

5. ELK 可观测性闭环

所有服务都已配置为输出带有trace_id的JSON日志。下一步是配置ELK Stack来收集和展示它们。

Filebeat/Logstash 配置:
Filebeat会从各个服务的日志文件或Docker日志驱动中收集日志,并发送给Logstash。Logstash的配置文件如下所示,它会解析JSON并正确处理时间戳。

# logstash.conf
input {
  beats {
    port => 5044
  }
}

filter {
  json {
    source => "message"
  }
  date {
    match => [ "asctime", "ISO8601" ] # 根据你的日志时间格式
  }
}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
  }
}

在Kibana中,我们可以通过简单的查询 trace_id: "the-uuid-we-got-from-api" 来筛选出与单个任务相关的所有日志,无论它们来自FastAPI、Celery worker还是WebSocket网关。这将清晰地展现任务的完整生命周期,极大地方便了调试和问题排查。

架构的局限性与未来展望

这套架构并非银弹。它引入了更多的运维组件,系统的复杂性也随之增加。它是一个典型的最终一致性系统,不适用于需要强事务和即时一致性的场景,例如金融交易。

此外,WebSocket Gateway 是一个潜在的单点瓶颈。在生产环境中,它需要被设计成可水平扩展的无状态服务,并通过负载均衡器对外提供服务。客户端连接的管理也需要更精细化的设计,例如使用Redis来维护用户ID与WebSocket连接的映射关系。

未来的优化方向可以包括:

  1. 引入OpenTelemetry: 使用更标准的分布式追踪方案替代简单的trace_id,可以获得更丰富的上下文信息和可视化拓扑,并与ELK APM等工具无缝集成。
  2. Saga模式: 对于需要跨多个服务执行的复杂业务流程,可以引入Saga模式来管理分布式事务,NATS可以作为Saga日志的可靠载体。
  3. 服务网格: 当微服务数量增多时,可以考虑引入服务网格(如Istio, Linkerd)来标准化服务间的通信、安全和可观测性,将部分基础设施逻辑从应用代码中剥离。

  目录