使用 Clojure 与 containerd API 构建一个运行在 GCP 上的多语言任务执行器


我们需要一个机制,用于在隔离环境中按需执行不可信的、短暂的计算任务。这些任务可能是数据后处理脚本、用户提交的函数,或是 CI/CD 流水线中的一个构建步骤。一个常见的方案是在 Kubernetes 中为每个任务启动一个 Pod,但这对于生命周期只有几秒钟的轻量级任务来说,调度延迟和资源开销都显得过于沉重。我们需要更贴近底层的控制力,一种能够以编程方式、毫秒级响应、精确控制资源并且足够安全的方式来启动和管理容器化进程的方案。

这就是直接与容器运行时 containerd 交互的价值所在。通过绕过 Docker Daemon 或 Kubelet 这类高层抽象,我们可以直接利用其 gRPC API 来实现一个轻量级、高性能的自定义任务执行器。控制平面,即这个执行器的大脑,我们将使用 Clojure 来构建。Clojure 基于 JVM,其函数式、数据驱动的范式以及强大的并发处理能力,使其非常适合用来管理和协调无状态的、生命周期复杂的执行任务。

我们的目标是构建一个运行在 GCP Compute Engine 实例上的 Clojure 服务,它能够:

  1. 通过 gRPC 与本地的 containerd 守护进程通信。
  2. 接收任务请求(例如,执行一个指定的 Node.js 脚本)。
  3. 动态拉取所需的容器镜像。
  4. 创建、运行并监控容器任务,捕获其标准输出和错误。
  5. 在任务结束后,可靠地清理所有相关资源。

这是一个典型的“微内核”架构思想的体现:containerd 是稳定、高效的执行核心,而 Clojure 负责所有上层的调度、状态管理和业务逻辑。

(ns containerd-executor.core
  (:require [clojure.core.async :as a]
            [clojure.java.io :as io]
            [clojure.string :as str]
            [clj-grpc.client :as grpc]
            [clj-uuid :as uuid])
  (:import (io.grpc ManagedChannelBuilder)
           (io.containerd.v1.services ContainersGrpc$ContainersBlockingStub
                                      ImagesGrpc$ImagesBlockingStub
                                      TasksGrpc$TasksBlockingStub)
           (io.containerd.v1.types Mount)
           (io.containerd.oci Oci$Spec)
           (com.google.protobuf Empty)))

;;; ===========================================================================
;;; 1. gRPC 客户端与 containerd 的连接
;;;
;;; 在真实项目中,配置应当外部化。这里为了演示,我们硬编码了 containerd 的 socket 地址。
;;; 我们需要为每个 containerd service (Images, Containers, Tasks) 创建一个独立的 blocking stub。
;;; 这种 stub 在每次调用时会阻塞,直到收到服务端的响应,这对于命令式的执行流程来说更易于管理。
;;; ===========================================================================
(defonce containerd-socket-path "/run/containerd/containerd.sock")
(defonce containerd-namespace "default")

(defn- build-channel [socket-path]
  (-> (ManagedChannelBuilder/forTarget (str "unix://" socket-path))
      (.usePlaintext)
      (.build)))

(defonce ^:private channel (build-channel containerd-socket-path))

(defn- create-stubs []
  {:images-stub (grpc/new-stub channel ImagesGrpc$ImagesBlockingStub)
   :containers-stub (grpc/new-stub channel ContainersGrpc$ContainersBlockingStub)
   :tasks-stub (grpc/new-stub channel TasksGrpc$TasksBlockingStub)})

(defonce stubs (create-stubs))

以上代码片段是整个系统的基石。它使用 clj-grpc 库(一个假设存在的库,实际中可能需要使用 Java gRPC 库的互操作)创建了到 containerd gRPC 服务的连接。注意,我们连接的是一个 Unix Socket,这是 containerd 的标准通信方式。stubs 这个 map 包含了与 containerd 三个核心服务交互所需的所有客户端实例。

任务生命周期管理

