佛山制作网站公司,深圳购物网站建设,公司品牌策划设计公司,三星网上商城退款问题描述
项目中遇到一个问题#xff0c;每当有节点变更时#xff0c;整个 gRPC 网络连接会重建
然后我对该问题做了下排查
最后发现是 gRPC Resolver 使用上的一个坑
问题代码
func (r *xxResolver) update(nodes []*registry.Node) {state : resolver.State{Addresses…问题描述
项目中遇到一个问题每当有节点变更时整个 gRPC 网络连接会重建
然后我对该问题做了下排查
最后发现是 gRPC Resolver 使用上的一个坑
问题代码
func (r *xxResolver) update(nodes []*registry.Node) {state : resolver.State{Addresses: make([]resolver.Address, 0, 10),}var grpcNodes []*registry.Node // grpc的节点for _, n : range nodes {if _, ok : n.ServicePorts[grpc]; ok {grpcNodes append(grpcNodes, n)}}for _, n : range grpcNodes {port : n.ServicePorts[grpc]addr : resolver.Address{Addr: fmt.Sprintf(%s:%d, n.Host, port), 问题代码在下面这行 Attributes: attributes.New(registry.NodeKey{}, n, registry.NodeNumber{}, len(grpcNodes)), 问题代码在这行 }state.Addresses append(state.Addresses, addr)}if r.cc ! nil {r.cc.UpdateState(state)}
}这段代码的意思是
xxResolver 是个 gRPC 名字解析器实现 名字解析器就是收集有哪些 ip 列表没接触过 gRPC Resolver 你可以把它看成是类似 DNS 的域名解析过程 nodes 里服务发现维护的节点集合从 nodes 中搜集相关 IP 列表最后更新 gRPC 连接器/平衡器cc的状态
在构建 resolver.Address 时字段 Attributes 添加了 NodeNumber 属性而这个 NodeNumber 值是变化的
发生 bug 的原因
项目基于 gRPC 1.410 版本。该版本本身就存在前后矛盾的 bug 代码
不同处的代码对 resolver.Address 对象的等于操作不一致
具体两处代码分别如下
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {// TODO: handle s.ResolverState.ServiceConfig?if logger.V(2) {logger.Info(base.baseBalancer: got new ClientConn state: , s)}// Successful resolution; clear resolver error and ensure we return nil.b.resolverErr nil// addrsSet is the set converted from addrs, its used for quick lookup of an address.addrsSet : make(map[resolver.Address]struct{})for _, a : range s.ResolverState.Addresses {// Strip attributes from addresses before using them as map keys. So// that when two addresses only differ in attributes pointers (but with// the same attribute content), they are considered the same address.//// Note that this doesnt handle the case where the attribute content is// different. So if users want to set different attributes to create// duplicate connections to the same backend, it doesnt work. This is// fine for now, because duplicate is done by setting Metadata today.//// TODO: read attributes to handle duplicate connections.aNoAttrs : aaNoAttrs.Attributes niladdrsSet[aNoAttrs] struct{}{}if scInfo, ok : b.subConns[aNoAttrs]; !ok {// a is a new address (not existing in b.subConns).//// When creating SubConn, the original address with attributes is// passed through. So that connection configurations in attributes// (like creds) will be used.sc, err : b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})if err ! nil {logger.Warningf(base.baseBalancer: failed to create new SubConn: %v, err)continue}b.subConns[aNoAttrs] subConnInfo{subConn: sc, attrs: a.Attributes}b.scStates[sc] connectivity.Idlesc.Connect()} else {// Always update the subconns address in case the attributes// changed.//// The SubConn does a reflect.DeepEqual of the new and old// addresses. So this is a noop if the current address is the same// as the old one (including attributes).scInfo.attrs a.Attributesb.subConns[aNoAttrs] scInfob.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})}}for a, scInfo : range b.subConns {// a was removed by resolver.if _, ok : addrsSet[a]; !ok {b.cc.RemoveSubConn(scInfo.subConn)delete(b.subConns, a)// Keep the state of this sc in b.scStates until scs state becomes Shutdown.// The entry will be deleted in UpdateSubConnState.}}// If resolver state contains no addresses, return an error so ClientConn// will trigger re-resolve. Also records this as an resolver error, so when// the overall state turns transient failure, the error message will have// the zero address information.if len(s.ResolverState.Addresses) 0 {b.ResolverError(errors.New(produced zero addresses))return balancer.ErrBadResolverState}return nil
}baseBalancer.UpdateClientConnState 比较 Addresses 对象事先处理了aNoAttrs.Attributes nil
即不考虑属性字段内容即只要 ip port 一样就是同个连接
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {ac.mu.Lock()defer ac.mu.Unlock()channelz.Infof(logger, ac.channelzID, addrConn: tryUpdateAddrs curAddr: %v, addrs: %v, ac.curAddr, addrs)if ac.state connectivity.Shutdown ||ac.state connectivity.TransientFailure ||ac.state connectivity.Idle {ac.addrs addrsreturn true}if ac.state connectivity.Connecting {return false}// ac.state is Ready, try to find the connected address.var curAddrFound boolfor _, a : range addrs {if reflect.DeepEqual(ac.curAddr, a) {curAddrFound truebreak}}channelz.Infof(logger, ac.channelzID, addrConn: tryUpdateAddrs curAddrFound: %v, curAddrFound)if curAddrFound {ac.addrs addrs}return curAddrFound
}addrConn.tryUpdateAddrs 内用的是 DeepEqual 这里 Addresses 字段又是起作用的。
导致连接先关闭后重建
问题解决
基于 gRPC 1.410 代码中的相互矛盾点然后看了最新的代码
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
//
// This method compares all fields of the address. When used to tell apart
// addresses during subchannel creation or connection establishment, it might be
// more appropriate for the caller to implement custom equality logic.
func (a Address) Equal(o Address) bool {return a.Addr o.Addr a.ServerName o.ServerName a.Attributes.Equal(o.Attributes) a.BalancerAttributes.Equal(o.BalancerAttributes) a.Metadata o.Metadata
}加了 Address.Equal 统一了 Address 等号操作的定义说明官方也发现了这个问题 |
但是很不幸Attributes 字段也必须相等才算地址相等
因此项目中的代码还是写错的