构建基于 CDC 和 NATS 的多源异构数据库实时向量同步管道


业务需求的变化总是比系统架构演进快一步。最近遇到的一个典型场景是:产品团队希望在一个集成了AI能力的全新前端应用上,提供对全量商品数据的语义化搜索。挑战在于,这些商品数据并非存放在单一、现代化的数据库中,而是散落在两个核心系统里:一套是运行多年的Oracle 12c,承载着核心交易和商品信息;另一套是近几年构建的营销系统,使用MariaDB 10.6,存储着商品的扩展属性和用户评论。直接改造这两套生产数据库的方案,在立项之初就被DBA和系统负责人以“高风险、影响稳定性”为由坚决否决。

初步的构想是做一个夜间ETL,将数据统一抽取到新的数据仓库中再进行处理。但这无法满足产品对“近实时”的要求——商品信息(尤其是库存、价格)的变更必须在几分钟内反映到搜索结果中。轮询查询数据库变更的方案负载太高,且时效性差。最终,我们决定采用变更数据捕获(Change Data Capture, CDC)技术,构建一个非侵入式、低延迟的数据管道,将两个异构数据库的变更实时同步到向量数据库Qdrant中。

整个架构的设计思路是:

  1. 使用Debezium作为CDC工具,分别连接Oracle的LogMiner和MariaDB的binlog,捕获行级别的INSERT, UPDATE, DELETE操作。
  2. Debezium将捕获到的变更事件以JSON格式推送到一个高吞吐、低延迟的消息中间件。在这里,我们选择了NATS JetStream,而不是更重的Kafka,因为它运维更简单,性能对于当前场景绰绰有余。
  3. 一个独立的Go语言编写的消费服务订阅NATS主题,处理这些变更事件。
  4. 消费服务负责解析Debezium的事件格式,根据业务逻辑拼接来自不同数据源的字段,调用一个本地的文本嵌入模型(Embedding Model)生成向量,最后将结构化数据和向量数据一并写入(Upsert)Qdrant。
  5. 前端应用由Vite构建,它通过一个轻量级后端API查询Qdrant,实现语义搜索功能。

这个方案的核心在于解耦,源数据库只负责产生日志,完全无感知下游消费,保证了生产系统的绝对稳定。

graph TD
    subgraph Legacy Systems
        Oracle[Oracle 12c] -- XStream/LogMiner --> Debezium_ORA[Debezium Connector for Oracle]
        MariaDB[MariaDB 10.6] -- Binlog --> Debezium_MAR[Debezium Connector for MariaDB]
    end

    subgraph Kafka Connect Cluster
        Debezium_ORA -- JSON over HTTP --> KafkaConnect[Kafka Connect Worker]
        Debezium_MAR -- JSON over HTTP --> KafkaConnect
    end

    KafkaConnect -- Change Events --> NATS[NATS JetStream Server]

    subgraph Real-time Processing
        NATS -- Subscribes to 'db.changes' stream --> GoConsumer[Go Vectorization Service]
        GoConsumer -- Calls for embeddings --> EmbeddingModel[SentenceTransformer Model]
        EmbeddingModel -- Returns vector --> GoConsumer
        GoConsumer -- Upserts points --> Qdrant[Qdrant Vector DB]
    end

    subgraph User Facing Application
        User[User] -- Semantic Search Query --> ViteApp[Vite Frontend]
        ViteApp -- API Call --> BackendAPI[Backend for Frontend]
        BackendAPI -- Vector Search --> Qdrant
    end

第一步:配置Debezium Connectors

在生产环境中,我们使用一个独立的Kafka Connect集群来运行Debezium。这里的坑在于Oracle的配置。Debezium支持多种方式从Oracle捕获变更,考虑到数据库版本和DBA的接受度,我们选择了基于LogMiner的方案。这需要在数据库侧开启归档日志和补充日志(Supplemental Logging)。

为商品核心表PRODUCTS和价格表PRODUCT_PRICES开启补充日志:

-- 以DBA权限执行
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE MYSCHEMA.PRODUCTS ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER TABLE MYSCHEMA.PRODUCT_PRICES ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Debezium Oracle Connector的配置 (oracle-products-connector.json) 相对复杂,关键在于数据库连接信息和LogMiner的配置项。