任务的生命周期是整个执行器的核心逻辑。一个任务从被提交到最终被清理,会经历一系列状态转换。我们可以用一个 Mermaid 图来清晰地描述这个流程。

stateDiagram-v2
    direction LR
    [*] --> PENDING: 任务提交

    PENDING --> PULLING_IMAGE: 检查镜像
    PULLING_IMAGE --> IMAGE_READY: 镜像拉取成功
    PULLING_IMAGE --> FAILED: 镜像拉取失败

    IMAGE_READY --> CREATING_CONTAINER: 创建容器元数据
    CREATING_CONTAINER --> CONTAINER_READY: 容器创建成功
    CREATING_CONTAINER --> FAILED: 容器创建失败

    CONTAINER_READY --> CREATING_TASK: 创建可执行任务
    CREATING_TASK --> TASK_READY: 任务创建成功
    CREATING_TASK --> FAILED: 任务创建失败

    TASK_READY --> RUNNING: 启动任务
    RUNNING --> EXITED: 任务执行完毕
    RUNNING --> FAILED: 任务运行时异常

    EXITED --> CLEANING_UP: 清理资源
    FAILED --> CLEANING_UP: 清理资源

    CLEANING_UP --> COMPLETED: 清理成功
    CLEANING_UP --> [*]

动态拉取容器镜像

在执行任何任务之前,我们必须确保其所需的容器镜像在本地存在。如果不存在,就需要从远程仓库(如 Docker Hub)拉取。这里的关键是 Images 服务的 Pull RPC 调用。

;;; ===========================================================================
;;; 2. 镜像管理
;;;
;;; 确保执行任务所需的镜像在本地存在。
;;; containerd 的 API 是幂等的。如果镜像已存在,`Get` 调用会成功返回,`Pull` 调用则会快速完成。
;;; 在生产环境中,需要配置私有仓库的认证信息。
;;; ===========================================================================
(defn- image-exists? [stub image-ref]
  (try
    (let [request (-> (com.containerd.v1.services.Images$GetImageRequest/newBuilder)
                      (.setName image-ref)
                      (.build))]
      (.get (:images-stub stub) request)
      true)
    (catch Exception _ false)))

(defn ensure-image [stub image-ref]
  (if (image-exists? stub image-ref)
    (do
      (println (format "Image '%s' already exists locally." image-ref))
      {:status :success})
    (try
      (println (format "Pulling image '%s'..." image-ref))
      (let [request (-> (com.containerd.v1.services.Images$PullRequest/newBuilder)
                        (.setRef image-ref)
                        (.build))]
        (.pull (:images-stub stub) request)
        (println (format "Image '%s' pulled successfully." image-ref))
        {:status :success})
      (catch Exception e
        (println (format "Failed to pull image '%s': %s" image-ref (.getMessage e)))
        {:status :error :message (.getMessage e)}))))

这段代码定义了一个 ensure-image 函数,它首先检查镜像是否存在,如果不存在则调用 pull RPC。这是一个典型的健壮性设计:先检查,再操作,并对可能发生的网络或权限错误进行捕获。

核心执行逻辑:创建并运行任务

这是整个系统的核心部分。它涉及到 containerd 的两个核心概念:Container 和 Task。

  • Container: 是一个元数据对象,它包含了运行一个容器所需的所有配置信息,比如 OCI Spec(定义了命令、环境变量、挂载点等)、镜像引用等。它本身是不运行的。
  • Task: 是一个正在运行的进程,它由一个 Container 对象实例化而来。一个 Container 可以被多次实例化为 Task(尽管我们这里是一对一使用)。

下面的代码将实现一个 run-task! 函数,它串联起容器创建、任务创建、任务启动、等待结果和资源清理的完整流程。

;;; ===========================================================================
;;; 3. 任务执行与资源清理
;;;
;;; 这是执行器的核心。流程非常明确:
;;; a. 创建一个唯一的容器 ID。
;;; b. 基于 OCI Spec 创建 Container 对象。这里我们使用了最简化的配置。
;;; c. 基于 Container 对象创建 Task。
;;; d. 启动 Task。
;;; e. 阻塞等待 Task 结束,并获取退出码。
;;; f. 无论成功与否,都必须清理 Task 和 Container,这是一个关键的健壮性设计。
;;; ===========================================================================

