195 lines
5.2 KiB
Go
195 lines
5.2 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/jpillora/backoff"
|
|
"google.golang.org/grpc/grpclog"
|
|
"log"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/hashicorp/consul/api"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc/resolver"
|
|
)
|
|
|
|
// schemeName for the urls
|
|
// All target URLs like 'consul://.../...' will be resolved by this resolver
|
|
const schemeName = "consul"
|
|
|
|
// builder implements resolver.Builder and use for constructing all consul resolvers
|
|
type builder struct{}
|
|
|
|
func (b *builder) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
|
|
tgt, err := parseURL(url.URL.String())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Wrong consul URL")
|
|
}
|
|
cli, err := api.NewClient(tgt.consulConfig())
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "Couldn't connect to the Consul API")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
pipe := make(chan []string)
|
|
go watchConsulService(ctx, cli.Health(), tgt, pipe)
|
|
go populateEndpoints(ctx, cc, pipe, tgt)
|
|
|
|
return &resolvr{cancelFunc: cancel}, nil
|
|
}
|
|
|
|
// Scheme returns the scheme supported by this resolver.
|
|
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
|
|
func (b *builder) Scheme() string {
|
|
return schemeName
|
|
}
|
|
|
|
// init function needs for auto-register in resolvers registry
|
|
func init() {
|
|
resolver.Register(&builder{})
|
|
}
|
|
|
|
// resolvr implements resolver.Resolver from the gRPC package.
|
|
// It watches for endpoints changes and pushes them to the underlying gRPC connection.
|
|
type resolvr struct {
|
|
cancelFunc context.CancelFunc
|
|
}
|
|
|
|
// ResolveNow will be skipped due unnecessary in this case
|
|
func (r *resolvr) ResolveNow(resolver.ResolveNowOptions) {}
|
|
|
|
// Close closes the resolver.
|
|
func (r *resolvr) Close() {
|
|
r.cancelFunc()
|
|
}
|
|
|
|
//go:generate ./bin/moq -out mocks_test.go . servicer
|
|
type servicer interface {
|
|
Service(string, string, bool, *api.QueryOptions) ([]*api.ServiceEntry, *api.QueryMeta, error)
|
|
}
|
|
|
|
func watchConsulService(ctx context.Context, s servicer, tgt target, out chan<- []string) {
|
|
res := make(chan []string)
|
|
quit := make(chan struct{})
|
|
bck := &backoff.Backoff{
|
|
Factor: 2,
|
|
Jitter: true,
|
|
Min: 10 * time.Millisecond,
|
|
Max: tgt.MaxBackoff,
|
|
}
|
|
go func() {
|
|
var lastIndex uint64
|
|
for {
|
|
ss, meta, err := s.Service(
|
|
tgt.Service,
|
|
tgt.Tag,
|
|
tgt.Healthy,
|
|
&api.QueryOptions{
|
|
WaitIndex: lastIndex,
|
|
Near: tgt.Near,
|
|
WaitTime: tgt.Wait,
|
|
Datacenter: tgt.Dc,
|
|
AllowStale: tgt.AllowStale,
|
|
RequireConsistent: tgt.RequireConsistent,
|
|
},
|
|
)
|
|
if err != nil {
|
|
// No need to continue if the context is done/cancelled.
|
|
// We check that here directly because the check for the closed quit channel
|
|
// at the end of the loop is not reached when calling continue here.
|
|
select {
|
|
case <-quit:
|
|
return
|
|
default:
|
|
grpclog.Errorf("[Consul resolver] Couldn't fetch endpoints. target={%s}; error={%v}", tgt.String(), err)
|
|
time.Sleep(bck.Duration())
|
|
continue
|
|
}
|
|
}
|
|
bck.Reset()
|
|
lastIndex = meta.LastIndex
|
|
grpclog.Infof("[Consul resolver] %d endpoints fetched in(+wait) %s for target={%s}",
|
|
len(ss),
|
|
meta.RequestTime,
|
|
tgt.String(),
|
|
)
|
|
|
|
ee := make([]string, 0, len(ss))
|
|
for _, s := range ss {
|
|
address := s.Service.Address
|
|
if s.Service.Address == "" {
|
|
address = s.Node.Address
|
|
}
|
|
ee = append(ee, fmt.Sprintf("%s:%d", address, s.Service.Port))
|
|
}
|
|
|
|
if tgt.Limit != 0 && len(ee) > tgt.Limit {
|
|
ee = ee[:tgt.Limit]
|
|
}
|
|
select {
|
|
case res <- ee:
|
|
continue
|
|
case <-quit:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
// If in the below select both channels have values that can be read,
|
|
// Go picks one pseudo-randomly.
|
|
// But when the context is canceled we want to act upon it immediately.
|
|
if ctx.Err() != nil {
|
|
// Close quit so the goroutine returns and doesn't leak.
|
|
// Do NOT close res because that can lead to panics in the goroutine.
|
|
// res will be garbage collected at some point.
|
|
close(quit)
|
|
return
|
|
}
|
|
select {
|
|
case ee := <-res:
|
|
out <- ee
|
|
case <-ctx.Done():
|
|
close(quit)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []string, tgt target) {
|
|
for {
|
|
select {
|
|
case cc := <-input:
|
|
connsSet := make(map[string]struct{}, len(cc))
|
|
for _, c := range cc {
|
|
connsSet[c] = struct{}{}
|
|
}
|
|
conns := make([]resolver.Address, 0, len(connsSet))
|
|
for c := range connsSet {
|
|
conns = append(conns, resolver.Address{Addr: c})
|
|
}
|
|
|
|
sort.Sort(byAddressString(conns)) // Don't replace the same address list in the balancer
|
|
|
|
log.Printf("update conn: %s - %s", tgt.Service, conns)
|
|
|
|
err := clientConn.UpdateState(resolver.State{Addresses: conns})
|
|
if err != nil {
|
|
grpclog.Errorf("[Consul resolver] Couldn't update client connection. error={%v}", err)
|
|
continue
|
|
}
|
|
case <-ctx.Done():
|
|
grpclog.Info("[Consul resolver] Watch has been finished")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// byAddressString sorts resolver.Address by Address Field sorting in increasing order.
|
|
type byAddressString []resolver.Address
|
|
|
|
func (p byAddressString) Len() int { return len(p) }
|
|
func (p byAddressString) Less(i, j int) bool { return p[i].Addr < p[j].Addr }
|
|
func (p byAddressString) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|