{
  "name": "oracle-products-source",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.server.name": "ora_server1",
    "database.hostname": "oracle.db.internal",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "dbz",
    "database.dbname": "ORCLPDB1",
    "database.history.kafka.bootstrap.servers": "kafka:9092", // Debezium内部使用,非我们业务流
    "database.history.kafka.topic": "dbhistory.oracle.products",
    "log.mining.strategy": "online_catalog",
    "table.include.list": "MYSCHEMA.PRODUCTS,MYSCHEMA.PRODUCT_PRICES",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "nats.db.oracle.$3" // 将topic路由到NATS主题
  }
}
  • log.mining.strategy: 明确使用在线日志挖掘。
  • table.include.list: 只捕获我们关心的表。
  • transforms: 这是个关键配置。unwrap将Debezium复杂的事件结构简化为只包含afterbefore数据的扁平JSON。route使用正则表达式将Kafka Connect内部的topic(如ora_server1.MYSCHEMA.PRODUCTS)重定向到我们规划的NATS主题(如nats.db.oracle.PRODUCTS)。

MariaDB的配置 (mariadb-ext-connector.json) 类似但更简单,因为它直接读取binlog。

{
  "name": "mariadb-ext-source",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mariadb.db.internal",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "maria_server1",
    "database.include.list": "marketing",
    "table.include.list": "marketing.product_reviews",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "dbhistory.mariadb.ext",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "transforms": "unwrap,route",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "nats.db.mariadb.$3"
  }
}

为了让Debezium能将消息发送到NATS,我们使用了 NATS Sink Connector for Kafka Connect。在Kafka Connect的worker配置中,需要指定这个connector的路径,并在启动时加载。

第二步:Go消费服务的实现

这是整个管道的核心处理逻辑。我们使用官方的nats.go库来连接NATS JetStream,并创建一个推式(Push-based)消费者。服务的目标是稳定、高效、可容错。

main.go:

package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"
	"time"

	"vector-sync-service/config"
	"vector-sync-service/internal/consumers"
	"vector-sync-service/internal/qdrant"
	"vector-sync-service/internal/vectorizer"

	"github.com/nats-io/nats.go"
)

func main() {
	// 1. 初始化配置和日志
	cfg := config.Load()
	logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))

	// 2. 初始化外部依赖
	qdrantClient, err := qdrant.NewClient(cfg.Qdrant.Host, cfg.Qdrant.Port, cfg.Qdrant.ApiKey)
	if err != nil {
		logger.Error("failed to initialize qdrant client", "error", err)
		os.Exit(1)
	}
	defer qdrantClient.Close()
	logger.Info("qdrant client initialized")

	// 确保集合存在
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	if err := qdrantClient.EnsureCollection(ctx, cfg.Qdrant.CollectionName); err != nil {
		logger.Error("failed to ensure qdrant collection exists", "error", err)
		os.Exit(1)
	}

	textVectorizer := vectorizer.NewLocalVectorizer(cfg.Vectorizer.ModelPath)
	logger.Info("local vectorizer loaded")

	// 3. 连接NATS JetStream
	nc, err := nats.Connect(cfg.NATS.URL, nats.Timeout(10*time.Second), nats.RetryOnFailedConnect(true), nats.MaxReconnects(5), nats.ReconnectWait(time.Second))
	if err != nil {
		logger.Error("failed to connect to NATS", "error", err)
		os.Exit(1)
	}
	defer nc.Close()
	logger.Info("connected to NATS server", "url", cfg.NATS.URL)

	js, err := nc.JetStream()
	if err != nil {
		logger.Error("failed to create JetStream context", "error", err)
		os.Exit(1)
	}

	// 4. 启动消费者
	// 使用一个上下文来协调所有goroutine的关闭
	mainCtx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	handler := consumers.NewDebeziumEventHandler(logger, qdrantClient, textVectorizer, cfg.Qdrant.CollectionName)
	// 监听所有匹配的数据库变更主题
	// subjects: ["nats.db.oracle.>", "nats.db.mariadb.>"]
	consumerManager, err := consumers.NewManager(mainCtx, js, logger, handler, cfg.NATS.StreamName, cfg.NATS.Subjects)
	if err != nil {
		logger.Error("failed to start consumer manager", "error", err)
		os.Exit(1)
	}

	logger.Info("vector sync service is running...")

	// 5. 等待终止信号
	<-mainCtx.Done()

	logger.Info("shutting down service...")
	// 清理资源
	consumerManager.Shutdown()
	logger.Info("service stopped gracefully")
}

consumers/handler.go 是处理消息的核心逻辑。这里的一个实践要点是,我们不能简单地将收到的每一条数据直接写入Qdrant。例如,PRODUCTS表和PRODUCT_PRICES表是关联的,我们需要将它们的数据合并成一个文档再进行向量化。

