package consul import ( "fmt" "github.com/hashicorp/consul/api" "github.com/pkg/errors" "google.golang.org/grpc/resolver" "sort" "strings" "time" ) const schemeName = "consul" func init() { resolver.Register(&builder{}) } var _ resolver.Builder = (*builder)(nil) type builder struct{} func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { tgt, err := parseURL(url.URL.String()) if err != nil { return nil, errors.Wrap(err, "Wrong consul URL") } client, err := NewClient(&ClientConfig{ Address: tgt.Addr, Token: tgt.Token, Username: tgt.User, Password: tgt.Password, }) if err != nil { return nil, errors.Wrap(err, "Couldn't connect to the Consul API") } rl := &consulResolver{ client: client, tgt: tgt, cc: cc, } go rl.watchService() return rl, nil } func (b *builder) Scheme() string { return schemeName } var _ resolver.Resolver = (*consulResolver)(nil) type consulResolver struct { client *api.Client tgt *target cc resolver.ClientConn } func (c *consulResolver) watchService() { var ( lastIndex uint64 isFirst = true ) for { if isFirst == false { time.Sleep(time.Second * 2) } isFirst = false endpoints, meta, err := c.client.Health().Service(c.tgt.Service, c.tgt.Tag, true, &api.QueryOptions{ WaitIndex: lastIndex, WaitTime: c.tgt.Wait, Datacenter: c.tgt.Dc, AllowStale: c.tgt.AllowStale, RequireConsistent: c.tgt.RequireConsistent, }) if err != nil { defaultLog.Errorf("watch service err: %s", err) continue } lastIndex = meta.LastIndex addrs := make([]string, 0, len(endpoints)) state := resolver.State{ Addresses: make([]resolver.Address, 0, len(endpoints)), } for _, endpoint := range endpoints { tmp := resolver.Address{ Addr: endpoint.Service.Address, } if tmp.Addr == "" { continue } tmp.Addr = fmt.Sprintf("%s:%d", tmp.Addr, endpoint.Service.Port) state.Addresses = append(state.Addresses, tmp) addrs = append(addrs, tmp.Addr) } if len(state.Addresses) == 0 { defaultLog.Warnf("%s services num == 0", c.tgt.String()) } sort.SliceStable(state.Addresses, func(i, j int) bool { return state.Addresses[i].Addr < state.Addresses[j].Addr }) err = c.cc.UpdateState(state) if err != nil { defaultLog.Errorf("%s update service state err: %s", err, c.tgt.String()) } else { defaultLog.Infof("%s update service num:%d (%s)", c.tgt.String(), len(addrs), strings.Join(addrs, ", ")) } } } func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) { } func (c *consulResolver) Close() { }