238 lines
5.1 KiB
Go
238 lines
5.1 KiB
Go
package consul
|
||
|
||
import (
|
||
"fmt"
|
||
"git.makemake.in/kzkzzzz/mycommon/myconf"
|
||
"git.makemake.in/kzkzzzz/mycommon/myregistry"
|
||
"github.com/hashicorp/consul/api"
|
||
"github.com/hashicorp/consul/api/watch"
|
||
"github.com/hashicorp/go-hclog"
|
||
"github.com/rs/xid"
|
||
"log"
|
||
"net"
|
||
"net/url"
|
||
"os"
|
||
"time"
|
||
)
|
||
|
||
var _ myregistry.IRegister = (*Consul)(nil)
|
||
|
||
type Consul struct {
|
||
client *api.Client
|
||
serviceIds map[string][]string
|
||
serviceTags []string
|
||
services []*api.AgentServiceRegistration
|
||
}
|
||
|
||
type Opt func(*Consul)
|
||
|
||
func WithServiceTags(tags ...string) Opt {
|
||
return func(c *Consul) {
|
||
c.serviceTags = tags
|
||
}
|
||
}
|
||
|
||
func (c *Consul) Name() string {
|
||
return "consul"
|
||
}
|
||
|
||
func (c *Consul) Register(service *myregistry.ServiceInfo) error {
|
||
// 健康检查
|
||
serviceId := xid.New().String()
|
||
hostname, err := os.Hostname()
|
||
if err != nil {
|
||
log.Printf("get hostname err: %s", err)
|
||
} else {
|
||
serviceId = fmt.Sprintf("%s-%s", hostname, serviceId)
|
||
}
|
||
|
||
c.serviceIds[service.ServiceName] = append(c.serviceIds[service.ServiceName], serviceId)
|
||
|
||
check := &api.AgentServiceCheck{
|
||
CheckID: serviceId,
|
||
TCP: fmt.Sprintf("%s:%d", service.Ip, service.Port),
|
||
Timeout: "5s", // 超时时间
|
||
Interval: "30s", // 运行检查的频率
|
||
// 指定时间后自动注销不健康的服务节点
|
||
// 最小超时时间为1分钟,收获不健康服务的进程每30秒运行一次,因此触发注销的时间可能略长于配置的超时时间。
|
||
DeregisterCriticalServiceAfter: "6m",
|
||
Status: "passing",
|
||
}
|
||
svc := &api.AgentServiceRegistration{
|
||
ID: serviceId, // 服务唯一ID
|
||
Name: service.ServiceName, // 服务名称
|
||
Tags: c.serviceTags, // 为服务打标签
|
||
Address: service.Ip,
|
||
Port: service.Port,
|
||
Check: check,
|
||
}
|
||
|
||
c.services = append(c.services, svc)
|
||
|
||
return c.client.Agent().ServiceRegister(svc)
|
||
}
|
||
|
||
func (c *Consul) Deregister(service *myregistry.ServiceInfo) error {
|
||
for _, svcId := range c.serviceIds[service.ServiceName] {
|
||
err := c.client.Agent().ServiceDeregister(svcId)
|
||
if err != nil {
|
||
log.Printf("Failed to deregister service %s: %s\n", service, err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func MustNew(conf *myconf.Config, opts ...Opt) *Consul {
|
||
consul, err := New(conf, opts...)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
return consul
|
||
}
|
||
|
||
func New(conf *myconf.Config, opts ...Opt) (*Consul, error) {
|
||
|
||
cfg := api.DefaultConfig()
|
||
cfg.Address = conf.GetString("addr")
|
||
|
||
if cfg.Address == "" {
|
||
return nil, fmt.Errorf("consul address is empty")
|
||
}
|
||
|
||
cfg.Transport.DialContext = (&net.Dialer{
|
||
Timeout: 3 * time.Second,
|
||
KeepAlive: 20 * time.Second,
|
||
DualStack: true,
|
||
}).DialContext
|
||
cfg.Token = conf.GetString("token")
|
||
|
||
username := conf.GetString("username")
|
||
password := conf.GetString("password")
|
||
|
||
if username != "" && password != "" {
|
||
cfg.HttpAuth = &api.HttpBasicAuth{
|
||
Username: username,
|
||
Password: password,
|
||
}
|
||
}
|
||
|
||
client, err := api.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
cl := &Consul{
|
||
client: client,
|
||
serviceIds: make(map[string][]string),
|
||
serviceTags: conf.GetStringSlice("serviceTags"),
|
||
services: make([]*api.AgentServiceRegistration, 0),
|
||
}
|
||
|
||
for _, opt := range opts {
|
||
opt(cl)
|
||
}
|
||
|
||
go cl.healthCheck()
|
||
|
||
return cl, nil
|
||
}
|
||
|
||
func (c *Consul) healthCheck() {
|
||
wlog := newWatchLogger()
|
||
|
||
wp, err := watch.Parse(map[string]any{
|
||
"type": "services",
|
||
})
|
||
if err != nil {
|
||
panic(fmt.Sprintf("parse watch err: %s", err))
|
||
}
|
||
|
||
wp.Handler = func(u uint64, raw any) {
|
||
if wlog.isWatchErr == true {
|
||
|
||
for _, svc := range c.services {
|
||
//c.client.Agent().ServiceDeregister(svc.ID)
|
||
err := c.client.Agent().ServiceRegister(svc)
|
||
if err != nil {
|
||
log.Printf("retry register service err: %s: %s", svc.Name, err)
|
||
} else {
|
||
log.Printf("retry register service ok: %s", svc.Name)
|
||
}
|
||
}
|
||
|
||
wlog.isWatchErr = false
|
||
}
|
||
//fmt.Println("watch", u, raw)
|
||
}
|
||
|
||
err = wp.RunWithClientAndHclog(c.client, wlog)
|
||
if err != nil {
|
||
log.Printf("watch err: %s", err)
|
||
}
|
||
|
||
}
|
||
|
||
func (c *Consul) Client() *api.Client {
|
||
return c.client
|
||
}
|
||
|
||
func GrpcUrl(serviceName string, conf *myconf.Config) string {
|
||
return GrpcUrlWithTag("", "grpc@"+serviceName, conf)
|
||
}
|
||
|
||
func GrpcUrlWithTag(tag string, serviceName string, conf *myconf.Config) string {
|
||
u := &url.URL{
|
||
Scheme: schemeName,
|
||
Host: conf.GetString("addr"),
|
||
Path: serviceName,
|
||
}
|
||
|
||
if u.Host == "" {
|
||
panic("consul address is empty")
|
||
}
|
||
|
||
query := u.Query()
|
||
query.Set("healthy", "true")
|
||
|
||
if v := conf.GetString("token"); v != "" {
|
||
query.Set("token", v)
|
||
}
|
||
|
||
if tag != "" {
|
||
query.Set("tag", tag)
|
||
}
|
||
|
||
username := conf.GetString("username")
|
||
password := conf.GetString("password")
|
||
|
||
if username != "" && password != "" {
|
||
u.User = url.UserPassword(username, password)
|
||
}
|
||
|
||
u.RawQuery = query.Encode()
|
||
|
||
return u.String()
|
||
}
|
||
|
||
type watchLogger struct {
|
||
hclog.Logger
|
||
isWatchErr bool
|
||
}
|
||
|
||
func newWatchLogger() *watchLogger {
|
||
return &watchLogger{Logger: hclog.New(&hclog.LoggerOptions{
|
||
Name: "watch",
|
||
Output: os.Stdout,
|
||
})}
|
||
}
|
||
|
||
func (l *watchLogger) Error(msg string, args ...interface{}) {
|
||
l.isWatchErr = true
|
||
log.Printf("is watch err: %s", msg)
|
||
l.Logger.Error(msg, args...)
|
||
}
|
||
|
||
func (l *watchLogger) Named(name string) hclog.Logger {
|
||
return l
|
||
}
|