通过Containerd-Shim启动container源码分析

Posted by     "Taction" on Tuesday, September 5, 2023

本文从源码的角度大概分析了一下,在执行ctr run的时候,ctr、containerd、shim程序中发生了什么流程。

背景介绍

简介

Containerd

Containerd是一个行业标准的容器运行时,它是一个daemon进程,可以管理主机上容器的全部生命周期和它的文件系统,包括:

  • 管理容器的生命周期(从创建容器到销毁容器)
  • 拉取/推送容器镜像
  • 存储管理(管理镜像及容器数据的存储)
  • 调用 runc 运行容器(与 runc 等容器运行时交互)
  • 管理容器网络接口及网络

Containerd-shim

Containerd-shim是containerd的一个子组件,它负责在宿主机上创建和管理容器。当用户通过containerd创建一个容器时,containerd-shim会被调用来实际创建和运行容器。

Containerd-shim是一个轻量级的组件,它的设计目标是尽可能简单和可靠。它可以与不同的容器运行时(如runc)配合使用,并且可以通过插件机制进行功能扩展。我们可以通过扩展containerd来实现对wasm容器的支持。

Ctr

ctr是containerd的命令行工具,是containerd的一个客户端,用于与containerd交互并管理容器。它提供了一组命令,用于执行各种容器操作,如创建容器、启动容器、停止容器、删除容器等。

ctr命令行工具可以通过与containerd的API进行通信来执行这些操作。它还支持与容器相关的元数据的查询和管理,包括容器状态、容器日志、容器元数据等。ctr还提供了对容器镜像的管理功能,包括拉取、推送、删除镜像等。

container在Containerd中代表的是一个容器的元数据,containerd中的Task用于获取容器对象并将它转换成在操作系统中可运行的进程,它代表的就是容器中可运行的对象。

容器启动一般流程

首先,使用ctr命令行工具执行ctr run命令来创建和启动容器。该命令指定容器的配置参数,例如容器镜像、容器名称、容器的资源限制等。ctr会与containerd进行通信,将容器的创建请求发送给containerd。

containerd收到ctr的容器创建请求后,会根据指定的容器配置来执行相应的操作。首先,containerd会检查本地是否存在指定的容器镜像。如果镜像不存在,containerd会与镜像仓库通信,拉取所需的镜像到本地。

containerd首先会启动containerd-shim程序,containerd-shim会再启动一个containerd-shim,此时先前containerd-shim退出。新的container-shim进程的父进程就变成了pid为1用户init程序。然后会向containerd-shim发送请求来实际创建和运行容器。containerd-shim负责设置容器的命名空间(Namespace)和控制组(Cgroup)等隔离机制,以及其他容器运行时所需的设置。

containerd-shim在创建容器的隔离环境后,会调用容器运行时(如runc)来启动容器进程。容器运行时根据containerd-shim提供的隔离环境配置,设置容器的文件系统、网络、进程等。在wasm场景下会配置wasm运行时并运行wasm程序。

在容器启动后,containerd-shim会监控容器进程的状态,并将相关的事件报告给containerd。containerd提供了API供用户查询和管理容器的状态、日志等信息。

接下来我们从代码角度来查看详细的流程,在这些组件中都发生了什么。

代码分析

ctr 分析

通过ctr run命令可以启动一个容器,让我们看下这个过程中所涉及到的源码:

简单总结下,run命令会先调用create创建一个容器,然 后调用start运行这个容器。在下面代码中我们可以看到,run命令先创建了一个containerd的client通过UDS与containerd进行通信。然后通过这个client发送NewContainer请求,然后通过task的api来start这个container。