真实项目中,这种合并逻辑会很复杂。一个常见的错误是试图在消费端做实时的跨流Join,这会引入状态管理和时序问题。更稳妥的方案是:

  • 物化视图(Materialized View)模式:在消费端维护一个轻量级的缓存(如Redis)来存储部分关联数据。例如,收到PRODUCT_PRICES变更时,去缓存中查找对应的PRODUCTS信息。如果找不到,可以短暂等待或标记为待处理。
  • 下游聚合:让Qdrant的payload存储范式化的数据。PRODUCTSPRODUCT_PRICES作为独立的字段存入同一个Point。这样虽然有数据冗余,但查询性能最好,且实现简单。我们选择了这种方式。
package consumers

import (
	"context"
	"encoding/json"
	"log/slog"
	"strconv"
	"strings"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/qdrant/go-client/qdrant"
)

type DebeziumMessage struct {
	// Debezium unwrapped format
	// 字段根据实际数据库表结构定义
	ProductID   *int    `json:"PRODUCT_ID"`
	ProductName *string `json:"PRODUCT_NAME"`
	Description *string `json:"DESCRIPTION"`
	Price       *float64 `json:"PRICE"`
	ReviewText  *string `json:"REVIEW_TEXT"`
	// ... other fields
}

// QdrantClient and Vectorizer are interfaces for testability
type QdrantClient interface {
	UpsertPoints(ctx context.Context, collectionName string, points []*qdrant.PointStruct) error
	DeletePoints(ctx context.Context, collectionName string, ids []uint64) error
}
type Vectorizer interface {
	Embed(ctx context.Context, text string) ([]float32, error)
}

type DebeziumEventHandler struct {
	logger         *slog.Logger
	qdrantClient   QdrantClient
	vectorizer     Vectorizer
	collectionName string
}

func NewDebeziumEventHandler(logger *slog.Logger, qc QdrantClient, v Vectorizer, collName string) *DebeziumEventHandler {
	return &DebeziumEventHandler{
		logger:         logger,
		qdrantClient:   qc,
		vectorizer:     v,
		collectionName: collName,
	}
}

// Handle is the core logic for processing a single NATS message
func (h *DebeziumEventHandler) Handle(msg *nats.Msg) {
	// 消息确认是关键,确保处理成功才ack
	defer func() {
		if err := msg.Ack(); err != nil {
			h.logger.Error("failed to ack message", "subject", msg.Subject, "error", err)
		}
	}()

	var event DebeziumMessage
	if err := json.Unmarshal(msg.Data, &event); err != nil {
		h.logger.Error("failed to unmarshal debezium event", "data", string(msg.Data), "error", err)
		// 如果消息格式错误,直接ack,避免无限重试
		return
	}

	// 检查主键是否存在
	if event.ProductID == nil {
		h.logger.Warn("event received without a product ID, skipping", "subject", msg.Subject)
		return
	}
	pointID := uint64(*event.ProductID)

	// 如果消息体为空 (Debezium的DELETE事件),则删除对应的点
	if len(msg.Data) <= 2 { // check for empty json "{}"
		h.logger.Info("handling delete event", "product_id", pointID)
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()
		if err := h.qdrantClient.DeletePoints(ctx, h.collectionName, []uint64{pointID}); err != nil {
			h.logger.Error("failed to delete point from qdrant", "product_id", pointID, "error", err)
			// 对于失败的操作,我们不ack,让NATS重传
			msg.NakWithDelay(3 * time.Second)
		}
		return
	}
	
	// 构建用于向量化的文本
	var textToEmbed strings.Builder
	if event.ProductName != nil {
		textToEmbed.WriteString(*event.ProductName)
		textToEmbed.WriteString(". ")
	}
	if event.Description != nil {
		textToEmbed.WriteString(*event.Description)
		textToEmbed.WriteString(". ")
	}
	if event.ReviewText != nil {
		textToEmbed.WriteString("User review: ")
		textToEmbed.WriteString(*event.ReviewText)
	}

	if textToEmbed.Len() == 0 {
		h.logger.Warn("no text content to vectorize, skipping upsert", "product_id", pointID)
		return
	}
	
	// 生成向量
	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
	defer cancel()
	vector, err := h.vectorizer.Embed(ctx, textToEmbed.String())
	if err != nil {
		h.logger.Error("failed to generate embedding", "product_id", pointID, "error", err)
		msg.NakWithDelay(5 * time.Second) // 嵌入模型失败,可以重试
		return
	}

	// 构建Qdrant的Point
	payload := h.buildPayload(event)
	point := &qdrant.PointStruct{
		Id:      &qdrant.PointId{PointIdOptions: &qdrant.PointId_Num{Num: pointID}},
		Payload: payload,
		Vectors: &qdrant.Vectors{VectorsOptions: &qdrant.Vectors_Vector{Vector: &qdrant.Vector{Data: vector}}},
	}

	// Upsert到Qdrant
	upsertCtx, upsertCancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer upsertCancel()
	if err := h.qdrantClient.UpsertPoints(upsertCtx, h.collectionName, []*qdrant.PointStruct{point}); err != nil {
		h.logger.Error("failed to upsert point to qdrant", "product_id", pointID, "error", err)
		msg.NakWithDelay(3 * time.Second) // Qdrant写入失败,重试
		return
	}
	
	h.logger.Info("successfully processed event and upserted to qdrant", "product_id", pointID, "subject", msg.Subject)
}

