基于Rust Rocket与SQL Server构建可观测的生成式AI RAG管道


业务部门最近对生成式AI的热情空前高涨,一个明确的需求摆在了我们面前:利用内部积累了近十年的SQL Server知识库,构建一个智能问答服务。Python技术栈的同事快速用LangChain和Flask搭了个原型,效果不错,但很快就暴露了生产环境的隐忧。原型在并发量稍高时响应延迟飙升,依赖管理和部署过程也显得相当脆弱。这是一个典型的从PoC(概念验证)走向Production(生产)的挑战,我们需要一个性能可靠、资源可控且易于运维的解决方案。

团队的技术栈主要是.NET和SQL Server,但对于这种需要高性能和低资源占用的API服务,我们决定评估一个更现代化的选项:Rust。Rust的内存安全、无GC的性能优势,以及其强大的生态系统,使其成为构建这类计算密集型和IO密集型服务的理想选择。Web框架我们选择了Rocket,它以其易用性和强大的类型安全著称。而监控,作为生产环境的眼睛,我们沿用公司标准,集成Datadog进行全链路追踪。

挑战的核心在于,如何将Rust这个“新贵”与SQL Server这个“传统豪门”高效、稳定地结合起来,并为整个复杂的RAG(Retrieval-Augmented Generation)管道提供深度可观测性。

架构构想与技术选型决策

我们的目标是构建一个单一的Web API服务,它接收用户问题,然后执行完整的RAG流程。

graph TD
    subgraph Rust Rocket Service
        A[HTTP POST /rag/query] --> B{Request Validation};
        B --> C[Generate Query Embedding];
        C --> D{Vector Search in SQL Server};
        D --> E[Retrieve Top-K Chunks];
        E --> F[Construct Prompt with Context];
        F --> G[Call External LLM API];
        G --> H{Process LLM Response};
        H --> I[HTTP 200 OK Response];
    end

    subgraph External Dependencies
        D --> SQL_DB[(SQL Server)];
        G --> LLM_API((Generative AI API));
    end

    subgraph Observability
        A -- Trace --> Datadog;
        C -- Span --> Datadog;
        D -- Span --> Datadog;
        G -- Span --> Datadog;
        I -- Trace End --> Datadog;
    end
  1. Web框架 - Rocket: 异步支持良好,通过代码生成实现的路由和请求守卫(Request Guards)极大地简化了开发,同时保证了编译时的安全检查。
  2. SQL Server驱动 - tiberius: 这是目前生态中最成熟的纯Rust TDS协议实现,支持异步操作,是与tokio生态系统集成的关键。
  3. 数据库连接池 - bb8: 任何生产级的数据库应用都离不开连接池。bb8是一个通用的、基于tokio的异步连接池实现,我们可以为tiberius编写一个管理器来接入它。
  4. Embedding模型 - rust-bert: 为了将用户问题和知识库文本转换为向量,我们需要一个本地的Embedding模型。rust-bert提供了对Hugging Face上众多预训练模型的支持,我们选择了一个轻量级的all-MiniLM-L6-v2模型。
  5. 可观测性 - datadog-apm-rust: Datadog官方提供的APM客户端,通过简单的宏和配置,可以为我们的关键函数和HTTP请求生成追踪Span,实现端到端的链路监控。

步骤化实现:从数据库连接到全链路追踪

1. 项目基础与依赖配置

我们的Cargo.toml文件反映了上述技术选型:

[package]
name = "enterprise_rag_api"
version = "0.1.0"
edition = "2021"

[dependencies]
rocket = { version = "0.5.0", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.0", features = ["full"] }

# SQL Server
tiberius = { version = "0.12", features = ["rust_decimal", "chrono", "tds73"] }
bb8 = "0.8"
bb8-tiberius = "0.11" # 注意:这个库可能需要自行适配或使用社区版本

# AI
rust-bert = "0.22.0"
tch = "0.14.0" # rust-bert's backend
reqwest = { version = "0.11", features = ["json", "rustls-tls"] } # For LLM API calls

# Observability
datadog-apm-rust = "0.2"

# Config & Logging
config = "0.13"
env_logger = "0.10"
log = "0.4"
once_cell = "1.18"

一个常见的错误是:直接在每个请求中创建数据库连接。这会导致巨大的性能开销和端口耗尽。使用连接池是强制性的。

2. 核心服务:数据库管理与连接池

与SQL Server的集成是第一个难点。我们需要一个健壮的、支持异步的连接池。

// src/db.rs

use bb8::{Pool, RunError};
use bb8_tiberius::{ConnectionManager, TiberiusPool};
use std::env;
use tiberius::{Client, Config, Query, AuthMethod, Row};
use tokio::net::TcpStream;
use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};

// 定义一个数据库管理结构,用于持有连接池
pub struct DbManager {
    pool: TiberiusPool,
}