var Command = cli.Command{
	Name:           "run",
	Usage:          "Run a container",
	ArgsUsage:      "[flags] Image|RootFS ID [COMMAND] [ARG...]",
	SkipArgReorder: true,
	Flags: append([]cli.Flag{
		cli.BoolFlag{
			Name:  "rm",
			Usage: "Remove the container after running, cannot be used with --detach",
		},
		//...
	}, append(platformRunFlags,
		append(append(commands.SnapshotterFlags, []cli.Flag{commands.SnapshotterLabels}...),
			commands.ContainerFlags...)...)...),
	Action: func(context *cli.Context) error {
		//...
		client, ctx, cancel, err := commands.NewClient(context)
		if err != nil {
			return err
		}
		defer cancel()

		container, err := NewContainer(ctx, client, context)
		if err != nil {
			return err
		}
		if rm && !detach {
			defer container.Delete(ctx, containerd.WithSnapshotCleanup)
		}
		// ...
		opts := tasks.GetNewTaskOpts(context)
		ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
    // 发送 crate请求,返回一个封装的task用于发送对于这个container的task api请求
		task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
		if err != nil {
			return err
		}

		// ...
    // 发送start请求
		if err := task.Start(ctx); err != nil {
			return err
		}
		// ...
		return nil
	},
}

NewTask是一个包装,主要是根据平台不同调用不用的container.NewTask方法。

// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
  if r.SnapshotKey != "" {
		if r.Snapshotter == "" {
			return nil, fmt.Errorf("unable to resolve rootfs mounts without snapshotter on container: %w", errdefs.ErrInvalidArgument)
		}

		// ...
    // 通过containerd获取container详细定义
		spec, err := container.Spec(ctx)// 请求containerd unix:///run/containerd/containerd.sock
		if err != nil {
			return nil, err
		}
  }
	t, err := container.NewTask(ctx, ioCreator, opts...)
	if err != nil {
		return nil, err
	}
	stdinC.closer = func() {
		t.CloseIO(ctx, containerd.WithStdinCloser)
	}
	return t, nil
}

这里是unix平台的task创建函数,可以看到其主要作用就是封装了一个task结构体,它实现了Task的接口,用于向containerd发送task请求。并且在这个函数中,可以看到向containerd发送了Create请求。

func (c *container) NewTask(ctx context.Context, ioCreate cio.Creator, opts ...NewTaskOpts) (_ Task, err error) {
	
	request := &tasks.CreateTaskRequest{
		ContainerID: c.id,
		Terminal:    cfg.Terminal,
		Stdin:       cfg.Stdin,
		Stdout:      cfg.Stdout,
		Stderr:      cfg.Stderr,
	}
	r, err := c.get(ctx)
	// ...
	t := &task{
		client: c.client,
		io:     i,
		id:     c.id,
		c:      c,
	}
	
	response, err := c.client.TaskService().Create(ctx, request)
	if err != nil {
		return nil, errdefs.FromGRPC(err)
	}
	t.pid = response.Pid
	return t, nil
}

ctr run的主要逻辑就是这些,起到的是一个向containerd发送请求的客户端的作用。接下来我们看下在containerd中的相关函数。

containerd分析

service实现了GRPC的接口,启动grpc server处理请求。

type service struct {
	local api.TasksClient
	api.UnimplementedTasksServer
}

func (s *service) Register(server *grpc.Server) error {
	api.RegisterTasksServer(server, s)
	return nil
}

func (s *service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.CreateTaskResponse, error) {
	return s.local.Create(ctx, r)
}

func (s *service) Start(ctx context.Context, r *api.StartRequest) (*api.StartResponse, error) {
	return s.local.Start(ctx, r)
}
// ...

首先来看create流程,这里是task的creat接口,通过container id来create(到这里之前container已经创建过了)。

func (l *local) Create(ctx context.Context, r *api.CreateTaskRequest, _ ...grpc.CallOption) (*api.CreateTaskResponse, error) {
	container, err := l.getContainer(ctx, r.ContainerID)
	if err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	//...
  // 调用TaskManager创建container,进行启动shim的流程
	c, err := rtime.Create(ctx, r.ContainerID, opts)
	if err != nil {
		return nil, errdefs.ToGRPC(err)
	}
	labels := map[string]string{"runtime": container.Runtime.Name}
	if err := l.monitor.Monitor(c, labels); err != nil {
		return nil, fmt.Errorf("monitor task: %w", err)
	}
  // pid 实际调用的是shim connect方法
	pid, err := c.PID(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to get task pid: %w", err)
	}
	return &api.CreateTaskResponse{
		ContainerID: r.ContainerID,
		Pid:         pid,
	}, nil
}

