利用 Apache Hudi 增量流构建支持 LlamaIndex 的近实时混合检索 RAG 管道


我们团队维护的一个基于 RAG 的内部知识库系统遇到了一个棘手的瓶颈:数据新鲜度。最初的架构简单粗暴,每晚通过 Spark 作业全量读取源数据,计算 embedding,然后将数百万文档向量完全重建到 OpenSearch 索引中。这个过程耗时4-6小时,意味着白天的知识更新最快也要到第二天才能被检索到。对于一个需要处理实时工单、产品文档更新的系统,24小时的延迟是完全无法接受的。问题的核心在于,我们缺乏一个有效的方式来捕捉源数据的变更(CDC),只能被迫采用全量刷新的笨办法。

初步的构想是引入一个消息队列(如 Kafka)和 Debezium 来捕获上游数据库的变更。但这会引入一套全新的、复杂的流处理基础设施,运维成本很高,并且对于我们已经基于数据湖的分析体系来说,显得格格不入。我们需要一个能在数据湖层面解决这个问题的方案。Apache Hudi 的增量查询(Incremental Query)能力进入了我们的视野。它允许我们像查询流数据一样,拉取两次提交之间的所有变更数据(Inserts, Updates, Deletes)。这正是我们需要的。

最终的技术选型敲定为一套组合拳,目标是实现一个数据延迟在分钟级的近实时 RAG 管道:

  • Apache Hudi (MoR Table): 作为知识库的“源真相”。选择 Merge-on-Read 类型是为了优化写入性能,同时其增量查询能力是整个方案的基石。
  • OpenSearch: 作为主要的向量数据库。它提供强大的可扩展性和混合检索能力(向量+关键词)。
  • Redis: 承担双重角色。首先,作为一级缓存,存储高频访问文档的 embedding,降低对 OpenSearch 的压力;其次,利用其内建的向量搜索功能,为特定场景(如最新文档)提供一个超低延迟的检索通道。
  • LlamaIndex: 作为 RAG 的编排框架。它的可插拔设计允许我们轻松定制数据源、向量存储和检索逻辑,将这套混合系统无缝集成。

整个架构的目标是清晰的:当 Hudi 表发生变更时,一个下游的 Spark 作业能通过增量查询高效地捕获这些变更,然后将它们传播到 OpenSearch 和 Redis,最终通过 LlamaIndex 提供给大语言模型。

架构概览

在深入代码之前,我们先用 Mermaid 图来描绘一下整个数据流。

flowchart TD
    subgraph 数据源与湖仓
        A[业务系统数据] --> B(Spark Ingestion Job);
        B --> C{Apache Hudi MoR Table};
    end

    subgraph 增量处理与索引
        C -- 1. 增量拉取变更数据 --> D(Incremental Spark Job);
        D -- 2. 计算 Embedding --> E[Embedding Service];
        E -- 3. Upsert 向量与元数据 --> F(OpenSearch Index);
        E -- 4. Cache/Upsert 热点向量 --> G(Redis Vector Store);
    end

    subgraph RAG 应用层
        H[User Query] --> I(LlamaIndex Application);
        I -- 5. 混合检索 --> F;
        I -- 6. 缓存/快速检索 --> G;
        F -- 7. 返回相关文档 --> I;
        G -- 8. 返回相关文档 --> I;
        I -- 9. 构建 Prompt --> J(Large Language Model);
        J -- 10. 返回答案 --> I;
        I -- 11. 最终结果 --> H;
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#bbf,stroke:#333,stroke-width:2px
    style I fill:#9f9,stroke:#333,stroke-width:2px

步骤一:构建 Hudi 表作为知识源

一切的起点是一个可靠、可增量查询的 Hudi 表。我们选择 Merge-on-Read (MoR) 表类型,因为它将数据写入速度置于优先地位,仅将变更写入 log 文件,这对于需要快速摄取更新的场景非常理想。

以下是一个 PySpark 示例,用于初始化并持续写入数据到 Hudi 表。

# hudi_ingestion.py
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, SaveMode

def setup_spark_session():
    """配置支持 Hudi 的 Spark Session"""
    return SparkSession.builder \
        .appName("HudiKnowledgeBaseIngestion") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0") \
        .getOrCreate()