(defn- generate-oci-spec [container-id image-ref command]
  (let [process-builder (-> (Oci$Process/newBuilder)
                            (.addAllArgs command)
                            (.setCwd "/"))
        spec-builder (-> (Oci$Spec/newBuilder)
                         (.setVersion "1.0.2")
                         (.setProcess process-builder)
                         (.setHostname container-id))]
    (.build spec-builder)))

(defn- cleanup-task-and-container [stubs container-id]
  (try
    ;; 删除 Task
    (let [delete-task-req (-> (com.containerd.v1.services.Tasks$DeleteRequest/newBuilder)
                              (.setContainerID container-id)
                              (.build))]
      (.delete (:tasks-stub stubs) delete-task-req))
    (println (format "Cleaned up task for container '%s'." container-id))
    (catch Exception e
      ;; 如果 task 不存在(例如创建失败),删除会报错,这是正常的,可以忽略。
      (when-not (str/includes? (.getMessage e) "not found")
        (println (format "Warn: Failed to delete task for container '%s': %s" container-id (.getMessage e))))))
  (try
    ;; 删除 Container
    (let [delete-container-req (-> (com.containerd.v1.services.Containers$DeleteContainerRequest/newBuilder)
                                   (.setId container-id)
                                   (.build))]
      (.delete (:containers-stub stubs) delete-container-req))
    (println (format "Cleaned up container '%s'." container-id))
    (catch Exception e
      (println (format "Error: Failed to delete container '%s': %s" container-id (.getMessage e))))))

(defn run-task! [task-def]
  (let [{:keys [image-ref command]} task-def
        container-id (str "task-" (uuid/v1))]
    (println (format "Starting task execution for container ID: %s" container-id))

    ;; 步骤 1: 确保镜像存在
    (let [image-result (ensure-image stubs image-ref)]
      (when (= (:status image-result) :error)
        (return-from run-task! {:status :error :message (:message image-result)})))

    ;; 使用 try-finally 确保无论执行过程中发生什么,清理逻辑都会被调用
    (try
      ;; 步骤 2: 创建容器
      (println (format "Creating container '%s'..." container-id))
      (let [oci-spec (generate-oci-spec container-id image-ref command)
            container (-> (com.containerd.v1.types.Container/newBuilder)
                          (.setId container-id)
                          (.setSpec (.toByteString oci-spec))
                          (.setImage image-ref)
                          (.build))
            create-container-req (-> (com.containerd.v1.services.Containers$CreateContainerRequest/newBuilder)
                                     (.setContainer container)
                                     (.build))]
        (.create (:containers-stub stubs) create-container-req))

      ;; 步骤 3: 创建任务
      (println (format "Creating task for container '%s'..." container-id))
      (let [create-task-req (-> (com.containerd.v1.services.Tasks$CreateRequest/newBuilder)
                                (.setContainerID container-id)
                                (.build))]
        (.create (:tasks-stub stubs) create-task-req))

      ;; 步骤 4: 启动任务
      (println (format "Starting task for container '%s'..." container-id))
      (let [start-req (-> (com.containerd.v1.services.Tasks$StartRequest/newBuilder)
                          (.setContainerID container-id)
                          (.build))]
        (.start (:tasks-stub stubs) start-req))

      ;; 步骤 5: 等待任务完成
      (println (format "Waiting for task '%s' to complete..." container-id))
      (let [wait-req (-> (com.containerd.v1.services.Tasks$WaitRequest/newBuilder)
                         (.setContainerID container-id)
                         (.build))
            response (.wait (:tasks-stub stubs) wait-req)]
        (let [exit-code (.getExitStatus response)]
          (println (format "Task '%s' finished with exit code: %d" container-id exit-code))
          {:status (if (zero? exit-code) :success :failed)
           :container-id container-id
           :exit-code exit-code}))

      (catch Exception e
        (println (format "An error occurred during task execution for '%s': %s" container-id (.getMessage e)))
        {:status :error :container-id container-id :message (.getMessage e)})

      (finally
        ;; 步骤 6: 清理资源
        (println (format "Initiating cleanup for container '%s'." container-id))
        (cleanup-task-and-container stubs container-id)))))