// PID of the task
func (s *shimTask) PID(ctx context.Context) (uint32, error) {
	response, err := s.task.Connect(ctx, &task.ConnectRequest{
		ID: s.ID(),
	})
	if err != nil {
		return 0, errdefs.FromGRPC(err)
	}

	return response.TaskPid, nil
}

containerd启动containerd-shim,containerd-shim会再启动一个containerd-shim,此时先前containerd-shim退出。新的container-shim进程的父进程就变成了1用户init程序。向第二个shim发送create网络请求,创建失败就发送delete干掉。

// Create launches new shim instance and creates new task
func (m *TaskManager) Create(ctx context.Context, taskID string, opts runtime.CreateOpts) (runtime.Task, error) {
  // 启动shim,在后文中详细介绍
	shim, err := m.manager.Start(ctx, taskID, opts)
	if err != nil {
		return nil, fmt.Errorf("failed to start shim: %w", err)
	}

	// Cast to shim task and call task service to create a new container task instance.
	// This will not be required once shim service / client implemented.
	shimTask, err := newShimTask(shim)
	if err != nil {
		return nil, err
	}

  // 向第二个shim发送创建请求,通过unix socket /containerd-shim/b801ea1d328d2ece1fa8eb4ed5b547e8f1cdd54e8a3692ff47416924c44d0262.sock
	t, err := shimTask.Create(ctx, opts)
	if err != nil {
		// NOTE: ctx contains required namespace information.
		m.manager.shims.Delete(ctx, taskID)

		dctx, cancel := timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
		defer cancel()

		sandboxed := opts.SandboxID != ""
		_, errShim := shimTask.delete(dctx, sandboxed, func(context.Context, string) {})
		if errShim != nil {
			if errdefs.IsDeadlineExceeded(errShim) {
				dctx, cancel = timeout.WithContext(cleanup.Background(ctx), cleanupTimeout)
				defer cancel()
			}

			shimTask.Shutdown(dctx)
			shimTask.Close()
		}

		return nil, fmt.Errorf("failed to create shim task: %w", err)
	}

	return t, nil
}

接下来看下启动shim的详细流程

// Start launches a new shim instance
func (m *ShimManager) Start(ctx context.Context, id string, opts runtime.CreateOpts) (_ ShimInstance, retErr error) {
	bundle, err := NewBundle(ctx, m.root, m.state, id, opts.Spec)
	// ...

  // 启动shim
	shim, err := m.startShim(ctx, bundle, id, opts)
	if err != nil {
		return nil, err
	}
	// ...

	if err := m.shims.Add(ctx, shim); err != nil {
		return nil, fmt.Errorf("failed to add task: %w", err)
	}

	return shim, nil
}

启动shim的流程,首先根据runtime类型解析shim二进制地址。然后调用binary的start方法

func (m *ShimManager) startShim(ctx context.Context, bundle *Bundle, id string, opts runtime.CreateOpts) (*shim, error) {
	// ...

	runtimePath, err := m.resolveRuntimePath(opts.Runtime) // /usr/local/bin/containerd-shim-dapr-v1
  //... error handle
	b := shimBinary(bundle, shimBinaryConfig{
		runtime:      runtimePath,
		address:      m.containerdAddress,// /run/containerd/containerd.sock
		ttrpcAddress: m.containerdTTRPCAddress,// /run/containerd/containerd.sock.ttrpc
		schedCore:    m.schedCore,
	})
	shim, err := b.Start(ctx, protobuf.FromAny(topts), func() {
		log.G(ctx).WithField("id", id).Info("shim disconnected")

		cleanupAfterDeadShim(cleanup.Background(ctx), id, m.shims, m.events, b)
		// Remove self from the runtime task list. Even though the cleanupAfterDeadShim()
		// would publish taskExit event, but the shim.Delete() would always failed with ttrpc
		// disconnect and there is no chance to remove this dead task from runtime task lists.
		// Thus it's better to delete it here.
		m.shims.Delete(ctx, id)
	})
	if err != nil {
		return nil, fmt.Errorf("start failed: %w", err)
	}

	return shim, nil
}

binary的start会启动第一个shim(第一个shim会自动启动第二个shim),从stdout中获取到第二个shim交互方式

