package consul import ( "fmt" "git.makemake.in/kzkzzzz/mycommon/myconf" "git.makemake.in/kzkzzzz/mycommon/mygrpc" "git.makemake.in/kzkzzzz/mycommon/mylog" "git.makemake.in/kzkzzzz/mycommon/myregistry" "github.com/hashicorp/consul/api" "github.com/rs/xid" "go.uber.org/zap" "net" "net/url" "os" "sync" "time" ) var ( _ myregistry.IRegister = (*Consul)(nil) defaultLog = mylog.NewLogger(&mylog.Config{ Level: mylog.DebugLevel, NeedLogFile: false, ConsoleWriter: os.Stdout, ZapOpt: []zap.Option{ zap.AddCaller(), zap.AddCallerSkip(1), }, }) ) type Consul struct { client *api.Client serviceIds map[string][]string serviceTags []string services []*api.AgentServiceRegistration } type Opt func(*Consul) func WithServiceTags(tags ...string) Opt { return func(c *Consul) { c.serviceTags = tags } } func (c *Consul) Name() string { return "consul" } func (c *Consul) Register(service *myregistry.ServiceInfo) error { // 健康检查 serviceId := xid.New().String() hostname, err := os.Hostname() if err != nil { defaultLog.Errorf("get hostname err: %s", err) } else { serviceId = fmt.Sprintf("%s-%s", hostname, serviceId) } c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId) check := &api.AgentServiceCheck{ CheckID: serviceId, TCP: fmt.Sprintf("%s:%d", service.Ip, service.Port), Timeout: "5s", // 超时时间 Interval: "30s", // 运行检查的频率 // 指定时间后自动注销不健康的服务节点 // 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。 DeregisterCriticalServiceAfter: "6m", Status: api.HealthPassing, } regTags := make([]string, len(c.serviceTags)) copy(regTags, c.serviceTags) if v := service.Extend["tag"]; v != "" { regTags = append(regTags, v) } svc := &api.AgentServiceRegistration{ ID: serviceId, // 服务唯一ID Name: service.ServiceName, // 服务名称 Tags: regTags, // 为服务打标签 Address: service.Ip, Port: service.Port, Check: check, } c.services = append(c.services, svc) err = c.client.Agent().ServiceRegister(svc) if err != nil { defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) return err } defaultLog.Infof("retry register service ok: %s", svc.Name) return nil } func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { for _, svcId := range c.serviceIds[service.ServiceName] { err := c.client.Agent().ServiceDeregister(svcId) if err != nil { defaultLog.Errorf("Failed to deregister service %s: %s\n", service, err) } } return nil } func MustNew(conf *myconf.Config, opts ...Opt) *Consul { consul, err := New(conf, opts...) if err != nil { panic(err) } return consul } func New(conf *myconf.Config, opts ...Opt) (*Consul, error) { client, err := NewClient(&ClientConfig{ Address: conf.GetString("addr"), Token: conf.GetString("token"), Username: conf.GetString("username"), Password: conf.GetString("password"), }) if err != nil { return nil, err } cl := &Consul{ client: client, serviceIds: make(map[string][]string), serviceTags: conf.GetStringSlice("serviceTags"), services: make([]*api.AgentServiceRegistration, 0), } for _, opt := range opts { opt(cl) } go cl.healthCheck() return cl, nil } type ClientConfig struct { Address string Token string Username string Password string AutoGrpcPrefix bool } var ( clientLock = &sync.Mutex{} clientMap = make(map[string]*api.Client) ) func NewClient(clientCfg *ClientConfig) (*api.Client, error) { clientLock.Lock() defer clientLock.Unlock() if clientCfg.Address == "" { return nil, fmt.Errorf("consul address is empty") } if client, ok := clientMap[clientCfg.Address]; ok { return client, nil } cfg := api.DefaultConfig() cfg.Address = clientCfg.Address if cfg.Address == "" { return nil, fmt.Errorf("consul address is empty") } cfg.Transport.DialContext = (&net.Dialer{ Timeout: 3 * time.Second, KeepAlive: 20 * time.Second, }).DialContext cfg.Token = clientCfg.Token username := clientCfg.Username password := clientCfg.Password if username != "" && password != "" { cfg.HttpAuth = &api.HttpBasicAuth{ Username: username, Password: password, } } client, err := api.NewClient(cfg) if err != nil { return nil, err } clientMap[clientCfg.Address] = client return client, nil } func (c *Consul) healthCheck() { var ( lastIndex uint64 isFirst = true err error meta *api.QueryMeta ) for { if isFirst == false { time.Sleep(time.Second * 2) // 错误的情况重新注册一次 if err != nil { var isRegisterErr bool for _, svc := range c.services { err := c.client.Agent().ServiceRegister(svc) if err != nil { defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) isRegisterErr = true break } else { defaultLog.Infof("retry register service ok: %s", svc.Name) } } if isRegisterErr { continue } } } isFirst = false _, meta, err = c.client.Catalog().Nodes(&api.QueryOptions{ WaitIndex: lastIndex, //WaitTime: time.Second * 5, }) if err != nil { defaultLog.Errorf("health check err: %s", err) continue } lastIndex = meta.LastIndex } } func (c *Consul) Client() *api.Client { return c.client } func GrpcUrl(serviceName string, conf *myconf.Config) string { return GrpcUrlWithTag("", serviceName, conf) } func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string { return GrpcUrlWithTagByConfig(tag, serviceName, &ClientConfig{ Address: conf.GetString("addr"), Token: conf.GetString("token"), Username: conf.GetString("username"), Password: conf.GetString("password"), AutoGrpcPrefix: true, }) } func GrpcUrlWithTagByConfig(tag string, serviceName string, conf *ClientConfig) string { if conf.AutoGrpcPrefix { serviceName = mygrpc.ServicePrefix + serviceName } u := &url.URL{ Scheme: schemeName, Host: conf.Address, Path: serviceName, } if u.Host == "" { panic("consul address is empty") } query := u.Query() query.Set("healthy", "true") if conf.Token != "" { query.Set("token", conf.Token) } if tag != "" { query.Set("tag", tag) } if conf.Username != "" && conf.Password != "" { u.User = url.UserPassword(conf.Username, conf.Password) } u.RawQuery = query.Encode() return u.String() }