业务需求的变化总是比系统架构演进快一步。最近遇到的一个典型场景是:产品团队希望在一个集成了AI能力的全新前端应用上,提供对全量商品数据的语义化搜索。挑战在于,这些商品数据并非存放在单一、现代化的数据库中,而是散落在两个核心系统里:一套是运行多年的Oracle 12c,承载着核心交易和商品信息;另一套是近几年构建的营销系统,使用MariaDB 10.6,存储着商品的扩展属性和用户评论。直接改造这两套生产数据库的方案,在立项之初就被DBA和系统负责人以“高风险、影响稳定性”为由坚决否决。
初步的构想是做一个夜间ETL,将数据统一抽取到新的数据仓库中再进行处理。但这无法满足产品对“近实时”的要求——商品信息(尤其是库存、价格)的变更必须在几分钟内反映到搜索结果中。轮询查询数据库变更的方案负载太高,且时效性差。最终,我们决定采用变更数据捕获(Change Data Capture, CDC)技术,构建一个非侵入式、低延迟的数据管道,将两个异构数据库的变更实时同步到向量数据库Qdrant中。
整个架构的设计思路是:
- 使用Debezium作为CDC工具,分别连接Oracle的LogMiner和MariaDB的binlog,捕获行级别的
INSERT
,UPDATE
,DELETE
操作。 - Debezium将捕获到的变更事件以JSON格式推送到一个高吞吐、低延迟的消息中间件。在这里,我们选择了NATS JetStream,而不是更重的Kafka,因为它运维更简单,性能对于当前场景绰绰有余。
- 一个独立的Go语言编写的消费服务订阅NATS主题,处理这些变更事件。
- 消费服务负责解析Debezium的事件格式,根据业务逻辑拼接来自不同数据源的字段,调用一个本地的文本嵌入模型(Embedding Model)生成向量,最后将结构化数据和向量数据一并写入(Upsert)Qdrant。
- 前端应用由Vite构建,它通过一个轻量级后端API查询Qdrant,实现语义搜索功能。
这个方案的核心在于解耦,源数据库只负责产生日志,完全无感知下游消费,保证了生产系统的绝对稳定。
graph TD subgraph Legacy Systems Oracle[Oracle 12c] -- XStream/LogMiner --> Debezium_ORA[Debezium Connector for Oracle] MariaDB[MariaDB 10.6] -- Binlog --> Debezium_MAR[Debezium Connector for MariaDB] end subgraph Kafka Connect Cluster Debezium_ORA -- JSON over HTTP --> KafkaConnect[Kafka Connect Worker] Debezium_MAR -- JSON over HTTP --> KafkaConnect end KafkaConnect -- Change Events --> NATS[NATS JetStream Server] subgraph Real-time Processing NATS -- Subscribes to 'db.changes' stream --> GoConsumer[Go Vectorization Service] GoConsumer -- Calls for embeddings --> EmbeddingModel[SentenceTransformer Model] EmbeddingModel -- Returns vector --> GoConsumer GoConsumer -- Upserts points --> Qdrant[Qdrant Vector DB] end subgraph User Facing Application User[User] -- Semantic Search Query --> ViteApp[Vite Frontend] ViteApp -- API Call --> BackendAPI[Backend for Frontend] BackendAPI -- Vector Search --> Qdrant end
第一步:配置Debezium Connectors
在生产环境中,我们使用一个独立的Kafka Connect集群来运行Debezium。这里的坑在于Oracle的配置。Debezium支持多种方式从Oracle捕获变更,考虑到数据库版本和DBA的接受度,我们选择了基于LogMiner的方案。这需要在数据库侧开启归档日志和补充日志(Supplemental Logging)。
为商品核心表PRODUCTS
和价格表PRODUCT_PRICES
开启补充日志:
-- 以DBA权限执行
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE MYSCHEMA.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE MYSCHEMA.PRODUCT_PRICES ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Debezium Oracle Connector的配置 (oracle-products-connector.json
) 相对复杂,关键在于数据库连接信息和LogMiner的配置项。
{
"name": "oracle-products-source",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.server.name": "ora_server1",
"database.hostname": "oracle.db.internal",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "ORCLPDB1",
"database.history.kafka.bootstrap.servers": "kafka:9092", // Debezium内部使用,非我们业务流
"database.history.kafka.topic": "dbhistory.oracle.products",
"log.mining.strategy": "online_catalog",
"table.include.list": "MYSCHEMA.PRODUCTS,MYSCHEMA.PRODUCT_PRICES",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "nats.db.oracle.$3" // 将topic路由到NATS主题
}
}
-
log.mining.strategy
: 明确使用在线日志挖掘。 -
table.include.list
: 只捕获我们关心的表。 -
transforms
: 这是个关键配置。unwrap
将Debezium复杂的事件结构简化为只包含after
或before
数据的扁平JSON。route
使用正则表达式将Kafka Connect内部的topic(如ora_server1.MYSCHEMA.PRODUCTS
)重定向到我们规划的NATS主题(如nats.db.oracle.PRODUCTS
)。
MariaDB的配置 (mariadb-ext-connector.json
) 类似但更简单,因为它直接读取binlog。
{
"name": "mariadb-ext-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mariadb.db.internal",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "maria_server1",
"database.include.list": "marketing",
"table.include.list": "marketing.product_reviews",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.mariadb.ext",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap,route",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "nats.db.mariadb.$3"
}
}
为了让Debezium能将消息发送到NATS,我们使用了 NATS Sink Connector for Kafka Connect。在Kafka Connect的worker配置中,需要指定这个connector的路径,并在启动时加载。
第二步:Go消费服务的实现
这是整个管道的核心处理逻辑。我们使用官方的nats.go
库来连接NATS JetStream,并创建一个推式(Push-based)消费者。服务的目标是稳定、高效、可容错。
main.go
:
package main
import (
"context"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"vector-sync-service/config"
"vector-sync-service/internal/consumers"
"vector-sync-service/internal/qdrant"
"vector-sync-service/internal/vectorizer"
"github.com/nats-io/nats.go"
)
func main() {
// 1. 初始化配置和日志
cfg := config.Load()
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// 2. 初始化外部依赖
qdrantClient, err := qdrant.NewClient(cfg.Qdrant.Host, cfg.Qdrant.Port, cfg.Qdrant.ApiKey)
if err != nil {
logger.Error("failed to initialize qdrant client", "error", err)
os.Exit(1)
}
defer qdrantClient.Close()
logger.Info("qdrant client initialized")
// 确保集合存在
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := qdrantClient.EnsureCollection(ctx, cfg.Qdrant.CollectionName); err != nil {
logger.Error("failed to ensure qdrant collection exists", "error", err)
os.Exit(1)
}
textVectorizer := vectorizer.NewLocalVectorizer(cfg.Vectorizer.ModelPath)
logger.Info("local vectorizer loaded")
// 3. 连接NATS JetStream
nc, err := nats.Connect(cfg.NATS.URL, nats.Timeout(10*time.Second), nats.RetryOnFailedConnect(true), nats.MaxReconnects(5), nats.ReconnectWait(time.Second))
if err != nil {
logger.Error("failed to connect to NATS", "error", err)
os.Exit(1)
}
defer nc.Close()
logger.Info("connected to NATS server", "url", cfg.NATS.URL)
js, err := nc.JetStream()
if err != nil {
logger.Error("failed to create JetStream context", "error", err)
os.Exit(1)
}
// 4. 启动消费者
// 使用一个上下文来协调所有goroutine的关闭
mainCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
handler := consumers.NewDebeziumEventHandler(logger, qdrantClient, textVectorizer, cfg.Qdrant.CollectionName)
// 监听所有匹配的数据库变更主题
// subjects: ["nats.db.oracle.>", "nats.db.mariadb.>"]
consumerManager, err := consumers.NewManager(mainCtx, js, logger, handler, cfg.NATS.StreamName, cfg.NATS.Subjects)
if err != nil {
logger.Error("failed to start consumer manager", "error", err)
os.Exit(1)
}
logger.Info("vector sync service is running...")
// 5. 等待终止信号
<-mainCtx.Done()
logger.Info("shutting down service...")
// 清理资源
consumerManager.Shutdown()
logger.Info("service stopped gracefully")
}
consumers/handler.go
是处理消息的核心逻辑。这里的一个实践要点是,我们不能简单地将收到的每一条数据直接写入Qdrant。例如,PRODUCTS
表和PRODUCT_PRICES
表是关联的,我们需要将它们的数据合并成一个文档再进行向量化。
真实项目中,这种合并逻辑会很复杂。一个常见的错误是试图在消费端做实时的跨流Join,这会引入状态管理和时序问题。更稳妥的方案是:
- 物化视图(Materialized View)模式:在消费端维护一个轻量级的缓存(如Redis)来存储部分关联数据。例如,收到
PRODUCT_PRICES
变更时,去缓存中查找对应的PRODUCTS
信息。如果找不到,可以短暂等待或标记为待处理。 - 下游聚合:让Qdrant的
payload
存储范式化的数据。PRODUCTS
和PRODUCT_PRICES
作为独立的字段存入同一个Point。这样虽然有数据冗余,但查询性能最好,且实现简单。我们选择了这种方式。
package consumers
import (
"context"
"encoding/json"
"log/slog"
"strconv"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/qdrant/go-client/qdrant"
)
type DebeziumMessage struct {
// Debezium unwrapped format
// 字段根据实际数据库表结构定义
ProductID *int `json:"PRODUCT_ID"`
ProductName *string `json:"PRODUCT_NAME"`
Description *string `json:"DESCRIPTION"`
Price *float64 `json:"PRICE"`
ReviewText *string `json:"REVIEW_TEXT"`
// ... other fields
}
// QdrantClient and Vectorizer are interfaces for testability
type QdrantClient interface {
UpsertPoints(ctx context.Context, collectionName string, points []*qdrant.PointStruct) error
DeletePoints(ctx context.Context, collectionName string, ids []uint64) error
}
type Vectorizer interface {
Embed(ctx context.Context, text string) ([]float32, error)
}
type DebeziumEventHandler struct {
logger *slog.Logger
qdrantClient QdrantClient
vectorizer Vectorizer
collectionName string
}
func NewDebeziumEventHandler(logger *slog.Logger, qc QdrantClient, v Vectorizer, collName string) *DebeziumEventHandler {
return &DebeziumEventHandler{
logger: logger,
qdrantClient: qc,
vectorizer: v,
collectionName: collName,
}
}
// Handle is the core logic for processing a single NATS message
func (h *DebeziumEventHandler) Handle(msg *nats.Msg) {
// 消息确认是关键,确保处理成功才ack
defer func() {
if err := msg.Ack(); err != nil {
h.logger.Error("failed to ack message", "subject", msg.Subject, "error", err)
}
}()
var event DebeziumMessage
if err := json.Unmarshal(msg.Data, &event); err != nil {
h.logger.Error("failed to unmarshal debezium event", "data", string(msg.Data), "error", err)
// 如果消息格式错误,直接ack,避免无限重试
return
}
// 检查主键是否存在
if event.ProductID == nil {
h.logger.Warn("event received without a product ID, skipping", "subject", msg.Subject)
return
}
pointID := uint64(*event.ProductID)
// 如果消息体为空 (Debezium的DELETE事件),则删除对应的点
if len(msg.Data) <= 2 { // check for empty json "{}"
h.logger.Info("handling delete event", "product_id", pointID)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := h.qdrantClient.DeletePoints(ctx, h.collectionName, []uint64{pointID}); err != nil {
h.logger.Error("failed to delete point from qdrant", "product_id", pointID, "error", err)
// 对于失败的操作,我们不ack,让NATS重传
msg.NakWithDelay(3 * time.Second)
}
return
}
// 构建用于向量化的文本
var textToEmbed strings.Builder
if event.ProductName != nil {
textToEmbed.WriteString(*event.ProductName)
textToEmbed.WriteString(". ")
}
if event.Description != nil {
textToEmbed.WriteString(*event.Description)
textToEmbed.WriteString(". ")
}
if event.ReviewText != nil {
textToEmbed.WriteString("User review: ")
textToEmbed.WriteString(*event.ReviewText)
}
if textToEmbed.Len() == 0 {
h.logger.Warn("no text content to vectorize, skipping upsert", "product_id", pointID)
return
}
// 生成向量
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
vector, err := h.vectorizer.Embed(ctx, textToEmbed.String())
if err != nil {
h.logger.Error("failed to generate embedding", "product_id", pointID, "error", err)
msg.NakWithDelay(5 * time.Second) // 嵌入模型失败,可以重试
return
}
// 构建Qdrant的Point
payload := h.buildPayload(event)
point := &qdrant.PointStruct{
Id: &qdrant.PointId{PointIdOptions: &qdrant.PointId_Num{Num: pointID}},
Payload: payload,
Vectors: &qdrant.Vectors{VectorsOptions: &qdrant.Vectors_Vector{Vector: &qdrant.Vector{Data: vector}}},
}
// Upsert到Qdrant
upsertCtx, upsertCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer upsertCancel()
if err := h.qdrantClient.UpsertPoints(upsertCtx, h.collectionName, []*qdrant.PointStruct{point}); err != nil {
h.logger.Error("failed to upsert point to qdrant", "product_id", pointID, "error", err)
msg.NakWithDelay(3 * time.Second) // Qdrant写入失败,重试
return
}
h.logger.Info("successfully processed event and upserted to qdrant", "product_id", pointID, "subject", msg.Subject)
}
func (h *DebeziumEventHandler) buildPayload(event DebeziumMessage) map[string]*qdrant.Value {
payload := map[string]*qdrant.Value{}
if event.ProductName != nil {
payload["product_name"] = &qdrant.Value{Kind: &qdrant.Value_StringValue{StringValue: *event.ProductName}}
}
if event.Price != nil {
payload["price"] = &qdrant.Value{Kind: &qdrant.Value_DoubleValue{DoubleValue: *event.Price}}
}
//... add other fields to payload for filtering
return payload
}
单元测试的思路是,通过接口模拟QdrantClient
和Vectorizer
,然后构造不同类型的nats.Msg
(创建、更新、删除、格式错误、空内容等)来调用Handle
方法,验证模拟对象的调用情况以及消息是否被正确Ack
或Nak
。
第三步:前端查询与展示
前端部分使用Vite + TypeScript + React。核心是一个搜索组件,它将用户的输入文本发送到后端API,后端再将请求转发给Qdrant进行向量搜索。
一个简化的前端API请求函数:
// src/services/searchApi.ts
import axios from 'axios';
interface SearchResult {
id: number;
score: number;
payload: {
product_name: string;
price: number;
// ... other fields
};
}
const API_BASE_URL = '/api'; // Using Vite's proxy
export const performSemanticSearch = async (
query: string,
limit: number = 10
): Promise<SearchResult[]> => {
if (!query.trim()) {
return [];
}
try {
// In a real app, the backend would handle embedding the query text
const response = await axios.post<{ results: SearchResult[] }>(
`${API_BASE_URL}/search`,
{
query_text: query,
limit,
}
);
return response.data.results;
} catch (error) {
console.error('Error performing semantic search:', error);
// Add user-facing error handling logic
return [];
}
};
后端API(例如用Go或Node.js实现)会接收这个请求,首先将query_text
用与消费服务相同的嵌入模型转换为向量,然后向Qdrant发起SearchPoints
请求。
方案的局限性与未来迭代
这套架构成功地解决了在不触碰核心系统的前提下,为其增加现代AI搜索能力的需求。然而,它并非银弹,也存在一些需要持续关注和优化的点。
首先,历史数据全量同步的问题。CDC只解决了增量数据的同步,对于已经存在于数据库中的数百万条历史数据,需要一个单独的、一次性的批处理任务来完成初始化。这个过程需要仔细设计,以避免对源数据库造成过大压力,比如可以分批次、在业务低峰期执行。
其次,Schema演进。如果源数据库的表结构发生变更(例如增加列),Debezium可以捕获到,但我们的消费服务需要相应的代码修改才能正确处理新的字段。这要求DBA的变更流程需要与应用开发团队紧密协同,否则可能导致数据管道中断或数据丢失。一个改进方向是让消费服务更具弹性,例如将不认识的字段以JSON格式存入Qdrant的payload中,而不是直接丢弃。
最后是端到端的数据一致性监控。虽然NATS JetStream提供了至少一次(At-Least-Once)的交付保证,加上我们消费端的幂等写入(Upsert),可以实现事实上的有效一次(Effectively-Once)处理。但在复杂的分布式系统中,总有意外发生。我们需要建立一套监控机制,定期抽样比对源数据库与Qdrant中的数据,以发现潜在的延迟或数据不一致问题,并触发告警。