133 lines
2.7 KiB
Go
133 lines
2.7 KiB
Go
package consul
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc/resolver"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const schemeName = "consul"
|
|
|
|
func init() {
|
|
resolver.Register(&builder{})
|
|
}
|
|
|
|
var _ resolver.Builder = (*builder)(nil)
|
|
|
|
type builder struct{}
|
|
|
|
func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
|
tgt, err := parseURL(url.URL.String())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Wrong consul URL")
|
|
}
|
|
client, err := NewClient(&ClientConfig{
|
|
Address: tgt.Addr,
|
|
Token: tgt.Token,
|
|
Username: tgt.User,
|
|
Password: tgt.Password,
|
|
})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Couldn't connect to the Consul API")
|
|
}
|
|
|
|
rl := &consulResolver{
|
|
client: client,
|
|
tgt: tgt,
|
|
cc: cc,
|
|
}
|
|
|
|
go rl.watchService()
|
|
return rl, nil
|
|
}
|
|
|
|
func (b *builder) Scheme() string {
|
|
return schemeName
|
|
}
|
|
|
|
var _ resolver.Resolver = (*consulResolver)(nil)
|
|
|
|
type consulResolver struct {
|
|
client *api.Client
|
|
tgt *target
|
|
cc resolver.ClientConn
|
|
}
|
|
|
|
func (c *consulResolver) watchService() {
|
|
var (
|
|
lastIndex uint64
|
|
isFirst = true
|
|
)
|
|
|
|
for {
|
|
if isFirst == false {
|
|
time.Sleep(time.Second * 2)
|
|
}
|
|
isFirst = false
|
|
|
|
endpoints, meta, err := c.client.Health().Service(c.tgt.Service, c.tgt.Tag,
|
|
true,
|
|
&api.QueryOptions{
|
|
WaitIndex: lastIndex,
|
|
WaitTime: c.tgt.Wait,
|
|
Datacenter: c.tgt.Dc,
|
|
AllowStale: c.tgt.AllowStale,
|
|
RequireConsistent: c.tgt.RequireConsistent,
|
|
})
|
|
if err != nil {
|
|
defaultLog.Errorf("watch service err: %s", err)
|
|
continue
|
|
}
|
|
lastIndex = meta.LastIndex
|
|
|
|
addrs := make([]string, 0, len(endpoints))
|
|
|
|
state := resolver.State{
|
|
Addresses: make([]resolver.Address, 0, len(endpoints)),
|
|
}
|
|
for _, endpoint := range endpoints {
|
|
tmp := resolver.Address{
|
|
Addr: endpoint.Service.Address,
|
|
}
|
|
|
|
if tmp.Addr == "" {
|
|
continue
|
|
}
|
|
|
|
tmp.Addr = fmt.Sprintf("%s:%d", tmp.Addr, endpoint.Service.Port)
|
|
|
|
state.Addresses = append(state.Addresses, tmp)
|
|
addrs = append(addrs, tmp.Addr)
|
|
}
|
|
|
|
if len(state.Addresses) == 0 {
|
|
defaultLog.Warnf("%s services num == 0", c.tgt.String())
|
|
}
|
|
|
|
sort.SliceStable(state.Addresses, func(i, j int) bool {
|
|
return state.Addresses[i].Addr < state.Addresses[j].Addr
|
|
})
|
|
|
|
err = c.cc.UpdateState(state)
|
|
if err != nil {
|
|
defaultLog.Errorf("%s update service state err: %s", err, c.tgt.String())
|
|
} else {
|
|
defaultLog.Infof("%s update service num:%d (%s)", c.tgt.String(), len(addrs), strings.Join(addrs, ", "))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
func (c *consulResolver) ResolveNow(options resolver.ResolveNowOptions) {
|
|
}
|
|
|
|
func (c *consulResolver) Close() {
|
|
|
|
}
|