Wii
Wii
发布于 2025-08-29 / 12 阅读
0
0

性能优化系列(二):基于机型权重的负载均衡策略

背景

中小型公司很少有自建机房,多使用云商的 Spot 机器,为了保障能分配足够的机器,通常会选多种机型。不同机型因为制造商、代际的差异,相同核心和内存的情况下,性能差异也非常大。

且中小公司通常没有成建制的基础组件开发维护小组,甚至没有在维护的公共服务发现组件,需要在此背景下解决推理服务在不同机型下的性能差异带来的整体 CPU 使用率偏低的问题。

优化前的不同机型的 CPU 使用核数差异如下。

基于 DNS 的服务发现

在原有的代码实现中,并未借助 go gRPC 内置的 DNS Resolver,通过代码定时解析 DNS 的后端地址,并维护节点池,对每个节点调用 grpc.Dial 方法,获取调用 Client,这种实现方式相对繁琐。

在切换到 gRPC 框架的 DNS Resolver 时,发现不能及时的解析新增的节点,这跟框架的代码实现有关。在 DNS 解析成功之后,会阻塞在这里

			// Success resolving, wait for the next ResolveNow. However, also wait 30
			// seconds at the very least to prevent constantly re-resolving.
			backoffIndex = 1
			nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
			select {
			case <-d.ctx.Done():
				return
			case <-d.rn:  // <- 这里
			}

只有主动调用 ResolveNow 才能触发下次解析。

func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
	select {
	case d.rn <- struct{}{}:
	default:
	}
}

默认的 Resolve 周期非常长。我的解决方案是包装一下默认的 DNSResolver,新增一个定时器,每 N 秒调用一次 ResolverNow

func (b *xnsResolver) refresh() {
	defer b.wg.Done()
	ticker := time.NewTicker(ResolvingPeriod)
	defer ticker.Stop()

	for {
		select {
		case <-b.ctx.Done():
			log.Println("xnsResolver: refresh stopped due to context cancel")
			return
		case <-ticker.C:
			log.Println("xnsResolver: periodic refresh triggered")
			b.dnsResolver.ResolveNow(resolver.ResolveNowOptions{})
		}
	}
}

并重新定义 schema 为 xns。

xns://{service-domain}:{port}

基于权重的负载均衡

由于没有服务发现组件,短时间内也很难搭建稳定并有人维护,从代码层面来解决这个问题或许会是短平快的方案。

权重维护器

首先需要定义一个权重维护器,来定时刷新节点权重。

type WeightManager struct {
	weights    sync.Map // map[string]int32
	client     *http.Client
	stopChan   chan struct{}
	updateLock sync.Mutex
}

基于权重的节点选择器

其次,需要基于权重进行节点选择

type WeightedPicker struct {
	nodes      []string
	weights    []int32
	cumulative []int32
	total      int32
	manager    WeightManagerIf
	rand       *rand.Rand
	mu         sync.RWMutex
	ctx        context.Context
}

目前实现的 Pick 逻辑相对简单,计算权重和,获取随机数,通过二分查找得到对应的节点。

func (wb *WeightedPicker) Pick() (string, error) {
	wb.mu.RLock()
	defer wb.mu.RUnlock()

	if len(wb.nodes) == 0 || wb.total == 0 {
		return "", fmt.Errorf("no available nodes in the balancer")
	}

	r := wb.rand.Int31n(wb.total)
	idx := sort.Search(len(wb.cumulative), func(i int) bool {
		return r < wb.cumulative[i]
	})
	if idx < len(wb.nodes) {
		return wb.nodes[idx], nil
	}
	randomIdx := int(r) % len(wb.nodes)
	return wb.nodes[randomIdx], nil
}

基于权重的负载均衡器

最后,需要实现基于权重的负载均衡器

const (
	Name = "x_weighted_random"
)

func init() {
	balancer.Register(base.NewBalancerBuilder(
		Name,
		&pickerBuilder{},
		base.Config{HealthCheck: true},
	))
}

type pickerBuilder struct {
	mu         sync.Mutex
	prevCancel context.CancelFunc
}

服务端实现权重上报接口

由于没有服务发现组件,服务端无法注册自己的权重,客户端不能直接获取机型信息,因此使用折中的方案,服务端自己暴露接口来提供权重信息。接口为 /rpc/meta

func fetchWeight(client *http.Client, addr string) (int, error) {
	resp, err := client.Get("http://" + addr + weightHTTPPath)
	if err != nil {
		return defaultWeight, err
	}

	body, err := io.ReadAll(resp.Body)
	defer func() {
		_ = resp.Body.Close()
	}()
	if err != nil {
		return defaultWeight, err
	}

	var result struct {
		Weight int `json:"weight"`
	}

	if err := json.Unmarshal(body, &result); err != nil {
		return defaultWeight, err
	}

	return result.Weight, nil
}

这样实现的话,服务端有比较自由的权重计算方案,除了机型,也可以把机器负载、CPU 使用率考虑进去。为此,实现了获取机器状态的 C++ 代码

使用

为了避免对 xns 的影响,实现了新的 wxns 来区分。

type wxnsResolver struct {
	dnsResolver resolver.Resolver
	ctx         context.Context
	cancel      context.CancelFunc
	wg          sync.WaitGroup

	cc         resolver.ClientConn
	lastAddrs  []resolver.Address
	stateMutex sync.Mutex
}

使用也相对简单。

func NewWeightedEchoClient(cfg *ClientConfig) (echo.EchoServiceClient, error) {
	opts := []grpc.DialOption{
		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"x_weighted_random":{}}]}`),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:    30 * time.Second,
			Timeout: cfg.Timeout,
		}),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	}
	conn, err := grpc.Dial(cfg.Url, opts...)
	if err != nil {
		return nil, err
	}
	return echo.NewEchoServiceClient(conn), nil
}

cfg.Url 的格式如下。

wxns://{service-domain}:{port}

效果

效果还是非常符合预期的,可以节省 20% 左右的机器。


评论