本文从源码的角度大概分析了一下,在执行ctr run的时候,ctr、containerd、shim程序中发生了什么流程。
背景介绍
简介
Containerd
Containerd是一个行业标准的容器运行时,它是一个daemon进程,可以管理主机上容器的全部生命周期和它的文件系统,包括:
- 管理容器的生命周期(从创建容器到销毁容器)
- 拉取/推送容器镜像
- 存储管理(管理镜像及容器数据的存储)
- 调用 runc 运行容器(与 runc 等容器运行时交互)
- 管理容器网络接口及网络
Containerd-shim
Containerd-shim是containerd的一个子组件,它负责在宿主机上创建和管理容器。当用户通过containerd创建一个容器时,containerd-shim会被调用来实际创建和运行容器。
Containerd-shim是一个轻量级的组件,它的设计目标是尽可能简单和可靠。它可以与不同的容器运行时(如runc)配合使用,并且可以通过插件机制进行功能扩展。我们可以通过扩展containerd来实现对wasm容器的支持。
Ctr
ctr是containerd的命令行工具,是containerd的一个客户端,用于与containerd交互并管理容器。它提供了一组命令,用于执行各种容器操作,如创建容器、启动容器、停止容器、删除容器等。
ctr命令行工具可以通过与containerd的API进行通信来执行这些操作。它还支持与容器相关的元数据的查询和管理,包括容器状态、容器日志、容器元数据等。ctr还提供了对容器镜像的管理功能,包括拉取、推送、删除镜像等。
container在Containerd中代表的是一个容器的元数据,containerd中的Task用于获取容器对象并将它转换成在操作系统中可运行的进程,它代表的就是容器中可运行的对象。
容器启动一般流程
首先,使用ctr命令行工具执行ctr run
命令来创建和启动容器。该命令指定容器的配置参数,例如容器镜像、容器名称、容器的资源限制等。ctr会与containerd进行通信,将容器的创建请求发送给containerd。
containerd收到ctr的容器创建请求后,会根据指定的容器配置来执行相应的操作。首先,containerd会检查本地是否存在指定的容器镜像。如果镜像不存在,containerd会与镜像仓库通信,拉取所需的镜像到本地。
containerd首先会启动containerd-shim程序,containerd-shim会再启动一个containerd-shim,此时先前containerd-shim退出。新的container-shim进程的父进程就变成了pid为1用户init程序。然后会向containerd-shim发送请求来实际创建和运行容器。containerd-shim负责设置容器的命名空间(Namespace)和控制组(Cgroup)等隔离机制,以及其他容器运行时所需的设置。
containerd-shim在创建容器的隔离环境后,会调用容器运行时(如runc)来启动容器进程。容器运行时根据containerd-shim提供的隔离环境配置,设置容器的文件系统、网络、进程等。在wasm场景下会配置wasm运行时并运行wasm程序。
在容器启动后,containerd-shim会监控容器进程的状态,并将相关的事件报告给containerd。containerd提供了API供用户查询和管理容器的状态、日志等信息。
接下来我们从代码角度来查看详细的流程,在这些组件中都发生了什么。
代码分析
ctr 分析
通过ctr run
命令可以启动一个容器,让我们看下这个过程中所涉及到的源码:
简单总结下,run命令会先调用create创建一个容器,然 后调用start运行这个容器。在下面代码中我们可以看到,run命令先创建了一个containerd的client通过UDS与containerd进行通信。然后通过这个client发送NewContainer请求,然后通过task的api来start这个container。
var Command = cli.Command{
Name: "run",
Usage: "Run a container",
ArgsUsage: "[flags] Image|RootFS ID [COMMAND] [ARG...]",
SkipArgReorder: true,
Flags: append([]cli.Flag{
cli.BoolFlag{
Name: "rm",
Usage: "Remove the container after running, cannot be used with --detach",
},
//...
}, append(platformRunFlags,
append(append(commands.SnapshotterFlags, []cli.Flag{commands.SnapshotterLabels}...),
commands.ContainerFlags...)...)...),
Action: func(context *cli.Context) error {
//...
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
container, err := NewContainer(ctx, client, context)
if err != nil {
return err
}
if rm && !detach {
defer container.Delete(ctx, containerd.WithSnapshotCleanup)
}
// ...
opts := tasks.GetNewTaskOpts(context)
ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
// 发送 crate请求,返回一个封装的task用于发送对于这个container的task api请求
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil {
return err
}
// ...
// 发送start请求
if err := task.Start(ctx); err != nil {
return err
}
// ...
return nil
},
}
NewTask是一个包装,主要是根据平台不同调用不用的container.NewTask
方法。
// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
if r.SnapshotKey != "" {
if r.Snapshotter == "" {
return nil, fmt.Errorf("unable to resolve rootfs mounts without snapshotter on container: %w", errdefs.ErrInvalidArgument)
}
// ...
// 通过containerd获取container详细定义
spec, err := container.Spec(ctx)// 请求containerd unix:///run/containerd/containerd.sock
if err != nil {
return nil, err
}
}
t, err := container.NewTask(ctx, ioCreator, opts...)
if err != nil {
return nil, err
}
stdinC.closer = func() {
t.CloseIO(ctx, containerd.WithStdinCloser)
}
return t, nil
}
这里是unix平台的task创建函数,可以看到其主要作用就是封装了一个task结构体,它实现了Task的接口,用于向containerd发送task请求。并且在这个函数中,可以看到向containerd发送了Create请求。
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
request := &tasks.CreateTaskRequest{
ContainerID: c.id,
Terminal: cfg.Terminal,
Stdin: cfg.Stdin,
Stdout: cfg.Stdout,
Stderr: cfg.Stderr,
}
r, err := c.get(ctx)
// ...
t := &task{
client: c.client,
io: i,
id: c.id,
c: c,
}
response, err := c.client.TaskService().Create(ctx, request)
if err != nil {
return nil, errdefs.FromGRPC(err)
}
t.pid = response.Pid
return t, nil
}
ctr run的主要逻辑就是这些,起到的是一个向containerd发送请求的客户端的作用。接下来我们看下在containerd中的相关函数。
containerd分析
service实现了GRPC的接口,启动grpc server处理请求。
type service struct {
local api.TasksClient
api.UnimplementedTasksServer
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterTasksServer(server, s)
return nil
}
func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
return s.local.Create(ctx, r)
}
func (s *service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) {
return s.local.Start(ctx, r)
}
// ...
首先来看create流程,这里是task的creat接口,通过container id来create(到这里之前container已经创建过了)。
func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
container, err := l.getContainer(ctx, r.ContainerID)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
//...
// 调用TaskManager创建container,进行启动shim的流程
c, err := rtime.Create(ctx, r.ContainerID, opts)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
labels := map[string]string{"runtime": container.Runtime.Name}
if err := l.monitor.Monitor(c, labels); err != nil {
return nil, fmt.Errorf("monitor task: %w", err)
}
// pid 实际调用的是shim connect方法
pid, err := c.PID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task pid: %w", err)
}
return &api.CreateTaskResponse{
ContainerID: r.ContainerID,
Pid: pid,
}, nil
}
// PID of the task
func (s *shimTask) PID(ctx context.Context) (uint32, error) {
response, err := s.task.Connect(ctx, &task.ConnectRequest{
ID: s.ID(),
})
if err != nil {
return 0, errdefs.FromGRPC(err)
}
return response.TaskPid, nil
}
containerd启动containerd-shim,containerd-shim会再启动一个containerd-shim,此时先前containerd-shim退出。新的container-shim进程的父进程就变成了1用户init程序。向第二个shim发送create网络请求,创建失败就发送delete干掉。
// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
// 启动shim,在后文中详细介绍
shim, err := m.manager.Start(ctx, taskID, opts)
if err != nil {
return nil, fmt.Errorf("failed to start shim: %w", err)
}
// Cast to shim task and call task service to create a new container task instance.
// This will not be required once shim service / client implemented.
shimTask, err := newShimTask(shim)
if err != nil {
return nil, err
}
// 向第二个shim发送创建请求,通过unix socket /containerd-shim/b801ea1d328d2ece1fa8eb4ed5b547e8f1cdd54e8a3692ff47416924c44d0262.sock
t, err := shimTask.Create(ctx, opts)
if err != nil {
// NOTE: ctx contains required namespace information.
m.manager.shims.Delete(ctx, taskID)
dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
sandboxed := opts.SandboxID != ""
_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
if errShim != nil {
if errdefs.IsDeadlineExceeded(errShim) {
dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
defer cancel()
}
shimTask.Shutdown(dctx)
shimTask.Close()
}
return nil, fmt.Errorf("failed to create shim task: %w", err)
}
return t, nil
}
接下来看下启动shim的详细流程
// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
// ...
// 启动shim
shim, err := m.startShim(ctx, bundle, id, opts)
if err != nil {
return nil, err
}
// ...
if err := m.shims.Add(ctx, shim); err != nil {
return nil, fmt.Errorf("failed to add task: %w", err)
}
return shim, nil
}
启动shim的流程,首先根据runtime类型解析shim二进制地址。然后调用binary的start方法
func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
// ...
runtimePath, err := m.resolveRuntimePath(opts.Runtime) // /usr/local/bin/containerd-shim-dapr-v1
//... error handle
b := shimBinary(bundle, shimBinaryConfig{
runtime: runtimePath,
address: m.containerdAddress,// /run/containerd/containerd.sock
ttrpcAddress: m.containerdTTRPCAddress,// /run/containerd/containerd.sock.ttrpc
schedCore: m.schedCore,
})
shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
log.G(ctx).WithField("id", id).Info("shim disconnected")
cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
// disconnect and there is no chance to remove this dead task from runtime task lists.
// Thus it's better to delete it here.
m.shims.Delete(ctx, id)
})
if err != nil {
return nil, fmt.Errorf("start failed: %w", err)
}
return shim, nil
}
binary的start会启动第一个shim(第一个shim会自动启动第二个shim),从stdout中获取到第二个shim交互方式
func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
args := []string{"-id", b.bundle.ID}
switch log.GetLevel() {
case log.DebugLevel, log.TraceLevel:
args = append(args, "-debug")
}
args = append(args, "start")
cmd, err := client.Command(
ctx,
&client.CommandConfig{
Runtime: b.runtime,
Address: b.containerdAddress,
TTRPCAddress: b.containerdTTRPCAddress,
Path: b.bundle.Path,
Opts: opts,
Args: args,
SchedCore: b.schedCore,
})
if err != nil {
return nil, err
}
// Windows needs a namespace when openShimLog
ns, _ := namespaces.Namespace(ctx)
shimCtx, cancelShimLog := context.WithCancel(namespaces.WithNamespace(context.Background(), ns))
// ...
// 通过stdout与第一个shim进行交互,拿到第二个shim的sock地址(由第一个shim生成,启动第二个shim的时候传给它,同时输出到stdout,然后退出)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("%s: %w", out, err)
}
response := bytes.TrimSpace(out)
onCloseWithShimLog := func() {
onClose()
cancelShimLog()
f.Close()
}
// Save runtime binary path for restore.
if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil {
return nil, err
}
// 通过第一个shim返回信息解析出来第二个shim的交互address(和protocol,支持ttrpc或者grpc)
params, err := parseStartResponse(ctx, response)
if err != nil {
return nil, err
}
// 建立连接,作为client
conn, err := makeConnection(ctx, params, onCloseWithShimLog)
if err != nil {
return nil, err
}
return &shim{
bundle: b.bundle,
client: conn,
}, nil
}
如果是使用的ttrpc协议,会通过下面函数发送创建请求。
func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
var resp CreateTaskResponse
if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {
return nil, err
}
return &resp, nil
}
在ctr run场景中,调用create成功后,ctr会继续发送start请求。对于start的流程来说就是向上一步创建的shim发送start请求。
通过debug可以更加方便直观的查看某些代码流程,你可以通过以下过程来调试程序。
构建debug二进制
git clone https://github.com/containerd/containerd.git
cd containerd
GODEBUG=1 make
debug containerd命令
dlv --listen=:40000 --headless=true --api-version=2 --accept-multiclient exec ./bin/containerd
debug ctr命令
dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec ./bin/ctr -- run --rm --runtime=io.containerd.wasm.v1 docker.io/docker4zc/dwhttp:v0.0.3 httpwasm
shim分析
containerd仓库中提供了shim SDK,对于开发来说只需要实现对应的接口,传入创建函数就可以,其中wasm.New
函数的类型为 type Init func(context.Context, string, Publisher, func()) (Shim, error)
。
func main() {
shim.Run("io.containerd.wasm.v1", wasm.New)
}
在shim的sdk
func run(id string, initFunc Init, config Config) error {
// 调用initFunc,就是上面传入的`wasm.New`函数
service, err := initFunc(ctx, idFlag, publisher, cancel)
if err != nil {
return err
}
switch action {
case "delete":
//...
case "start":
// 第一个启动的shim接收的action 就是 start。这里启动第二个shim。address是根据ns和id算出来的,会传递给第二个shim,第二个shim会以这个地址起一个server,同时会通过stdout发送给containerd(因为c启动的本进程,所以可以收到),这就是containerd和第二个shim交流的.sock地址。
address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag)
if err != nil {
return err
}
if _, err := os.Stdout.WriteString(address); err != nil {
return err
}
return nil
default:
// 这里是第二shim会走到的流程,action会为空
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
}
client := NewShimClient(ctx, service, signals)
if err := client.Serve(); err != nil {
if err != context.Canceled {
return err
}
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
}
}
这里有一个有意思地方是第二个containerd如何获取自身作为server的socket地址。从代码上看之前是通过socket
flag来传递的,但是现在是通过把套接字转换成文件描述符传递给第二个shim,然后第二个shim再还原成listener实现的。
func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
address, err := shim.SocketAddress(ctx, grouping)
if err != nil {
return "", err
}
socket, err := shim.NewSocket(address)
f, err := socket.File()
if err != nil {
return "", err
}
defer f.Close()
// 这里通过标准库的cmd.ExtraFiles将文件传递了下去
cmd.ExtraFiles = append(cmd.ExtraFiles, f)
if err := cmd.Start(); err != nil {
return "", err
}
启动shimserver的时候会通过 net.FileListener(os.NewFile(3, "socket"))
来获取listener。这里之所以是3,是因为0,1,2默认是stdin、stdout、stderr,新添加的这个就刚好是3。
// client.Serve()会最终调用这个函数来启动server
func serveListener(path string) (net.Listener, error) {
var (
l net.Listener
err error
)
if path == "" {
l, err = net.FileListener(os.NewFile(3, "socket"))
path = "[inherited from parent]"
} else {
if len(path) > 106 {
return nil, errors.Errorf("%q: unix socket path too long (> 106)", path)
}
l, err = net.Listen("unix", "\x00"+path)
}
if err != nil {
return nil, err
}
logrus.WithField("socket", path).Debug("serving api on abstract socket")
return l, nil
}
shim在启动server之后,containerd就可以通过ttrpc(或者grpc)协议与shim进行交互。shim提供TaskService interface来对容器进行管理,这也是我们自己实现的shim需要实现的功能接口。
type TaskService interface {
State(ctx context.Context, req *StateRequest) (*StateResponse, error)
Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error)
Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error)
Pids(ctx context.Context, req *PidsRequest) (*PidsResponse, error)
Pause(ctx context.Context, req *PauseRequest) (*types1.Empty, error)
Resume(ctx context.Context, req *ResumeRequest) (*types1.Empty, error)
Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*types1.Empty, error)
Kill(ctx context.Context, req *KillRequest) (*types1.Empty, error)
Exec(ctx context.Context, req *ExecProcessRequest) (*types1.Empty, error)
ResizePty(ctx context.Context, req *ResizePtyRequest) (*types1.Empty, error)
CloseIO(ctx context.Context, req *CloseIORequest) (*types1.Empty, error)
Update(ctx context.Context, req *UpdateTaskRequest) (*types1.Empty, error)
Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error)
Stats(ctx context.Context, req *StatsRequest) (*StatsResponse, error)
Connect(ctx context.Context, req *ConnectRequest) (*ConnectResponse, error)
Shutdown(ctx context.Context, req *ShutdownRequest) (*types1.Empty, error)
}
参考
https://www.51cto.com/article/744467.html
https://juejin.cn/post/7223593286698074167
https://www.qikqiak.com/post/containerd-usage/
https://cloud.tencent.com/developer/article/2154031
https://www.jianshu.com/p/d8f6c40280f8
https://jiajunhuang.com/articles/2017_10_25-golang_graceful_restart.md.html