From 9093bf2b7907115942ff5f533da1b6b44c650c9a Mon Sep 17 00:00:00 2001 From: lzf Date: Tue, 22 Jul 2025 17:39:34 +0800 Subject: [PATCH 1/5] update --- go.mod | 13 +- go.sum | 12 ++ myhttp/httpsr/middleware.go | 28 ++++ mymetric/prometheus.go | 97 +++++++++++ mymysql/batchwriter.go | 312 ++++++++++++++++++++++++++++++++++++ 5 files changed, 459 insertions(+), 3 deletions(-) create mode 100644 myhttp/httpsr/middleware.go create mode 100644 mymetric/prometheus.go create mode 100644 mymysql/batchwriter.go diff --git a/go.mod b/go.mod index 982fad9..0f94168 100644 --- a/go.mod +++ b/go.mod @@ -10,9 +10,12 @@ require ( github.com/go-playground/universal-translator v0.18.1 github.com/go-playground/validator/v10 v10.25.0 github.com/go-sql-driver/mysql v1.7.0 + github.com/goccy/go-json v0.10.5 github.com/google/uuid v1.6.0 github.com/hashicorp/consul/api v1.28.2 + github.com/hashicorp/go-hclog v1.5.0 github.com/jpillora/backoff v1.0.0 + github.com/json-iterator/go v1.1.12 github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/toml v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -21,6 +24,7 @@ require ( github.com/knadh/koanf/providers/posflag v0.1.0 github.com/knadh/koanf/v2 v2.1.2 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.4.0 github.com/redis/go-redis/v9 v9.7.3 github.com/rs/xid v1.6.0 github.com/spf13/cast v1.6.0 @@ -36,6 +40,7 @@ require ( require ( github.com/armon/go-metrics v0.4.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.13.2 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -47,10 +52,9 @@ require ( github.com/gabriel-vasile/mimetype v1.4.8 // indirect github.com/gin-contrib/sse v1.0.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect - github.com/goccy/go-json v0.10.5 // indirect + github.com/golang/protobuf v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect - github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect @@ -60,13 +64,13 @@ require ( github.com/hashicorp/serf v0.10.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -76,6 +80,9 @@ require ( github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.2.0 // indirect + github.com/prometheus/common v0.9.1 // indirect + github.com/prometheus/procfs v0.0.8 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect diff --git a/go.sum b/go.sum index 7f574d5..f6c2f11 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,7 @@ github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLj github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= @@ -80,11 +81,15 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4= github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -195,6 +200,7 @@ github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27k github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY= @@ -235,14 +241,18 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.4.0 h1:YVIb/fVcOTMSqtqZWSKnHpSLBxu8DKgxq8z6RuBZwqI= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X7U= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.0.8 h1:+fpWZdT24pJBiqJdAwYBjPSk+5YmQzYNPYzQsdzLkt8= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= @@ -364,6 +374,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/myhttp/httpsr/middleware.go b/myhttp/httpsr/middleware.go new file mode 100644 index 0000000..0b332e4 --- /dev/null +++ b/myhttp/httpsr/middleware.go @@ -0,0 +1,28 @@ +package httpsr + +import ( + "git.makemake.in/kzkzzzz/mycommon/mymetric" + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "time" +) + +func QPSCollect(svcName string) gin.HandlerFunc { + hs := mymetric.NewQPSHistogram(svcName, []string{ + "svc", "method", "route", "from", + }...) + + return func(ctx *gin.Context) { + st := time.Now() + + ctx.Next() + + hs.With(prometheus.Labels{ + "svc": svcName, + "method": ctx.Request.Method, + "route": ctx.Request.URL.Path, + "form": "", + }).Observe(float64(time.Since(st).Milliseconds())) + + } +} diff --git a/mymetric/prometheus.go b/mymetric/prometheus.go new file mode 100644 index 0000000..04e85b7 --- /dev/null +++ b/mymetric/prometheus.go @@ -0,0 +1,97 @@ +package mymetric + +import ( + "github.com/gin-gonic/gin" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "log" + "time" +) + +type ( + HistogramVec struct { + *prometheus.HistogramVec + Labels []string + } + CounterVec struct { + *prometheus.CounterVec + Labels []string + } +) + +func NewQPSHistogram(name string, labels ...string) *HistogramVec { + name = HistogramKey(name) + v := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: name, + Buckets: []float64{5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 95, 100, 200, 300, 500, 600, 700, 800, 900, 1000, 1500, 2000, 5000}, // 统计区间 单位毫秒 + + }, labels) + + wrapMetric := &HistogramVec{ + HistogramVec: v, + Labels: labels, + } + + RegisterPrometheus(wrapMetric) + + return wrapMetric +} + +func NewHistogram(name string, buckets []float64, labels ...string) *HistogramVec { + name = HistogramKey(name) + v := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: name, + Buckets: buckets, + }, labels) + + wrapMetric := &HistogramVec{ + HistogramVec: v, + Labels: labels, + } + + RegisterPrometheus(wrapMetric) + return wrapMetric +} + +func NewCounter(name string, labels ...string) *CounterVec { + name = CounterKey(name) + v := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: name, + }, labels) + + wrapMetric := &CounterVec{ + CounterVec: v, + Labels: labels, + } + + RegisterPrometheus(wrapMetric) + return wrapMetric +} + +const ( + MetricsRoute = "/metrics" +) + +// 监控指标路由 +func GinExport(engine *gin.Engine) { + engine.GET(MetricsRoute, gin.WrapH(promhttp.HandlerFor( + prometheus.DefaultGatherer, + promhttp.HandlerOpts{Timeout: time.Second * 3}, + ))) +} + +func HistogramKey(name string) string { + return name + "_h" +} + +func CounterKey(name string) string { + return name + "_c" +} + +func RegisterPrometheus(c prometheus.Collector) { + err := prometheus.Register(c) + if err != nil { + log.Printf("register err: %s", err) + } + +} diff --git a/mymysql/batchwriter.go b/mymysql/batchwriter.go new file mode 100644 index 0000000..aaa774b --- /dev/null +++ b/mymysql/batchwriter.go @@ -0,0 +1,312 @@ +package mymysql + +import ( + "context" + "git.makemake.in/kzkzzzz/mycommon" + "git.makemake.in/kzkzzzz/mycommon/mylog" + "github.com/goccy/go-json" + "github.com/rs/xid" + "gorm.io/gorm/clause" + "log" + "sync" + "time" +) + +const ( + defaultDataBuffer = 1e5 // channel缓冲区 + defaultBatchSize = 200 // 多少条数据写一次 + defaultIntervalTime = time.Second * 2 // 多久时间写一次 + defaultJobNum = 2 // 写入db 任务数量 + defaultAsyncWorkerNum = 20 // 异步执行写入事件的最大协程数量 +) + +type iWriterStop interface { + StopWriter() +} + +var ( + writerJobMap = &sync.Map{} +) + +type ( + batchData[T any] struct { + jobIndex int + dataList []T + } +) + +type BatchWriterConfig struct { + channelBuffer int + batchSize int + batchInterval time.Duration + jobNum int + asyncWorkerNum int + duplicateUpdate *clause.OnConflict + debug bool +} + +type BatchWriter[T any] struct { + db *MysqlDb + tableName string + jobName string + + uniqueId string + config *BatchWriterConfig + dataChan chan T + ctx context.Context + cancel context.CancelFunc + stopChan chan struct{} + + asyncWorkerLimitChan chan struct{} + asyncWorkerWg *sync.WaitGroup +} + +type BatchWriterOpt func(c *BatchWriterConfig) + +func WithWriteJobNum(v int) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.jobNum = v + } +} + +func WithWriteChannelBuffer(v int) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.channelBuffer = v + } +} + +func WithWriteBatchSize(v int) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.batchSize = v + } +} + +func WithWriteIntervalTime(v time.Duration) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.batchInterval = v + } +} + +func WithAsyncWorkerNum(v int) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.asyncWorkerNum = v + } +} + +func WithDuplicateUpdate(v *clause.OnConflict) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.duplicateUpdate = v + } +} + +func WithDebug(v bool) BatchWriterOpt { + return func(c *BatchWriterConfig) { + c.debug = v + } +} + +func NewBatchWrite[T any](db *MysqlDb, tableName, jobName string, opts ...BatchWriterOpt) *BatchWriter[T] { + config := &BatchWriterConfig{} + for _, opt := range opts { + opt(config) + } + + if config.batchInterval <= 0 { + config.batchInterval = defaultIntervalTime + } + + if config.channelBuffer <= 0 { + config.channelBuffer = defaultDataBuffer + } + + if config.jobNum <= 0 { + config.jobNum = defaultJobNum + } + + if config.asyncWorkerNum <= 0 { + config.asyncWorkerNum = defaultAsyncWorkerNum + } + + if config.batchSize <= 0 { + config.batchSize = defaultBatchSize + } + + bw := &BatchWriter[T]{ + db: db, + tableName: tableName, + jobName: jobName, + uniqueId: xid.New().String(), + config: config, + dataChan: make(chan T), + stopChan: make(chan struct{}, 1), + + asyncWorkerLimitChan: make(chan struct{}, config.asyncWorkerNum), + asyncWorkerWg: &sync.WaitGroup{}, + } + + bw.ctx, bw.cancel = context.WithCancel(context.Background()) + + // 记录实例, 便于退出程序的时候入库 + writerJobMap.Store(bw.uniqueId, bw) + + go func() { + bw.start() + }() + + return bw +} + +func (bw *BatchWriter[T]) Write(data ...T) { + if len(data) == 0 { + return + } + + if bw.ctx.Err() != nil { + b, _ := json.Marshal(data) + mylog.Errorf("[%s] save to db err: job is close, data: (%s)", bw.tableName, b) + return + } + + for _, v := range data { + bw.dataChan <- v + } +} + +func (bw *BatchWriter[T]) start() { + wg := &sync.WaitGroup{} + + for i := 0; i < bw.config.jobNum; i++ { + wg.Add(1) + go func(i0 int) { + defer wg.Done() + bw.startJob(i) + }(i) + } + + wg.Wait() + log.Printf("[table:%s - job:%s] batch write job stop", bw.tableName, bw.jobName) + + close(bw.stopChan) +} + +func (bw *BatchWriter[T]) startJob(jobIndex int) { + tkTime := bw.config.batchInterval + // 定时器增加随机时间差 + randN := float64(mycommon.RandRange(50, 350)) / float64(100) + + tkTime = tkTime + time.Duration(float64(time.Second)*randN) + + log.Printf("[table:%s - job:%s - %d] batch write job start, ticker time: %s", bw.tableName, bw.jobName, jobIndex, tkTime.String()) + + tk := time.NewTicker(tkTime) + defer tk.Stop() + + bd := &batchData[T]{ + jobIndex: jobIndex, + dataList: make([]T, 0, bw.config.batchSize), + } + +loop: + for { + select { + case <-bw.ctx.Done(): + break loop + + case <-tk.C: + bw.writeToDb(bd) + + case data, ok := <-bw.dataChan: + if !ok { + break loop + } + bd.dataList = append(bd.dataList, data) + + if len(bd.dataList) >= bw.config.batchSize { + bw.writeToDb(bd) + } + + } + } + + if len(bd.dataList) > 0 { + bw.writeToDb(bd) + } +} + +func (bw *BatchWriter[T]) writeToDb(bd *batchData[T]) { + if len(bd.dataList) == 0 { + return + } + + defer func() { + // 清空切片 + bd.dataList = bd.dataList[:0] + }() + + bw.asyncWorkerLimitChan <- struct{}{} + + // 复制一份数据, 异步写入 + copyDataList := make([]T, len(bd.dataList)) + copy(copyDataList, bd.dataList) + + bw.asyncWorkerWg.Add(1) + go func() { + defer func() { + <-bw.asyncWorkerLimitChan + bw.asyncWorkerWg.Done() + + }() + + bw.asyncWriteToDb(bd.jobIndex, copyDataList) + }() + +} + +func (bw *BatchWriter[T]) asyncWriteToDb(jobIndex int, copyDataList []T) { + if len(copyDataList) == 0 { + return + } + + query := bw.db.Table(bw.tableName) + + if bw.config.duplicateUpdate != nil { + query.Clauses(bw.config.duplicateUpdate) + } + + err := query.Create(copyDataList).Error + if err == nil { + return + } + + // 批量写入失败, 后续优化重试流程 + b, _ := json.Marshal(copyDataList) + mylog.Errorf("[%s - %s] save to db err: %s data: (%s)", bw.tableName, bw.jobName, err, b) + +} + +func (bw *BatchWriter[T]) StopWriter() { + if bw.ctx.Err() != nil { + return + } + + bw.cancel() + close(bw.dataChan) + + <-bw.stopChan + + bw.asyncWorkerWg.Wait() +} + +func StopAllBatchWriter() { + writerJobMap.Range(func(k, v interface{}) bool { + q := v.(iWriterStop) + q.StopWriter() + return true + }) + +} + +// Deprecated: 改成用 StopAllBatchWriter +func StopWriter() { + StopAllBatchWriter() +} From 063fc823383b0c64cf66b4a292cf624c89454e9d Mon Sep 17 00:00:00 2001 From: lzf Date: Tue, 22 Jul 2025 17:43:07 +0800 Subject: [PATCH 2/5] update --- myhttp/httpsr/middleware.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/myhttp/httpsr/middleware.go b/myhttp/httpsr/middleware.go index 0b332e4..d22649f 100644 --- a/myhttp/httpsr/middleware.go +++ b/myhttp/httpsr/middleware.go @@ -21,7 +21,7 @@ func QPSCollect(svcName string) gin.HandlerFunc { "svc": svcName, "method": ctx.Request.Method, "route": ctx.Request.URL.Path, - "form": "", + "from": "", }).Observe(float64(time.Since(st).Milliseconds())) } From 1eb10eb1af4f99d52b6163dacfe0bdefd607ea9a Mon Sep 17 00:00:00 2001 From: lzf Date: Tue, 22 Jul 2025 17:44:04 +0800 Subject: [PATCH 3/5] update --- myhttp/httpsr/middleware.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/myhttp/httpsr/middleware.go b/myhttp/httpsr/middleware.go index d22649f..4e00789 100644 --- a/myhttp/httpsr/middleware.go +++ b/myhttp/httpsr/middleware.go @@ -7,6 +7,8 @@ import ( "time" ) +const CtxCollectRequestFrom = "ctx_collect_request_from" + func QPSCollect(svcName string) gin.HandlerFunc { hs := mymetric.NewQPSHistogram(svcName, []string{ "svc", "method", "route", "from", @@ -21,7 +23,7 @@ func QPSCollect(svcName string) gin.HandlerFunc { "svc": svcName, "method": ctx.Request.Method, "route": ctx.Request.URL.Path, - "from": "", + "from": ctx.GetString(CtxCollectRequestFrom), }).Observe(float64(time.Since(st).Milliseconds())) } From 4ced8ae869064c4bba786cde69ecb40541a74541 Mon Sep 17 00:00:00 2001 From: lzf Date: Tue, 22 Jul 2025 17:48:02 +0800 Subject: [PATCH 4/5] update --- myhttp/httpsr/middleware.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/myhttp/httpsr/middleware.go b/myhttp/httpsr/middleware.go index 4e00789..86b76a6 100644 --- a/myhttp/httpsr/middleware.go +++ b/myhttp/httpsr/middleware.go @@ -4,6 +4,7 @@ import ( "git.makemake.in/kzkzzzz/mycommon/mymetric" "github.com/gin-gonic/gin" "github.com/prometheus/client_golang/prometheus" + "github.com/spf13/cast" "time" ) @@ -11,7 +12,7 @@ const CtxCollectRequestFrom = "ctx_collect_request_from" func QPSCollect(svcName string) gin.HandlerFunc { hs := mymetric.NewQPSHistogram(svcName, []string{ - "svc", "method", "route", "from", + "svc", "method", "route", "status", "from", }...) return func(ctx *gin.Context) { @@ -23,6 +24,7 @@ func QPSCollect(svcName string) gin.HandlerFunc { "svc": svcName, "method": ctx.Request.Method, "route": ctx.Request.URL.Path, + "status": cast.ToString(ctx.Writer.Status()), "from": ctx.GetString(CtxCollectRequestFrom), }).Observe(float64(time.Since(st).Milliseconds())) From a545116f522c2d215fbfe53d5845d3888a04238d Mon Sep 17 00:00:00 2001 From: lzf Date: Tue, 22 Jul 2025 17:58:46 +0800 Subject: [PATCH 5/5] update --- mymetric/prometheus.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/mymetric/prometheus.go b/mymetric/prometheus.go index 04e85b7..7bbd8aa 100644 --- a/mymetric/prometheus.go +++ b/mymetric/prometheus.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "log" + "net/http" "time" ) @@ -73,11 +74,24 @@ const ( ) // 监控指标路由 -func GinExport(engine *gin.Engine) { - engine.GET(MetricsRoute, gin.WrapH(promhttp.HandlerFor( +func GinExport(authKey string) gin.HandlerFunc { + pm := promhttp.HandlerFor( prometheus.DefaultGatherer, promhttp.HandlerOpts{Timeout: time.Second * 3}, - ))) + ) + + return func(ctx *gin.Context) { + if authKey != "" { + value := ctx.Query("key") + if authKey != value { + ctx.String(http.StatusForbidden, "metric auth key is empty") + return + } + } + + pm.ServeHTTP(ctx.Writer, ctx.Request) + } + } func HistogramKey(name string) string {