func (b *binary) Start(ctx context.Context, opts *types.Any, onClose func()) (_ *shim, err error) {
	args := []string{"-id", b.bundle.ID}
	switch log.GetLevel() {
	case log.DebugLevel, log.TraceLevel:
		args = append(args, "-debug")
	}
	args = append(args, "start")

	cmd, err := client.Command(
		ctx,
		&client.CommandConfig{
			Runtime:      b.runtime,
			Address:      b.containerdAddress,
			TTRPCAddress: b.containerdTTRPCAddress,
			Path:         b.bundle.Path,
			Opts:         opts,
			Args:         args,
			SchedCore:    b.schedCore,
		})
	if err != nil {
		return nil, err
	}
	// Windows needs a namespace when openShimLog
	ns, _ := namespaces.Namespace(ctx)
	shimCtx, cancelShimLog := context.WithCancel(namespaces.WithNamespace(context.Background(), ns))
	// ...
  // 通过stdout与第一个shim进行交互,拿到第二个shim的sock地址(由第一个shim生成,启动第二个shim的时候传给它,同时输出到stdout,然后退出)
	out, err := cmd.CombinedOutput()
	if err != nil {
		return nil, fmt.Errorf("%s: %w", out, err)
	}
	response := bytes.TrimSpace(out)

	onCloseWithShimLog := func() {
		onClose()
		cancelShimLog()
		f.Close()
	}
	// Save runtime binary path for restore.
	if err := os.WriteFile(filepath.Join(b.bundle.Path, "shim-binary-path"), []byte(b.runtime), 0600); err != nil {
		return nil, err
	}
  // 通过第一个shim返回信息解析出来第二个shim的交互address(和protocol,支持ttrpc或者grpc)
	params, err := parseStartResponse(ctx, response)
	if err != nil {
		return nil, err
	}

  // 建立连接,作为client
	conn, err := makeConnection(ctx, params, onCloseWithShimLog)
	if err != nil {
		return nil, err
	}

	return &shim{
		bundle: b.bundle,
		client: conn,
	}, nil
}

如果是使用的ttrpc协议,会通过下面函数发送创建请求。

func (c *taskClient) Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error) {
	var resp CreateTaskResponse
	if err := c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp); err != nil {
		return nil, err
	}
	return &resp, nil
}

在ctr run场景中,调用create成功后,ctr会继续发送start请求。对于start的流程来说就是向上一步创建的shim发送start请求。

通过debug可以更加方便直观的查看某些代码流程,你可以通过以下过程来调试程序。

构建debug二进制

git clone https://github.com/containerd/containerd.git
cd containerd
GODEBUG=1 make

debug containerd命令

dlv --listen=:40000 --headless=true --api-version=2 --accept-multiclient exec ./bin/containerd

debug ctr命令

dlv --listen=:40001 --headless=true --api-version=2 --accept-multiclient exec ./bin/ctr -- run --rm --runtime=io.containerd.wasm.v1 docker.io/docker4zc/dwhttp:v0.0.3 httpwasm

shim分析

containerd仓库中提供了shim SDK,对于开发来说只需要实现对应的接口,传入创建函数就可以,其中wasm.New函数的类型为 type Init func(context.Context, string, Publisher, func()) (Shim, error)

func main() {
	shim.Run("io.containerd.wasm.v1", wasm.New)
}

在shim的sdk

func run(id string, initFunc Init, config Config) error {
	// 调用initFunc,就是上面传入的`wasm.New`函数
	service, err := initFunc(ctx, idFlag, publisher, cancel)
	if err != nil {
		return err
	}
	switch action {
	case "delete":
		//...
	case "start":
    // 第一个启动的shim接收的action 就是 start。这里启动第二个shim。address是根据ns和id算出来的,会传递给第二个shim,第二个shim会以这个地址起一个server,同时会通过stdout发送给containerd(因为c启动的本进程,所以可以收到),这就是containerd和第二个shim交流的.sock地址。
		address, err := service.StartShim(ctx, idFlag, containerdBinaryFlag, addressFlag)
		if err != nil {
			return err
		}
		if _, err := os.Stdout.WriteString(address); err != nil {
			return err
		}
		return nil
	default:
    // 这里是第二shim会走到的流程,action会为空
		if !config.NoSetupLogger {
			if err := setLogger(ctx, idFlag); err != nil {
				return err
			}
		}
		client := NewShimClient(ctx, service, signals)
		if err := client.Serve(); err != nil {
			if err != context.Canceled {
				return err
			}
		}
		select {
		case <-publisher.Done():
			return nil
		case <-time.After(5 * time.Second):
			return errors.New("publisher not closed")
		}
	}
}

