156 lines
4.0 KiB
Go
156 lines
4.0 KiB
Go
package grpcc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"git.makemake.in/kzkzzzz/mycommon/myconf"
|
|
"git.makemake.in/kzkzzzz/mycommon/mygrpc"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
"log"
|
|
"time"
|
|
)
|
|
|
|
type ClientConf struct {
|
|
useDefaultBufferCfg bool
|
|
conf *myconf.Config
|
|
grpcOpts []grpc.DialOption
|
|
unaryMiddlewares []grpc.UnaryClientInterceptor
|
|
}
|
|
|
|
type Opt func(*ClientConf)
|
|
|
|
func UseDefaultBufferCfg(v bool) Opt {
|
|
return func(c *ClientConf) {
|
|
c.useDefaultBufferCfg = v
|
|
}
|
|
}
|
|
|
|
func WithConf(v *myconf.Config) Opt {
|
|
return func(c *ClientConf) {
|
|
c.conf = v
|
|
}
|
|
}
|
|
|
|
func WithGrpcOpts(v ...grpc.DialOption) Opt {
|
|
return func(c *ClientConf) {
|
|
c.grpcOpts = v
|
|
}
|
|
}
|
|
|
|
func WithUnaryMiddlewares(v ...grpc.UnaryClientInterceptor) Opt {
|
|
return func(c *ClientConf) {
|
|
c.unaryMiddlewares = append(c.unaryMiddlewares, v...)
|
|
}
|
|
}
|
|
|
|
func MustNew(grpcUrl string, opts ...Opt) *grpc.ClientConn {
|
|
client, err := New(grpcUrl, opts...)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return client
|
|
}
|
|
|
|
func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) {
|
|
log.Printf("new grpc client url: %s", grpcUrl)
|
|
|
|
c := &ClientConf{
|
|
useDefaultBufferCfg: true,
|
|
unaryMiddlewares: []grpc.UnaryClientInterceptor{WrapRequestError()}, // 默认加上错误包装
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
|
|
dialOpts := []grpc.DialOption{
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: time.Second * 20, // 如果没有activity,则每隔N s发送一个ping包
|
|
Timeout: time.Second * 5, // 如果ping ack N s之内未返回则认为连接已断开
|
|
PermitWithoutStream: true, // 如果没有active的stream,是否允许发送ping
|
|
}),
|
|
// 参考 https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing 设置轮训策略
|
|
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // This sets the initial balancing policy.
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
}
|
|
|
|
// 使用默认的buffer调整配置
|
|
if c.useDefaultBufferCfg {
|
|
dialOpts = append(dialOpts,
|
|
grpc.WithInitialWindowSize(mygrpc.DefaultWindowSize),
|
|
grpc.WithInitialConnWindowSize(mygrpc.DefaultWindowSize),
|
|
|
|
grpc.WithReadBufferSize(mygrpc.DefaultReadBufferSize),
|
|
grpc.WithWriteBufferSize(mygrpc.DefaultWriteBufferSize),
|
|
)
|
|
}
|
|
|
|
if len(c.unaryMiddlewares) > 0 {
|
|
dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...))
|
|
}
|
|
|
|
if len(c.grpcOpts) > 0 {
|
|
dialOpts = append(dialOpts, c.grpcOpts...)
|
|
}
|
|
|
|
conn, err := grpc.NewClient(
|
|
grpcUrl,
|
|
dialOpts...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func WrapRequestError() grpc.UnaryClientInterceptor {
|
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
// 提取服务名称
|
|
var serviceName string
|
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
|
if v := md.Get(mygrpc.HeaderServiceName); len(v) > 0 {
|
|
serviceName = v[0]
|
|
}
|
|
}
|
|
|
|
err := invoker(ctx, method, req, reply, cc, opts...)
|
|
if err != nil {
|
|
st, ok := status.FromError(err)
|
|
if ok && serviceName != "" {
|
|
sp := st.Proto()
|
|
sp.Message = fmt.Sprintf("[%s] - %s", serviceName, sp.Message)
|
|
|
|
return status.ErrorProto(sp)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Timeout 客户端超时
|
|
func Timeout(timeout time.Duration) grpc.UnaryClientInterceptor {
|
|
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
tCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
err := invoker(tCtx, method, req, reply, cc, opts...)
|
|
|
|
if v, ok := status.FromError(err); ok && v.Code() == codes.DeadlineExceeded {
|
|
//return status.Errorf(grpcserver.Timeout, "call %s timeout %s", method, timeout)
|
|
return mygrpc.GrpcClientTimeout("request timeout: %s", timeout)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
}
|