package httpsr import ( "context" "fmt" "git.makemake.in/kzkzzzz/mycommon" "git.makemake.in/kzkzzzz/mycommon/graceful" "git.makemake.in/kzkzzzz/mycommon/myconf" "git.makemake.in/kzkzzzz/mycommon/myhttp" "git.makemake.in/kzkzzzz/mycommon/mylog" "git.makemake.in/kzkzzzz/mycommon/myregistry" "github.com/gin-gonic/gin" "github.com/spf13/pflag" "log" "net" "net/http" "runtime/debug" "time" ) var _ graceful.IRunner = (*Server)(nil) type Conf struct { Addr string Port int Ip string Log bool ReadTimeoutMs int WriteTimeoutMs int IdleTimeoutMs int ShutdownTimeoutMs int } type Opt func(server *Server) type Server struct { serviceId string serviceName string serverConf *Conf reg myregistry.IRegister logger mylog.ILogger useDefaultBufferCfg bool delayStopMs int serviceRegInfo *myregistry.ServiceInfo hs *http.Server ginEngine *gin.Engine shutdownTimeout time.Duration } func WithRegistry(serviceName string, reg myregistry.IRegister) Opt { return func(server *Server) { server.serviceName = myhttp.ServicePrefix + serviceName server.reg = reg } } func WithDelayStopMs(v int) Opt { return func(server *Server) { server.delayStopMs = v } } func SetFlag() { pflag.Int("http.port", 0, "listen port, 0 is random port") pflag.String("http.log", "true", "enable request log") } func New(cfg *myconf.Config, opts ...Opt) *Server { cf := &Conf{} err := cfg.UnmarshalKey("http", cf) if err != nil { panic(err) } // 命令行的参数覆盖一次, Unmarshal解析的时候, 不会用命令行的参数覆盖 https://github.com/spf13/viper/issues/190 cf.Port = cfg.GetInt(fmt.Sprintf("http.port")) cf.Log = cfg.GetBool(fmt.Sprintf("http.log")) return NewByConf(cf, opts...) } func NewByConf(conf *Conf, opts ...Opt) *Server { s := &Server{ serverConf: conf, useDefaultBufferCfg: true, } for _, opt := range opts { opt(s) } if s.logger == nil { s.logger = mylog.GetLoggerSkip(-1) } if s.reg != nil && s.serviceName == "" { panic("service name is empty") } s.ginEngine = gin.New() if s.serverConf.Log { //s.unaryMiddlewares = append(s.unaryMiddlewares, s.requestLog()) s.ginEngine.Use(s.requestLog()) } s.ginEngine.Use(s.httpRecover()) s.ginEngine.GET("/health", func(ctx *gin.Context) { ctx.String(http.StatusOK, "ok") }) s.hs = &http.Server{ Handler: s.ginEngine, ReadTimeout: time.Second * 5, WriteTimeout: time.Second * 10, IdleTimeout: time.Second * 180, } s.shutdownTimeout = time.Millisecond * 200 if conf.ReadTimeoutMs > 0 { s.hs.ReadTimeout = time.Millisecond * time.Duration(conf.ReadTimeoutMs) } if conf.WriteTimeoutMs > 0 { s.hs.WriteTimeout = time.Millisecond * time.Duration(conf.WriteTimeoutMs) } if conf.IdleTimeoutMs > 0 { s.hs.IdleTimeout = time.Millisecond * time.Duration(conf.IdleTimeoutMs) } if conf.ShutdownTimeoutMs > 0 { s.shutdownTimeout = time.Millisecond * time.Duration(conf.ShutdownTimeoutMs) } return s } func (s *Server) Engine() *gin.Engine { return s.ginEngine } func (s *Server) initServer() { } func (s *Server) Run(ctx context.Context) error { s.initServer() // 端口如果=0, 监听随机端口 addr0 := fmt.Sprintf("%s:%d", s.serverConf.Addr, s.serverConf.Port) lis, err := net.Listen("tcp", addr0) if err != nil { return err } // 获取监听的端口 port := lis.Addr().(*net.TCPAddr).Port var svcIp = s.serverConf.Ip if svcIp == "" { svcIp = mycommon.GetOutboundIP() } // 注册服务 if s.reg != nil { s.serviceRegInfo = &myregistry.ServiceInfo{ ServiceName: s.serviceName, Ip: svcIp, Port: port, } err = s.reg.Register(s.serviceRegInfo) if err != nil { return err } } addr := fmt.Sprintf("%s:%d", s.serverConf.Addr, port) log.Printf("http server listen on %s", addr) s.hs.Addr = addr err = s.hs.Serve(lis) if err != nil { log.Printf("start http server err: %s", err) return err } return nil } func (s *Server) Stop() { if s.reg != nil { err := s.reg.Deregister(s.serviceRegInfo) if err != nil { s.logger.Errorf("http server deregister err: %s", err) } } // 如果使用k8s service, 关闭pod和往service注销ip是同时进行的, 如果退出服务比注销ip先完成, 可能有流量继续进来, 导致请求失败 // 延迟一段时间, 确保服务已经注销ip, 再关闭服务 // 如何使用注册中心, 先从中心退出ip, 也延迟一段时间, 等上游网关更新ip完成(正常不会太久), 不会有流量进来旧服务, 再退出服务 if s.delayStopMs > 0 { delayTime := time.Millisecond * time.Duration(s.delayStopMs) log.Printf("http server delay stop: %s", delayTime) time.Sleep(delayTime) } tCtx, tCancel := context.WithTimeout(context.Background(), s.shutdownTimeout) defer tCancel() s.hs.Shutdown(tCtx) log.Printf("http server stop") } func (s *Server) httpRecover() gin.HandlerFunc { return func(ctx *gin.Context) { defer func() { if err := recover(); err != nil { log.Printf("%s - panic: %v\n%s", ctx.Request.RequestURI, err, debug.Stack()) ctx.JSON(http.StatusOK, gin.H{ "code": 1, "message": fmt.Sprintf("server err: %s", err), }) } }() ctx.Next() } } func (s *Server) requestLog() gin.HandlerFunc { return func(ctx *gin.Context) { start := time.Now() ctx.Next() s.logger.Infof( "%s - %d - %s - %s - %s", ctx.Request.Method, ctx.Writer.Status(), time.Since(start), ctx.ClientIP(), ctx.Request.RequestURI, ) } }