main
kzkzzzz 2025-12-15 22:37:55 +08:00
parent 4553fc8466
commit 2b704cc700
1 changed files with 67 additions and 22 deletions

View File

@ -31,9 +31,10 @@ var (
type Consul struct { type Consul struct {
client *api.Client client *api.Client
serviceIds map[string][]string
serviceTags []string serviceTags []string
services []*api.AgentServiceRegistration services map[string]*api.AgentServiceRegistration
deregisteredMap *sync.Map
} }
type Opt func(*Consul) type Opt func(*Consul)
@ -58,7 +59,9 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error {
serviceId = fmt.Sprintf("%s-%s", hostname, serviceId) serviceId = fmt.Sprintf("%s-%s", hostname, serviceId)
} }
c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId) if _, ok := c.services[service.ServiceName]; ok {
return fmt.Errorf("service [%s] already registered", service.ServiceName)
}
check := &api.AgentServiceCheck{ check := &api.AgentServiceCheck{
CheckID: serviceId, CheckID: serviceId,
@ -87,26 +90,29 @@ func (c *Consul) Register(service *myregistry.ServiceInfo) error {
Check: check, Check: check,
} }
c.services = append(c.services, svc) c.services[service.ServiceName] = svc
err = c.client.Agent().ServiceRegister(svc) err = c.client.Agent().ServiceRegister(svc)
if err != nil { if err != nil {
defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) defaultLog.Errorf("register service err: %s: %s", svc.Name, err)
return err return err
} }
defaultLog.Infof("retry register service ok: %s", svc.Name) defaultLog.Infof("register service ok: %s", svc.Name)
return nil return nil
} }
func (c *Consul) Deregister(service *myregistry.ServiceInfo) error { func (c *Consul) Deregister(service *myregistry.ServiceInfo) error {
for _, svcId := range c.serviceIds[service.ServiceName] { c.deregisteredMap.Store(service.ServiceName, true)
err := c.client.Agent().ServiceDeregister(svcId)
if svc, ok := c.services[service.ServiceName]; ok {
err := c.client.Agent().ServiceDeregister(svc.ID)
if err != nil { if err != nil {
defaultLog.Errorf("Failed to deregister service %s: %s\n", service, err) defaultLog.Errorf("deregister service %s err: %s", service, err)
} }
} }
return nil return nil
} }
@ -131,9 +137,9 @@ func New(conf *myconf.Config, opts ...Opt) (*Consul, error) {
cl := &Consul{ cl := &Consul{
client: client, client: client,
serviceIds: make(map[string][]string),
serviceTags: conf.GetStringSlice("serviceTags"), serviceTags: conf.GetStringSlice("serviceTags"),
services: make([]*api.AgentServiceRegistration, 0), services: make(map[string]*api.AgentServiceRegistration, 0),
deregisteredMap: &sync.Map{},
} }
for _, opt := range opts { for _, opt := range opts {
@ -203,11 +209,15 @@ func NewClient(clientCfg *ClientConfig) (*api.Client, error) {
} }
func (c *Consul) healthCheck() { func (c *Consul) healthCheck() {
time.Sleep(time.Second * 5)
mylog.Infof("start health check")
var ( var (
lastIndex uint64 lastIndex uint64
isFirst = true isFirst = true
err error err error
meta *api.QueryMeta meta *api.QueryMeta
remoteServices map[string][]string
) )
for { for {
@ -218,14 +228,15 @@ func (c *Consul) healthCheck() {
if err != nil { if err != nil {
var isRegisterErr bool var isRegisterErr bool
for _, svc := range c.services { for _, svc := range c.services {
err := c.client.Agent().ServiceRegister(svc) err = c.retryRegister(svc, false)
if err != nil { if err != nil {
defaultLog.Errorf("retry register service err: %s: %s", svc.Name, err) defaultLog.Errorf("retry register service %s err:: %s", svc.Name, err)
isRegisterErr = true isRegisterErr = true
break break
} else { } else {
defaultLog.Infof("retry register service ok: %s", svc.Name) defaultLog.Infof("retry register service %s ok", svc.Name)
} }
} }
if isRegisterErr { if isRegisterErr {
@ -237,16 +248,50 @@ func (c *Consul) healthCheck() {
isFirst = false isFirst = false
_, meta, err = c.client.Catalog().Nodes(&api.QueryOptions{ remoteServices, meta, err = c.client.Catalog().Services(&api.QueryOptions{
WaitIndex: lastIndex, WaitIndex: lastIndex,
//WaitTime: time.Second * 5, WaitTime: time.Second * 90,
}) })
if err != nil { if err != nil {
defaultLog.Errorf("health check err: %s", err) defaultLog.Errorf("health check err: %s", err)
continue continue
} }
lastIndex = meta.LastIndex lastIndex = meta.LastIndex
//mylog.Debugf("%d - %+v\n", lastIndex, remoteServices)
if isFirst == false {
for _, localSvc := range c.services {
if _, ok := remoteServices[localSvc.Name]; !ok {
//c.retryRegister(localSvc, true)
err = fmt.Errorf("need retry register service %s not exist [%+v]", localSvc.Name, remoteServices)
} }
}
}
}
}
func (c *Consul) retryRegister(svc *api.AgentServiceRegistration, needDelay bool) error {
if _, ok := c.deregisteredMap.Load(svc.Name); ok {
return nil
}
if needDelay {
go func() {
time.Sleep(time.Second)
err := c.client.Agent().ServiceRegister(svc)
if err != nil {
defaultLog.Errorf("delay retry register service %s err: %s", svc.Name, err)
} else {
defaultLog.Infof("delay retry register service %s ok", svc.Name)
}
}()
return nil
}
return c.client.Agent().ServiceRegister(svc)
} }
func (c *Consul) Client() *api.Client { func (c *Consul) Client() *api.Client {