func (h *DebeziumEventHandler) buildPayload(event DebeziumMessage) map[string]*qdrant.Value {
    payload := map[string]*qdrant.Value{}
    if event.ProductName != nil {
        payload["product_name"] = &qdrant.Value{Kind: &qdrant.Value_StringValue{StringValue: *event.ProductName}}
    }
    if event.Price != nil {
		payload["price"] = &qdrant.Value{Kind: &qdrant.Value_DoubleValue{DoubleValue: *event.Price}}
	}
    //... add other fields to payload for filtering
    return payload
}

单元测试的思路是,通过接口模拟QdrantClientVectorizer,然后构造不同类型的nats.Msg(创建、更新、删除、格式错误、空内容等)来调用Handle方法,验证模拟对象的调用情况以及消息是否被正确AckNak

第三步:前端查询与展示

前端部分使用Vite + TypeScript + React。核心是一个搜索组件,它将用户的输入文本发送到后端API,后端再将请求转发给Qdrant进行向量搜索。

一个简化的前端API请求函数:

// src/services/searchApi.ts
import axios from 'axios';

interface SearchResult {
  id: number;
  score: number;
  payload: {
    product_name: string;
    price: number;
    // ... other fields
  };
}

const API_BASE_URL = '/api'; // Using Vite's proxy

export const performSemanticSearch = async (
  query: string,
  limit: number = 10
): Promise<SearchResult[]> => {
  if (!query.trim()) {
    return [];
  }
  try {
    // In a real app, the backend would handle embedding the query text
    const response = await axios.post<{ results: SearchResult[] }>(
      `${API_BASE_URL}/search`,
      {
        query_text: query,
        limit,
      }
    );
    return response.data.results;
  } catch (error) {
    console.error('Error performing semantic search:', error);
    // Add user-facing error handling logic
    return [];
  }
};

后端API(例如用Go或Node.js实现)会接收这个请求,首先将query_text用与消费服务相同的嵌入模型转换为向量,然后向Qdrant发起SearchPoints请求。

方案的局限性与未来迭代

这套架构成功地解决了在不触碰核心系统的前提下,为其增加现代AI搜索能力的需求。然而,它并非银弹,也存在一些需要持续关注和优化的点。

首先,历史数据全量同步的问题。CDC只解决了增量数据的同步,对于已经存在于数据库中的数百万条历史数据,需要一个单独的、一次性的批处理任务来完成初始化。这个过程需要仔细设计,以避免对源数据库造成过大压力,比如可以分批次、在业务低峰期执行。

其次,Schema演进。如果源数据库的表结构发生变更(例如增加列),Debezium可以捕获到,但我们的消费服务需要相应的代码修改才能正确处理新的字段。这要求DBA的变更流程需要与应用开发团队紧密协同,否则可能导致数据管道中断或数据丢失。一个改进方向是让消费服务更具弹性,例如将不认识的字段以JSON格式存入Qdrant的payload中,而不是直接丢弃。

最后是端到端的数据一致性监控。虽然NATS JetStream提供了至少一次(At-Least-Once)的交付保证,加上我们消费端的幂等写入(Upsert),可以实现事实上的有效一次(Effectively-Once)处理。但在复杂的分布式系统中,总有意外发生。我们需要建立一套监控机制,定期抽样比对源数据库与Qdrant中的数据,以发现潜在的延迟或数据不一致问题,并触发告警。


  目录