TL;DR 本文以实际案例介绍如何借助WebAssembly组件模型来自定义Wasm及开发对应的go语言SDK。
一 概述
Dapr是一个分布式运行时,那么我们能不能使用类似的abi定义,支持wasm程序能够获取dapr所支持的各种能力呢?在本文中我们将通过wit定义两个功能,网络调用http和状态存储state来探索如何实现。
二 HTTP
WIT定义
wit/http/http.wit 导出handle-http-request
,每当有网络请求到达时,host调用此方法将网络请求内容交由guest代码处理。这里将所有http语义信息放到了请求参数中,这是可以优化的,可以采用http-wasm
和proxy-wasm
所定义的方式采用guest主动获取参数方式,但是在初版实现中我会采用以下方式,更加简单一些。
导出函数是定义在wasm中的功能函数,可以供host调用。在http语义下,wasm需要导出http处理函数,接收请求数据,执行wasm业务逻辑。在接下来这个例子中我们可以看到导出函数是如何
package dapr:http
world http {
export http-handler
}
interface http-handler {
use http-types.{request, response}
// The entrypoint for an HTTP handler.
handle-http-request: func(req: request) -> response
}
wit/http/http-types.wit 这里时各种类型的详细定义。
package dapr:http
interface http-types {
// The HTTP status code.
// This is currently an unsigned 16-bit integer,
// but it could be represented as an enum containing
// all possible HTTP status codes.
type http-status = u16
// The HTTP body.
// Currently, this is a synchonous byte array, but it should be
// possible to have a stream for both request and response bodies.
type body = list<u8>
// The HTTP headers represented as a list of (name, value) pairs.
type headers = list<tuple<string, string>>
// The HTTP parameter queries, represented as a list of (name, value) pairs.
type params = list<tuple<string, string>>
// The HTTP URI of the current request.
type uri = string
// The HTTP method.
enum method {
get,
post,
put,
delete,
patch,
head,
options,
}
// An HTTP request.
record request {
method: method,
uri: uri,
headers: headers,
params: params,
body: option<body>,
}
// An HTTP response.
record response {
status: http-status,
headers: option<headers>,
body: option<body>,
}
}
Guest
通过命令wit-bindgen tiny-go --out-dir dapr/http wit/http
来生成guest sdk代码。,查看生成的目录,可以发现生成的是go文件和c文件。go文件通过cgo与c代码进行交互。
然后对于guest code来说,可以直接引用生成的guest sdk,实现所需要的接口,在接口函数中实现功能逻辑:
package main
import (
"github.com/taction/wit-dapr/dapr/http"
)
func init() {
a := &HostImpl{}
http.SetExportsDaprHttpHttpHandler(a)
}
type HostImpl struct {
}
func (h *HostImpl) HandleHttpRequest(req http.DaprHttpHttpTypesRequest) http.DaprHttpHttpTypesResponse {
headers := []http.DaprHttpHttpTypesTuple2StringStringT{{"Content-Type", "text/plain"}}
return http.DaprHttpHttpTypesResponse{
Status: 200,
Body: http.Some([]byte("Hello from WASM!")),
Headers: http.Some(headers),
}
}
func main() {}
通过tinygo build -o main.wasm --no-debug -target=wasi main.go
命令即可将其编译为wasm二进制文件。
Host
由于本例中没有导入函数,所以对于host来说就是需要实现一个go原生http server,当有请求的时候,将请求序列化为abi所能接受的参数格式,调用wasm对应的函数进行处理。然后解析wasm函数的处理结果,将其设置为网络请求的返回。
首先根据c代码中的函数,确定函数导出名称,以及入参出参,根据代码细节确定入参各个参数含义。
// 返回值大小,根据所有返回值计算出来的,铺平后所占用的空间
static uint8_t RET_AREA[28];
__attribute__((__export_name__("dapr:http/http-handler#handle-http-request")))
int32_t __wasm_export_exports_dapr_http_http_handler_handle_http_request(int32_t arg, int32_t arg0, int32_t arg1, int32_t arg2, int32_t arg3, int32_t arg4, int32_t arg5, int32_t arg6, int32_t arg7, int32_t arg8) {
// 将arg ~ arg8组装成arg9调用go中定义的函数。
dapr_http_http_handler_response_t ret;
exports_dapr_http_http_handler_handle_http_request(&arg9, &ret);
int32_t ptr = (int32_t) &RET_AREA;
// 将返回值封装到28字节数组中,返回其指针。
return ptr;
}
然后在处理网络请求的函数中,为每个请求实例化一个wasm instance,获取其导出函数,并封装调用参数,调用导出函数:
func main() {
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
err := handler(writer, request, wasmFile)
if err != nil {
writer.WriteHeader(500)
writer.Write([]byte(err.Error()))
}
})
}
func (w WasmServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
fn := w.Module.ExportedFunction("dapr:http/http-handler#handle-http-request")
if fn == nil {
res.WriteHeader(500)
res.Write([]byte("Handler not found"))
return
}
// 参数准备
methond := getMethod(req.Method)
urlPtr, urlLen, headerPtr, headerLen, paramPtr, paramLen, hasBody, bodyPtr, bodyLen, err := formatRequestAsArgs(w.Module, req)
// 函数调用,根据guest sdk中
resp, err := fn.Call(context.TODO(), uint64(methond), urlPtr, urlLen, headerPtr, headerLen, paramPtr, paramLen, hasBody, bodyPtr, bodyLen)
// 释放参数准备时申请的wasm中的内存,这些数据在wasm中不会再被用到了
freeHeader(w.Module, headerPtr, headerLen)
common.Free(context.TODO(), w.Module, urlPtr, headerPtr, paramPtr, bodyPtr) // free after used
if err != nil {
res.WriteHeader(500)
res.Write([]byte(err.Error()))
return
}
// 读取并解析返回值
data, ok := w.Module.Memory().Read(uint32(resp[0]), returnedResponseLen)
if !ok {
fmt.Println("Error reading headers.")
res.WriteHeader(500)
res.Write([]byte("Error reading response"))
return
}
response := decodeResponse(w.Module, data)
if response == nil {
res.WriteHeader(500)
res.Write([]byte("Error decoding response"))
return
}
if response.Headers != nil {
// todo handle headers properly
for k, v := range *response.Headers {
res.Header().Set(k, v[0])
}
}
res.WriteHeader(int(response.StatusCode))
if response.Body != nil {
res.Write(response.Body)
}
}
从参数准备过程中,我们可以发现的一点就是,在传复杂结构的时候,手动申请和释放内存会比较麻烦。申请内存所存放的内容是在wasm侧会用到的信息,而且这些内容在用不到的时候是需要手动释放的。我能想到的有2种方式来解决这个问题,一个是本例中所采用的方式,host知道或者规定在某个节点这些数据不再使用,进行手动释放;另一个是增加一个释放的abi函数,wasm在用不到对应数据的时候调用对应的函数释放对应的一系列资源,如果将body改为stream流,那么采用这种方式就会更好一点。
response解析
以response解析为例来看在传递response的时候具体的逻辑,从wit定义来看它具有如下定义:
// The HTTP body.
// Currently, this is a synchonous byte array, but it should be
// possible to have a stream for both request and response bodies.
type body = list<u8>
// The HTTP headers represented as a list of (name, value) pairs.
type headers = list<tuple<string, string>>
record response {
status: http-status,
headers: option<headers>,
body: option<body>,
}
在c guest sdk中,序列化response的代码如下:
static uint8_t RET_AREA[28];
__attribute__((__export_name__("dapr:http/http-handler#handle-http-request")))
int32_t __wasm_export_exports_dapr_http_http_handler_handle_http_request(int32_t arg, int32_t arg0, int32_t arg1, int32_t arg2, int32_t arg3, int32_t arg4, int32_t arg5, int32_t arg6, int32_t arg7, int32_t arg8) {
// ....
int32_t ptr = (int32_t) &RET_AREA;
*((int16_t*)(ptr + 0)) = (int32_t) ((ret).status);
if (((ret).headers).is_some) {
const dapr_http_http_types_headers_t *payload10 = &((ret).headers).val;
*((int8_t*)(ptr + 4)) = 1;
*((int32_t*)(ptr + 12)) = (int32_t) (*payload10).len;
*((int32_t*)(ptr + 8)) = (int32_t) (*payload10).ptr;
} else {
*((int8_t*)(ptr + 4)) = 0;
}
if (((ret).body).is_some) {
const dapr_http_http_types_body_t *payload12 = &((ret).body).val;
*((int8_t*)(ptr + 16)) = 1;
*((int32_t*)(ptr + 24)) = (int32_t) (*payload12).len;
*((int32_t*)(ptr + 20)) = (int32_t) (*payload12).ptr;
} else {
*((int8_t*)(ptr + 16)) = 0;
}
return ptr;
从中可以看到,response一共占用了28字节,所有的元素及其内存分布如下:
offset | actual bytes used | type | name |
---|---|---|---|
0 | 4 | int16 | status |
4 | 4 | int8 | headers.is_some |
8 | 4 | int32 | headers.ptr |
12 | 4 | int32 | headers.len |
16 | 4 | int8 | body.is_some |
20 | 4 | int32 | body.ptr |
24 | 4 | int32 | body.len |
从中我们可以看到任意元素无论其本身定义的时候类型是什么,在内存占用上都是4字节,这应该是代码生成bindgen工具的特性,可能是在展开response时将每个都作为一个独立元素处理的结果。
所以对应的go语言解析代码为:
func (w WasmServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
// ...
// 读取序列化后的response内容并解析
data, ok := w.Module.Memory().Read(uint32(resp[0]), returnedResponseLen)// returnedResponseLen=28
response := decodeResponse(w.Module, data)
// ...
}
// 解析Response
func decodeResponse(m api.Module, res []byte) *Response {
if len(res) != returnedResponseLen {
fmt.Printf("Error response length %d, expected %v\n", len(res), returnedResponseLen)
return nil
}
return &Response{
// [0-4) 字节是StatusCode
StatusCode: uint16(binary.LittleEndian.Uint32(res[0:4])),
// [4-16) 字节是header
Headers: decodeHeaders(m, res[4:16]),
// [16-28) 字节是body
Body: decodeBody(m, res[16:]),
}
}
// 解析body:0-4是是否存在,4-8是指针,8-14是数据长度
func decodeBody(mod api.Module, h []byte) []byte {
// 由于wasm采用小端存储,body是否有值只可能是0或者1,虽然使用前4个字节存储,因此只需要读第一字节就可以。
// uint8(h[0]) == uint8(binary.LittleEndian.Uint32(h[:4]))
if h[0] == 0 {
// nil
return nil
}
ptr := binary.LittleEndian.Uint32(h[4:8])
len := binary.LittleEndian.Uint32(h[8:12])
data, ok := mod.Memory().Read(ptr, len)
if !ok {
fmt.Println("Error reading header data")
return nil
}
return data
}
三 State
在state组件中,主要涉及是导入函数,wasm需要借助host函数来对存储数据进行增删改查。定义get、set、delete三个函数。
WIT定义
wit/state/state.wit
package dapr:state
world state {
import state-interface
}
interface state-interface {
use state-types.{store-name,get-request,get-response,set-request,delete-request,error}
get: func(name: store-name, req: get-request) -> result<get-response,error>
set: func(name: store-name, req: set-request) -> result<u32,error>
delete: func(name: store-name, req: delete-request) -> result<u32,error>
}
wit/state/state-types.wit
package dapr:state
interface state-types {
type error = string
type store-name = string
type key = string
type metadata = list<tuple<string, string>>
// type consistency = string // "eventual" | "strong"
enum consistency {
unspecified,
eventual,
strong,
}
record get-state-options {
consistency: consistency,
}
record get-request {
key: key,
metadata: option<metadata>,
options: get-state-options,
}
type data =list<u8>
type etag = option<string>
type content-type = option<string>
record get-response {
data: data,
etag: etag,
metadata: option<metadata>,
content-type: content-type,
}
enum concurrency {
unspecified,
first-write,
last-write,
}
record set-state-options {
concurrency: concurrency, // "first-write" | "last-write"
consistency: consistency,
}
record set-request {
key: key,
value: data,
etag: etag,
metadata: option<metadata>,
options: set-state-options,
content-type: content-type,
}
record delete-request {
key: key,
etag: etag,
metadata: option<metadata>,
options: set-state-options,
}
}
Guest
wit bindgen
可以通过wit-bindgen来生成guest的sdk。wit-bindgen tiny-go --out-dir dapr/state wit/state
。
类似的,对于guest code来说,可以直接引用生成的guest sdk,直接调用sdk中定义的函数来达成对host function的访问。
func main() {
state.DaprStateStateInterfaceSet("state", state.DaprStateStateTypesSetRequest{Key: "z", Value: []byte("value")})
res := state.DaprStateStateInterfaceGet("state", state.DaprStateStateTypesGetRequest{Key: "z"})
if res.IsOk() {
fmt.Println(string(res.Val.Data))
} else {
fmt.Println(res.Err)
}
}
通过tinygo build -o main.wasm --no-debug -target=wasi main.go
命令即可将其编译为wasm二进制文件。这次我们更进一步的,通过wasm2wat main.wasm -o main.wat
命令可以将其转化为text format,更容易阅读。
我们可以看到导入了两个在dapr:state/state-interface
命名空间下的host func,分别是get和set。其函数类型分别是0和20,在最上面可以分别看到其函数签名。
(module
(type (;0;) (func (param i32 i32)))
(type (;20;) (func (param i32 i32 i32 i32 i32 i32 i32 i32 i32)))
;; ...
(import "dapr:state/state-interface" "get" (func $__wasm_import_dapr_state_state_interface_get (type 20)))
(import "dapr:state/state-interface" "set" (func $__wasm_import_dapr_state_state_interface_set (type 0)))
;; ...
(table (;0;) 4 4 funcref)
(memory (;0;) 2)
(global $__stack_pointer (mut i32) (i32.const 65536))
(global (;1;) (mut i32) (i32.const 0))
(global (;2;) (mut i32) (i32.const 0))
(export "memory" (memory 0))
(export "malloc" (func $malloc))
(export "free" (func $free))
(export "calloc" (func $calloc))
(export "realloc" (func $realloc))
(export "_start" (func $_start))
(export "cabi_realloc" (func $cabi_realloc))
通过查看c sdk代码中函数的定义,可以看到函数签名的符合的,get函数具有9个int32参数,set函数具有2个int32参数。参数代表的具体含义同样可以进一步观察c sdk代码获得。
__attribute__((__import_module__("dapr:state/state-interface"), __import_name__("get")))
void __wasm_import_dapr_state_state_interface_get(int32_t, int32_t, int32_t, int32_t, int32_t, int32_t, int32_t, int32_t, int32_t);
__attribute__((__import_module__("dapr:state/state-interface"), __import_name__("set")))
void __wasm_import_dapr_state_state_interface_set(int32_t, int32_t);
Host
对于host sdk来说,需要实现对应的3个导入函数:
const ModuleName = "dapr:state/state-interface"
func Instantiate(ctx context.Context, rt wazero.Runtime) error {
s := State
_, err := rt.NewHostModuleBuilder(ModuleName).
NewFunctionBuilder().WithFunc(s.Get).Export("get").
NewFunctionBuilder().WithFunc(s.Set).Export("set").
NewFunctionBuilder().WithFunc(s.Delete).Export("delete").
Instantiate(ctx)
return err
}
以set函数为例,它具有两个参数,一个是方法调用所有参数的指针,另一个是方法调用完成后返回值可以写入的指针,其长度可以查看guest sdk获得。在此方法中,首先根据入参指针解析组件名称和set请求,然后根据名称获取对应的组件,调用组件的set方法,将结果写回到返回值指针中。
func (h *HostImpl) Set(ctx context.Context, m api.Module, inPtr, outPtr uint32) {
// read request
name, req, err := readSetRequest(ctx, m, inPtr)
// call state store
c, ok := h.getComponent(ctx, string(name))
if !ok {
handleGetError(ctx, m, outPtr, errors.New("get component failed"))
return
}
err = c.Set(ctx, req)
if err != nil {
handleSetError(ctx, m, outPtr, err)
return
}
// handle response
handleSetResponseOk(ctx, m, outPtr)
}
读取请求
func readSetRequest(ctx context.Context, m api.Module, ptr uint32) (string, *comstate.SetRequest, error) {
data, ok := common.ReadBytes(m, ptr, SetLen)
if !ok {
return "", nil, errors.New("read request failed")
}
// read name
namePtr, nameLen := binary.LittleEndian.Uint32(data[0:4]), binary.LittleEndian.Uint32(data[4:8])
name, ok := common.ReadString(m, namePtr, nameLen)
if !ok {
return "", nil, errors.New("read name failed")
}
keyPtr, keyLen := binary.LittleEndian.Uint32(data[8:12]), binary.LittleEndian.Uint32(data[12:16])
key, ok := common.ReadString(m, keyPtr, keyLen)
if !ok {
return "", nil, errors.New("read key failed")
}
valuePtr, valueLen := binary.LittleEndian.Uint32(data[16:20]), binary.LittleEndian.Uint32(data[20:24])
value, ok := common.ReadBytes(m, valuePtr, valueLen)
// todo handle etag and metadata
return name, &comstate.SetRequest{
Key: string(key),
Value: value,
}, nil
}
处理成功的请求,将请求结果序列化,并写入给定的指针。指针指向的区域肯定够大,因为在生成代码的时候计算过返回值“大小”。
func handleSetResponseOk(ctx context.Context, m api.Module, ptr uint32) {
resbytes := make([]byte, SetLen)
//binary.LittleEndian.PutUint32(resbytes[0:4], 0) // result<ok> is 0, so do not need to write
binary.LittleEndian.PutUint32(resbytes[4:8], 1) // write 1 to indicate ok
m.Memory().Write(ptr, resbytes)
}
忽略组件的注册过程,host运行wasm的代码如下,其中最主要的就是host_state.Instantiate(ctx, runtime)
导入定义的state host function:
func run(wasmFile string) error {
wasmName := filepath.Base(wasmFile)
wasmCode, err := os.ReadFile(wasmFile)
if err != nil {
return fmt.Errorf("could not read WASM file '%s': %w", wasmFile, err)
}
ctx := context.Background()
runtime := wazero.NewRuntime(ctx)
defer runtime.Close(ctx)
// todo detect wasm module valid
wasmModule, err := runtime.CompileModule(ctx, wasmCode)
if err != nil {
return err
}
defer wasmModule.Close(ctx)
wasi_snapshot_preview1.Instantiate(ctx, runtime)
host_state.Instantiate(ctx, runtime)
ins, err := runtime.InstantiateModule(ctx, wasmModule,
wazero.NewModuleConfig().WithName(wasmName).
WithRandSource(rand.Reader).
WithStdout(os.Stdout).
WithSysNanotime().
WithSysWalltime().
WithSysNanosleep())
if err != nil {
return err
}
ins.Close(ctx)
return nil
}
函数参数格式进一步说明
细心的同学会发现wit-bindgen会倾向于把把函数返回值“聚合”,把函数参数“铺平”。但是可以看到在state函数中set和get函数的参数数量差别很大,set函数参数是聚合的结构体(或者说是序列化后的指针),而get参数是正常每个参数都单独占用一个参数位置。由于这个会影响到调用方法时参数应该以何种方式传递,所以我追查了一下源码,为什么会产生这个“特例”。在生成导入和导出函数的时候,第一步都是调用计算函数签名方法wasm_signature
.在这个函数中,第一步就是将所有参数展开成wasm本身支持的数据结构能表达的格式;然后判断是否超过MAX_FLAT_PARAMS
(16)阈值,如果超过了,那么参数列表就太长了,就给它聚合成一个。
// import 和 export 函数,都会先计算函数签名
fn import(&mut self, interface_name: Option<&WorldKey>, func: &Function) {
self.docs(&func.docs, SourceType::HFns);
let sig = self.resolve.wasm_signature(AbiVariant::GuestImport, func);
// ...
}
fn export(&mut self, func: &Function, interface_name: Option<&WorldKey>) {
let sig = self.resolve.wasm_signature(AbiVariant::GuestExport, func);
// ...
}
pub fn wasm_signature(&self, variant: AbiVariant, func: &Function) -> WasmSignature {
const MAX_FLAT_PARAMS: usize = 16;
const MAX_FLAT_RESULTS: usize = 1;
// 会将参数展开,将一个参数展开成为多个wasm支持的参数(i32,i64,f32,f46)
let mut params = Vec::new();
let mut indirect_params = false;
for (_, param) in func.params.iter() {
self.push_flat(param, &mut params);
}
// 如果参数展开后超过了MAX_FLAT_PARAMS(16),就会将其压缩为一个参数
if params.len() > MAX_FLAT_PARAMS {
params.truncate(0);
params.push(WasmType::I32);
indirect_params = true;
}
// ...
}
你可以在这里看到所有的源码: https://github.com/Taction/wit-dapr