This commit is contained in:
kzkzzzz
2025-03-23 20:29:29 +08:00
parent 7e0bf82418
commit 9fd0eaadb8
14 changed files with 661 additions and 78 deletions

View File

@@ -15,6 +15,10 @@ const (
DefaultWindowSize = 16 * 1024 * 1024
)
const (
ServicePrefix = "grpc@"
)
const (
HeaderClientIP = "grpc-client-ip"
HeaderServiceName = "grpc-service-name"

View File

@@ -75,7 +75,7 @@ func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) {
PermitWithoutStream: true, // 如果没有active的stream,是否允许发送ping
}),
// 参考 https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing 设置轮训策略
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy.
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // This sets the initial balancing policy.
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
@@ -87,13 +87,11 @@ func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) {
grpc.WithReadBufferSize(mygrpc.DefaultReadBufferSize),
grpc.WithWriteBufferSize(mygrpc.DefaultWriteBufferSize),
grpc.WithUnaryInterceptor(WrapRequestError()),
)
}
if len(c.unaryMiddlewares) > 0 {
grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...)
dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...))
}
if len(c.grpcOpts) > 0 {
@@ -123,12 +121,15 @@ func WrapRequestError() grpc.UnaryClientInterceptor {
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
st, ok := status.FromError(err)
if ok && serviceName != "" {
sp := st.Proto()
sp.Message = fmt.Sprintf("[%s] - %s", serviceName, sp.Message)
if serviceName != "" {
return fmt.Errorf("request grpc err: [%s - %s] %s", serviceName, method, err)
return status.ErrorProto(sp)
}
return fmt.Errorf("request grpc err: [%s] %s", method, err)
return err
}
return nil

View File

@@ -63,7 +63,7 @@ func UseDefaultBufferCfg(v bool) Opt {
func WithRegistry(serviceName string, reg myregistry.IRegister) Opt {
return func(server *Server) {
server.serviceName = serviceName
server.serviceName = mygrpc.ServicePrefix + serviceName
server.reg = reg
}
}
@@ -81,7 +81,7 @@ func WithDelayStopMs(v int) Opt {
}
func SetFlag() {
pflag.Int("grpc.port", 18082, "listen port")
pflag.Int("grpc.port", 0, "listen port, 0 is random port")
pflag.String("grpc.log", "true", "enable request log")
}
@@ -109,7 +109,7 @@ func NewByConf(conf *Conf, opts ...Opt) *Server {
}
if s.logger == nil {
s.logger = mylog.GetLogger()
s.logger = mylog.GetLoggerSkip(-1)
}
if s.reg != nil && s.serviceName == "" {
@@ -142,7 +142,7 @@ func (s *Server) initServer() {
PermitWithoutStream: true, // 即使没有 active stream, 也允许 ping
}),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: time.Hour * 2, // 空闲连接时间
MaxConnectionIdle: time.Minute * 15, // 空闲连接时间
MaxConnectionAgeGrace: time.Second * 30, // 在强制关闭连接之间, 允许有 N 的时间完成 pending 的 rpc 请求
Time: time.Second * 20, // 如果一个连接空闲超过 N, 则发送一个 ping 请求
Timeout: time.Second * 5, // 如果 ping 请求 N 内未收到回复, 则认为该连接已断开
@@ -210,6 +210,12 @@ func (s *Server) Run(ctx context.Context) error {
if err != nil {
return err
}
log.Printf("[%s] register service: %s - %s:%d",
s.reg.Name(),
s.serviceRegInfo.ServiceName,
s.serviceRegInfo.Ip, s.serviceRegInfo.Port,
)
}
addr := fmt.Sprintf("%s:%d", s.serverConf.Addr, port)
@@ -230,6 +236,13 @@ func (s *Server) Stop() {
if err != nil {
s.logger.Errorf("grpc server deregister err: %s", err)
}
log.Printf("[%s] deregister service: %s - %s:%d",
s.reg.Name(),
s.serviceRegInfo.ServiceName,
s.serviceRegInfo.Ip, s.serviceRegInfo.Port,
)
}
// 如果使用k8s service, 关闭pod和往service注销ip是同时进行的, 如果退出服务比注销ip先完成, 可能有流量继续进来, 导致请求失败
@@ -270,7 +283,7 @@ func (s *Server) requestLog() grpc.UnaryServerInterceptor {
code = status.New(codes.Unknown, err.Error()).Code()
}
codeMsg = fmt.Sprintf("Error Code: %s(%d)", code.String(), uint32(code))
codeMsg = fmt.Sprintf("%s(%d)", code.String(), uint32(code))
}
s.logger.Infof(
@@ -287,7 +300,7 @@ func (s *Server) grpcRecover() grpc.UnaryServerInterceptor {
defer func() {
if err0 := recover(); err0 != nil {
log.Printf("%s - panic: %v\n%s", info.FullMethod, err0, debug.Stack())
err = fmt.Errorf("server err: %s - system err: %s", info.FullMethod, err0)
err = fmt.Errorf("server err %s - %s", info.FullMethod, err0)
}
}()

View File

@@ -0,0 +1,80 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
// installed as one of the default balancers in gRPC, users don't need to
// explicitly install this balancer.
package random
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"log"
"math/rand"
)
// Name is the name of round_robin balancer.
const Name = "my_random_robin"
var logger = grpclog.Component("myrandomrobin")
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &randomPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {
balancer.Register(newBuilder())
}
type randomPickerBuilder struct{}
type subConnInfo struct {
conn balancer.SubConn
connInfo base.SubConnInfo
}
func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("myrandomrobin Picker: Build called with info: %v", info)
log.Printf("myrandomrobin Picker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
scs := make([]*subConnInfo, 0, len(info.ReadySCs))
for sc, scInfo := range info.ReadySCs {
scs = append(scs, &subConnInfo{
conn: sc,
connInfo: scInfo,
})
}
return &randomPicker{
subConns: scs,
}
}
type randomPicker struct {
subConns []*subConnInfo
}
func (p *randomPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
sc := p.subConns[rand.Intn(len(p.subConns))]
log.Printf("randomPicker Pick: SubConn: %s", sc.connInfo.Address.String())
return balancer.PickResult{SubConn: sc.conn}, nil
}