package grpcc import ( "context" "fmt" "git.makemake.in/kzkzzzz/mycommon/myconf" "git.makemake.in/kzkzzzz/mycommon/mygrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "log" "time" ) type ClientConf struct { useDefaultBufferCfg bool conf *myconf.Config grpcOpts []grpc.DialOption unaryMiddlewares []grpc.UnaryClientInterceptor } type Opt func(*ClientConf) func UseDefaultBufferCfg(v bool) Opt { return func(c *ClientConf) { c.useDefaultBufferCfg = v } } func WithConf(v *myconf.Config) Opt { return func(c *ClientConf) { c.conf = v } } func WithGrpcOpts(v ...grpc.DialOption) Opt { return func(c *ClientConf) { c.grpcOpts = v } } func WithUnaryMiddlewares(v ...grpc.UnaryClientInterceptor) Opt { return func(c *ClientConf) { c.unaryMiddlewares = append(c.unaryMiddlewares, v...) } } func MustNew(grpcUrl string, opts ...Opt) *grpc.ClientConn { client, err := New(grpcUrl, opts...) if err != nil { panic(err) } return client } func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) { log.Printf("new grpc client url: %s", grpcUrl) c := &ClientConf{ useDefaultBufferCfg: true, unaryMiddlewares: []grpc.UnaryClientInterceptor{WrapRequestError()}, // 默认加上错误包装 } for _, opt := range opts { opt(c) } dialOpts := []grpc.DialOption{ grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: time.Second * 20, // 如果没有activity,则每隔N s发送一个ping包 Timeout: time.Second * 5, // 如果ping ack N s之内未返回则认为连接已断开 PermitWithoutStream: true, // 如果没有active的stream,是否允许发送ping }), // 参考 https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing 设置轮训策略 grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // This sets the initial balancing policy. grpc.WithTransportCredentials(insecure.NewCredentials()), } // 使用默认的buffer调整配置 if c.useDefaultBufferCfg { dialOpts = append(dialOpts, grpc.WithInitialWindowSize(mygrpc.DefaultWindowSize), grpc.WithInitialConnWindowSize(mygrpc.DefaultWindowSize), grpc.WithReadBufferSize(mygrpc.DefaultReadBufferSize), grpc.WithWriteBufferSize(mygrpc.DefaultWriteBufferSize), ) } if len(c.unaryMiddlewares) > 0 { dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...)) } if len(c.grpcOpts) > 0 { dialOpts = append(dialOpts, c.grpcOpts...) } conn, err := grpc.NewClient( grpcUrl, dialOpts..., ) if err != nil { return nil, err } return conn, nil } func WrapRequestError() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 提取服务名称 var serviceName string if md, ok := metadata.FromOutgoingContext(ctx); ok { if v := md.Get(mygrpc.HeaderServiceName); len(v) > 0 { serviceName = v[0] } } err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { st, ok := status.FromError(err) if ok && serviceName != "" { sp := st.Proto() sp.Message = fmt.Sprintf("[%s] - %s", serviceName, sp.Message) return status.ErrorProto(sp) } return err } return nil } } // Timeout 客户端超时 func Timeout(timeout time.Duration) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { tCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() err := invoker(tCtx, method, req, reply, cc, opts...) if v, ok := status.FromError(err); ok && v.Code() == codes.DeadlineExceeded { //return status.Errorf(grpcserver.Timeout, "call %s timeout %s", method, timeout) return mygrpc.GrpcClientTimeout("request timeout: %s", timeout) } return err } }