From 7e0bf824182e4164cd68bfd9d0ac57c9c1f50a6f Mon Sep 17 00:00:00 2001 From: kzkzzzz Date: Sat, 22 Mar 2025 01:24:57 +0800 Subject: [PATCH 1/2] update --- .DS_Store | Bin 0 -> 6148 bytes common.go | 64 +++ go.mod | 88 +++-- go.sum | 742 +++++++++++++---------------------- graceful/graeceful.go | 79 ++++ myconf/conf.go | 389 ++++++++++++++++-- myconf/test.yaml | 8 - mygrpc/error.go | 19 + mygrpc/grpc.go | 41 ++ mygrpc/grpcc/client.go | 154 ++++++++ mygrpc/grpcsr/server.go | 331 ++++++++++++++++ mylog/log.go | 72 +--- mymysql/mysql.go | 281 +++++++------ mymysql/mysql_test.go | 33 -- mymysql/option.go | 17 + myredis/option.go | 19 + myredis/redis.go | 369 ++++++++++------- myredis/redis_test.go | 40 -- myregistry/consul/builder.go | 189 +++++++++ myregistry/consul/consul.go | 153 ++++++++ myregistry/consul/target.go | 101 +++++ myregistry/reigster.go | 21 + 22 files changed, 2291 insertions(+), 919 deletions(-) create mode 100644 .DS_Store create mode 100644 graceful/graeceful.go delete mode 100644 myconf/test.yaml create mode 100644 mygrpc/error.go create mode 100644 mygrpc/grpc.go create mode 100644 mygrpc/grpcc/client.go create mode 100644 mygrpc/grpcsr/server.go delete mode 100644 mymysql/mysql_test.go create mode 100644 mymysql/option.go create mode 100644 myredis/option.go delete mode 100644 myredis/redis_test.go create mode 100644 myregistry/consul/builder.go create mode 100644 myregistry/consul/consul.go create mode 100644 myregistry/consul/target.go create mode 100644 myregistry/reigster.go diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..31513669f34176c1e2d97fbcdae44f88d29d41d4 GIT binary patch literal 6148 zcmeHKyH3L}6umA10!UE7KnEnUvLL!3u>}-X20AdXKnWD2gp~4-*zyU8m4OdIto;Nw z{)KaGE3umdHiVF`WFN=(*gidZxF#a8cOQ3&+C=1|FnaT-YJ&4zD$&wCYe2zsB!h16 zEK7$Krt5GHI0ycs1N`oKv`GWHqC4yT72D@#F3qB-pJaXX`AY}OtPk8bIP z5?jOKYn5T@cO_j&MC&zdTZ-x9~XKGhNSEz=Q z(1#D9R~Gt&BIN3rKag}1p+Z+X2b=@815@g<&HMj;^ZDN% 0 { + return v[0] + } + } + + p, ok := peer.FromContext(ctx) + if ok { + switch v := p.Addr.(type) { + case *net.TCPAddr: + return v.IP.String() + } + } + + return "" +} diff --git a/mygrpc/grpcc/client.go b/mygrpc/grpcc/client.go new file mode 100644 index 0000000..aadee83 --- /dev/null +++ b/mygrpc/grpcc/client.go @@ -0,0 +1,154 @@ +package grpcc + +import ( + "context" + "fmt" + "git.makemake.in/kzkzzzz/mycommon/myconf" + "git.makemake.in/kzkzzzz/mycommon/mygrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "log" + "time" +) + +type ClientConf struct { + useDefaultBufferCfg bool + conf *myconf.Config + grpcOpts []grpc.DialOption + unaryMiddlewares []grpc.UnaryClientInterceptor +} + +type Opt func(*ClientConf) + +func UseDefaultBufferCfg(v bool) Opt { + return func(c *ClientConf) { + c.useDefaultBufferCfg = v + } +} + +func WithConf(v *myconf.Config) Opt { + return func(c *ClientConf) { + c.conf = v + } +} + +func WithGrpcOpts(v ...grpc.DialOption) Opt { + return func(c *ClientConf) { + c.grpcOpts = v + } +} + +func WithUnaryMiddlewares(v ...grpc.UnaryClientInterceptor) Opt { + return func(c *ClientConf) { + c.unaryMiddlewares = append(c.unaryMiddlewares, v...) + } +} + +func MustNew(grpcUrl string, opts ...Opt) *grpc.ClientConn { + client, err := New(grpcUrl, opts...) + if err != nil { + panic(err) + } + return client +} + +func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) { + log.Printf("new grpc client url: %s", grpcUrl) + + c := &ClientConf{ + useDefaultBufferCfg: true, + unaryMiddlewares: []grpc.UnaryClientInterceptor{WrapRequestError()}, // 默认加上错误包装 + } + + for _, opt := range opts { + opt(c) + } + + dialOpts := []grpc.DialOption{ + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 20, // 如果没有activity,则每隔N s发送一个ping包 + Timeout: time.Second * 5, // 如果ping ack N s之内未返回则认为连接已断开 + PermitWithoutStream: true, // 如果没有active的stream,是否允许发送ping + }), + // 参考 https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing 设置轮训策略 + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy. + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + + // 使用默认的buffer调整配置 + if c.useDefaultBufferCfg { + dialOpts = append(dialOpts, + grpc.WithInitialWindowSize(mygrpc.DefaultWindowSize), + grpc.WithInitialConnWindowSize(mygrpc.DefaultWindowSize), + + grpc.WithReadBufferSize(mygrpc.DefaultReadBufferSize), + grpc.WithWriteBufferSize(mygrpc.DefaultWriteBufferSize), + + grpc.WithUnaryInterceptor(WrapRequestError()), + ) + } + + if len(c.unaryMiddlewares) > 0 { + grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...) + } + + if len(c.grpcOpts) > 0 { + dialOpts = append(dialOpts, c.grpcOpts...) + } + + conn, err := grpc.NewClient( + grpcUrl, + dialOpts..., + ) + if err != nil { + return nil, err + } + + return conn, nil +} + +func WrapRequestError() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + // 提取服务名称 + var serviceName string + if md, ok := metadata.FromOutgoingContext(ctx); ok { + if v := md.Get(mygrpc.HeaderServiceName); len(v) > 0 { + serviceName = v[0] + } + } + + err := invoker(ctx, method, req, reply, cc, opts...) + if err != nil { + + if serviceName != "" { + return fmt.Errorf("request grpc err: [%s - %s] %s", serviceName, method, err) + } + + return fmt.Errorf("request grpc err: [%s] %s", method, err) + } + + return nil + } +} + +// Timeout 客户端超时 +func Timeout(timeout time.Duration) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + tCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + err := invoker(tCtx, method, req, reply, cc, opts...) + + if v, ok := status.FromError(err); ok && v.Code() == codes.DeadlineExceeded { + //return status.Errorf(grpcserver.Timeout, "call %s timeout %s", method, timeout) + return mygrpc.GrpcClientTimeout("request timeout: %s", timeout) + } + + return err + } + +} diff --git a/mygrpc/grpcsr/server.go b/mygrpc/grpcsr/server.go new file mode 100644 index 0000000..13441ee --- /dev/null +++ b/mygrpc/grpcsr/server.go @@ -0,0 +1,331 @@ +package grpcsr + +import ( + "context" + "fmt" + "git.makemake.in/kzkzzzz/mycommon" + "git.makemake.in/kzkzzzz/mycommon/graceful" + "git.makemake.in/kzkzzzz/mycommon/myconf" + "git.makemake.in/kzkzzzz/mycommon/mygrpc" + "git.makemake.in/kzkzzzz/mycommon/mylog" + "git.makemake.in/kzkzzzz/mycommon/myregistry" + "github.com/spf13/pflag" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/status" + "log" + "net" + + "runtime/debug" + "time" +) + +//const DefaultInstanceName = "grpc" + +var _ graceful.IRunner = (*Server)(nil) + +type Conf struct { + Addr string + Port int + Ip string + Log bool +} + +type Opt func(server *Server) + +type Server struct { + gs *grpc.Server + serviceId string + serviceName string + serverConf *Conf + reg myregistry.IRegister + grpcOpts []grpc.ServerOption + logger mylog.ILogger + registerGrpcFn func(*grpc.Server) // 注册grpc服务, 使用函数延迟调用, 便于先初始化中间件等操作 + + unaryMiddlewares []grpc.UnaryServerInterceptor // grpc一元服务端中间件 + + useDefaultBufferCfg bool + delayStopMs int + + serviceRegInfo *myregistry.ServiceInfo +} + +func UseDefaultBufferCfg(v bool) Opt { + return func(server *Server) { + server.useDefaultBufferCfg = v + } +} + +func WithRegistry(serviceName string, reg myregistry.IRegister) Opt { + return func(server *Server) { + server.serviceName = serviceName + server.reg = reg + } +} + +func WithGrpcOpts(v ...grpc.ServerOption) Opt { + return func(server *Server) { + server.grpcOpts = v + } +} + +func WithDelayStopMs(v int) Opt { + return func(server *Server) { + server.delayStopMs = v + } +} + +func SetFlag() { + pflag.Int("grpc.port", 18082, "listen port") + pflag.String("grpc.log", "true", "enable request log") +} + +func New(cfg *myconf.Config, opts ...Opt) *Server { + cf := &Conf{} + err := cfg.UnmarshalKey("grpc", cf) + if err != nil { + panic(err) + } + + // 命令行的参数覆盖一次, Unmarshal解析的时候, 不会用命令行的参数覆盖 https://github.com/spf13/viper/issues/190 + cf.Port = cfg.GetInt(fmt.Sprintf("grpc.port")) + cf.Log = cfg.GetBool(fmt.Sprintf("grpc.log")) + + return NewByConf(cf, opts...) +} + +func NewByConf(conf *Conf, opts ...Opt) *Server { + s := &Server{ + serverConf: conf, + useDefaultBufferCfg: true, + } + for _, opt := range opts { + opt(s) + } + + if s.logger == nil { + s.logger = mylog.GetLogger() + } + + if s.reg != nil && s.serviceName == "" { + panic("service name is empty") + } + + s.unaryMiddlewares = []grpc.UnaryServerInterceptor{ + s.grpcRecover(), // 默认启用recover中间件 + } + + if s.serverConf.Log { + s.unaryMiddlewares = append(s.unaryMiddlewares, s.requestLog()) + } + + return s +} + +func (s *Server) Use(middlewares ...grpc.UnaryServerInterceptor) { + s.unaryMiddlewares = append(s.unaryMiddlewares, middlewares...) +} + +func (s *Server) RegisterGrpc(fn func(*grpc.Server)) { + s.registerGrpcFn = fn +} + +func (s *Server) initServer() { + grpcOpts := []grpc.ServerOption{ + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: time.Second * 5, // 如果客户端两次 ping 的间隔小于 N,则关闭连接 + PermitWithoutStream: true, // 即使没有 active stream, 也允许 ping + }), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: time.Hour * 2, // 空闲连接时间 + MaxConnectionAgeGrace: time.Second * 30, // 在强制关闭连接之间, 允许有 N 的时间完成 pending 的 rpc 请求 + Time: time.Second * 20, // 如果一个连接空闲超过 N, 则发送一个 ping 请求 + Timeout: time.Second * 5, // 如果 ping 请求 N 内未收到回复, 则认为该连接已断开 + }), + } + + if s.useDefaultBufferCfg { + grpcOpts = append(grpcOpts, + grpc.InitialWindowSize(mygrpc.DefaultWindowSize), + grpc.InitialConnWindowSize(mygrpc.DefaultWindowSize), + + grpc.ReadBufferSize(mygrpc.DefaultReadBufferSize), + grpc.WriteBufferSize(mygrpc.DefaultWriteBufferSize), + ) + } + + if len(s.unaryMiddlewares) > 0 { + grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(s.unaryMiddlewares...)) + } + + if len(s.grpcOpts) > 0 { + grpcOpts = append(grpcOpts, s.grpcOpts...) + } + + s.gs = grpc.NewServer(grpcOpts...) + + // 注册grpc服务 + s.registerGrpcFn(s.gs) +} + +func (s *Server) Run(ctx context.Context) error { + s.initServer() + + // 端口如果=0, 监听随机端口 + addr0 := fmt.Sprintf("%s:%d", s.serverConf.Addr, s.serverConf.Port) + lis, err := net.Listen("tcp", addr0) + if err != nil { + return err + } + + // 获取监听的端口 + port := lis.Addr().(*net.TCPAddr).Port + + // 健康服务 + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(s.gs, healthServer) + + // 服务反射, 方便调试 + reflection.Register(s.gs) + + var svcIp = s.serverConf.Ip + if svcIp == "" { + svcIp = mycommon.GetOutboundIP() + } + + // 注册服务 + if s.reg != nil { + s.serviceRegInfo = &myregistry.ServiceInfo{ + ServiceName: s.serviceName, + Ip: svcIp, + Port: port, + } + + err = s.reg.Register(s.serviceRegInfo) + if err != nil { + return err + } + } + + addr := fmt.Sprintf("%s:%d", s.serverConf.Addr, port) + log.Printf("grpc server listen on %s", addr) + + err = s.gs.Serve(lis) + if err != nil { + log.Printf("start grpc server err: %s", err) + return err + } + + return nil +} + +func (s *Server) Stop() { + if s.reg != nil { + err := s.reg.Deregister(s.serviceRegInfo) + if err != nil { + s.logger.Errorf("grpc server deregister err: %s", err) + } + } + + // 如果使用k8s service, 关闭pod和往service注销ip是同时进行的, 如果退出服务比注销ip先完成, 可能有流量继续进来, 导致请求失败 + // 延迟一段时间, 确保服务已经注销ip, 再关闭服务 + + // 如何使用注册中心, 先从中心退出ip, 也延迟一段时间, 等上游网关更新ip完成(正常不会太久), 不会有流量进来旧服务, 再退出服务 + if s.delayStopMs > 0 { + delayTime := time.Millisecond * time.Duration(s.delayStopMs) + log.Printf("grpc server delay stop: %s", delayTime) + time.Sleep(delayTime) + } + + s.gs.GracefulStop() + + log.Printf("grpc server stop") +} + +type handleResp struct { + resp interface{} + err error +} + +func (s *Server) requestLog() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + start := time.Now() + + resp, err := handler(ctx, req) + + var ( + code codes.Code + codeMsg = "OK" + ) + if err != nil { + fromError, ok := status.FromError(err) + if ok { + code = fromError.Code() + } else { + code = status.New(codes.Unknown, err.Error()).Code() + } + + codeMsg = fmt.Sprintf("Error Code: %s(%d)", code.String(), uint32(code)) + } + + s.logger.Infof( + "%s - %s - %s - %s", + codeMsg, time.Since(start), mygrpc.ClientIP(ctx), info.FullMethod, + ) + + return resp, err + } +} + +func (s *Server) grpcRecover() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + defer func() { + if err0 := recover(); err0 != nil { + log.Printf("%s - panic: %v\n%s", info.FullMethod, err0, debug.Stack()) + err = fmt.Errorf("server err: %s - system err: %s", info.FullMethod, err0) + } + }() + + return handler(ctx, req) + } +} + +// Timeout 控制服务端超时 +func Timeout(timeout time.Duration) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + tCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ch := make(chan *handleResp, 1) + + go func() { + defer func() { + if err0 := recover(); err0 != nil { + log.Printf("%s - panic: %v\n%s", info.FullMethod, err0, debug.Stack()) + ch <- &handleResp{nil, fmt.Errorf("server err: %s - system err: %s", info.FullMethod, err0)} + } + }() + //start := time.Now() + r := &handleResp{} + r.resp, r.err = handler(tCtx, req) + //log.Printf("rta server time: %s", time.Since(start)) + ch <- r + }() + + select { + case <-tCtx.Done(): + return nil, mygrpc.GrpcServerTimeout("server err: grpc handle timeout %s %s", timeout, info.FullMethod) + //return nil, status.Errorf(codes.DeadlineExceeded, "grpc handle timeout %s %s", timeout, info.FullMethod) + + case res := <-ch: + return res.resp, res.err + } + + //return nil, fmt.Errorf("handle err %s.%s", info.Server, info.FullMethod) + } +} diff --git a/mylog/log.go b/mylog/log.go index 27a52df..0d7d081 100644 --- a/mylog/log.go +++ b/mylog/log.go @@ -3,13 +3,18 @@ package mylog import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" - "gopkg.in/natefinch/lumberjack.v2" "io" "os" - "path/filepath" "strings" ) +type ILogger interface { + Debugf(format string, v ...any) + Infof(format string, v ...any) + Warnf(format string, v ...any) + Errorf(format string, v ...any) +} + const ( DebugLevel = "DEBUG" InfoLevel = "INFO" @@ -36,13 +41,7 @@ var ( ConsoleWriter: os.Stdout, } - DefaultLogFile = &LogFile{ - LogFilePath: "logs", - MaxSize: 200, - MaxAge: 0, - MaxBackups: 0, - } - globalLog = NewLogger("debug", DefaultConfig) + globalLog = NewLogger(DefaultConfig) ) type ( @@ -71,7 +70,7 @@ func SetLogLevel(level string) { } func Init() { - globalLog = NewLogger("app", &Config{ + globalLog = NewLogger(&Config{ Level: defaultLogLevel, NeedLogFile: false, ConsoleWriter: os.Stdout, @@ -79,11 +78,11 @@ func Init() { } // InitWithConfig 覆盖默认日志 -func InitWithConfig(serverName string, config *Config) { - globalLog = NewLogger(serverName, config) +func InitWithConfig(config *Config) { + globalLog = NewLogger(config) } -func NewLogger(serverName string, config *Config) *ZapLog { +func NewLogger(config *Config) *ZapLog { if config == nil { config = DefaultConfig } @@ -96,29 +95,15 @@ func NewLogger(serverName string, config *Config) *ZapLog { cores := make([]zapcore.Core, 0) // 使用控制台输出 - if config.ConsoleWriter != nil { - cfg := zap.NewProductionEncoderConfig() - cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder - cfg.ConsoleSeparator = " | " - // 指定日志时间格式 - cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000") - cfg.EncodeCaller = zapcore.ShortCallerEncoder - encoder := zapcore.NewConsoleEncoder(cfg) - core := zapcore.NewCore(encoder, zapcore.AddSync(config.ConsoleWriter), level) - cores = append(cores, core) - } - - if config.NeedLogFile { - cfg := zap.NewProductionEncoderConfig() - cfg.EncodeLevel = zapcore.CapitalLevelEncoder - cfg.ConsoleSeparator = " | " - // 指定日志时间格式 - cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000") - cfg.EncodeCaller = zapcore.ShortCallerEncoder - encoder := zapcore.NewConsoleEncoder(cfg) - core := zapcore.NewCore(encoder, zapcore.AddSync(getRollingFileWriter(serverName, config)), level) - cores = append(cores, core) - } + cfg := zap.NewProductionEncoderConfig() + cfg.EncodeLevel = zapcore.CapitalColorLevelEncoder + cfg.ConsoleSeparator = " | " + // 指定日志时间格式 + cfg.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000") + cfg.EncodeCaller = zapcore.ShortCallerEncoder + encoder := zapcore.NewConsoleEncoder(cfg) + core := zapcore.NewCore(encoder, zapcore.AddSync(config.ConsoleWriter), level) + cores = append(cores, core) opts := make([]zap.Option, 0) if config.ZapOpt != nil { @@ -134,21 +119,6 @@ func NewLogger(serverName string, config *Config) *ZapLog { } } -func getRollingFileWriter(serverName string, config *Config) *lumberjack.Logger { - if config.LogFile == nil { - config.LogFile = DefaultLogFile - } - - return &lumberjack.Logger{ - Filename: filepath.Join(config.LogFile.LogFilePath, serverName+".log"), - MaxSize: config.LogFile.MaxSize, - MaxAge: config.LogFile.MaxAge, - MaxBackups: config.LogFile.MaxBackups, - LocalTime: true, - Compress: false, - } -} - func (z *ZapLog) Debug(args ...interface{}) { z.sugarLog.Debug(args...) } diff --git a/mymysql/mysql.go b/mymysql/mysql.go index bf39564..0384f38 100644 --- a/mymysql/mysql.go +++ b/mymysql/mysql.go @@ -1,150 +1,203 @@ package mymysql import ( + "database/sql" "fmt" - "git.makemake.in/kzkzzzz/mycommon/mylog" + "git.makemake.in/kzkzzzz/mycommon/myconf" + driverMysql "github.com/go-sql-driver/mysql" + "github.com/google/uuid" "gorm.io/driver/mysql" "gorm.io/gorm" - gormLogger "gorm.io/gorm/logger" + "gorm.io/gorm/logger" + "log" + "os" + "sync" "time" ) -const DefaultKey = "default" +const ( + DefaultInstance = "mysql" +) + +type MysqlDb struct { + *gorm.DB + SqlDB *sql.DB + gormConfig *gorm.Config + disablePing bool +} + +type Conf struct { + Dsn string + MaxOpenConn int // 最大连接数 + MaxIdleConn int // 最大空闲连接数 + MaxIdleTime string // 空闲时间 + MaxLifeTime string // 连接最大有效时间 + Debug bool + LogSqlSlowTimeMs int + LogDisableColor bool +} var ( - DefaultConfig = &Config{ - Dsn: "root:root@tcp(127.0.0.1:3306)/?loc=Local&charset=utf8mb4&parseTime=true", - MaxOpenConn: 32, - MaxIdleConn: 8, - MaxLifeTime: "4h", - MaxIdleTime: "15m", - Debug: true, - GormLogger: gormLogger.Default.LogMode(gormLogger.Info), - } - - instanceMap = make(map[string]*gorm.DB) + instanceMap = &sync.Map{} ) -type ( - Config struct { - Dsn string - MaxOpenConn int - MaxIdleConn int - MaxIdleTime string - MaxLifeTime string - Debug bool - GormLogger gormLogger.Interface - } -) - -func DB(key ...string) *gorm.DB { - var key0 string - - if len(key) > 0 { - key0 = key[0] +func GetDb(name ...string) *MysqlDb { + var instanceName string + if len(name) > 0 { + instanceName = name[0] } else { - key0 = DefaultKey + instanceName = DefaultInstance } - instance, ok := instanceMap[key0] + v, ok := instanceMap.Load(instanceName) if !ok { - panic(fmt.Errorf("mysql %s not config", key0)) + panic(fmt.Errorf("mysql instance [%s] not init", instanceName)) } - return instance + + return v.(*MysqlDb) } -func InitDefault(config *Config) { - Init(DefaultKey, config) -} - -func Init(key string, config *Config) { - db, err := New(config) +// InitDb 初始化全局默认db +func InitDb(config *myconf.Config, opts ...Opt) { + client, err := NewDb(DefaultInstance, config, opts...) if err != nil { panic(err) } - instanceMap[key] = db + instanceMap.Store(DefaultInstance, client) } -func New(config *Config) (*gorm.DB, error) { - var ( - maxLifeTime, _ = time.ParseDuration(DefaultConfig.MaxLifeTime) - maxIdleTime, _ = time.ParseDuration(DefaultConfig.MaxIdleTime) - logger gormLogger.Interface - ) - - if config.MaxOpenConn <= 0 { - config.MaxOpenConn = DefaultConfig.MaxOpenConn - } - - if config.MaxIdleConn <= 0 { - config.MaxIdleConn = DefaultConfig.MaxIdleConn - } - - if config.MaxLifeTime != "" { - t, err := time.ParseDuration(config.MaxLifeTime) - if err != nil { - return nil, fmt.Errorf("parse MaxLifeTime err: %s\n", err) - - } - maxLifeTime = t - } - - if config.MaxIdleTime != "" { - t, err := time.ParseDuration(config.MaxIdleTime) - if err != nil { - return nil, fmt.Errorf("parse MaxIdleTime err: %s\n", err) - } - maxIdleTime = t - } - - if config.GormLogger == nil { - level := gormLogger.Warn - if config.Debug { - level = gormLogger.Info - } - logger = DefaultGormLogger(level) - } - - db, err := gorm.Open(mysql.Open(config.Dsn), &gorm.Config{ - SkipDefaultTransaction: true, - Logger: logger, - }) - +// InitDbInstance 初始化全局的db +func InitDbInstance(instanceName string, config *myconf.Config, opts ...Opt) { + client, err := NewDb(instanceName, config, opts...) if err != nil { - return nil, fmt.Errorf("connect mysql err: %s", err) + panic(err) } - sqlDb, _ := db.DB() + instanceMap.Store(instanceName, client) +} - sqlDb.SetMaxOpenConns(config.MaxOpenConn) - sqlDb.SetMaxIdleConns(config.MaxIdleConn) - sqlDb.SetConnMaxLifetime(maxLifeTime) - sqlDb.SetConnMaxIdleTime(maxIdleTime) +func NewDb(instanceName string, config *myconf.Config, opts ...Opt) (*MysqlDb, error) { + cf := &Conf{Debug: true} + err := config.UnmarshalKey(instanceName, &cf) + if err != nil { + return nil, err + } + db, err := NewDbFromConf(cf, opts...) + if err != nil { + return nil, err + } + + instanceMap.Store(instanceName, db) + return db, nil +} + +func NewDbFromConf(cf *Conf, opts ...Opt) (*MysqlDb, error) { + parseDsn, err := driverMysql.ParseDSN(cf.Dsn) + if err != nil { + return nil, fmt.Errorf("mysql parse dsn error: %s", err) + } + + db := &MysqlDb{} + for _, opt := range opts { + opt(db) + } + + if db.gormConfig == nil { + db.gormConfig = &gorm.Config{ + SkipDefaultTransaction: true, + } + + lCfg := logger.Config{ + SlowThreshold: 200 * time.Millisecond, + LogLevel: logger.Warn, + IgnoreRecordNotFoundError: false, + Colorful: true, + } + + if cf.LogSqlSlowTimeMs > 0 { + lCfg.SlowThreshold = time.Duration(cf.LogSqlSlowTimeMs) * time.Millisecond + } + + if cf.LogDisableColor { + lCfg.Colorful = false + } + + l := newGormLogger(lCfg) + + if cf.Debug { + db.gormConfig.Logger = l.LogMode(logger.Info) + } else { + db.gormConfig.Logger = l + } + } + + gormDB, err := gorm.Open(mysql.Open(cf.Dsn), db.gormConfig) + if err != nil { + return nil, err + } + + sqlDB, err := gormDB.DB() + if err != nil { + return nil, err + } + + if db.disablePing == false { + err = sqlDB.Ping() + if err != nil { + return nil, err + } + } + + if cf.MaxOpenConn <= 0 { + cf.MaxOpenConn = 1024 + } + + if cf.MaxIdleConn <= 0 { + // 默认最大空闲数等于最大连接数 + cf.MaxIdleConn = cf.MaxOpenConn + } + + if cf.MaxIdleTime == "" { + cf.MaxIdleTime = "10m" + } + + sqlDB.SetMaxOpenConns(cf.MaxOpenConn) + sqlDB.SetMaxIdleConns(cf.MaxIdleConn) + + if dv, err := time.ParseDuration(cf.MaxIdleTime); err != nil { + return nil, fmt.Errorf("parse MaxIdleTime err: %s", err) + } else { + sqlDB.SetConnMaxIdleTime(dv) + } + + // max life time默认暂不设置, 使用idle time控制即可 + if cf.MaxLifeTime != "" { + if dv, err := time.ParseDuration(cf.MaxLifeTime); err != nil { + return nil, fmt.Errorf("parse MaxLifeTime err: %s", err) + } else { + sqlDB.SetConnMaxLifetime(dv) + } + } + + db.DB = gormDB + + db.SqlDB = sqlDB + instanceMap.Store(uuid.New().String(), db) + + log.Printf("connect db success [addr:%s - db:%s]", parseDsn.Addr, parseDsn.DBName) return db, nil } -func DefaultGormLogger(level gormLogger.LogLevel) gormLogger.Interface { - return gormLogger.New(mylog.NewLogger("gorm", mylog.DefaultConfig), gormLogger.Config{ - SlowThreshold: time.Second * 2, - Colorful: true, - IgnoreRecordNotFoundError: false, - ParameterizedQueries: false, - LogLevel: level, +func CloseAll() { + instanceMap.Range(func(k, v any) bool { + db, err := (v.(*MysqlDb)).DB.DB() + if err != nil { + db.Close() + } + return true }) } -func NewGormLogger(writer gormLogger.Writer, gormLoggerConfig gormLogger.Config) gormLogger.Interface { - return gormLogger.New(writer, gormLoggerConfig) -} - -func CloseDB(key string) { - db, _ := DB(key).DB() - db.Close() -} - -func CloseAllDB() { - for _, v := range instanceMap { - db, _ := v.DB() - db.Close() - } +func newGormLogger(cfg logger.Config) logger.Interface { + return logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), cfg) } diff --git a/mymysql/mysql_test.go b/mymysql/mysql_test.go deleted file mode 100644 index 38b37ea..0000000 --- a/mymysql/mysql_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package mymysql - -import ( - "fmt" - "testing" -) - -func TestMysql(t *testing.T) { - err := InitDefault(&Config{ - Dsn: "root:Tqa129126@tcp(119.29.187.200:3306)/site?loc=Local&charset=utf8mb4&writeTimeout=3s&readTimeout=3s&timeout=2s&parseTime=true", - MaxOpenConn: 16, - MaxIdleConn: 4, - MaxIdleTime: "5m", - MaxLifeTime: "30m", - Debug: false, - GormLogger: nil, - }) - if err != nil { - fmt.Println(err) - return - } - - defer CloseAllDB() - - var res = make(map[string]interface{}) - err = DB().Table("image").Limit(1).Take(&res).Error - if err != nil { - fmt.Println(err) - return - } - - fmt.Printf("%+v\n", res) -} diff --git a/mymysql/option.go b/mymysql/option.go new file mode 100644 index 0000000..0695831 --- /dev/null +++ b/mymysql/option.go @@ -0,0 +1,17 @@ +package mymysql + +import "gorm.io/gorm" + +type Opt func(m *MysqlDb) + +func WithDisablePing(v bool) Opt { + return func(m *MysqlDb) { + m.disablePing = v + } +} + +func WithGormConfig(v *gorm.Config) Opt { + return func(m *MysqlDb) { + m.gormConfig = v + } +} diff --git a/myredis/option.go b/myredis/option.go new file mode 100644 index 0000000..0dc2c90 --- /dev/null +++ b/myredis/option.go @@ -0,0 +1,19 @@ +package myredis + +import ( + "github.com/redis/go-redis/v9" +) + +type Opt func(*Client) + +func WithRedisOpt(v *redis.Options) Opt { + return func(r *Client) { + r.redisOpt = v + } +} + +func WithDisablePing(v bool) Opt { + return func(r *Client) { + r.disablePing = v + } +} diff --git a/myredis/redis.go b/myredis/redis.go index 3783ee0..58b1aa5 100644 --- a/myredis/redis.go +++ b/myredis/redis.go @@ -3,178 +3,245 @@ package myredis import ( "context" "fmt" - "github.com/go-redis/redis/v8" - jsoniter "github.com/json-iterator/go" + "git.makemake.in/kzkzzzz/mycommon/myconf" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "log" + "sync" "time" ) -const DefaultKey = "default" +const ( + DefaultInstance = "redis" +) + +type Client struct { + *redis.Client + redisOpt *redis.Options + disablePing bool +} + +//type Conf struct { +// Addr string +// Password string +// Db int +// PoolSize int // 连接池数量 如果不够用会继续新建连接 可以用 MaxActiveConns 限制 +// MinIdleConn int // 最小空闲连接数 预热可能用到 +// MaxIdleConn int // 最大空闲连接数 0不限制 +// MaxActiveConn int // 0 不限制, 如果不限制, 超出pool size可以继续新建立连接, 但使用完之后不会放回连接池 +// PoolTimeout string // 等待连接池超时时间 Default is ReadTimeout + 1 second. +// DialTimeout string // 拨号超时时间 +// ReadTimeout string // 读取超时 +// WriteTimeout string // 写入超时 +// MaxRetries int // 重试次数 +// ConnMaxIdleTime string // 连接空闲时间 +//} var ( - DefaultConfig = &Config{ - Addr: "127.0.0.1:6379", - Password: "", - DB: 0, - PoolSize: 16, - MinIdleConn: 4, - MaxConnAge: "4h", - IdleTimeout: "15m", - } - - instanceMap = make(map[string]*MyRedis) + defaultClient *redis.Client + instanceMap = &sync.Map{} ) -type ( - MyRedis struct { - *redis.Client - } - - Config struct { - Addr string - Password string - DB int - PoolSize int - MinIdleConn int - MaxConnAge string - IdleTimeout string - } -) - -func DB(key ...string) *MyRedis { - var key0 string - - if len(key) > 0 { - key0 = key[0] +func GetClient(name ...string) *Client { + var instanceName string + if len(name) > 0 { + instanceName = name[0] } else { - key0 = DefaultKey + instanceName = DefaultInstance } - instance, ok := instanceMap[key0] + v, ok := instanceMap.Load(instanceName) if !ok { - panic(fmt.Errorf("redis %s not config", key0)) + panic(fmt.Errorf("redis instance [%s] not init", instanceName)) } - return instance + + return v.(*Client) } -func InitDefault(config *Config) error { - return Init(DefaultKey, config) -} - -func Init(key string, config *Config) error { - rd, err := New(config) +// InitClient 初始化全局默认client +func InitClient(config *myconf.Config, opts ...Opt) { + client, err := NewClient(DefaultInstance, config, opts...) if err != nil { - return err + panic(err) } - instanceMap[key] = rd + instanceMap.Store(DefaultInstance, client) +} + +// InitClientInstance 初始化全局的client +func InitClientInstance(instanceName string, config *myconf.Config, opts ...Opt) { + client, err := NewClient(instanceName, config, opts...) + if err != nil { + panic(err) + } + instanceMap.Store(instanceName, client) +} + +func NewClient(instanceName string, config *myconf.Config, opts ...Opt) (*Client, error) { + cf := &Conf{} + err := config.UnmarshalKey(instanceName, &cf) + if err != nil { + return nil, err + } + client, err := NewClientFromConf(cf, opts...) + if err != nil { + return nil, err + } + + instanceMap.Store(instanceMap, client) + return client, nil +} + +type Conf struct { + Addr string + Password string + Db int + PoolSize int // 连接池数量 如果不够用会继续新建连接 可以用 MaxActiveConns 限制 + MinIdleConn int // 最小空闲连接数 预热可能用到 + MaxIdleConn int // 最大空闲连接数 0不限制 + MaxActiveConn int // 0 不限制, 如果不限制, 超出pool size可以继续新建立连接, 但使用完之后不会放回连接池 + DialTimeout string // 拨号超时时间, 时间单位格式 10ms, 60s, 15m, 2h... + ReadTimeout string // 读取超时 + WriteTimeout string // 写入超时 + ConnMaxIdleTime string // 连接空闲时间 + PoolTimeout string // 等待连接池超时时间 Default is ReadTimeout + 1 second. + MaxRetries int // 重试次数 +} + +func NewClientFromConf(cf *Conf, opts ...Opt) (*Client, error) { + c := &Client{ + redisOpt: DefaultRedisOpt(), + } + + for _, opt := range opts { + opt(c) + } + + err := c.parseConfToRedisOpt(cf) + if err != nil { + return nil, err + } + + client := redis.NewClient(c.redisOpt) + + if c.disablePing == false { + _, err := client.Ping(context.Background()).Result() + if err != nil { + return nil, fmt.Errorf("redis ping err: %s", err) + } + } + + c.Client = client + + instanceMap.Store(uuid.New().String(), c) + + log.Printf("connect redis success [addr:%s - db:%d]", cf.Addr, cf.Db) + + return c, nil +} + +func DefaultRedisOpt() *redis.Options { + return &redis.Options{ + Addr: "", + Password: "", + DB: 0, + MaxRetries: 3, // 重试次数 + DialTimeout: time.Millisecond * 500, // 拨号超时时间 + ReadTimeout: time.Second, // 读取超时 + WriteTimeout: time.Second * 3, // 写入超时 + PoolSize: 1024, // 连接池数量 如果不够用会继续新建连接 可以用 MaxActiveConns 限制 + PoolTimeout: time.Second + time.Second, // 等待连接池超时时间 Default is ReadTimeout + 1 second. + MinIdleConns: 0, // 最小空闲连接数 预热可能用到 + MaxIdleConns: 0, // 最大空闲连接数 0不限制 + MaxActiveConns: 0, // 0 不限制, 如果不限制, 超出pool size可以继续新建立连接, 但使用完之后不会放回连接池 + ConnMaxIdleTime: time.Minute * 10, // 连接空闲时间 + ContextTimeoutEnabled: true, // context控制超时用到 + } +} + +func (c *Client) parseTime(v string) (time.Duration, error) { + if v == "" { + return 0, nil + } + + d, err := time.ParseDuration(v) + if err != nil { + return 0, fmt.Errorf("parse time %s err: %s", v, err) + } + + return d, nil +} + +func (c *Client) parseConfToRedisOpt(cf *Conf) error { + c.redisOpt.Addr = cf.Addr + c.redisOpt.Password = cf.Password + c.redisOpt.DB = cf.Db + + if v := cf.PoolSize; v > 0 { + c.redisOpt.PoolSize = v + } + + if v := cf.MinIdleConn; v > 0 { + c.redisOpt.MinIdleConns = v + } + + if v := cf.MaxIdleConn; v > 0 { + c.redisOpt.MaxIdleConns = v + } + + if v := cf.MaxActiveConn; v > 0 { + c.redisOpt.MaxActiveConns = v + } + + if v := cf.MaxRetries; v > 0 { + c.redisOpt.MaxRetries = v + } + + if v, err := c.parseTime(cf.DialTimeout); err != nil { + return err + } else if v > 0 { + c.redisOpt.DialTimeout = v + } + + if v, err := c.parseTime(cf.ReadTimeout); err != nil { + return err + } else if v > 0 { + c.redisOpt.ReadTimeout = v + } + + if v, err := c.parseTime(cf.WriteTimeout); err != nil { + return err + } else if v > 0 { + c.redisOpt.WriteTimeout = v + } + + if v, err := c.parseTime(cf.ConnMaxIdleTime); err != nil { + return err + } else if v > 0 { + c.redisOpt.ConnMaxIdleTime = v + } + + if v, err := c.parseTime(cf.PoolTimeout); err != nil { + return err + } else if v > 0 { + c.redisOpt.PoolTimeout = v + } + return nil } -func New(config *Config) (*MyRedis, error) { - var ( - maxConnAge, _ = time.ParseDuration(DefaultConfig.MaxConnAge) - idleTimeout, _ = time.ParseDuration(DefaultConfig.IdleTimeout) - ) +const luaSetOnce = `if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end` - if config.PoolSize <= 0 { - config.MinIdleConn = DefaultConfig.PoolSize +// SetOnce 设置一次并设置过期时间, key不存在则设置成功返回1, key已存在返回0 +func (c *Client) SetOnce(ctx context.Context, key, value string, t time.Duration) (int, error) { + if t < time.Second { + return 0, fmt.Errorf("time must >= 1s") } + return c.Client.Eval(ctx, luaSetOnce, []string{key}, value, t).Int() +} - if config.MinIdleConn <= 0 { - config.MinIdleConn = DefaultConfig.MinIdleConn - } - - if config.MaxConnAge != "" { - t, err := time.ParseDuration(config.MaxConnAge) - if err != nil { - return nil, fmt.Errorf("parse MaxConnAge err: %s\n", err) - - } - maxConnAge = t - } - - if config.IdleTimeout != "" { - t, err := time.ParseDuration(config.IdleTimeout) - if err != nil { - return nil, fmt.Errorf("parse IdleTimeout err: %s\n", err) - - } - idleTimeout = t - } - - client := redis.NewClient(&redis.Options{ - Addr: config.Addr, - Password: config.Password, - DB: config.DB, - PoolSize: config.PoolSize, - MinIdleConns: config.MinIdleConn, - MaxConnAge: maxConnAge, - IdleTimeout: idleTimeout, +func CloseAllClient() { + instanceMap.Range(func(k, v any) bool { + v.(*Client).Close() + return true }) - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) - defer cancel() - rd := &MyRedis{} - rd.Client = client - ping := rd.Client.Ping(ctx) - if ping.Err() != nil { - return nil, fmt.Errorf("connet redis err: %s", ping.Err()) - } - - return rd, nil -} - -// GetSimple 通用get -func (r *MyRedis) GetSimple(key string) (string, error) { - ctx := context.Background() - return r.Client.Get(ctx, key).Result() -} - -// SetSimple 通用set -func (r *MyRedis) SetSimple(key string, value interface{}, t ...time.Duration) (string, error) { - ctx := context.Background() - var t2 time.Duration - if len(t) > 0 { - t2 = t[0] - } - return r.Client.Set(ctx, key, value, t2).Result() -} - -// GetJson json序列化 -func (r *MyRedis) GetJson(key string, result interface{}) error { - ctx := context.Background() - res, err := r.Client.Get(ctx, key).Bytes() - if err != nil { - return err - } - - err = jsoniter.Unmarshal(res, &result) - if err != nil { - return fmt.Errorf("get key:%s 反序列化json失败(-2)", key) - } - return nil -} - -// SetJson json序列化set -func (r *MyRedis) SetJson(key string, value interface{}, t ...time.Duration) (string, error) { - ctx := context.Background() - - var t2 time.Duration - if len(t) > 0 { - t2 = t[0] - } - v, err := jsoniter.Marshal(value) - if err != nil { - return "", fmt.Errorf("set key:%s 序列化json失败", key) - } - return r.Client.Set(ctx, key, v, t2).Result() -} - -func CloseDB(key string) { - DB(key).Client.Close() -} - -func CloseAllDB() { - for _, v := range instanceMap { - v.Client.Close() - } } diff --git a/myredis/redis_test.go b/myredis/redis_test.go deleted file mode 100644 index 1c71f05..0000000 --- a/myredis/redis_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package myredis - -import ( - "context" - "fmt" - "testing" - "time" -) - -func TestRedis(t *testing.T) { - err := InitDefault(&Config{ - Addr: "192.168.244.128:6379", - Password: "", - DB: 15, - PoolSize: 16, - MinIdleConn: 4, - MaxConnAge: "1h", - IdleTimeout: "10m", - }) - if err != nil { - fmt.Println(err) - return - } - defer CloseAllDB() - - set, err := DB().Set(context.Background(), "name", "qwe123", time.Minute).Result() - if err != nil { - fmt.Println(err) - return - } - fmt.Println(set) - - get, err := DB().Get(context.Background(), "name").Result() - if err != nil { - fmt.Println(err) - return - } - - fmt.Println(get) -} diff --git a/myregistry/consul/builder.go b/myregistry/consul/builder.go new file mode 100644 index 0000000..cfcbf86 --- /dev/null +++ b/myregistry/consul/builder.go @@ -0,0 +1,189 @@ +package consul + +import ( + "context" + "fmt" + "github.com/jpillora/backoff" + "google.golang.org/grpc/grpclog" + "sort" + "time" + + "github.com/hashicorp/consul/api" + "github.com/pkg/errors" + "google.golang.org/grpc/resolver" +) + +// schemeName for the urls +// All target URLs like 'consul://.../...' will be resolved by this resolver +const schemeName = "consul" + +// builder implements resolver.Builder and use for constructing all consul resolvers +type builder struct{} + +func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + tgt, err := parseURL(url.URL.String()) + if err != nil { + return nil, errors.Wrap(err, "Wrong consul URL") + } + cli, err := api.NewClient(tgt.consulConfig()) + if err != nil { + return nil, errors.Wrap(err, "Couldn't connect to the Consul API") + } + + ctx, cancel := context.WithCancel(context.Background()) + pipe := make(chan []string) + go watchConsulService(ctx, cli.Health(), tgt, pipe) + go populateEndpoints(ctx, cc, pipe) + + return &resolvr{cancelFunc: cancel}, nil +} + +// Scheme returns the scheme supported by this resolver. +// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md. +func (b *builder) Scheme() string { + return schemeName +} + +// init function needs for auto-register in resolvers registry +func init() { + resolver.Register(&builder{}) +} + +// resolvr implements resolver.Resolver from the gRPC package. +// It watches for endpoints changes and pushes them to the underlying gRPC connection. +type resolvr struct { + cancelFunc context.CancelFunc +} + +// ResolveNow will be skipped due unnecessary in this case +func (r *resolvr) ResolveNow(resolver.ResolveNowOptions) {} + +// Close closes the resolver. +func (r *resolvr) Close() { + r.cancelFunc() +} + +//go:generate ./bin/moq -out mocks_test.go . servicer +type servicer interface { + Service(string, string, bool, *api.QueryOptions) ([]*api.ServiceEntry, *api.QueryMeta, error) +} + +func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<- []string) { + res := make(chan []string) + quit := make(chan struct{}) + bck := &backoff.Backoff{ + Factor: 2, + Jitter: true, + Min: 10 * time.Millisecond, + Max: tgt.MaxBackoff, + } + go func() { + var lastIndex uint64 + for { + ss, meta, err := s.Service( + tgt.Service, + tgt.Tag, + tgt.Healthy, + &api.QueryOptions{ + WaitIndex: lastIndex, + Near: tgt.Near, + WaitTime: tgt.Wait, + Datacenter: tgt.Dc, + AllowStale: tgt.AllowStale, + RequireConsistent: tgt.RequireConsistent, + }, + ) + if err != nil { + // No need to continue if the context is done/cancelled. + // We check that here directly because the check for the closed quit channel + // at the end of the loop is not reached when calling continue here. + select { + case <-quit: + return + default: + grpclog.Errorf("[Consul resolver] Couldn't fetch endpoints. target={%s}; error={%v}", tgt.String(), err) + time.Sleep(bck.Duration()) + continue + } + } + bck.Reset() + lastIndex = meta.LastIndex + grpclog.Infof("[Consul resolver] %d endpoints fetched in(+wait) %s for target={%s}", + len(ss), + meta.RequestTime, + tgt.String(), + ) + + ee := make([]string, 0, len(ss)) + for _, s := range ss { + address := s.Service.Address + if s.Service.Address == "" { + address = s.Node.Address + } + ee = append(ee, fmt.Sprintf("%s:%d", address, s.Service.Port)) + } + + if tgt.Limit != 0 && len(ee) > tgt.Limit { + ee = ee[:tgt.Limit] + } + select { + case res <- ee: + continue + case <-quit: + return + } + } + }() + + for { + // If in the below select both channels have values that can be read, + // Go picks one pseudo-randomly. + // But when the context is canceled we want to act upon it immediately. + if ctx.Err() != nil { + // Close quit so the goroutine returns and doesn't leak. + // Do NOT close res because that can lead to panics in the goroutine. + // res will be garbage collected at some point. + close(quit) + return + } + select { + case ee := <-res: + out <- ee + case <-ctx.Done(): + close(quit) + return + } + } +} + +func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []string) { + for { + select { + case cc := <-input: + connsSet := make(map[string]struct{}, len(cc)) + for _, c := range cc { + connsSet[c] = struct{}{} + } + conns := make([]resolver.Address, 0, len(connsSet)) + for c := range connsSet { + conns = append(conns, resolver.Address{Addr: c}) + } + sort.Sort(byAddressString(conns)) // Don't replace the same address list in the balancer + err := clientConn.UpdateState(resolver.State{Addresses: conns}) + if err != nil { + grpclog.Errorf("[Consul resolver] Couldn't update client connection. error={%v}", err) + continue + } + case <-ctx.Done(): + grpclog.Info("[Consul resolver] Watch has been finished") + return + } + } +} + +// byAddressString sorts resolver.Address by Address Field sorting in increasing order. +type byAddressString []resolver.Address + +func (p byAddressString) Len() int { return len(p) } +func (p byAddressString) Less(i, j int) bool { return p[i].Addr < p[j].Addr } +func (p byAddressString) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/myregistry/consul/consul.go b/myregistry/consul/consul.go new file mode 100644 index 0000000..ac44409 --- /dev/null +++ b/myregistry/consul/consul.go @@ -0,0 +1,153 @@ +package consul + +import ( + "fmt" + "git.makemake.in/kzkzzzz/mycommon/myconf" + "git.makemake.in/kzkzzzz/mycommon/myregistry" + "github.com/google/uuid" + api "github.com/hashicorp/consul/api" + "log" + "net" + "net/url" + "time" +) + +var _ myregistry.IRegister = (*Consul)(nil) + +type Consul struct { + client *api.Client + serviceIds map[string][]string + serviceTags []string +} + +func (c *Consul) Name() string { + return "consul" +} + +func (c *Consul) Register(service *myregistry.ServiceInfo) error { + // 健康检查 + serviceId := uuid.New().String() + + c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId) + + check := &api.AgentServiceCheck{ + CheckID: serviceId, + TCP: fmt.Sprintf("%s:%d", service.Ip, service.Port), + Timeout: "5s", // 超时时间 + Interval: "20s", // 运行检查的频率 + // 指定时间后自动注销不健康的服务节点 + // 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。 + DeregisterCriticalServiceAfter: "5m", + Status: "passing", + } + srv := &api.AgentServiceRegistration{ + ID: serviceId, // 服务唯一ID + Name: service.ServiceName, // 服务名称 + Tags: c.serviceTags, // 为服务打标签 + Address: service.Ip, + Port: service.Port, + Check: check, + } + + return c.client.Agent().ServiceRegister(srv) +} + +func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { + for _, svcId := range c.serviceIds[service.ServiceName] { + err := c.client.Agent().ServiceDeregister(svcId) + if err != nil { + log.Printf("Failed to deregister service %s: %s\n", service, err) + } + } + return nil +} + +type Conf struct { + Addr string + Token string +} + +func MustNew(conf *myconf.Config) *Consul { + consul, err := New(conf) + if err != nil { + panic(err) + } + return consul +} + +func New(conf *myconf.Config) (*Consul, error) { + cfg := api.DefaultConfig() + cfg.Address = conf.GetString("addr") + cfg.Transport.DialContext = (&net.Dialer{ + Timeout: 3 * time.Second, + KeepAlive: 20 * time.Second, + DualStack: true, + }).DialContext + cfg.Token = conf.GetString("token") + + username := conf.GetString("username") + password := conf.GetString("password") + + if username != "" && password != "" { + cfg.HttpAuth = &api.HttpBasicAuth{ + Username: username, + Password: password, + } + } + + client, err := api.NewClient(cfg) + if err != nil { + return nil, err + } + cl := &Consul{ + client: client, + serviceIds: make(map[string][]string), + serviceTags: make([]string, 0), + } + + if v := conf.GetStringSlice("serviceTags"); len(v) > 0 { + cl.serviceTags = v + } else { + cl.serviceTags = []string{} + } + + return cl, nil +} + +func (c *Consul) Client() *api.Client { + return c.client +} + +func GrpcUrl(serviceName string, conf *myconf.Config) string { + return GrpcUrlWithTag("", serviceName, conf) +} + +func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string { + u := &url.URL{ + Scheme: schemeName, + Host: conf.GetString("addr"), + Path: serviceName, + } + + query := u.Query() + query.Set("healthy", "true") + + if v := conf.GetString("token"); v != "" { + query.Set("token", v) + } + + if tag != "" { + query.Set("tag", tag) + } + + username := conf.GetString("username") + password := conf.GetString("password") + + if username != "" && password != "" { + u.User = url.UserPassword(username, password) + } + + u.RawQuery = query.Encode() + + return u.String() +} diff --git a/myregistry/consul/target.go b/myregistry/consul/target.go new file mode 100644 index 0000000..703bbad --- /dev/null +++ b/myregistry/consul/target.go @@ -0,0 +1,101 @@ +package consul + +import ( + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/go-playground/form" + "github.com/hashicorp/consul/api" + "github.com/pkg/errors" +) + +type target struct { + Addr string `form:"-"` + User string `form:"-"` + Password string `form:"-"` + Service string `form:"-"` + Wait time.Duration `form:"wait"` + Timeout time.Duration `form:"timeout"` + MaxBackoff time.Duration `form:"max-backoff"` + Tag string `form:"tag"` + Near string `form:"near"` + Limit int `form:"limit"` + Healthy bool `form:"healthy"` + TLSInsecure bool `form:"insecure"` + Token string `form:"token"` + Dc string `form:"dc"` + AllowStale bool `form:"allow-stale"` + RequireConsistent bool `form:"require-consistent"` + // TODO(mbobakov): custom parameters for the http-transport + // TODO(mbobakov): custom parameters for the TLS subsystem +} + +func (t *target) String() string { + return fmt.Sprintf("service='%s' healthy='%t' tag='%s'", t.Service, t.Healthy, t.Tag) +} + +// parseURL with parameters +// see README.md for the actual format +// URL schema will stay stable in the future for backward compatibility +func parseURL(u string) (target, error) { + rawURL, err := url.Parse(u) + if err != nil { + return target{}, errors.Wrap(err, "Malformed URL") + } + + if rawURL.Scheme != schemeName || + len(rawURL.Host) == 0 || len(strings.TrimLeft(rawURL.Path, "/")) == 0 { + return target{}, + errors.Errorf("Malformed URL('%s'). Must be in the next format: 'consul://[user:passwd]@host/service?param=value'", u) + } + + var tgt target + tgt.User = rawURL.User.Username() + tgt.Password, _ = rawURL.User.Password() + tgt.Addr = rawURL.Host + tgt.Service = strings.TrimLeft(rawURL.Path, "/") + decoder := form.NewDecoder() + decoder.RegisterCustomTypeFunc(func(vals []string) (interface{}, error) { + return time.ParseDuration(vals[0]) + }, time.Duration(0)) + + err = decoder.Decode(&tgt, rawURL.Query()) + if err != nil { + return target{}, errors.Wrap(err, "Malformed URL parameters") + } + if len(tgt.Near) == 0 { + tgt.Near = "_agent" + } + if tgt.MaxBackoff == 0 { + tgt.MaxBackoff = time.Second + } + return tgt, nil +} + +// consulConfig returns config based on the parsed target. +// It uses custom http-client. +func (t *target) consulConfig() *api.Config { + var creds *api.HttpBasicAuth + if len(t.User) > 0 && len(t.Password) > 0 { + creds = new(api.HttpBasicAuth) + creds.Password = t.Password + creds.Username = t.User + } + // custom http.Client + c := &http.Client{ + Timeout: t.Timeout, + } + return &api.Config{ + Address: t.Addr, + HttpAuth: creds, + WaitTime: t.Wait, + HttpClient: c, + TLSConfig: api.TLSConfig{ + InsecureSkipVerify: t.TLSInsecure, + }, + Token: t.Token, + } +} diff --git a/myregistry/reigster.go b/myregistry/reigster.go new file mode 100644 index 0000000..a5a4c22 --- /dev/null +++ b/myregistry/reigster.go @@ -0,0 +1,21 @@ +package myregistry + +import "fmt" + +type ServiceInfo struct { + ServiceName string + Ip string + Port int + Extend map[string]string +} + +func (s *ServiceInfo) String() string { + return fmt.Sprintf("%s - %s:%d", s.ServiceName, s.Ip, s.Port) +} + +// IRegister 注册中心 服务注册发现 +type IRegister interface { + Name() string + Register(service *ServiceInfo) error + Deregister(service *ServiceInfo) error +} From 9fd0eaadb84b7b03560a8a686514f0a7e761304a Mon Sep 17 00:00:00 2001 From: kzkzzzz Date: Sun, 23 Mar 2025 20:29:29 +0800 Subject: [PATCH 2/2] update --- go.mod | 46 ++++-- go.sum | 86 +++++++--- myconf/conf.go | 1 + mygrpc/grpc.go | 4 + mygrpc/grpcc/client.go | 15 +- mygrpc/grpcsr/server.go | 25 ++- mygrpc/mybalancer/random/random.go | 80 +++++++++ myhttp/httpsr/pprof.go | 78 +++++++++ myhttp/httpsr/server.go | 252 +++++++++++++++++++++++++++++ myhttp/myhttp.go | 5 + mylog/log.go | 10 ++ mymysql/mysql.go | 2 +- myregistry/consul/builder.go | 9 +- myregistry/consul/consul.go | 126 ++++++++++++--- 14 files changed, 661 insertions(+), 78 deletions(-) create mode 100644 mygrpc/mybalancer/random/random.go create mode 100644 myhttp/httpsr/pprof.go create mode 100644 myhttp/httpsr/server.go create mode 100644 myhttp/myhttp.go diff --git a/go.mod b/go.mod index 61f053d..982fad9 100644 --- a/go.mod +++ b/go.mod @@ -1,14 +1,18 @@ module git.makemake.in/kzkzzzz/mycommon -go 1.23 +go 1.23.0 require ( + github.com/gin-contrib/pprof v1.5.2 + github.com/gin-gonic/gin v1.10.0 + github.com/go-playground/form v3.1.4+incompatible github.com/go-playground/locales v0.14.1 github.com/go-playground/universal-translator v0.18.1 - github.com/go-playground/validator/v10 v10.20.0 + github.com/go-playground/validator/v10 v10.25.0 github.com/go-sql-driver/mysql v1.7.0 github.com/google/uuid v1.6.0 github.com/hashicorp/consul/api v1.28.2 + github.com/jpillora/backoff v1.0.0 github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/toml v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -16,13 +20,15 @@ require ( github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.1.2 + github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.7.3 + github.com/rs/xid v1.6.0 github.com/spf13/cast v1.6.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 go.uber.org/zap v1.24.0 - golang.org/x/sync v0.8.0 + golang.org/x/sync v0.12.0 google.golang.org/grpc v1.67.1 gorm.io/driver/mysql v1.5.7 gorm.io/gorm v1.25.12 @@ -30,54 +36,64 @@ require ( require ( github.com/armon/go-metrics v0.4.1 // indirect + github.com/bytedance/sonic v1.13.2 // indirect + github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/cloudwego/base64x v0.1.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.15.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/go-playground/form v3.1.4+incompatible // indirect + github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/gin-contrib/sse v1.0.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect + github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/jpillora/backoff v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect - github.com/mbobakov/grpc-consul-resolver v1.5.3 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml v1.9.5 // indirect - github.com/pelletier/go-toml/v2 v2.2.2 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/subosito/gotenv v1.6.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.12 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/crypto v0.27.0 // indirect + golang.org/x/arch v0.15.0 // indirect + golang.org/x/crypto v0.36.0 // indirect golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect - golang.org/x/net v0.29.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect - google.golang.org/protobuf v1.34.2 // indirect + google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 34dea66..7f574d5 100644 --- a/go.sum +++ b/go.sum @@ -19,11 +19,19 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ= +github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4= +github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.4 h1:ZWCw4stuXUsn1/+zQDqeE7JKP+QO47tz7QCNan80NzY= +github.com/bytedance/sonic/loader v0.2.4/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/cloudwego/base64x v0.1.5 h1:XPciSp1xaq2VCSt6lF0phncD4koWyULpl5bUxbfCyP4= +github.com/cloudwego/base64x v0.1.5/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= +github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQPiEFhY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -33,16 +41,20 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= -github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= -github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= +github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= +github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= +github.com/gin-contrib/pprof v1.5.2 h1:Kcq5W2bA2PBcVtF0MqkQjpvCpwJr+pd7zxcQh2csg7E= +github.com/gin-contrib/pprof v1.5.2/go.mod h1:a1W4CDXwAPm2zql2AKdnT7OVCJdV/oFPhJXVOrDs5Ns= +github.com/gin-contrib/sse v1.0.0 h1:y3bT1mUWUxDpW4JLQg/HnTqV4rozuW4tC9eFKTxYI9E= +github.com/gin-contrib/sse v1.0.0/go.mod h1:zNuFdwarAygJBht0NTKiSi3jRf6RbqeILZ9Sp6Slhe0= +github.com/gin-gonic/gin v1.10.0 h1:nTuyha1TYqgedzytsKYqna+DfLos46nTv2ygFy86HFU= +github.com/gin-gonic/gin v1.10.0/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= @@ -55,13 +67,15 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.20.0 h1:K9ISHbSaI0lyB2eWMPJo+kOS/FBExVwjEviJTixqxL8= -github.com/go-playground/validator/v10 v10.20.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.25.0 h1:5Dh7cjvzR7BRZadnsVOzPhWsrwUr0nmsZJxEAnFLNO8= +github.com/go-playground/validator/v10 v10.25.0/go.mod h1:GGzBIJMuE98Ic/kJsBXbz1x/7cByt++cQ+YOuDM5wus= github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -109,8 +123,8 @@ github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/go-version v1.2.1 h1:zEfKbn2+PDgroKdiOzqiE8rsmLqU2uwi5PB5pBJ3TkI= -github.com/hashicorp/go-version v1.2.1/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= +github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= @@ -130,7 +144,12 @@ github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2E github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI= github.com/knadh/koanf/parsers/json v0.1.0 h1:dzSZl5pf5bBcW0Acnu20Djleto19T0CfHcvZ14NJ6fU= @@ -147,6 +166,7 @@ github.com/knadh/koanf/providers/posflag v0.1.0 h1:mKJlLrKPcAP7Ootf4pBZWJ6J+4wHY github.com/knadh/koanf/providers/posflag v0.1.0/go.mod h1:SYg03v/t8ISBNrMBRMlojH8OsKowbkXV7giIbBVgbz0= github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= +github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -176,8 +196,6 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/mbobakov/grpc-consul-resolver v1.5.3 h1:xL7nJm8qCvxgHMqlnF4naXruBUoHqfUWORl3UmwKByU= -github.com/mbobakov/grpc-consul-resolver v1.5.3/go.mod h1:0wN8+McBocuk5mO9xlAfrmBSothm7sps43bFGubg0m4= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= @@ -192,17 +210,20 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= -github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= +github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= +github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -227,6 +248,8 @@ github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0 github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sagikazarmark/locafero v0.4.0 h1:HApY1R9zGo4DBgr7dqsTH/JJxLTTsOt7u6keLGt6kNQ= github.com/sagikazarmark/locafero v0.4.0/go.mod h1:Pe1W6UlPYUk/+wc/6KFhbORCfqzgYEpgQ3O5fPuL3H4= @@ -255,15 +278,21 @@ github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= +github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -272,11 +301,13 @@ go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw= +golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/exp v0.0.0-20230905200255-921286631fa9/go.mod h1:S2oDrQGGwySpoQPVqRShND87VCbxmc6bL1Yd2oYrm6k= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -286,15 +317,15 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -316,15 +347,15 @@ golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -333,12 +364,14 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -353,3 +386,4 @@ gorm.io/driver/mysql v1.5.7/go.mod h1:sEtPWMiqiN1N1cMXoXmBbd8C6/l+TESwriotuRRpkD gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= +nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= diff --git a/myconf/conf.go b/myconf/conf.go index 192aa99..97d46e3 100644 --- a/myconf/conf.go +++ b/myconf/conf.go @@ -65,6 +65,7 @@ func WithLoadOverwrite(v bool) Opt { func init() { defaultConf = New(Default) log.SetOutput(os.Stdout) + log.SetFlags(log.LstdFlags | log.Lshortfile) // --config 指定配置文件 pflag.StringVar(&flagConfigFile, "config", "", "set config file") diff --git a/mygrpc/grpc.go b/mygrpc/grpc.go index 09501c0..b91e3db 100644 --- a/mygrpc/grpc.go +++ b/mygrpc/grpc.go @@ -15,6 +15,10 @@ const ( DefaultWindowSize = 16 * 1024 * 1024 ) +const ( + ServicePrefix = "grpc@" +) + const ( HeaderClientIP = "grpc-client-ip" HeaderServiceName = "grpc-service-name" diff --git a/mygrpc/grpcc/client.go b/mygrpc/grpcc/client.go index aadee83..7148416 100644 --- a/mygrpc/grpcc/client.go +++ b/mygrpc/grpcc/client.go @@ -75,7 +75,7 @@ func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) { PermitWithoutStream: true, // 如果没有active的stream,是否允许发送ping }), // 参考 https://github.com/grpc/grpc-go/tree/master/examples/features/load_balancing 设置轮训策略 - grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy. + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // This sets the initial balancing policy. grpc.WithTransportCredentials(insecure.NewCredentials()), } @@ -87,13 +87,11 @@ func New(grpcUrl string, opts ...Opt) (*grpc.ClientConn, error) { grpc.WithReadBufferSize(mygrpc.DefaultReadBufferSize), grpc.WithWriteBufferSize(mygrpc.DefaultWriteBufferSize), - - grpc.WithUnaryInterceptor(WrapRequestError()), ) } if len(c.unaryMiddlewares) > 0 { - grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...) + dialOpts = append(dialOpts, grpc.WithChainUnaryInterceptor(c.unaryMiddlewares...)) } if len(c.grpcOpts) > 0 { @@ -123,12 +121,15 @@ func WrapRequestError() grpc.UnaryClientInterceptor { err := invoker(ctx, method, req, reply, cc, opts...) if err != nil { + st, ok := status.FromError(err) + if ok && serviceName != "" { + sp := st.Proto() + sp.Message = fmt.Sprintf("[%s] - %s", serviceName, sp.Message) - if serviceName != "" { - return fmt.Errorf("request grpc err: [%s - %s] %s", serviceName, method, err) + return status.ErrorProto(sp) } - return fmt.Errorf("request grpc err: [%s] %s", method, err) + return err } return nil diff --git a/mygrpc/grpcsr/server.go b/mygrpc/grpcsr/server.go index 13441ee..f6368cf 100644 --- a/mygrpc/grpcsr/server.go +++ b/mygrpc/grpcsr/server.go @@ -63,7 +63,7 @@ func UseDefaultBufferCfg(v bool) Opt { func WithRegistry(serviceName string, reg myregistry.IRegister) Opt { return func(server *Server) { - server.serviceName = serviceName + server.serviceName = mygrpc.ServicePrefix + serviceName server.reg = reg } } @@ -81,7 +81,7 @@ func WithDelayStopMs(v int) Opt { } func SetFlag() { - pflag.Int("grpc.port", 18082, "listen port") + pflag.Int("grpc.port", 0, "listen port, 0 is random port") pflag.String("grpc.log", "true", "enable request log") } @@ -109,7 +109,7 @@ func NewByConf(conf *Conf, opts ...Opt) *Server { } if s.logger == nil { - s.logger = mylog.GetLogger() + s.logger = mylog.GetLoggerSkip(-1) } if s.reg != nil && s.serviceName == "" { @@ -142,7 +142,7 @@ func (s *Server) initServer() { PermitWithoutStream: true, // 即使没有 active stream, 也允许 ping }), grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: time.Hour * 2, // 空闲连接时间 + MaxConnectionIdle: time.Minute * 15, // 空闲连接时间 MaxConnectionAgeGrace: time.Second * 30, // 在强制关闭连接之间, 允许有 N 的时间完成 pending 的 rpc 请求 Time: time.Second * 20, // 如果一个连接空闲超过 N, 则发送一个 ping 请求 Timeout: time.Second * 5, // 如果 ping 请求 N 内未收到回复, 则认为该连接已断开 @@ -210,6 +210,12 @@ func (s *Server) Run(ctx context.Context) error { if err != nil { return err } + + log.Printf("[%s] register service: %s - %s:%d", + s.reg.Name(), + s.serviceRegInfo.ServiceName, + s.serviceRegInfo.Ip, s.serviceRegInfo.Port, + ) } addr := fmt.Sprintf("%s:%d", s.serverConf.Addr, port) @@ -230,6 +236,13 @@ func (s *Server) Stop() { if err != nil { s.logger.Errorf("grpc server deregister err: %s", err) } + + log.Printf("[%s] deregister service: %s - %s:%d", + s.reg.Name(), + s.serviceRegInfo.ServiceName, + s.serviceRegInfo.Ip, s.serviceRegInfo.Port, + ) + } // 如果使用k8s service, 关闭pod和往service注销ip是同时进行的, 如果退出服务比注销ip先完成, 可能有流量继续进来, 导致请求失败 @@ -270,7 +283,7 @@ func (s *Server) requestLog() grpc.UnaryServerInterceptor { code = status.New(codes.Unknown, err.Error()).Code() } - codeMsg = fmt.Sprintf("Error Code: %s(%d)", code.String(), uint32(code)) + codeMsg = fmt.Sprintf("%s(%d)", code.String(), uint32(code)) } s.logger.Infof( @@ -287,7 +300,7 @@ func (s *Server) grpcRecover() grpc.UnaryServerInterceptor { defer func() { if err0 := recover(); err0 != nil { log.Printf("%s - panic: %v\n%s", info.FullMethod, err0, debug.Stack()) - err = fmt.Errorf("server err: %s - system err: %s", info.FullMethod, err0) + err = fmt.Errorf("server err %s - %s", info.FullMethod, err0) } }() diff --git a/mygrpc/mybalancer/random/random.go b/mygrpc/mybalancer/random/random.go new file mode 100644 index 0000000..3186d9e --- /dev/null +++ b/mygrpc/mybalancer/random/random.go @@ -0,0 +1,80 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is +// installed as one of the default balancers in gRPC, users don't need to +// explicitly install this balancer. +package random + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/grpclog" + "log" + "math/rand" +) + +// Name is the name of round_robin balancer. +const Name = "my_random_robin" + +var logger = grpclog.Component("myrandomrobin") + +// newBuilder creates a new roundrobin balancer builder. +func newBuilder() balancer.Builder { + return base.NewBalancerBuilder(Name, &randomPickerBuilder{}, base.Config{HealthCheck: true}) +} + +func init() { + balancer.Register(newBuilder()) +} + +type randomPickerBuilder struct{} + +type subConnInfo struct { + conn balancer.SubConn + connInfo base.SubConnInfo +} + +func (r *randomPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { + logger.Infof("myrandomrobin Picker: Build called with info: %v", info) + log.Printf("myrandomrobin Picker: Build called with info: %v", info) + if len(info.ReadySCs) == 0 { + return base.NewErrPicker(balancer.ErrNoSubConnAvailable) + } + scs := make([]*subConnInfo, 0, len(info.ReadySCs)) + + for sc, scInfo := range info.ReadySCs { + scs = append(scs, &subConnInfo{ + conn: sc, + connInfo: scInfo, + }) + } + return &randomPicker{ + subConns: scs, + } +} + +type randomPicker struct { + subConns []*subConnInfo +} + +func (p *randomPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + sc := p.subConns[rand.Intn(len(p.subConns))] + log.Printf("randomPicker Pick: SubConn: %s", sc.connInfo.Address.String()) + return balancer.PickResult{SubConn: sc.conn}, nil +} diff --git a/myhttp/httpsr/pprof.go b/myhttp/httpsr/pprof.go new file mode 100644 index 0000000..c933125 --- /dev/null +++ b/myhttp/httpsr/pprof.go @@ -0,0 +1,78 @@ +package httpsr + +import ( + "context" + "fmt" + "git.makemake.in/kzkzzzz/mycommon/graceful" + "github.com/gin-contrib/pprof" + "github.com/gin-gonic/gin" + "log" + "net" + "net/http" + "time" +) + +var _ graceful.IRunner = (*PProf)(nil) + +type PProf struct { + conf *pprofConf + hs *http.Server +} +type pprofConf struct { + Port int +} + +type PProfOpt func(*pprofConf) + +func NewPProf(opts ...PProfOpt) *PProf { + engine := gin.Default() + pprof.Register(engine) + + p := &PProf{ + conf: &pprofConf{}, + hs: &http.Server{ + Handler: engine, + }, + } + for _, opt := range opts { + opt(p.conf) + } + + return p +} + +func (p *PProf) Run(ctx context.Context) error { + // 端口如果=0, 监听随机端口 + addr0 := fmt.Sprintf(":%d", p.conf.Port) + lis, err := net.Listen("tcp", addr0) + if err != nil { + return err + } + + // 获取监听的端口 + port := lis.Addr().(*net.TCPAddr).Port + + addr := fmt.Sprintf(":%d", port) + log.Printf("http pprof server listen on %s", addr) + + p.hs.Addr = addr + + err = p.hs.Serve(lis) + if err != nil { + log.Printf("start http pprof server err: %s", err) + return err + } + return nil +} + +func (p *PProf) Stop() { + tCtx, tCancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer tCancel() + p.hs.Shutdown(tCtx) +} + +func PProfPort(v int) PProfOpt { + return func(p *pprofConf) { + p.Port = v + } +} diff --git a/myhttp/httpsr/server.go b/myhttp/httpsr/server.go new file mode 100644 index 0000000..77b3713 --- /dev/null +++ b/myhttp/httpsr/server.go @@ -0,0 +1,252 @@ +package httpsr + +import ( + "context" + "fmt" + "git.makemake.in/kzkzzzz/mycommon" + "git.makemake.in/kzkzzzz/mycommon/graceful" + "git.makemake.in/kzkzzzz/mycommon/myconf" + "git.makemake.in/kzkzzzz/mycommon/myhttp" + "git.makemake.in/kzkzzzz/mycommon/mylog" + "git.makemake.in/kzkzzzz/mycommon/myregistry" + "github.com/gin-gonic/gin" + "github.com/spf13/pflag" + "log" + "net" + "net/http" + + "runtime/debug" + "time" +) + +var _ graceful.IRunner = (*Server)(nil) + +type Conf struct { + Addr string + Port int + Ip string + Log bool + ReadTimeoutMs int + WriteTimeoutMs int + IdleTimeoutMs int + ShutdownTimeoutMs int +} + +type Opt func(server *Server) + +type Server struct { + serviceId string + serviceName string + serverConf *Conf + reg myregistry.IRegister + logger mylog.ILogger + + useDefaultBufferCfg bool + delayStopMs int + + serviceRegInfo *myregistry.ServiceInfo + + hs *http.Server + ginEngine *gin.Engine + + shutdownTimeout time.Duration +} + +func WithRegistry(serviceName string, reg myregistry.IRegister) Opt { + return func(server *Server) { + server.serviceName = myhttp.ServicePrefix + serviceName + server.reg = reg + } +} + +func WithDelayStopMs(v int) Opt { + return func(server *Server) { + server.delayStopMs = v + } +} + +func SetFlag() { + pflag.Int("http.port", 0, "listen port, 0 is random port") + pflag.String("http.log", "true", "enable request log") +} + +func New(cfg *myconf.Config, opts ...Opt) *Server { + cf := &Conf{} + err := cfg.UnmarshalKey("http", cf) + if err != nil { + panic(err) + } + + // 命令行的参数覆盖一次, Unmarshal解析的时候, 不会用命令行的参数覆盖 https://github.com/spf13/viper/issues/190 + cf.Port = cfg.GetInt(fmt.Sprintf("http.port")) + cf.Log = cfg.GetBool(fmt.Sprintf("http.log")) + + return NewByConf(cf, opts...) +} + +func NewByConf(conf *Conf, opts ...Opt) *Server { + s := &Server{ + serverConf: conf, + useDefaultBufferCfg: true, + } + for _, opt := range opts { + opt(s) + } + + if s.logger == nil { + s.logger = mylog.GetLoggerSkip(-1) + } + + if s.reg != nil && s.serviceName == "" { + panic("service name is empty") + } + + s.ginEngine = gin.New() + + if s.serverConf.Log { + //s.unaryMiddlewares = append(s.unaryMiddlewares, s.requestLog()) + s.ginEngine.Use(s.requestLog()) + } + s.ginEngine.Use(s.httpRecover()) + + s.ginEngine.GET("/health", func(ctx *gin.Context) { + ctx.String(http.StatusOK, "ok") + }) + + s.hs = &http.Server{ + Handler: s.ginEngine, + ReadTimeout: time.Second * 5, + WriteTimeout: time.Second * 10, + IdleTimeout: time.Second * 180, + } + s.shutdownTimeout = time.Millisecond * 200 + + if conf.ReadTimeoutMs > 0 { + s.hs.ReadTimeout = time.Millisecond * time.Duration(conf.ReadTimeoutMs) + } + + if conf.WriteTimeoutMs > 0 { + s.hs.WriteTimeout = time.Millisecond * time.Duration(conf.WriteTimeoutMs) + } + + if conf.IdleTimeoutMs > 0 { + s.hs.IdleTimeout = time.Millisecond * time.Duration(conf.IdleTimeoutMs) + } + + if conf.ShutdownTimeoutMs > 0 { + s.shutdownTimeout = time.Millisecond * time.Duration(conf.ShutdownTimeoutMs) + } + + return s +} + +func (s *Server) Engine() *gin.Engine { + return s.ginEngine +} + +func (s *Server) initServer() { + +} + +func (s *Server) Run(ctx context.Context) error { + s.initServer() + + // 端口如果=0, 监听随机端口 + addr0 := fmt.Sprintf("%s:%d", s.serverConf.Addr, s.serverConf.Port) + lis, err := net.Listen("tcp", addr0) + if err != nil { + return err + } + + // 获取监听的端口 + port := lis.Addr().(*net.TCPAddr).Port + + var svcIp = s.serverConf.Ip + if svcIp == "" { + svcIp = mycommon.GetOutboundIP() + } + + // 注册服务 + if s.reg != nil { + s.serviceRegInfo = &myregistry.ServiceInfo{ + ServiceName: s.serviceName, + Ip: svcIp, + Port: port, + } + + err = s.reg.Register(s.serviceRegInfo) + if err != nil { + return err + } + } + + addr := fmt.Sprintf("%s:%d", s.serverConf.Addr, port) + log.Printf("http server listen on %s", addr) + + s.hs.Addr = addr + + err = s.hs.Serve(lis) + if err != nil { + log.Printf("start http server err: %s", err) + return err + } + + return nil +} + +func (s *Server) Stop() { + if s.reg != nil { + err := s.reg.Deregister(s.serviceRegInfo) + if err != nil { + s.logger.Errorf("http server deregister err: %s", err) + } + } + + // 如果使用k8s service, 关闭pod和往service注销ip是同时进行的, 如果退出服务比注销ip先完成, 可能有流量继续进来, 导致请求失败 + // 延迟一段时间, 确保服务已经注销ip, 再关闭服务 + + // 如何使用注册中心, 先从中心退出ip, 也延迟一段时间, 等上游网关更新ip完成(正常不会太久), 不会有流量进来旧服务, 再退出服务 + if s.delayStopMs > 0 { + delayTime := time.Millisecond * time.Duration(s.delayStopMs) + log.Printf("http server delay stop: %s", delayTime) + time.Sleep(delayTime) + } + + tCtx, tCancel := context.WithTimeout(context.Background(), s.shutdownTimeout) + defer tCancel() + s.hs.Shutdown(tCtx) + + log.Printf("http server stop") +} + +func (s *Server) httpRecover() gin.HandlerFunc { + return func(ctx *gin.Context) { + defer func() { + if err := recover(); err != nil { + log.Printf("%s - panic: %v\n%s", ctx.Request.RequestURI, err, debug.Stack()) + + ctx.JSON(http.StatusOK, gin.H{ + "code": 1, + "message": fmt.Sprintf("server err: %s", err), + }) + + } + }() + + ctx.Next() + } +} + +func (s *Server) requestLog() gin.HandlerFunc { + return func(ctx *gin.Context) { + start := time.Now() + + ctx.Next() + + s.logger.Infof( + "%s - %d - %s - %s - %s", + ctx.Request.Method, ctx.Writer.Status(), time.Since(start), + ctx.ClientIP(), ctx.Request.RequestURI, + ) + } +} diff --git a/myhttp/myhttp.go b/myhttp/myhttp.go new file mode 100644 index 0000000..cf18d69 --- /dev/null +++ b/myhttp/myhttp.go @@ -0,0 +1,5 @@ +package myhttp + +const ( + ServicePrefix = "http@" +) diff --git a/mylog/log.go b/mylog/log.go index 0d7d081..316901f 100644 --- a/mylog/log.go +++ b/mylog/log.go @@ -223,6 +223,16 @@ func GetLogger() *ZapLog { return globalLog } +func GetLoggerSkip(callSkip int) *ZapLog { + sg := globalLog.sugarLog.WithOptions( + zap.AddCallerSkip(callSkip), + ) + + return &ZapLog{ + sugarLog: sg, + } +} + func Flush() { globalLog.Sync() } diff --git a/mymysql/mysql.go b/mymysql/mysql.go index 0384f38..bdbdf40 100644 --- a/mymysql/mysql.go +++ b/mymysql/mysql.go @@ -41,7 +41,7 @@ var ( instanceMap = &sync.Map{} ) -func GetDb(name ...string) *MysqlDb { +func DB(name ...string) *MysqlDb { var instanceName string if len(name) > 0 { instanceName = name[0] diff --git a/myregistry/consul/builder.go b/myregistry/consul/builder.go index cfcbf86..b32a647 100644 --- a/myregistry/consul/builder.go +++ b/myregistry/consul/builder.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/jpillora/backoff" "google.golang.org/grpc/grpclog" + "log" "sort" "time" @@ -33,7 +34,7 @@ func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolv ctx, cancel := context.WithCancel(context.Background()) pipe := make(chan []string) go watchConsulService(ctx, cli.Health(), tgt, pipe) - go populateEndpoints(ctx, cc, pipe) + go populateEndpoints(ctx, cc, pipe, tgt) return &resolvr{cancelFunc: cancel}, nil } @@ -156,7 +157,7 @@ func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<- } } -func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []string) { +func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []string, tgt target) { for { select { case cc := <-input: @@ -168,7 +169,11 @@ func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, inpu for c := range connsSet { conns = append(conns, resolver.Address{Addr: c}) } + sort.Sort(byAddressString(conns)) // Don't replace the same address list in the balancer + + log.Printf("update conn: %s - %s", tgt.Service, conns) + err := clientConn.UpdateState(resolver.State{Addresses: conns}) if err != nil { grpclog.Errorf("[Consul resolver] Couldn't update client connection. error={%v}", err) diff --git a/myregistry/consul/consul.go b/myregistry/consul/consul.go index ac44409..9d35a4b 100644 --- a/myregistry/consul/consul.go +++ b/myregistry/consul/consul.go @@ -4,11 +4,14 @@ import ( "fmt" "git.makemake.in/kzkzzzz/mycommon/myconf" "git.makemake.in/kzkzzzz/mycommon/myregistry" - "github.com/google/uuid" - api "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/api/watch" + "github.com/hashicorp/go-hclog" + "github.com/rs/xid" "log" "net" "net/url" + "os" "time" ) @@ -18,6 +21,15 @@ type Consul struct { client *api.Client serviceIds map[string][]string serviceTags []string + services []*api.AgentServiceRegistration +} + +type Opt func(*Consul) + +func WithServiceTags(tags ...string) Opt { + return func(c *Consul) { + c.serviceTags = tags + } } func (c *Consul) Name() string { @@ -26,7 +38,13 @@ func (c *Consul) Name() string { func (c *Consul) Register(service *myregistry.ServiceInfo) error { // 健康检查 - serviceId := uuid.New().String() + serviceId := xid.New().String() + hostname, err := os.Hostname() + if err != nil { + log.Printf("get hostname err: %s", err) + } else { + serviceId = fmt.Sprintf("%s-%s", hostname, serviceId) + } c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId) @@ -34,13 +52,13 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error { CheckID: serviceId, TCP: fmt.Sprintf("%s:%d", service.Ip, service.Port), Timeout: "5s", // 超时时间 - Interval: "20s", // 运行检查的频率 + Interval: "30s", // 运行检查的频率 // 指定时间后自动注销不健康的服务节点 // 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。 - DeregisterCriticalServiceAfter: "5m", + DeregisterCriticalServiceAfter: "6m", Status: "passing", } - srv := &api.AgentServiceRegistration{ + svc := &api.AgentServiceRegistration{ ID: serviceId, // 服务唯一ID Name: service.ServiceName, // 服务名称 Tags: c.serviceTags, // 为服务打标签 @@ -49,7 +67,9 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error { Check: check, } - return c.client.Agent().ServiceRegister(srv) + c.services = append(c.services, svc) + + return c.client.Agent().ServiceRegister(svc) } func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { @@ -62,22 +82,23 @@ func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { return nil } -type Conf struct { - Addr string - Token string -} - -func MustNew(conf *myconf.Config) *Consul { - consul, err := New(conf) +func MustNew(conf *myconf.Config, opts ...Opt) *Consul { + consul, err := New(conf, opts...) if err != nil { panic(err) } return consul } -func New(conf *myconf.Config) (*Consul, error) { +func New(conf *myconf.Config, opts ...Opt) (*Consul, error) { + cfg := api.DefaultConfig() cfg.Address = conf.GetString("addr") + + if cfg.Address == "" { + return nil, fmt.Errorf("consul address is empty") + } + cfg.Transport.DialContext = (&net.Dialer{ Timeout: 3 * time.Second, KeepAlive: 20 * time.Second, @@ -99,27 +120,64 @@ func New(conf *myconf.Config) (*Consul, error) { if err != nil { return nil, err } + cl := &Consul{ client: client, serviceIds: make(map[string][]string), - serviceTags: make([]string, 0), + serviceTags: conf.GetStringSlice("serviceTags"), + services: make([]*api.AgentServiceRegistration, 0), } - if v := conf.GetStringSlice("serviceTags"); len(v) > 0 { - cl.serviceTags = v - } else { - cl.serviceTags = []string{} + for _, opt := range opts { + opt(cl) } + go cl.healthCheck() + return cl, nil } +func (c *Consul) healthCheck() { + wlog := newWatchLogger() + + wp, err := watch.Parse(map[string]any{ + "type": "services", + }) + if err != nil { + panic(fmt.Sprintf("parse watch err: %s", err)) + } + + wp.Handler = func(u uint64, raw any) { + if wlog.isWatchErr == true { + + for _, svc := range c.services { + //c.client.Agent().ServiceDeregister(svc.ID) + err := c.client.Agent().ServiceRegister(svc) + if err != nil { + log.Printf("retry register service err: %s: %s", svc.Name, err) + } else { + log.Printf("retry register service ok: %s", svc.Name) + } + } + + wlog.isWatchErr = false + } + //fmt.Println("watch", u, raw) + } + + err = wp.RunWithClientAndHclog(c.client, wlog) + if err != nil { + log.Printf("watch err: %s", err) + } + +} + func (c *Consul) Client() *api.Client { return c.client } func GrpcUrl(serviceName string, conf *myconf.Config) string { - return GrpcUrlWithTag("", serviceName, conf) + return GrpcUrlWithTag("", "grpc@"+serviceName, conf) } func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string { @@ -129,6 +187,10 @@ func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string Path: serviceName, } + if u.Host == "" { + panic("consul address is empty") + } + query := u.Query() query.Set("healthy", "true") @@ -151,3 +213,25 @@ func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string return u.String() } + +type watchLogger struct { + hclog.Logger + isWatchErr bool +} + +func newWatchLogger() *watchLogger { + return &watchLogger{Logger: hclog.New(&hclog.LoggerOptions{ + Name: "watch", + Output: os.Stdout, + })} +} + +func (l *watchLogger) Error(msg string, args ...interface{}) { + l.isWatchErr = true + log.Printf("is watch err: %s", msg) + l.Logger.Error(msg, args...) +} + +func (l *watchLogger) Named(name string) hclog.Logger { + return l +}