def write_to_hudi(spark, df, table_name, db_name, record_key, precombine_key, partition_path):
    """向 Hudi 表写入数据"""
    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
        'hoodie.datasource.write.recordkey.field': record_key,
        'hoodie.datasource.write.precombine.field': precombine_key,
        'hoodie.datasource.write.partitionpath.field': partition_path,
        'hoodie.datasource.write.operation': 'upsert',
        'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
        'hoodie.index.type': 'BLOOM',
        'hoodie.bloom.index.update.partition.path': 'true',
        'hoodie.finalize.write.parallelism': 2,
        'hoodie.upsert.shuffle.parallelism': 2,
    }
    
    db_table_path = f"/path/to/lakehouse/{db_name}/{table_name}"

    df.write.format("hudi") \
        .options(**hudi_options) \
        .mode(SaveMode.Append) \
        .save(db_table_path)

if __name__ == "__main__":
    spark = setup_spark_session()

    # 模拟一批新的或更新的文档
    docs_data = [
        (1, "doc_001", "Apache Hudi 是一种...", "tech", "2023-11-20 10:00:00"),
        (2, "doc_002", "LlamaIndex 是一个...", "ai", "2023-11-20 11:00:00"),
        # 模拟一次更新
        (3, "doc_001", "Apache Hudi is a streaming data lake platform...", "tech", "2023-11-21 14:00:00"),
    ]
    columns = ["id", "doc_id", "content", "category", "updated_at"]
    docs_df = spark.createDataFrame(docs_data, columns)

    # 这里的 `updated_at` 作为 precombine key, 确保我们总能拿到最新的文档版本
    write_to_hudi(
        spark=spark,
        df=docs_df,
        table_name="knowledge_base",
        db_name="rag",
        record_key="doc_id",
        precombine_key="updated_at",
        partition_path="category"
    )

    spark.stop()

生产环境注意事项:

  • Precombine Key (precombine.field): 必须选择一个能代表记录新旧顺序的字段,通常是更新时间戳。这是 Hudi 处理更新时解决冲突的关键。
  • Record Key (recordkey.field): 业务上的唯一标识符,用于定位需要更新的记录。
  • Partitioning: 合理的分区策略对性能至关重要。这里我们按 category 分区,可以加速特定类别的增量查询。

步骤二:实现增量拉取与向量化

这是整个管道的核心。我们将编写一个独立的 Spark 作业,定期运行(例如,通过 Airflow 每5分钟调度一次),它会检查 Hudi 表的最新提交,并拉取从上次处理点以来的所有变更。

# incremental_vectorizer.py
import os
import time
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# 假设这是一个外部服务, 生产中应使用更健壮的客户端
EMBEDDING_SERVICE_URL = "http://localhost:8000/embed"
OPENSEARCH_HOST = "localhost"
OPENSEARCH_PORT = 9200
REDIS_HOST = "localhost"
REDIS_PORT = 6379

# 用于追踪处理进度的文件
CHECKPOINT_FILE = "/path/to/checkpoints/rag_kb_checkpoint.txt"

def get_last_commit_time():
    """从检查点文件读取上次成功处理的 Hudi commit time"""
    if not os.path.exists(CHECKPOINT_FILE):
        return "0"  # 从头开始
    with open(CHECKPOINT_FILE, 'r') as f:
        return f.read().strip()

def save_commit_time(commit_time):
    """将最新的 Hudi commit time 保存到检查点文件"""
    with open(CHECKPOINT_FILE, 'w') as f:
        f.write(str(commit_time))

def setup_spark_session():
    # 与 ingestion 作业相同的配置
    return SparkSession.builder \
        .appName("HudiIncrementalVectorizer") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0") \
        .getOrCreate()

