Knative Queue

Posted by Taction on Wednesday, December 1, 2021

简介

queue是knative在每个deployment都会为业务容器注入的"sidecar",负责其入口流量代理行为。并且会对请求进行计数,对外暴露metrice接口,autoscaler会定期拉取这些指标数据。

queue的几个端口表示如下:

• 8012: queue-proxy 代理的http端口,流量的入口都会到 8012 • 8013: http2 端口,用于grpc流量的转发 • 8022: queue-proxy 管理端口,如健康检查 • 9090: queue-proxy的监控端口,暴露指标供 autoscaler 采集,用于扩缩容 • 9091: prometheus 应用监控指标(请求数,响应时长等)

此外还有一个通过环境变量USER_PORT配置的用户容器端口,即业务实际暴露的服务端口,最初是在ksvc container port中配置的,然后一步一步带下来的。

指标上报

指标上报server

queue的指标上报是比较简单的一个逻辑,首先是在有请求的时候进行计数,然后暴露拉取接口。

queue会启动一个metrics server。同时支持protobuf和json格式的数据上报,原理都是一致的, 接下来以http reporter为例介绍。

func buildMetricsServer(promStatReporter *queue.PrometheusStatsReporter, protobufStatReporter *queue.ProtobufStatsReporter) *http.Server {
   metricsMux := http.NewServeMux()
   metricsMux.Handle("/metrics", queue.NewStatsHandler(promStatReporter, protobufStatReporter))
   return &http.Server{
      Addr:    ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort),
      Handler: metricsMux,
   }
}

reporter创建,首先检查指标上报labelnamespace、configuration、revision、pod是否配置,然后都是prometheus一些模式化的代码,将各个指标数据设置为自己的成员变量。

// NewPrometheusStatsReporter creates a reporter that collects and reports queue metrics.
func NewPrometheusStatsReporter(namespace, config, revision, pod string, reportingPeriod time.Duration) (*PrometheusStatsReporter, error) {
   if namespace == "" {
      return nil, errors.New("namespace must not be empty")
   }
   if config == "" {
      return nil, errors.New("config must not be empty")
   }
   if revision == "" {
      return nil, errors.New("revision must not be empty")
   }
   if pod == "" {
      return nil, errors.New("pod must not be empty")
   }

   registry := prometheus.NewRegistry()
   for _, gv := range []*prometheus.GaugeVec{
      requestsPerSecondGV, proxiedRequestsPerSecondGV,
      averageConcurrentRequestsGV, averageProxiedConcurrentRequestsGV,
      processUptimeGV} {
      if err := registry.Register(gv); err != nil {
         return nil, fmt.Errorf("register metric failed: %w", err)
      }
   }

   labels := prometheus.Labels{
      destinationNsLabel:     namespace,
      destinationConfigLabel: config,
      destinationRevLabel:    revision,
      destinationPodLabel:    pod,
   }

   return &PrometheusStatsReporter{
      handler:   promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
      startTime: time.Now(),

      reportingPeriodSeconds: reportingPeriod.Seconds(),

      requestsPerSecond:                requestsPerSecondGV.With(labels),
      proxiedRequestsPerSecond:         proxiedRequestsPerSecondGV.With(labels),
      averageConcurrentRequests:        averageConcurrentRequestsGV.With(labels),
      averageProxiedConcurrentRequests: averageProxiedConcurrentRequestsGV.With(labels),
      processUptime:                    processUptimeGV.With(labels),
   }, nil
}

请求数量的记录借助了一个中间的结构体RequestStats,在构建代理pod请求的server的时候会传入此结构体的一个实例,在请求到达时进行计数。然后会周期取出各项指标,设置到PrometheusStatsReporter中供拉取指标时使用。

指标记录

将proxyHandler路由中间件逻辑简化后如下所示,就是在流量到达和转发完成后分别触发stats HandleEvent来进行记录。

// ProxyHandler sends requests to the `next` handler at a rate controlled by
// the passed `breaker`, while recording stats to `stats`.
func ProxyHandler(breaker *Breaker, stats *network.RequestStats, tracingEnabled bool, next http.Handler) http.HandlerFunc {
   return func(w http.ResponseWriter, r *http.Request) {
      
      stats.HandleEvent(network.ReqEvent{Time: time.Now(), Type: in})
      defer func() {
         stats.HandleEvent(network.ReqEvent{Time: time.Now(), Type: out})
      }()
      // ......
      next.ServeHTTP(w, r)
   }
}

HandleEvent就是针对不同情况下的出入流量行为进行对应的计数。如果入请求就将并发数和请求数增加。如果是请求结束就将并发数减一。如果是activator代理过来的入请求就在以上基础上对将代理并发数和请求数加一,如果是代理请求结束就在以上基础上将代理并发减一。

// HandleEvent handles an incoming or outgoing request event and updates
// the state accordingly.
func (s *RequestStats) HandleEvent(event ReqEvent) {
   s.mux.Lock()
   defer s.mux.Unlock()

   s.compute(event.Time)

   switch event.Type {
   case ProxiedIn:
      s.proxiedConcurrency++
      s.proxiedCount++
      fallthrough
   case ReqIn:
      s.requestCount++
      s.concurrency++
   case ProxiedOut:
      s.proxiedConcurrency--
      fallthrough
   case ReqOut:
      s.concurrency--
   }
}

指标更新

指标更新是在main函数中定义的,定时从stats中取出指标,将其更新到http和protobuf的metrics server中。

func main() {
   protoStatReporter := queue.NewProtobufStatsReporter(env.ServingPod, reportingPeriod)

   reportTicker := time.NewTicker(reportingPeriod)
   defer reportTicker.Stop()

   stats := network.NewRequestStats(time.Now())
   go func() {
      for now := range reportTicker.C {
         stat := stats.Report(now)
         promStatReporter.Report(stat)
         protoStatReporter.Report(stat)
      }
   }()
}

对于http metrics server指标更新非常简单,由于这些指标项已经按prometheus要求初始化,只要对这些值进行设置就可以。

// Report captures request metrics.
func (r *PrometheusStatsReporter) Report(stats network.RequestStatsReport) {
   // Requests per second is a rate over time while concurrency is not.
   r.requestsPerSecond.Set(stats.RequestCount / r.reportingPeriodSeconds)
   r.proxiedRequestsPerSecond.Set(stats.ProxiedRequestCount / r.reportingPeriodSeconds)
   r.averageConcurrentRequests.Set(stats.AverageConcurrency)
   r.averageProxiedConcurrentRequests.Set(stats.AverageProxiedConcurrency)
   r.processUptime.Set(time.Since(r.startTime).Seconds())
}

如果你对这部分代码想要详细了解,但是对Prometheus收集指标不太了解的话,你可以在它的client项目获得更多了解。