这里有一个有意思地方是第二个containerd如何获取自身作为server的socket地址。从代码上看之前是通过socket flag来传递的,但是现在是通过把套接字转换成文件描述符传递给第二个shim,然后第二个shim再还原成listener实现的。

func (s *service) StartShim(ctx context.Context, id, containerdBinary, containerdAddress string) (string, error) {
	cmd, err := newCommand(ctx, id, containerdBinary, containerdAddress)
	address, err := shim.SocketAddress(ctx, grouping)
	if err != nil {
		return "", err
	}
	socket, err := shim.NewSocket(address)
	f, err := socket.File()
	if err != nil {
		return "", err
	}
	defer f.Close()
	// 这里通过标准库的cmd.ExtraFiles将文件传递了下去
	cmd.ExtraFiles = append(cmd.ExtraFiles, f)

	if err := cmd.Start(); err != nil {
		return "", err
	}

启动shimserver的时候会通过 net.FileListener(os.NewFile(3, "socket"))来获取listener。这里之所以是3,是因为0,1,2默认是stdin、stdout、stderr,新添加的这个就刚好是3。

// client.Serve()会最终调用这个函数来启动server
func serveListener(path string) (net.Listener, error) {
	var (
		l   net.Listener
		err error
	)
	if path == "" {
		l, err = net.FileListener(os.NewFile(3, "socket"))
		path = "[inherited from parent]"
	} else {
		if len(path) > 106 {
			return nil, errors.Errorf("%q: unix socket path too long (> 106)", path)
		}
		l, err = net.Listen("unix", "\x00"+path)
	}
	if err != nil {
		return nil, err
	}
	logrus.WithField("socket", path).Debug("serving api on abstract socket")
	return l, nil
}

shim在启动server之后,containerd就可以通过ttrpc(或者grpc)协议与shim进行交互。shim提供TaskService interface来对容器进行管理,这也是我们自己实现的shim需要实现的功能接口。

type TaskService interface {
	State(ctx context.Context, req *StateRequest) (*StateResponse, error)
	Create(ctx context.Context, req *CreateTaskRequest) (*CreateTaskResponse, error)
	Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
	Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error)
	Pids(ctx context.Context, req *PidsRequest) (*PidsResponse, error)
	Pause(ctx context.Context, req *PauseRequest) (*types1.Empty, error)
	Resume(ctx context.Context, req *ResumeRequest) (*types1.Empty, error)
	Checkpoint(ctx context.Context, req *CheckpointTaskRequest) (*types1.Empty, error)
	Kill(ctx context.Context, req *KillRequest) (*types1.Empty, error)
	Exec(ctx context.Context, req *ExecProcessRequest) (*types1.Empty, error)
	ResizePty(ctx context.Context, req *ResizePtyRequest) (*types1.Empty, error)
	CloseIO(ctx context.Context, req *CloseIORequest) (*types1.Empty, error)
	Update(ctx context.Context, req *UpdateTaskRequest) (*types1.Empty, error)
	Wait(ctx context.Context, req *WaitRequest) (*WaitResponse, error)
	Stats(ctx context.Context, req *StatsRequest) (*StatsResponse, error)
	Connect(ctx context.Context, req *ConnectRequest) (*ConnectResponse, error)
	Shutdown(ctx context.Context, req *ShutdownRequest) (*types1.Empty, error)
}

参考

https://www.51cto.com/article/744467.html

https://juejin.cn/post/7223593286698074167

https://www.alibabacloud.com/blog/getting-started-with-kubernetes-%7C-understanding-kubernetes-runtimeclass-and-using-multiple-container-runtimes_596341

https://www.qikqiak.com/post/containerd-usage/

https://cloud.tencent.com/developer/article/2154031

https://www.jianshu.com/p/d8f6c40280f8

https://jiajunhuang.com/articles/2017_10_25-golang_graceful_restart.md.html