背景
中小型公司很少有自建机房,多使用云商的 Spot 机器,为了保障能分配足够的机器,通常会选多种机型。不同机型因为制造商、代际的差异,相同核心和内存的情况下,性能差异也非常大。
且中小公司通常没有成建制的基础组件开发维护小组,甚至没有在维护的公共服务发现组件,需要在此背景下解决推理服务在不同机型下的性能差异带来的整体 CPU 使用率偏低的问题。
优化前的不同机型的 CPU 使用核数差异如下。

基于 DNS 的服务发现
在原有的代码实现中,并未借助 go gRPC 内置的 DNS Resolver,通过代码定时解析 DNS 的后端地址,并维护节点池,对每个节点调用 grpc.Dial 方法,获取调用 Client,这种实现方式相对繁琐。
在切换到 gRPC 框架的 DNS Resolver 时,发现不能及时的解析新增的节点,这跟框架的代码实现有关。在 DNS 解析成功之后,会阻塞在这里。
// Success resolving, wait for the next ResolveNow. However, also wait 30
// seconds at the very least to prevent constantly re-resolving.
backoffIndex = 1
nextResolutionTime = internal.TimeNowFunc().Add(MinResolutionInterval)
select {
case <-d.ctx.Done():
return
case <-d.rn: // <- 这里
}只有主动调用 ResolveNow 才能触发下次解析。
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
}
}默认的 Resolve 周期非常长。我的解决方案是包装一下默认的 DNSResolver,新增一个定时器,每 N 秒调用一次 ResolverNow。
func (b *xnsResolver) refresh() {
defer b.wg.Done()
ticker := time.NewTicker(ResolvingPeriod)
defer ticker.Stop()
for {
select {
case <-b.ctx.Done():
log.Println("xnsResolver: refresh stopped due to context cancel")
return
case <-ticker.C:
log.Println("xnsResolver: periodic refresh triggered")
b.dnsResolver.ResolveNow(resolver.ResolveNowOptions{})
}
}
}并重新定义 schema 为 xns。
xns://{service-domain}:{port}基于权重的负载均衡
由于没有服务发现组件,短时间内也很难搭建稳定并有人维护,从代码层面来解决这个问题或许会是短平快的方案。
权重维护器
首先需要定义一个权重维护器,来定时刷新节点权重。
type WeightManager struct {
weights sync.Map // map[string]int32
client *http.Client
stopChan chan struct{}
updateLock sync.Mutex
}基于权重的节点选择器
其次,需要基于权重进行节点选择。
type WeightedPicker struct {
nodes []string
weights []int32
cumulative []int32
total int32
manager WeightManagerIf
rand *rand.Rand
mu sync.RWMutex
ctx context.Context
}目前实现的 Pick 逻辑相对简单,计算权重和,获取随机数,通过二分查找得到对应的节点。
func (wb *WeightedPicker) Pick() (string, error) {
wb.mu.RLock()
defer wb.mu.RUnlock()
if len(wb.nodes) == 0 || wb.total == 0 {
return "", fmt.Errorf("no available nodes in the balancer")
}
r := wb.rand.Int31n(wb.total)
idx := sort.Search(len(wb.cumulative), func(i int) bool {
return r < wb.cumulative[i]
})
if idx < len(wb.nodes) {
return wb.nodes[idx], nil
}
randomIdx := int(r) % len(wb.nodes)
return wb.nodes[randomIdx], nil
}基于权重的负载均衡器
最后,需要实现基于权重的负载均衡器。
const (
Name = "x_weighted_random"
)
func init() {
balancer.Register(base.NewBalancerBuilder(
Name,
&pickerBuilder{},
base.Config{HealthCheck: true},
))
}
type pickerBuilder struct {
mu sync.Mutex
prevCancel context.CancelFunc
}服务端实现权重上报接口
由于没有服务发现组件,服务端无法注册自己的权重,客户端不能直接获取机型信息,因此使用折中的方案,服务端自己暴露接口来提供权重信息。接口为 /rpc/meta 。
func fetchWeight(client *http.Client, addr string) (int, error) {
resp, err := client.Get("http://" + addr + weightHTTPPath)
if err != nil {
return defaultWeight, err
}
body, err := io.ReadAll(resp.Body)
defer func() {
_ = resp.Body.Close()
}()
if err != nil {
return defaultWeight, err
}
var result struct {
Weight int `json:"weight"`
}
if err := json.Unmarshal(body, &result); err != nil {
return defaultWeight, err
}
return result.Weight, nil
}这样实现的话,服务端有比较自由的权重计算方案,除了机型,也可以把机器负载、CPU 使用率考虑进去。为此,实现了获取机器状态的 C++ 代码。
使用
为了避免对 xns 的影响,实现了新的 wxns 来区分。
type wxnsResolver struct {
dnsResolver resolver.Resolver
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
cc resolver.ClientConn
lastAddrs []resolver.Address
stateMutex sync.Mutex
}使用也相对简单。
func NewWeightedEchoClient(cfg *ClientConfig) (echo.EchoServiceClient, error) {
opts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"x_weighted_random":{}}]}`),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: cfg.Timeout,
}),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
conn, err := grpc.Dial(cfg.Url, opts...)
if err != nil {
return nil, err
}
return echo.NewEchoServiceClient(conn), nil
}cfg.Url 的格式如下。
wxns://{service-domain}:{port}效果
.png)
效果还是非常符合预期的,可以节省 20% 左右的机器。