def process_partition(iterator):
    """
    这是在每个 Spark Executor 上运行的函数。
    它处理一个分区的数据,计算 embedding 并写入 OpenSearch 和 Redis。
    """
    from opensearchpy import OpenSearch, helpers
    import redis

    os_client = OpenSearch(
        hosts=[{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
        http_auth=('user', 'password'), # 根据实际情况配置
        use_ssl=True, verify_certs=False, ssl_show_warn=False
    )
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
    
    actions = []
    for row in iterator:
        doc_id = row['doc_id']
        content = row['content']
        # _hoodie_is_deleted 是 Hudi MoR 表增量查询时提供的特殊列
        is_deleted = row.get('_hoodie_is_deleted', False)

        if is_deleted:
            # 处理删除操作
            actions.append({'_op_type': 'delete', '_index': 'knowledge_base_vector', '_id': doc_id})
            redis_client.delete(f"kb_hot:{doc_id}")
            continue

        try:
            # 1. 获取 Embedding
            response = requests.post(EMBEDDING_SERVICE_URL, json={'text': content})
            response.raise_for_status()
            vector = response.json()['embedding']

            # 2. 准备 OpenSearch 的 upsert 操作
            doc = {
                'doc_id': doc_id,
                'content': content,
                'category': row['category'],
                'updated_at': row['updated_at'],
                'content_vector': vector
            }
            actions.append({'_op_type': 'index', '_index': 'knowledge_base_vector', '_id': doc_id, '_source': doc})

            # 3. 写入 Redis (这里可以加入业务逻辑判断是否为热点数据)
            # 在此简化为全部写入
            redis_client.hset(f"kb_hot:{doc_id}", mapping={
                "content": content,
                "vector_blob": ",".join(map(str, vector)) # 简单序列化
            })

        except requests.RequestException as e:
            # 生产级代码需要有更完善的日志和重试机制
            print(f"Error processing doc_id {doc_id}: {e}")
            continue
    
    if actions:
        try:
            helpers.bulk(os_client, actions)
        except Exception as e:
            # 处理批量写入失败
            print(f"Bulk indexing to OpenSearch failed: {e}")

if __name__ == "__main__":
    spark = setup_spark_session()
    table_path = "/path/to/lakehouse/rag/knowledge_base"

    # 1. 获取 Hudi 表的所有提交记录
    commits = spark.sql(f"SELECT distinct(_hoodie_commit_time) as commitTime FROM hudi.table_changes('rag.knowledge_base', 'latest') ORDER BY commitTime").collect()
    commit_times = [row['commitTime'] for row in commits]

    if not commit_times:
        print("No new commits found.")
        spark.stop()
        exit()

    # 2. 确定增量查询的范围
    last_processed_commit = get_last_commit_time()
    latest_commit = commit_times[-1]

    if last_processed_commit == latest_commit:
        print(f"Already processed up to commit {latest_commit}.")
        spark.stop()
        exit()

    print(f"Processing commits from {last_processed_commit} to {latest_commit}")

    # 3. 执行增量查询
    incremental_df = spark.read.format("hudi") \
        .option("hoodie.datasource.query.type", "incremental") \
        .option("hoodie.datasource.read.begin.instanttime", last_processed_commit) \
        .option("hoodie.datasource.read.end.instanttime", latest_commit) \
        .load(table_path)
    
    # 过滤掉仅元数据变更的行
    valid_changes_df = incremental_df.filter(col("doc_id").isNotNull())
    
    if valid_changes_df.rdd.isEmpty():
        print("No actual data changes in the new commits.")
    else:
        # 4. 对每个分区进行处理
        valid_changes_df.foreachPartition(process_partition)

    # 5. 更新检查点
    save_commit_time(latest_commit)
    print(f"Successfully processed up to commit {latest_commit}.")

    spark.stop()

这里的坑在于:

  • Checkpoint管理: 必须可靠地存储上次处理的 commit_time。如果失败,可能会导致数据重复处理或丢失。将 checkpoint 存储在 HDFS、S3 或一个可靠的数据库中是必要的。
  • 处理删除: MoR 表的增量查询会为被删除的记录提供一个 _hoodie_is_deleted = true 的列。下游作业必须捕获这个信号,并在 OpenSearch 和 Redis 中执行相应的删除操作,否则向量库会充满脏数据。
  • 空提交: Hudi 可能因为表服务(如compaction)产生没有数据变更的提交。增量查询结果可能为空,代码需要优雅地处理这种情况。
  • Executor 端的依赖: process_partition 函数在 executor 上执行,它需要的 Python 库(如 opensearch-py, redis)必须在所有工作节点上都可用。

步骤三:在 LlamaIndex 中实现混合检索

现在数据已经近乎实时地同步到了 OpenSearch 和 Redis,最后一步是让 LlamaIndex 能够智能地利用这两个数据源。我们将创建一个自定义的检索器(Retriever),它会先查询 Redis 获取低延迟的候选结果,然后使用 OpenSearch 进行更精细的语义重排或补充。

# custom_retriever.py
import numpy as np
import json
from typing import List
from llama_index.core.schema import NodeWithScore, QueryBundle
from llama_index.core.retrievers import BaseRetriever
from llama_index.vector_stores.opensearch import OpenSearchVectorStore
from llama_index.core import VectorStoreIndex, StorageContext
from opensearchpy import OpenSearch
import redis

class HybridCacheRetriever(BaseRetriever):
    """
    一个混合检索器:
    1. 首先查询 Redis 快速获取可能的候选者 (例如,最新的N个文档)。
    2. 然后使用 OpenSearch 对用户的原始查询进行深度语义搜索。
    3. 合并并去重两个来源的结果。
    """
    def __init__(self, opensearch_retriever, redis_client, top_k=5):
        self._opensearch_retriever = opensearch_retriever
        self._redis_client = redis_client
        self._top_k = top_k
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        # 1. OpenSearch 深度语义搜索
        opensearch_nodes = self._opensearch_retriever.retrieve(query_bundle)
        
        # 2. Redis 快速通道 (这里的逻辑可以非常灵活)
        # 示例:假设我们想总是包含最新的几个文档作为上下文
        # 生产中,这里可以实现更复杂的逻辑,例如基于用户画像的个性化缓存召回
        redis_nodes = []
        try:
            # 这是一个简化的示例,假设 Redis 中缓存了最新的文档
            # 实际中,你可能需要一个独立的机制来维护这个"最新"列表
            latest_doc_ids = self._redis_client.zrevrange("kb:latest", 0, 2)
            for doc_id in latest_doc_ids:
                cached_data = self._redis_client.hgetall(f"kb_hot:{doc_id}")
                if cached_data:
                    node = NodeWithScore(
                        node=TextNode(text=cached_data['content'], id_=doc_id),
                        score=0.95 # 给缓存结果一个较高的固定分数
                    )
                    redis_nodes.append(node)
        except redis.RedisError as e:
            print(f"Redis retrieval failed: {e}")

        # 3. 合并与去重
        all_nodes = opensearch_nodes + redis_nodes
        seen_ids = set()
        unique_nodes = []
        for node in sorted(all_nodes, key=lambda x: x.score, reverse=True):
            if node.node.id_ not in seen_ids:
                unique_nodes.append(node)
                seen_ids.add(node.node.id_)

        return unique_nodes[:self._top_k]

# --- 集成应用 ---
if __name__ == "__main__":
    from llama_index.core import get_response_synthesizer
    from llama_index.core.query_engine import RetrieverQueryEngine
    from llama_index.embeddings.huggingface import HuggingFaceEmbedding
    from llama_index.core.node_parser import SimpleNodeParser
    from llama_index.core.schema import TextNode
    
    # 1. 配置 OpenSearch 连接
    os_client = OpenSearch(...)
    vector_store = OpenSearchVectorStore(
        client=os_client, 
        index_name="knowledge_base_vector", 
        text_field="content",
        embedding_field="content_vector"
    )
    
    # LlamaIndex 需要一个 Index 对象来构建 Retriever
    index = VectorStoreIndex.from_vector_store(vector_store)
    
    # 2. 获取标准的 OpenSearch 检索器
    opensearch_retriever = index.as_retriever(similarity_top_k=5)
    
    # 3. 配置 Redis 连接
    redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
    
    # 4. 实例化我们的自定义混合检索器
    hybrid_retriever = HybridCacheRetriever(
        opensearch_retriever=opensearch_retriever,
        redis_client=redis_client,
        top_k=5
    )
    
    # 5. 构建查询引擎
    response_synthesizer = get_response_synthesizer()
    query_engine = RetrieverQueryEngine(
        retriever=hybrid_retriever,
        response_synthesizer=response_synthesizer,
    )
    
    # 6. 执行查询
    response = query_engine.query("What is Apache Hudi's Merge-on-Read table type?")
    print(response)

设计的权衡与考量:

  • 检索策略的复杂性: HybridCacheRetriever 的实现可以非常复杂。它可以并行查询,使用重排模型(reranker)对合并后的结果进行二次排序,或者根据查询的类型动态决定是只查缓存还是查全库。
  • LlamaIndex 版本兼容性: LlamaIndex API 迭代很快,自定义组件(如 BaseRetriever)的实现需要关注其版本变化。
  • 一致性问题: 这是一个最终一致性的系统。在 Hudi 提交变更和该变更反映到 OpenSearch/Redis 之间,存在一个短暂的延迟窗口(取决于 Spark 作业的调度频率)。应用层必须能容忍这种分钟级的延迟。

局限性与未来迭代路径

这个架构显著改善了数据新鲜度,从24小时缩短到了5分钟级别,但它并非银弹。首先,整个系统的复杂性增加了,引入了 Hudi 和一个额外的 Spark 作业,需要更细致的监控和运维。其次,Hudi MoR 表的小文件问题需要通过定期的 compactionclustering 表服务来管理,这为平台增加了一些维护开销。

未来的优化路径可以集中在几个方面。第一,可以探索使用 Flink 来替代 Spark 进行增量处理,这有望将端到端的延迟进一步降低到秒级。第二,当前的混合检索策略相对简单,可以引入一个轻量级的机器学习模型来做更智能的重排,提升最终结果的相关性。第三,对于删除信号的处理,可以增加一个延迟队列或双重确认机制,以防止意外删除操作导致向量库数据永久丢失,提高系统的鲁棒性。


  目录