From 2b704cc700c227b3683166cfb7d53e451f577a9b Mon Sep 17 00:00:00 2001 From: kzkzzzz Date: Mon, 15 Dec 2025 22:37:55 +0800 Subject: [PATCH] update --- myregistry/consul/consul.go | 89 ++++++++++++++++++++++++++++--------- 1 file changed, 67 insertions(+), 22 deletions(-) diff --git a/myregistry/consul/consul.go b/myregistry/consul/consul.go index 8f9c12a..22c4efe 100644 --- a/myregistry/consul/consul.go +++ b/myregistry/consul/consul.go @@ -31,9 +31,10 @@ var ( type Consul struct { client *api.Client - serviceIds map[string][]string serviceTags []string - services []*api.AgentServiceRegistration + services map[string]*api.AgentServiceRegistration + + deregisteredMap *sync.Map } type Opt func(*Consul) @@ -58,7 +59,9 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error { serviceId = fmt.Sprintf("%s-%s", hostname, serviceId) } - c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId) + if _, ok := c.services[service.ServiceName]; ok { + return fmt.Errorf("service [%s] already registered", service.ServiceName) + } check := &api.AgentServiceCheck{ CheckID: serviceId, @@ -87,26 +90,29 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error { Check: check, } - c.services = append(c.services, svc) + c.services[service.ServiceName] = svc err = c.client.Agent().ServiceRegister(svc) if err != nil { - defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) + defaultLog.Errorf("register service err: %s: %s", svc.Name, err) return err } - defaultLog.Infof("retry register service ok: %s", svc.Name) + defaultLog.Infof("register service ok: %s", svc.Name) return nil } func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { - for _, svcId := range c.serviceIds[service.ServiceName] { - err := c.client.Agent().ServiceDeregister(svcId) + c.deregisteredMap.Store(service.ServiceName, true) + + if svc, ok := c.services[service.ServiceName]; ok { + err := c.client.Agent().ServiceDeregister(svc.ID) if err != nil { - defaultLog.Errorf("Failed to deregister service %s: %s\n", service, err) + defaultLog.Errorf("deregister service %s err: %s", service, err) } } + return nil } @@ -130,10 +136,10 @@ func New(conf *myconf.Config, opts ...Opt) (*Consul, error) { } cl := &Consul{ - client: client, - serviceIds: make(map[string][]string), - serviceTags: conf.GetStringSlice("serviceTags"), - services: make([]*api.AgentServiceRegistration, 0), + client: client, + serviceTags: conf.GetStringSlice("serviceTags"), + services: make(map[string]*api.AgentServiceRegistration, 0), + deregisteredMap: &sync.Map{}, } for _, opt := range opts { @@ -203,11 +209,15 @@ func NewClient(clientCfg *ClientConfig) (*api.Client, error) { } func (c *Consul) healthCheck() { + time.Sleep(time.Second * 5) + mylog.Infof("start health check") + var ( - lastIndex uint64 - isFirst = true - err error - meta *api.QueryMeta + lastIndex uint64 + isFirst = true + err error + meta *api.QueryMeta + remoteServices map[string][]string ) for { @@ -218,14 +228,15 @@ func (c *Consul) healthCheck() { if err != nil { var isRegisterErr bool for _, svc := range c.services { - err := c.client.Agent().ServiceRegister(svc) + err = c.retryRegister(svc, false) if err != nil { - defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) + defaultLog.Errorf("retry register service %s err:: %s", svc.Name, err) isRegisterErr = true break } else { - defaultLog.Infof("retry register service ok: %s", svc.Name) + defaultLog.Infof("retry register service %s ok", svc.Name) } + } if isRegisterErr { @@ -237,18 +248,52 @@ func (c *Consul) healthCheck() { isFirst = false - _, meta, err = c.client.Catalog().Nodes(&api.QueryOptions{ + remoteServices, meta, err = c.client.Catalog().Services(&api.QueryOptions{ WaitIndex: lastIndex, - //WaitTime: time.Second * 5, + WaitTime: time.Second * 90, }) if err != nil { defaultLog.Errorf("health check err: %s", err) continue } lastIndex = meta.LastIndex + + //mylog.Debugf("%d - %+v\n", lastIndex, remoteServices) + + if isFirst == false { + for _, localSvc := range c.services { + if _, ok := remoteServices[localSvc.Name]; !ok { + //c.retryRegister(localSvc, true) + err = fmt.Errorf("need retry register service %s not exist [%+v]", localSvc.Name, remoteServices) + } + } + } + } } +func (c *Consul) retryRegister(svc *api.AgentServiceRegistration, needDelay bool) error { + if _, ok := c.deregisteredMap.Load(svc.Name); ok { + return nil + } + + if needDelay { + go func() { + time.Sleep(time.Second) + err := c.client.Agent().ServiceRegister(svc) + if err != nil { + defaultLog.Errorf("delay retry register service %s err: %s", svc.Name, err) + } else { + defaultLog.Infof("delay retry register service %s ok", svc.Name) + } + }() + + return nil + } + + return c.client.Agent().ServiceRegister(svc) +} + func (c *Consul) Client() *api.Client { return c.client }