这段代码是整个系统的核心实现。generate-oci-spec 函数动态构建了符合 OCI 运行时规范的 JSON 结构,这是定义容器行为的蓝图。run-task! 函数通过一系列 gRPC 调用,严格按照 containerd 的 API 流程执行任务。最关键的设计是 try...finally 块,它保证了 cleanup-task-and-container 无论任务成功、失败还是在中间步骤抛出异常,都会被执行。在生产系统中,这种资源回收的确定性至关重要,能有效防止资源泄漏。

捕获任务输出

到目前为止,我们只能获取任务的退出码,但这还不够。我们需要捕获任务的标准输出(stdout)和标准错误(stderr)来获取计算结果或调试信息。一个常见的陷阱是直接在 OCI Spec 中指定日志文件路径,但这会引发并发写入和文件管理的复杂性。更可靠的底层方法是使用 FIFO (命名管道)。

我们可以在宿主机上为每个任务创建临时的 FIFO 文件,然后通过 OCI Spec 将容器的 stdout/stderr 重定向到这些 FIFO。Clojure 主进程可以异步地从这些 FIFO 中读取数据流,直到任务结束。

;;; ===========================================================================
;;; 4. I/O 处理 (高级)
;;;
;;; 修改 `run-task!` 以支持捕获标准输出和错误。
;;; 1. 在启动任务前,在宿主机上创建两个 FIFO 文件。
;;; 2. 修改 OCI Spec,将容器的 stdout/stderr 指向这两个 FIFO。
;;; 3. 启动两个异步的 Clojure 线程(或 core.async go 块)来分别读取 FIFO 的内容。
;;; 4. 当任务结束时,关闭这些 reader。
;;;
;;; 注意:此实现为了演示,简化了文件路径和权限管理。
;;; 在生产环境中,需要使用安全的临时目录,并处理好权限问题。
;;; ===========================================================================

(defn- create-fifo [path]
  (try
    (let [pb (ProcessBuilder. (into-array String ["mkfifo" path]))]
      (let [process (.start pb)]
        (.waitFor process)
        (when-not (zero? (.exitValue process))
          (throw (Exception. (format "Failed to create fifo: %s" path))))))
    (catch java.io.IOException e
      (throw (Exception. (format "mkfifo command not found. Is this a Linux system? Error: %s" (.getMessage e)))))))

(defn read-stream-to-channel [path out-chan]
  (a/thread
    (try
      (with-open [reader (io/reader path)]
        (doseq [line (line-seq reader)]
          (a/>!! out-chan {:type :log :payload line})))
      (finally
        (a/>!! out-chan {:type :eof})
        (a/close! out-chan)))))

