我们需要一个机制,用于在隔离环境中按需执行不可信的、短暂的计算任务。这些任务可能是数据后处理脚本、用户提交的函数,或是 CI/CD 流水线中的一个构建步骤。一个常见的方案是在 Kubernetes 中为每个任务启动一个 Pod,但这对于生命周期只有几秒钟的轻量级任务来说,调度延迟和资源开销都显得过于沉重。我们需要更贴近底层的控制力,一种能够以编程方式、毫秒级响应、精确控制资源并且足够安全的方式来启动和管理容器化进程的方案。
这就是直接与容器运行时 containerd
交互的价值所在。通过绕过 Docker Daemon 或 Kubelet 这类高层抽象,我们可以直接利用其 gRPC API 来实现一个轻量级、高性能的自定义任务执行器。控制平面,即这个执行器的大脑,我们将使用 Clojure 来构建。Clojure 基于 JVM,其函数式、数据驱动的范式以及强大的并发处理能力,使其非常适合用来管理和协调无状态的、生命周期复杂的执行任务。
我们的目标是构建一个运行在 GCP Compute Engine 实例上的 Clojure 服务,它能够:
- 通过 gRPC 与本地的
containerd
守护进程通信。 - 接收任务请求(例如,执行一个指定的 Node.js 脚本)。
- 动态拉取所需的容器镜像。
- 创建、运行并监控容器任务,捕获其标准输出和错误。
- 在任务结束后,可靠地清理所有相关资源。
这是一个典型的“微内核”架构思想的体现:containerd
是稳定、高效的执行核心,而 Clojure 负责所有上层的调度、状态管理和业务逻辑。
(ns containerd-executor.core
(:require [clojure.core.async :as a]
[clojure.java.io :as io]
[clojure.string :as str]
[clj-grpc.client :as grpc]
[clj-uuid :as uuid])
(:import (io.grpc ManagedChannelBuilder)
(io.containerd.v1.services ContainersGrpc$ContainersBlockingStub
ImagesGrpc$ImagesBlockingStub
TasksGrpc$TasksBlockingStub)
(io.containerd.v1.types Mount)
(io.containerd.oci Oci$Spec)
(com.google.protobuf Empty)))
;;; ===========================================================================
;;; 1. gRPC 客户端与 containerd 的连接
;;;
;;; 在真实项目中,配置应当外部化。这里为了演示,我们硬编码了 containerd 的 socket 地址。
;;; 我们需要为每个 containerd service (Images, Containers, Tasks) 创建一个独立的 blocking stub。
;;; 这种 stub 在每次调用时会阻塞,直到收到服务端的响应,这对于命令式的执行流程来说更易于管理。
;;; ===========================================================================
(defonce containerd-socket-path "/run/containerd/containerd.sock")
(defonce containerd-namespace "default")
(defn- build-channel [socket-path]
(-> (ManagedChannelBuilder/forTarget (str "unix://" socket-path))
(.usePlaintext)
(.build)))
(defonce ^:private channel (build-channel containerd-socket-path))
(defn- create-stubs []
{:images-stub (grpc/new-stub channel ImagesGrpc$ImagesBlockingStub)
:containers-stub (grpc/new-stub channel ContainersGrpc$ContainersBlockingStub)
:tasks-stub (grpc/new-stub channel TasksGrpc$TasksBlockingStub)})
(defonce stubs (create-stubs))
以上代码片段是整个系统的基石。它使用 clj-grpc
库(一个假设存在的库,实际中可能需要使用 Java gRPC 库的互操作)创建了到 containerd
gRPC 服务的连接。注意,我们连接的是一个 Unix Socket,这是 containerd
的标准通信方式。stubs
这个 map 包含了与 containerd
三个核心服务交互所需的所有客户端实例。
任务生命周期管理
任务的生命周期是整个执行器的核心逻辑。一个任务从被提交到最终被清理,会经历一系列状态转换。我们可以用一个 Mermaid 图来清晰地描述这个流程。
stateDiagram-v2 direction LR [*] --> PENDING: 任务提交 PENDING --> PULLING_IMAGE: 检查镜像 PULLING_IMAGE --> IMAGE_READY: 镜像拉取成功 PULLING_IMAGE --> FAILED: 镜像拉取失败 IMAGE_READY --> CREATING_CONTAINER: 创建容器元数据 CREATING_CONTAINER --> CONTAINER_READY: 容器创建成功 CREATING_CONTAINER --> FAILED: 容器创建失败 CONTAINER_READY --> CREATING_TASK: 创建可执行任务 CREATING_TASK --> TASK_READY: 任务创建成功 CREATING_TASK --> FAILED: 任务创建失败 TASK_READY --> RUNNING: 启动任务 RUNNING --> EXITED: 任务执行完毕 RUNNING --> FAILED: 任务运行时异常 EXITED --> CLEANING_UP: 清理资源 FAILED --> CLEANING_UP: 清理资源 CLEANING_UP --> COMPLETED: 清理成功 CLEANING_UP --> [*]
动态拉取容器镜像
在执行任何任务之前,我们必须确保其所需的容器镜像在本地存在。如果不存在,就需要从远程仓库(如 Docker Hub)拉取。这里的关键是 Images
服务的 Pull
RPC 调用。
;;; ===========================================================================
;;; 2. 镜像管理
;;;
;;; 确保执行任务所需的镜像在本地存在。
;;; containerd 的 API 是幂等的。如果镜像已存在,`Get` 调用会成功返回,`Pull` 调用则会快速完成。
;;; 在生产环境中,需要配置私有仓库的认证信息。
;;; ===========================================================================
(defn- image-exists? [stub image-ref]
(try
(let [request (-> (com.containerd.v1.services.Images$GetImageRequest/newBuilder)
(.setName image-ref)
(.build))]
(.get (:images-stub stub) request)
true)
(catch Exception _ false)))
(defn ensure-image [stub image-ref]
(if (image-exists? stub image-ref)
(do
(println (format "Image '%s' already exists locally." image-ref))
{:status :success})
(try
(println (format "Pulling image '%s'..." image-ref))
(let [request (-> (com.containerd.v1.services.Images$PullRequest/newBuilder)
(.setRef image-ref)
(.build))]
(.pull (:images-stub stub) request)
(println (format "Image '%s' pulled successfully." image-ref))
{:status :success})
(catch Exception e
(println (format "Failed to pull image '%s': %s" image-ref (.getMessage e)))
{:status :error :message (.getMessage e)}))))
这段代码定义了一个 ensure-image
函数,它首先检查镜像是否存在,如果不存在则调用 pull
RPC。这是一个典型的健壮性设计:先检查,再操作,并对可能发生的网络或权限错误进行捕获。
核心执行逻辑:创建并运行任务
这是整个系统的核心部分。它涉及到 containerd
的两个核心概念:Container 和 Task。
- Container: 是一个元数据对象,它包含了运行一个容器所需的所有配置信息,比如 OCI Spec(定义了命令、环境变量、挂载点等)、镜像引用等。它本身是不运行的。
- Task: 是一个正在运行的进程,它由一个 Container 对象实例化而来。一个 Container 可以被多次实例化为 Task(尽管我们这里是一对一使用)。
下面的代码将实现一个 run-task!
函数,它串联起容器创建、任务创建、任务启动、等待结果和资源清理的完整流程。
;;; ===========================================================================
;;; 3. 任务执行与资源清理
;;;
;;; 这是执行器的核心。流程非常明确:
;;; a. 创建一个唯一的容器 ID。
;;; b. 基于 OCI Spec 创建 Container 对象。这里我们使用了最简化的配置。
;;; c. 基于 Container 对象创建 Task。
;;; d. 启动 Task。
;;; e. 阻塞等待 Task 结束,并获取退出码。
;;; f. 无论成功与否,都必须清理 Task 和 Container,这是一个关键的健壮性设计。
;;; ===========================================================================
(defn- generate-oci-spec [container-id image-ref command]
(let [process-builder (-> (Oci$Process/newBuilder)
(.addAllArgs command)
(.setCwd "/"))
spec-builder (-> (Oci$Spec/newBuilder)
(.setVersion "1.0.2")
(.setProcess process-builder)
(.setHostname container-id))]
(.build spec-builder)))
(defn- cleanup-task-and-container [stubs container-id]
(try
;; 删除 Task
(let [delete-task-req (-> (com.containerd.v1.services.Tasks$DeleteRequest/newBuilder)
(.setContainerID container-id)
(.build))]
(.delete (:tasks-stub stubs) delete-task-req))
(println (format "Cleaned up task for container '%s'." container-id))
(catch Exception e
;; 如果 task 不存在(例如创建失败),删除会报错,这是正常的,可以忽略。
(when-not (str/includes? (.getMessage e) "not found")
(println (format "Warn: Failed to delete task for container '%s': %s" container-id (.getMessage e))))))
(try
;; 删除 Container
(let [delete-container-req (-> (com.containerd.v1.services.Containers$DeleteContainerRequest/newBuilder)
(.setId container-id)
(.build))]
(.delete (:containers-stub stubs) delete-container-req))
(println (format "Cleaned up container '%s'." container-id))
(catch Exception e
(println (format "Error: Failed to delete container '%s': %s" container-id (.getMessage e))))))
(defn run-task! [task-def]
(let [{:keys [image-ref command]} task-def
container-id (str "task-" (uuid/v1))]
(println (format "Starting task execution for container ID: %s" container-id))
;; 步骤 1: 确保镜像存在
(let [image-result (ensure-image stubs image-ref)]
(when (= (:status image-result) :error)
(return-from run-task! {:status :error :message (:message image-result)})))
;; 使用 try-finally 确保无论执行过程中发生什么,清理逻辑都会被调用
(try
;; 步骤 2: 创建容器
(println (format "Creating container '%s'..." container-id))
(let [oci-spec (generate-oci-spec container-id image-ref command)
container (-> (com.containerd.v1.types.Container/newBuilder)
(.setId container-id)
(.setSpec (.toByteString oci-spec))
(.setImage image-ref)
(.build))
create-container-req (-> (com.containerd.v1.services.Containers$CreateContainerRequest/newBuilder)
(.setContainer container)
(.build))]
(.create (:containers-stub stubs) create-container-req))
;; 步骤 3: 创建任务
(println (format "Creating task for container '%s'..." container-id))
(let [create-task-req (-> (com.containerd.v1.services.Tasks$CreateRequest/newBuilder)
(.setContainerID container-id)
(.build))]
(.create (:tasks-stub stubs) create-task-req))
;; 步骤 4: 启动任务
(println (format "Starting task for container '%s'..." container-id))
(let [start-req (-> (com.containerd.v1.services.Tasks$StartRequest/newBuilder)
(.setContainerID container-id)
(.build))]
(.start (:tasks-stub stubs) start-req))
;; 步骤 5: 等待任务完成
(println (format "Waiting for task '%s' to complete..." container-id))
(let [wait-req (-> (com.containerd.v1.services.Tasks$WaitRequest/newBuilder)
(.setContainerID container-id)
(.build))
response (.wait (:tasks-stub stubs) wait-req)]
(let [exit-code (.getExitStatus response)]
(println (format "Task '%s' finished with exit code: %d" container-id exit-code))
{:status (if (zero? exit-code) :success :failed)
:container-id container-id
:exit-code exit-code}))
(catch Exception e
(println (format "An error occurred during task execution for '%s': %s" container-id (.getMessage e)))
{:status :error :container-id container-id :message (.getMessage e)})
(finally
;; 步骤 6: 清理资源
(println (format "Initiating cleanup for container '%s'." container-id))
(cleanup-task-and-container stubs container-id)))))
这段代码是整个系统的核心实现。generate-oci-spec
函数动态构建了符合 OCI 运行时规范的 JSON 结构,这是定义容器行为的蓝图。run-task!
函数通过一系列 gRPC 调用,严格按照 containerd
的 API 流程执行任务。最关键的设计是 try...finally
块,它保证了 cleanup-task-and-container
无论任务成功、失败还是在中间步骤抛出异常,都会被执行。在生产系统中,这种资源回收的确定性至关重要,能有效防止资源泄漏。
捕获任务输出
到目前为止,我们只能获取任务的退出码,但这还不够。我们需要捕获任务的标准输出(stdout)和标准错误(stderr)来获取计算结果或调试信息。一个常见的陷阱是直接在 OCI Spec 中指定日志文件路径,但这会引发并发写入和文件管理的复杂性。更可靠的底层方法是使用 FIFO (命名管道)。
我们可以在宿主机上为每个任务创建临时的 FIFO 文件,然后通过 OCI Spec 将容器的 stdout/stderr 重定向到这些 FIFO。Clojure 主进程可以异步地从这些 FIFO 中读取数据流,直到任务结束。
;;; ===========================================================================
;;; 4. I/O 处理 (高级)
;;;
;;; 修改 `run-task!` 以支持捕获标准输出和错误。
;;; 1. 在启动任务前,在宿主机上创建两个 FIFO 文件。
;;; 2. 修改 OCI Spec,将容器的 stdout/stderr 指向这两个 FIFO。
;;; 3. 启动两个异步的 Clojure 线程(或 core.async go 块)来分别读取 FIFO 的内容。
;;; 4. 当任务结束时,关闭这些 reader。
;;;
;;; 注意:此实现为了演示,简化了文件路径和权限管理。
;;; 在生产环境中,需要使用安全的临时目录,并处理好权限问题。
;;; ===========================================================================
(defn- create-fifo [path]
(try
(let [pb (ProcessBuilder. (into-array String ["mkfifo" path]))]
(let [process (.start pb)]
(.waitFor process)
(when-not (zero? (.exitValue process))
(throw (Exception. (format "Failed to create fifo: %s" path))))))
(catch java.io.IOException e
(throw (Exception. (format "mkfifo command not found. Is this a Linux system? Error: %s" (.getMessage e)))))))
(defn read-stream-to-channel [path out-chan]
(a/thread
(try
(with-open [reader (io/reader path)]
(doseq [line (line-seq reader)]
(a/>!! out-chan {:type :log :payload line})))
(finally
(a/>!! out-chan {:type :eof})
(a/close! out-chan)))))
(defn execute-task-with-io [task-def]
(let [{:keys [image-ref command]} task-def
container-id (str "task-" (uuid/v1))
tmp-dir "/tmp"
stdout-fifo (format "%s/%s.stdout" tmp-dir container-id)
stderr-fifo (format "%s/%s.stderr" tmp-dir container-id)
stdout-chan (a/chan 100)
stderr-chan (a/chan 100)]
(println (format "Executing task '%s' with I/O redirection." container-id))
;; 创建 FIFO 文件
(create-fifo stdout-fifo)
(create-fifo stderr-fifo)
(try
;; 确保镜像存在 (省略,与 run-task! 相同)
(ensure-image stubs image-ref)
;; 创建容器,这次指定了 stdio 路径
(let [create-task-req (-> (com.containerd.v1.services.Tasks$CreateRequest/newBuilder)
(.setContainerID container-id)
(.setStdout stdout-fifo)
(.setStderr stderr-fifo)
(.build))
;; ... 省略容器创建逻辑 ...
]
;; ... 启动容器的逻辑 ...
;; 异步读取输出
(read-stream-to-channel stdout-fifo stdout-chan)
(read-stream-to-channel stderr-fifo stderr-chan)
;; 合并输出并等待任务结束
(let [logs (atom [])
exit-code (atom nil)]
(loop [stdout-open? true stderr-open? true]
(when (or stdout-open? stderr-open?)
(let [[val port] (a/alts!! [stdout-chan stderr-chan])]
(cond
(nil? val) ; channel closed
(if (= port stdout-chan)
(recur false stderr-open?)
(recur stdout-open? false))
(= :log (:type val))
(do (swap! logs conj (assoc val :source (if (= port stdout-chan) :stdout :stderr)))
(recur stdout-open? stderr-open?))))))
;; ... 等待任务并获取退出码的逻辑 ...
(reset! exit-code (get-task-exit-code stubs container-id))
{:status (if (zero? @exit-code) :success :failed)
:exit-code @exit-code
:logs @logs}))
(catch Exception e
;; ... 错误处理 ...
(finally
;; 清理 FIFO 文件和容器/任务
(io/delete-file stdout-fifo true)
(io/delete-file stderr-fifo true)
(cleanup-task-and-container stubs container-id)))))
;; 使用示例
(comment
(let [node-task {:image-ref "docker.io/library/node:18-slim"
:command ["node" "-e" "console.log('Hello from Node.js in containerd!'); console.error('This is an error message.');"]}]
(run-task! node-task))
)
注意:execute-task-with-io
是一个概念性的增强版本,为了清晰地展示思路,它合并并简化了部分 run-task!
的逻辑。
这个增强版本展示了如何通过宿主机的 FIFO 文件来捕获 I/O。core.async
在这里非常有用,它允许我们以非阻塞的方式同时监听 stdout 和 stderr 两个流,并将它们的输出合并到一个结果中。当任务结束,finally
块不仅要清理 containerd
的资源,还要确保删除这些临时的 FIFO 文件。
局限性与未来方向
我们构建的这个执行器虽然轻量且高效,但在一个真实的生产环境中,它还存在一些明显的局限性:
- 安全性: 当前实现以 root 权限运行
containerd
和 Clojure 进程,容器内的进程也可能是 root。这是一个巨大的安全隐患。生产环境需要启用 rootless containerd,并利用 user namespaces 将容器内的 root 用户映射为宿主机上的非特权用户。对于执行完全不可信的代码,还应考虑使用 gVisor 或 Firecracker 这样的强隔离运行时。 - 资源管理: 我们没有在 OCI Spec 中定义 cgroups 相关的资源限制(CPU、内存)。对于一个多租户执行器,精确的资源配额和限制是防止单个任务耗尽整个节点资源的关键。这需要对 OCI Spec 和 Linux cgroups v2 有更深入的了解。
- 网络: 默认情况下,
containerd
创建的任务没有网络命名空间,或者说它共享了宿主机的网络。这同样是危险的。要提供受控的网络访问,需要集成 CNI(Container Network Interface)插件,为每个任务动态创建和配置独立的网络环境。 - 可扩展性: 当前执行器是单机版的。要构建一个高可用的分布式系统,需要引入一个任务队列(如 RabbitMQ 或 NATS),让多个执行器节点从队列中消费任务。同时还需要一个集中的地方来存储任务状态和日志,例如使用 GCP Cloud Logging 和 Firestore。
尽管存在这些局限,但这个从底层构建的执行器原型,清晰地揭示了容器编排系统的核心工作原理,并展示了如何利用 Clojure 这样一门富有表现力的语言来构建稳定、可靠的系统控制平面。