impl DbManager {
    // 初始化连接池
    pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
        let db_host = env::var("DB_HOST").expect("DB_HOST must be set");
        let db_port_str = env::var("DB_PORT").unwrap_or_else(|_| "1433".to_string());
        let db_port = db_port_str.parse::<u16>().expect("Invalid DB_PORT");
        let db_user = env::var("DB_USER").expect("DB_USER must be set");
        let db_password = env::var("DB_PASSWORD").expect("DB_PASSWORD must be set");
        let db_name = env::var("DB_DATABASE").expect("DB_DATABASE must be set");
        
        let mut config = Config::new();
        config.host(&db_host);
        config.port(db_port);
        config.authentication(AuthMethod::sql_server(db_user, db_password));
        config.database(db_name);
        config.trust_cert(); // 在生产环境中应配置正确的证书

        let manager = ConnectionManager::new(config)
            .expect("Failed to create Tiberius connection manager");
        
        let pool = Pool::builder()
            .max_size(15) // 根据预估并发量设置
            .build(manager)
            .await?;
            
        Ok(DbManager { pool })
    }

    // 从连接池获取一个连接
    async fn get_conn(&self) -> Result<bb8::PooledConnection<'_, ConnectionManager>, RunError<tiberius::error::Error>> {
        self.pool.get().await
    }

    // 在真实项目中,我们假设知识库的文本块已经被预先处理并向量化存储
    // 表结构: KnowledgeChunks (ID, TextContent, VectorData VARBINARY(MAX))
    // 为了简化演示,我们将向量存储为逗号分隔的字符串,并在SQL中处理
    // 更好的方式是使用VARBINARY存储原始f32字节
    #[datadog::trace]
    pub async fn find_similar_chunks(&self, vector: &[f32], top_k: i32) -> Result<Vec<String>, Box<dyn std::error::Error>> {
        let mut conn = self.get_conn().await?;
        let vector_str = vector.iter().map(|f| f.to_string()).collect::<Vec<String>>().join(",");
        
        // 这里的坑在于: SQL Server 2019及以下版本没有内建的向量支持。
        // 我们必须手动实现余弦相似度计算。这在小数据集上可行,但大数据集性能会很差。
        // 这段SQL是这个方案的核心与瓶颈。
        let sql = r#"
            -- A temporary table to hold the query vector
            DECLARE @query_vector TABLE (i INT, val FLOAT);
            INSERT INTO @query_vector (i, val)
            SELECT
                i = ROW_NUMBER() OVER (ORDER BY (SELECT NULL)),
                value
            FROM STRING_SPLIT(@P1, ',');

            -- Calculate cosine similarity and rank
            WITH Similarities AS (
                SELECT 
                    k.ID,
                    k.TextContent,
                    SUM(CAST(dv.value AS FLOAT) * qv.val) / 
                        (SQRT(SUM(POWER(CAST(dv.value AS FLOAT), 2))) * SQRT(SUM(POWER(qv.val, 2) OVER()))) 
                    AS CosineSimilarity
                FROM 
                    KnowledgeChunks k
                CROSS APPLY 
                    STRING_SPLIT(k.VectorString, ',') WITH (i INT) AS dv -- Custom function or STRING_SPLIT trick
                JOIN 
                    @query_vector qv ON dv.i = qv.i
                GROUP BY
                    k.ID, k.TextContent
            )
            SELECT TOP (@P2) TextContent
            FROM Similarities
            ORDER BY CosineSimilarity DESC;
        "#;

        let mut query = Query::new(sql);
        query.bind(vector_str);
        query.bind(top_k);

        let mut stream = query.query(&mut conn).await?;
        let rows = stream.into_first_result().await?;
        
        let results = rows.into_iter()
            .map(|row| row.get::<&str, _>("TextContent").unwrap_or_default().to_string())
            .collect();
        
        Ok(results)
    }
}

这段代码的核心是find_similar_chunks函数。它展示了在没有原生向量支持的SQL Server中进行向量搜索的权衡。通过将向量拆分为表并进行聚合计算,我们实现了余弦相似度。这在生产中需要严格的性能测试和索引优化,或者考虑升级到支持向量的数据库版本。

3. 核心服务:AI与Embedding

AI部分分为两块:生成Embedding和调用大语言模型。

// src/ai_services.rs

use rust_bert::pipelines::sentence_embeddings::{
    SentenceEmbeddingsBuilder, SentenceEmbeddingsModelType,
};
use serde_json::json;
use std::env;
use std::error::Error;
use once_cell::sync::Lazy;
use reqwest::Client;

// 使用once_cell确保模型只在程序启动时加载一次,这是一个昂贵的操作。
static EMBEDDING_MODEL: Lazy<rust_bert::pipelines::sentence_embeddings::SentenceEmbeddingsModel> = Lazy::new(|| {
    log::info!("Loading sentence embedding model...");
    let model = SentenceEmbeddingsBuilder::remote(SentenceEmbeddingsModelType::AllMiniLmL6V2)
        .create_model()
        .expect("Failed to load sentence embedding model.");
    log::info!("Sentence embedding model loaded.");
    model
});

pub struct AiManager {
    llm_client: Client,
    llm_api_key: String,
    llm_endpoint: String,
}