(defn execute-task-with-io [task-def]
  (let [{:keys [image-ref command]} task-def
        container-id (str "task-" (uuid/v1))
        tmp-dir "/tmp"
        stdout-fifo (format "%s/%s.stdout" tmp-dir container-id)
        stderr-fifo (format "%s/%s.stderr" tmp-dir container-id)
        stdout-chan (a/chan 100)
        stderr-chan (a/chan 100)]

    (println (format "Executing task '%s' with I/O redirection." container-id))

    ;; 创建 FIFO 文件
    (create-fifo stdout-fifo)
    (create-fifo stderr-fifo)
    
    (try
      ;; 确保镜像存在 (省略,与 run-task! 相同)
      (ensure-image stubs image-ref)

      ;; 创建容器,这次指定了 stdio 路径
      (let [create-task-req (-> (com.containerd.v1.services.Tasks$CreateRequest/newBuilder)
                                (.setContainerID container-id)
                                (.setStdout stdout-fifo)
                                (.setStderr stderr-fifo)
                                (.build))
            ;; ... 省略容器创建逻辑 ...
            ]

        ;; ... 启动容器的逻辑 ...

        ;; 异步读取输出
        (read-stream-to-channel stdout-fifo stdout-chan)
        (read-stream-to-channel stderr-fifo stderr-chan)

        ;; 合并输出并等待任务结束
        (let [logs (atom [])
              exit-code (atom nil)]
          (loop [stdout-open? true stderr-open? true]
            (when (or stdout-open? stderr-open?)
              (let [[val port] (a/alts!! [stdout-chan stderr-chan])]
                (cond
                  (nil? val) ; channel closed
                  (if (= port stdout-chan)
                    (recur false stderr-open?)
                    (recur stdout-open? false))

                  (= :log (:type val))
                  (do (swap! logs conj (assoc val :source (if (= port stdout-chan) :stdout :stderr)))
                      (recur stdout-open? stderr-open?))))))
          
          ;; ... 等待任务并获取退出码的逻辑 ...
          (reset! exit-code (get-task-exit-code stubs container-id))

          {:status (if (zero? @exit-code) :success :failed)
           :exit-code @exit-code
           :logs @logs}))

      (catch Exception e
         ;; ... 错误处理 ...
      (finally
        ;; 清理 FIFO 文件和容器/任务
        (io/delete-file stdout-fifo true)
        (io/delete-file stderr-fifo true)
        (cleanup-task-and-container stubs container-id)))))

;; 使用示例
(comment
  (let [node-task {:image-ref "docker.io/library/node:18-slim"
                   :command ["node" "-e" "console.log('Hello from Node.js in containerd!'); console.error('This is an error message.');"]}]
    (run-task! node-task))
  )

注意:execute-task-with-io 是一个概念性的增强版本,为了清晰地展示思路,它合并并简化了部分 run-task! 的逻辑。
这个增强版本展示了如何通过宿主机的 FIFO 文件来捕获 I/O。core.async 在这里非常有用,它允许我们以非阻塞的方式同时监听 stdout 和 stderr 两个流,并将它们的输出合并到一个结果中。当任务结束,finally 块不仅要清理 containerd 的资源,还要确保删除这些临时的 FIFO 文件。

局限性与未来方向

我们构建的这个执行器虽然轻量且高效,但在一个真实的生产环境中,它还存在一些明显的局限性:

  1. 安全性: 当前实现以 root 权限运行 containerd 和 Clojure 进程,容器内的进程也可能是 root。这是一个巨大的安全隐患。生产环境需要启用 rootless containerd,并利用 user namespaces 将容器内的 root 用户映射为宿主机上的非特权用户。对于执行完全不可信的代码,还应考虑使用 gVisor 或 Firecracker 这样的强隔离运行时。
  2. 资源管理: 我们没有在 OCI Spec 中定义 cgroups 相关的资源限制(CPU、内存)。对于一个多租户执行器,精确的资源配额和限制是防止单个任务耗尽整个节点资源的关键。这需要对 OCI Spec 和 Linux cgroups v2 有更深入的了解。
  3. 网络: 默认情况下,containerd 创建的任务没有网络命名空间,或者说它共享了宿主机的网络。这同样是危险的。要提供受控的网络访问,需要集成 CNI(Container Network Interface)插件,为每个任务动态创建和配置独立的网络环境。
  4. 可扩展性: 当前执行器是单机版的。要构建一个高可用的分布式系统,需要引入一个任务队列(如 RabbitMQ 或 NATS),让多个执行器节点从队列中消费任务。同时还需要一个集中的地方来存储任务状态和日志,例如使用 GCP Cloud Logging 和 Firestore。

尽管存在这些局限,但这个从底层构建的执行器原型,清晰地揭示了容器编排系统的核心工作原理,并展示了如何利用 Clojure 这样一门富有表现力的语言来构建稳定、可靠的系统控制平面。


  目录