package consul import ( "fmt" "git.makemake.in/kzkzzzz/mycommon/myconf" "git.makemake.in/kzkzzzz/mycommon/mygrpc" "git.makemake.in/kzkzzzz/mycommon/mylog" "git.makemake.in/kzkzzzz/mycommon/myregistry" "github.com/hashicorp/consul/api" "github.com/rs/xid" "go.uber.org/zap" "log" "net" "net/url" "os" "sync" "time" ) var ( _ myregistry.IRegister = (*Consul)(nil) defaultLog = mylog.NewLogger(&mylog.Config{ Level: mylog.DebugLevel, NeedLogFile: false, ConsoleWriter: os.Stdout, ZapOpt: []zap.Option{ zap.AddCaller(), zap.AddCallerSkip(1), }, }) ) type Consul struct { client *api.Client serviceTags []string services map[string]*api.AgentServiceRegistration deregisteredMap *sync.Map } type Opt func(*Consul) func WithServiceTags(tags ...string) Opt { return func(c *Consul) { c.serviceTags = tags } } func (c *Consul) Name() string { return "consul" } func (c *Consul) Register(service *myregistry.ServiceInfo) error { // 健康检查 serviceId := xid.New().String() hostname, err := os.Hostname() if err != nil { defaultLog.Errorf("get hostname err: %s", err) } else { serviceId = fmt.Sprintf("%s-%s", hostname, serviceId) } if _, ok := c.services[service.ServiceName]; ok { return fmt.Errorf("service [%s] already registered", service.ServiceName) } check := &api.AgentServiceCheck{ CheckID: serviceId, TCP: fmt.Sprintf("%s:%d", service.Ip, service.Port), Timeout: "5s", // 超时时间 Interval: "30s", // 运行检查的频率 // 指定时间后自动注销不健康的服务节点 // 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。 DeregisterCriticalServiceAfter: "6m", Status: api.HealthPassing, } regTags := make([]string, len(c.serviceTags)) copy(regTags, c.serviceTags) if v := service.Extend["tag"]; v != "" { regTags = append(regTags, v) } svc := &api.AgentServiceRegistration{ ID: serviceId, // 服务唯一ID Name: service.ServiceName, // 服务名称 Tags: regTags, // 为服务打标签 Address: service.Ip, Port: service.Port, Check: check, } c.services[service.ServiceName] = svc err = c.client.Agent().ServiceRegister(svc) if err != nil { defaultLog.Errorf("register service err: %s: %s", svc.Name, err) return err } //defaultLog.Infof("register service ok: %s", svc.Name) log.Printf("register service ok: %s", svc.Name) return nil } func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { c.deregisteredMap.Store(service.ServiceName, true) if svc, ok := c.services[service.ServiceName]; ok { err := c.client.Agent().ServiceDeregister(svc.ID) if err != nil { defaultLog.Errorf("deregister service %s err: %s", service, err) } } return nil } func MustNew(conf *myconf.Config, opts ...Opt) *Consul { consul, err := New(conf, opts...) if err != nil { panic(err) } return consul } func New(conf *myconf.Config, opts ...Opt) (*Consul, error) { client, err := NewClient(&ClientConfig{ Address: conf.GetString("addr"), Token: conf.GetString("token"), Username: conf.GetString("username"), Password: conf.GetString("password"), }) if err != nil { return nil, err } cl := &Consul{ client: client, serviceTags: conf.GetStringSlice("serviceTags"), services: make(map[string]*api.AgentServiceRegistration, 0), deregisteredMap: &sync.Map{}, } for _, opt := range opts { opt(cl) } go cl.healthCheck() return cl, nil } type ClientConfig struct { Address string Token string Username string Password string AutoGrpcPrefix bool } var ( clientLock = &sync.Mutex{} clientMap = make(map[string]*api.Client) ) func NewClient(clientCfg *ClientConfig) (*api.Client, error) { clientLock.Lock() defer clientLock.Unlock() if clientCfg.Address == "" { return nil, fmt.Errorf("consul address is empty") } if client, ok := clientMap[clientCfg.Address]; ok { return client, nil } cfg := api.DefaultConfig() cfg.Address = clientCfg.Address if cfg.Address == "" { return nil, fmt.Errorf("consul address is empty") } cfg.Transport.DialContext = (&net.Dialer{ Timeout: 3 * time.Second, KeepAlive: 20 * time.Second, }).DialContext cfg.Token = clientCfg.Token username := clientCfg.Username password := clientCfg.Password if username != "" && password != "" { cfg.HttpAuth = &api.HttpBasicAuth{ Username: username, Password: password, } } client, err := api.NewClient(cfg) if err != nil { return nil, err } clientMap[clientCfg.Address] = client return client, nil } func (c *Consul) healthCheck() { time.Sleep(time.Second * 5) mylog.Infof("start health check") var ( lastIndex uint64 isFirst = true err error meta *api.QueryMeta remoteServices map[string][]string ) for { if isFirst == false { time.Sleep(time.Second * 2) // 错误的情况重新注册一次 if err != nil { var isRegisterErr bool for _, svc := range c.services { err = c.retryRegister(svc, false) if err != nil { defaultLog.Errorf("retry register service %s err:: %s", svc.Name, err) isRegisterErr = true break } else { defaultLog.Infof("retry register service %s ok", svc.Name) } } if isRegisterErr { continue } } } isFirst = false remoteServices, meta, err = c.client.Catalog().Services(&api.QueryOptions{ WaitIndex: lastIndex, WaitTime: time.Second * 90, }) if err != nil { defaultLog.Errorf("health check err: %s", err) continue } lastIndex = meta.LastIndex //mylog.Debugf("%d - %+v\n", lastIndex, remoteServices) if isFirst == false { for _, localSvc := range c.services { if _, ok := remoteServices[localSvc.Name]; !ok { //c.retryRegister(localSvc, true) err = fmt.Errorf("need retry register service %s not exist [%+v]", localSvc.Name, remoteServices) } } } } } func (c *Consul) retryRegister(svc *api.AgentServiceRegistration, needDelay bool) error { if _, ok := c.deregisteredMap.Load(svc.Name); ok { return nil } if needDelay { go func() { time.Sleep(time.Second) err := c.client.Agent().ServiceRegister(svc) if err != nil { defaultLog.Errorf("delay retry register service %s err: %s", svc.Name, err) } else { defaultLog.Infof("delay retry register service %s ok", svc.Name) } }() return nil } return c.client.Agent().ServiceRegister(svc) } func (c *Consul) Client() *api.Client { return c.client } func GrpcUrl(serviceName string, conf *myconf.Config) string { return GrpcUrlWithTag("", serviceName, conf) } func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string { return GrpcUrlWithTagByConfig(tag, serviceName, &ClientConfig{ Address: conf.GetString("addr"), Token: conf.GetString("token"), Username: conf.GetString("username"), Password: conf.GetString("password"), AutoGrpcPrefix: true, }) } func GrpcUrlWithTagByConfig(tag string, serviceName string, conf *ClientConfig) string { if conf.AutoGrpcPrefix { serviceName = mygrpc.ServicePrefix + serviceName } u := &url.URL{ Scheme: schemeName, Host: conf.Address, Path: serviceName, } if u.Host == "" { panic("consul address is empty") } query := u.Query() query.Set("healthy", "true") if conf.Token != "" { query.Set("token", conf.Token) } if tag != "" { query.Set("tag", tag) } if conf.Username != "" && conf.Password != "" { u.User = url.UserPassword(conf.Username, conf.Password) } u.RawQuery = query.Encode() return u.String() }