impl AiManager {
    pub fn new() -> Self {
        Self {
            llm_client: Client::new(),
            llm_api_key: env::var("LLM_API_KEY").expect("LLM_API_KEY must be set"),
            llm_endpoint: env::var("LLM_ENDPOINT").expect("LLM_ENDPOINT must be set"),
        }
    }

    #[datadog::trace]
    pub fn generate_embedding(&self, text: &str) -> Result<Vec<f32>, Box<dyn Error>> {
        let embeddings = EMBEDDING_MODEL.encode(&[text])?;
        Ok(embeddings[0].clone())
    }

    #[datadog::trace(name = "call_llm_api")]
    pub async fn query_llm(&self, user_query: &str, context: Vec<String>) -> Result<String, Box<dyn Error>> {
        let context_str = context.join("\n---\n");
        
        // 一个常见的错误是直接将用户输入和数据库内容拼接成prompt,这容易导致Prompt注入。
        // 在真实项目中,需要对context和user_query进行清理和格式化。
        let prompt = format!(
            "Based on the following context, please answer the user's question. \
            If the context doesn't contain the answer, say you don't know.\n\n\
            Context:\n{}\n\nUser Question: {}\n\nAnswer:",
            context_str, user_query
        );

        let response = self.llm_client
            .post(&self.llm_endpoint)
            .bearer_auth(&self.llm_api_key)
            .json(&json!({
                "model": "gpt-4-turbo", // Or any other model
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": 500
            }))
            .send()
            .await?;

        if !response.status().is_success() {
            let error_body = response.text().await?;
            return Err(format!("LLM API failed with status: {}", error_body).into());
        }

        let result: serde_json::Value = response.json().await?;
        let answer = result["choices"][0]["message"]["content"]
            .as_str()
            .unwrap_or_default()
            .to_string();
        
        // 为Datadog Span添加自定义元数据
        if let Some(mut span) = datadog_apm_rust::active_span() {
            if let Some(usage) = result.get("usage") {
                 span.set_tag("llm.usage.prompt_tokens", usage.get("prompt_tokens").and_then(|v| v.as_i64()));
                 span.set_tag("llm.usage.completion_tokens", usage.get("completion_tokens").and_then(|v| v.as_i64()));
            }
            span.set_tag("llm.model", "gpt-4-turbo");
        }

        Ok(answer)
    }
}

这里我们用once_cell::sync::Lazy来延迟加载Embedding模型,确保它在整个应用的生命周期中是单例。对LLM的调用则通过reqwest完成,并特别展示了如何在Datadog的追踪Span中添加自定义标签(如token使用量),这对于成本控制和性能分析至关重要。

4. API层:Rocket路由与服务集成

最后,我们将所有部分在Rocket的路由中串联起来。

```rust
// src/main.rs

#[macro_use]
extern crate rocket;

mod db;
mod ai_services;

use db::DbManager;
use ai_services::AiManager;
use rocket::serde::json::{Json, Value};
use rocket::{State, serde::Deserialize};
use rocket::http::Status;

#[derive(Deserialize)]
struct RagQuery {
query: String,
}

#[post(“/rag/query”, format = “json”, data = ““)]
#[datadog::trace]
async fn rag_query(
request: Json,
db_manager: &State,
ai_manager: &State,
) -> Result<Value, (Status, String)> {
if request.query.trim().is_empty() {
return Err((Status::BadRequest, “Query cannot be empty”.to_string()));
}

// 1. Generate embedding for the user query
let query_vector = ai_manager.generate_embedding(&request.query)
    .map_err(|e| (Status::InternalServerError, format!("Embedding failed: {}", e)))?;

// 2. Search for similar chunks in SQL Server
let context_chunks = db_manager.find_similar_chunks(&query_vector, 3).await
    .map_err(|e| (Status::InternalServerError, format!("Database search failed: {}", e)))?;

if context_chunks.is_empty() {
    return Ok(json!({ "answer": "I'm sorry, I couldn't find any relevant information in the knowledge base." }));
}

// 为当前追踪添加上下文信息
if let Some(mut span) = datadog_apm_rust::active_span() {
    span.set_tag("rag.retrieved_chunks", context_chunks.len() as i64);
}

// 3. Call LLM with context
let answer = ai_manager.query_llm(&request.query, context_chunks).await
    .map_err(|e| (Status::ServiceUnavailable, format!("LLM query failed: {}", e)))?;
    
Ok(json!({ "answer": answer }))

}

#[launch]
async fn rocket() -> _ {
// 初始化日志和Datadog Tracer
env_logger::init();
let _tracer = datadog_apm_rust::init_tracer(datadog_apm_rust::Config {
service: Some(“enterprise-rag-api”.to_string()),
env: Some(env::var(“APP_ENV”).unwrap_or_else(|_| “development”.to_string())),
..Default::default()
}).expect(“Failed to init Datadog tracer”);

let db_manager = DbManager::new().await.expect("Failed to create DbManager");
let ai_manager = AiManager::new();

rocket::build()
    .mount("/api/v1", routes

  目录