我们团队维护的一个基于 RAG 的内部知识库系统遇到了一个棘手的瓶颈:数据新鲜度。最初的架构简单粗暴,每晚通过 Spark 作业全量读取源数据,计算 embedding,然后将数百万文档向量完全重建到 OpenSearch 索引中。这个过程耗时4-6小时,意味着白天的知识更新最快也要到第二天才能被检索到。对于一个需要处理实时工单、产品文档更新的系统,24小时的延迟是完全无法接受的。问题的核心在于,我们缺乏一个有效的方式来捕捉源数据的变更(CDC),只能被迫采用全量刷新的笨办法。
初步的构想是引入一个消息队列(如 Kafka)和 Debezium 来捕获上游数据库的变更。但这会引入一套全新的、复杂的流处理基础设施,运维成本很高,并且对于我们已经基于数据湖的分析体系来说,显得格格不入。我们需要一个能在数据湖层面解决这个问题的方案。Apache Hudi 的增量查询(Incremental Query)能力进入了我们的视野。它允许我们像查询流数据一样,拉取两次提交之间的所有变更数据(Inserts, Updates, Deletes)。这正是我们需要的。
最终的技术选型敲定为一套组合拳,目标是实现一个数据延迟在分钟级的近实时 RAG 管道:
- Apache Hudi (MoR Table): 作为知识库的“源真相”。选择 Merge-on-Read 类型是为了优化写入性能,同时其增量查询能力是整个方案的基石。
- OpenSearch: 作为主要的向量数据库。它提供强大的可扩展性和混合检索能力(向量+关键词)。
- Redis: 承担双重角色。首先,作为一级缓存,存储高频访问文档的 embedding,降低对 OpenSearch 的压力;其次,利用其内建的向量搜索功能,为特定场景(如最新文档)提供一个超低延迟的检索通道。
- LlamaIndex: 作为 RAG 的编排框架。它的可插拔设计允许我们轻松定制数据源、向量存储和检索逻辑,将这套混合系统无缝集成。
整个架构的目标是清晰的:当 Hudi 表发生变更时,一个下游的 Spark 作业能通过增量查询高效地捕获这些变更,然后将它们传播到 OpenSearch 和 Redis,最终通过 LlamaIndex 提供给大语言模型。
架构概览
在深入代码之前,我们先用 Mermaid 图来描绘一下整个数据流。
flowchart TD subgraph 数据源与湖仓 A[业务系统数据] --> B(Spark Ingestion Job); B --> C{Apache Hudi MoR Table}; end subgraph 增量处理与索引 C -- 1. 增量拉取变更数据 --> D(Incremental Spark Job); D -- 2. 计算 Embedding --> E[Embedding Service]; E -- 3. Upsert 向量与元数据 --> F(OpenSearch Index); E -- 4. Cache/Upsert 热点向量 --> G(Redis Vector Store); end subgraph RAG 应用层 H[User Query] --> I(LlamaIndex Application); I -- 5. 混合检索 --> F; I -- 6. 缓存/快速检索 --> G; F -- 7. 返回相关文档 --> I; G -- 8. 返回相关文档 --> I; I -- 9. 构建 Prompt --> J(Large Language Model); J -- 10. 返回答案 --> I; I -- 11. 最终结果 --> H; end style C fill:#f9f,stroke:#333,stroke-width:2px style D fill:#bbf,stroke:#333,stroke-width:2px style I fill:#9f9,stroke:#333,stroke-width:2px
步骤一:构建 Hudi 表作为知识源
一切的起点是一个可靠、可增量查询的 Hudi 表。我们选择 Merge-on-Read (MoR) 表类型,因为它将数据写入速度置于优先地位,仅将变更写入 log 文件,这对于需要快速摄取更新的场景非常理想。
以下是一个 PySpark 示例,用于初始化并持续写入数据到 Hudi 表。
# hudi_ingestion.py
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, SaveMode
def setup_spark_session():
"""配置支持 Hudi 的 Spark Session"""
return SparkSession.builder \
.appName("HudiKnowledgeBaseIngestion") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0") \
.getOrCreate()
def write_to_hudi(spark, df, table_name, db_name, record_key, precombine_key, partition_path):
"""向 Hudi 表写入数据"""
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field': record_key,
'hoodie.datasource.write.precombine.field': precombine_key,
'hoodie.datasource.write.partitionpath.field': partition_path,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.DefaultHoodieRecordPayload',
'hoodie.index.type': 'BLOOM',
'hoodie.bloom.index.update.partition.path': 'true',
'hoodie.finalize.write.parallelism': 2,
'hoodie.upsert.shuffle.parallelism': 2,
}
db_table_path = f"/path/to/lakehouse/{db_name}/{table_name}"
df.write.format("hudi") \
.options(**hudi_options) \
.mode(SaveMode.Append) \
.save(db_table_path)
if __name__ == "__main__":
spark = setup_spark_session()
# 模拟一批新的或更新的文档
docs_data = [
(1, "doc_001", "Apache Hudi 是一种...", "tech", "2023-11-20 10:00:00"),
(2, "doc_002", "LlamaIndex 是一个...", "ai", "2023-11-20 11:00:00"),
# 模拟一次更新
(3, "doc_001", "Apache Hudi is a streaming data lake platform...", "tech", "2023-11-21 14:00:00"),
]
columns = ["id", "doc_id", "content", "category", "updated_at"]
docs_df = spark.createDataFrame(docs_data, columns)
# 这里的 `updated_at` 作为 precombine key, 确保我们总能拿到最新的文档版本
write_to_hudi(
spark=spark,
df=docs_df,
table_name="knowledge_base",
db_name="rag",
record_key="doc_id",
precombine_key="updated_at",
partition_path="category"
)
spark.stop()
生产环境注意事项:
- Precombine Key (
precombine.field
): 必须选择一个能代表记录新旧顺序的字段,通常是更新时间戳。这是 Hudi 处理更新时解决冲突的关键。 - Record Key (
recordkey.field
): 业务上的唯一标识符,用于定位需要更新的记录。 - Partitioning: 合理的分区策略对性能至关重要。这里我们按
category
分区,可以加速特定类别的增量查询。
步骤二:实现增量拉取与向量化
这是整个管道的核心。我们将编写一个独立的 Spark 作业,定期运行(例如,通过 Airflow 每5分钟调度一次),它会检查 Hudi 表的最新提交,并拉取从上次处理点以来的所有变更。
# incremental_vectorizer.py
import os
import time
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
# 假设这是一个外部服务, 生产中应使用更健壮的客户端
EMBEDDING_SERVICE_URL = "http://localhost:8000/embed"
OPENSEARCH_HOST = "localhost"
OPENSEARCH_PORT = 9200
REDIS_HOST = "localhost"
REDIS_PORT = 6379
# 用于追踪处理进度的文件
CHECKPOINT_FILE = "/path/to/checkpoints/rag_kb_checkpoint.txt"
def get_last_commit_time():
"""从检查点文件读取上次成功处理的 Hudi commit time"""
if not os.path.exists(CHECKPOINT_FILE):
return "0" # 从头开始
with open(CHECKPOINT_FILE, 'r') as f:
return f.read().strip()
def save_commit_time(commit_time):
"""将最新的 Hudi commit time 保存到检查点文件"""
with open(CHECKPOINT_FILE, 'w') as f:
f.write(str(commit_time))
def setup_spark_session():
# 与 ingestion 作业相同的配置
return SparkSession.builder \
.appName("HudiIncrementalVectorizer") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.3-bundle_2.12:0.14.0") \
.getOrCreate()
def process_partition(iterator):
"""
这是在每个 Spark Executor 上运行的函数。
它处理一个分区的数据,计算 embedding 并写入 OpenSearch 和 Redis。
"""
from opensearchpy import OpenSearch, helpers
import redis
os_client = OpenSearch(
hosts=[{'host': OPENSEARCH_HOST, 'port': OPENSEARCH_PORT}],
http_auth=('user', 'password'), # 根据实际情况配置
use_ssl=True, verify_certs=False, ssl_show_warn=False
)
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
actions = []
for row in iterator:
doc_id = row['doc_id']
content = row['content']
# _hoodie_is_deleted 是 Hudi MoR 表增量查询时提供的特殊列
is_deleted = row.get('_hoodie_is_deleted', False)
if is_deleted:
# 处理删除操作
actions.append({'_op_type': 'delete', '_index': 'knowledge_base_vector', '_id': doc_id})
redis_client.delete(f"kb_hot:{doc_id}")
continue
try:
# 1. 获取 Embedding
response = requests.post(EMBEDDING_SERVICE_URL, json={'text': content})
response.raise_for_status()
vector = response.json()['embedding']
# 2. 准备 OpenSearch 的 upsert 操作
doc = {
'doc_id': doc_id,
'content': content,
'category': row['category'],
'updated_at': row['updated_at'],
'content_vector': vector
}
actions.append({'_op_type': 'index', '_index': 'knowledge_base_vector', '_id': doc_id, '_source': doc})
# 3. 写入 Redis (这里可以加入业务逻辑判断是否为热点数据)
# 在此简化为全部写入
redis_client.hset(f"kb_hot:{doc_id}", mapping={
"content": content,
"vector_blob": ",".join(map(str, vector)) # 简单序列化
})
except requests.RequestException as e:
# 生产级代码需要有更完善的日志和重试机制
print(f"Error processing doc_id {doc_id}: {e}")
continue
if actions:
try:
helpers.bulk(os_client, actions)
except Exception as e:
# 处理批量写入失败
print(f"Bulk indexing to OpenSearch failed: {e}")
if __name__ == "__main__":
spark = setup_spark_session()
table_path = "/path/to/lakehouse/rag/knowledge_base"
# 1. 获取 Hudi 表的所有提交记录
commits = spark.sql(f"SELECT distinct(_hoodie_commit_time) as commitTime FROM hudi.table_changes('rag.knowledge_base', 'latest') ORDER BY commitTime").collect()
commit_times = [row['commitTime'] for row in commits]
if not commit_times:
print("No new commits found.")
spark.stop()
exit()
# 2. 确定增量查询的范围
last_processed_commit = get_last_commit_time()
latest_commit = commit_times[-1]
if last_processed_commit == latest_commit:
print(f"Already processed up to commit {latest_commit}.")
spark.stop()
exit()
print(f"Processing commits from {last_processed_commit} to {latest_commit}")
# 3. 执行增量查询
incremental_df = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", last_processed_commit) \
.option("hoodie.datasource.read.end.instanttime", latest_commit) \
.load(table_path)
# 过滤掉仅元数据变更的行
valid_changes_df = incremental_df.filter(col("doc_id").isNotNull())
if valid_changes_df.rdd.isEmpty():
print("No actual data changes in the new commits.")
else:
# 4. 对每个分区进行处理
valid_changes_df.foreachPartition(process_partition)
# 5. 更新检查点
save_commit_time(latest_commit)
print(f"Successfully processed up to commit {latest_commit}.")
spark.stop()
这里的坑在于:
- Checkpoint管理: 必须可靠地存储上次处理的
commit_time
。如果失败,可能会导致数据重复处理或丢失。将 checkpoint 存储在 HDFS、S3 或一个可靠的数据库中是必要的。 - 处理删除: MoR 表的增量查询会为被删除的记录提供一个
_hoodie_is_deleted = true
的列。下游作业必须捕获这个信号,并在 OpenSearch 和 Redis 中执行相应的删除操作,否则向量库会充满脏数据。 - 空提交: Hudi 可能因为表服务(如compaction)产生没有数据变更的提交。增量查询结果可能为空,代码需要优雅地处理这种情况。
- Executor 端的依赖:
process_partition
函数在 executor 上执行,它需要的 Python 库(如opensearch-py
,redis
)必须在所有工作节点上都可用。
步骤三:在 LlamaIndex 中实现混合检索
现在数据已经近乎实时地同步到了 OpenSearch 和 Redis,最后一步是让 LlamaIndex 能够智能地利用这两个数据源。我们将创建一个自定义的检索器(Retriever),它会先查询 Redis 获取低延迟的候选结果,然后使用 OpenSearch 进行更精细的语义重排或补充。
# custom_retriever.py
import numpy as np
import json
from typing import List
from llama_index.core.schema import NodeWithScore, QueryBundle
from llama_index.core.retrievers import BaseRetriever
from llama_index.vector_stores.opensearch import OpenSearchVectorStore
from llama_index.core import VectorStoreIndex, StorageContext
from opensearchpy import OpenSearch
import redis
class HybridCacheRetriever(BaseRetriever):
"""
一个混合检索器:
1. 首先查询 Redis 快速获取可能的候选者 (例如,最新的N个文档)。
2. 然后使用 OpenSearch 对用户的原始查询进行深度语义搜索。
3. 合并并去重两个来源的结果。
"""
def __init__(self, opensearch_retriever, redis_client, top_k=5):
self._opensearch_retriever = opensearch_retriever
self._redis_client = redis_client
self._top_k = top_k
super().__init__()
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
# 1. OpenSearch 深度语义搜索
opensearch_nodes = self._opensearch_retriever.retrieve(query_bundle)
# 2. Redis 快速通道 (这里的逻辑可以非常灵活)
# 示例:假设我们想总是包含最新的几个文档作为上下文
# 生产中,这里可以实现更复杂的逻辑,例如基于用户画像的个性化缓存召回
redis_nodes = []
try:
# 这是一个简化的示例,假设 Redis 中缓存了最新的文档
# 实际中,你可能需要一个独立的机制来维护这个"最新"列表
latest_doc_ids = self._redis_client.zrevrange("kb:latest", 0, 2)
for doc_id in latest_doc_ids:
cached_data = self._redis_client.hgetall(f"kb_hot:{doc_id}")
if cached_data:
node = NodeWithScore(
node=TextNode(text=cached_data['content'], id_=doc_id),
score=0.95 # 给缓存结果一个较高的固定分数
)
redis_nodes.append(node)
except redis.RedisError as e:
print(f"Redis retrieval failed: {e}")
# 3. 合并与去重
all_nodes = opensearch_nodes + redis_nodes
seen_ids = set()
unique_nodes = []
for node in sorted(all_nodes, key=lambda x: x.score, reverse=True):
if node.node.id_ not in seen_ids:
unique_nodes.append(node)
seen_ids.add(node.node.id_)
return unique_nodes[:self._top_k]
# --- 集成应用 ---
if __name__ == "__main__":
from llama_index.core import get_response_synthesizer
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core.schema import TextNode
# 1. 配置 OpenSearch 连接
os_client = OpenSearch(...)
vector_store = OpenSearchVectorStore(
client=os_client,
index_name="knowledge_base_vector",
text_field="content",
embedding_field="content_vector"
)
# LlamaIndex 需要一个 Index 对象来构建 Retriever
index = VectorStoreIndex.from_vector_store(vector_store)
# 2. 获取标准的 OpenSearch 检索器
opensearch_retriever = index.as_retriever(similarity_top_k=5)
# 3. 配置 Redis 连接
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)
# 4. 实例化我们的自定义混合检索器
hybrid_retriever = HybridCacheRetriever(
opensearch_retriever=opensearch_retriever,
redis_client=redis_client,
top_k=5
)
# 5. 构建查询引擎
response_synthesizer = get_response_synthesizer()
query_engine = RetrieverQueryEngine(
retriever=hybrid_retriever,
response_synthesizer=response_synthesizer,
)
# 6. 执行查询
response = query_engine.query("What is Apache Hudi's Merge-on-Read table type?")
print(response)
设计的权衡与考量:
- 检索策略的复杂性:
HybridCacheRetriever
的实现可以非常复杂。它可以并行查询,使用重排模型(reranker)对合并后的结果进行二次排序,或者根据查询的类型动态决定是只查缓存还是查全库。 - LlamaIndex 版本兼容性: LlamaIndex API 迭代很快,自定义组件(如
BaseRetriever
)的实现需要关注其版本变化。 - 一致性问题: 这是一个最终一致性的系统。在 Hudi 提交变更和该变更反映到 OpenSearch/Redis 之间,存在一个短暂的延迟窗口(取决于 Spark 作业的调度频率)。应用层必须能容忍这种分钟级的延迟。
局限性与未来迭代路径
这个架构显著改善了数据新鲜度,从24小时缩短到了5分钟级别,但它并非银弹。首先,整个系统的复杂性增加了,引入了 Hudi 和一个额外的 Spark 作业,需要更细致的监控和运维。其次,Hudi MoR 表的小文件问题需要通过定期的 compaction
和 clustering
表服务来管理,这为平台增加了一些维护开销。
未来的优化路径可以集中在几个方面。第一,可以探索使用 Flink 来替代 Spark 进行增量处理,这有望将端到端的延迟进一步降低到秒级。第二,当前的混合检索策略相对简单,可以引入一个轻量级的机器学习模型来做更智能的重排,提升最终结果的相关性。第三,对于删除信号的处理,可以增加一个延迟队列或双重确认机制,以防止意外删除操作导致向量库数据永久丢失,提高系统的鲁棒性。