mycommon/myregistry/consul/resolver.go

133 lines
2.7 KiB
Go

package consul
import (
"fmt"
"github.com/hashicorp/consul/api"
"github.com/pkg/errors"
"google.golang.org/grpc/resolver"
"sort"
"strings"
"time"
)
const schemeName = "consul"
func init() {
resolver.Register(&builder{})
}
var _ resolver.Builder = (*builder)(nil)
type builder struct{}
func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
tgt, err := parseURL(url.URL.String())
if err != nil {
return nil, errors.Wrap(err, "Wrong consul URL")
}
client, err := NewClient(&ClientConfig{
Address: tgt.Addr,
Token: tgt.Token,
Username: tgt.User,
Password: tgt.Password,
})
if err != nil {
return nil, errors.Wrap(err, "Couldn't connect to the Consul API")
}
rl := &consulResolver{
client: client,
tgt: tgt,
cc: cc,
}
go rl.watchService()
return rl, nil
}
func (b *builder) Scheme() string {
return schemeName
}
var _ resolver.Resolver = (*consulResolver)(nil)
type consulResolver struct {
client *api.Client
tgt *target
cc resolver.ClientConn
}
func (c *consulResolver) watchService() {
var (
lastIndex uint64
isFirst = true
)
for {
if isFirst == false {
time.Sleep(time.Second * 2)
}
isFirst = false
endpoints, meta, err := c.client.Health().Service(c.tgt.Service, c.tgt.Tag,
true,
&api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: c.tgt.Wait,
Datacenter: c.tgt.Dc,
AllowStale: c.tgt.AllowStale,
RequireConsistent: c.tgt.RequireConsistent,
})
if err != nil {
defaultLog.Errorf("watch service err: %s", err)
continue
}
lastIndex = meta.LastIndex
addrs := make([]string, 0, len(endpoints))
state := resolver.State{
Addresses: make([]resolver.Address, 0, len(endpoints)),
}
for _, endpoint := range endpoints {
tmp := resolver.Address{
Addr: endpoint.Service.Address,
}
if tmp.Addr == "" {
continue
}
tmp.Addr = fmt.Sprintf("%s:%d", tmp.Addr, endpoint.Service.Port)
state.Addresses = append(state.Addresses, tmp)
addrs = append(addrs, tmp.Addr)
}
if len(state.Addresses) == 0 {
defaultLog.Warnf("%s services num == 0", c.tgt.String())
}
sort.SliceStable(state.Addresses, func(i, j int) bool {
return state.Addresses[i].Addr < state.Addresses[j].Addr
})
err = c.cc.UpdateState(state)
if err != nil {
defaultLog.Errorf("%s update service state err: %s", err, c.tgt.String())
} else {
defaultLog.Infof("%s update service num:%d (%s)", c.tgt.String(), len(addrs), strings.Join(addrs, ", "))
}
}
}
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {
}
func (c *consulResolver) Close() {
}