main
kzkzzzz 2025-07-27 18:10:06 +08:00
parent 95880712b8
commit 510629a424
2 changed files with 72 additions and 44 deletions

View File

@ -6,7 +6,9 @@ import (
"git.makemake.in/kzkzzzz/mycommon/mylog"
"github.com/goccy/go-json"
"github.com/rs/xid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"log"
"sync"
"time"
@ -43,6 +45,7 @@ type BatchWriterConfig struct {
asyncWorkerNum int
clauseExpr []clause.Expression
debug bool
noPrepare bool
}
type BatchWriter[T any] struct {
@ -63,48 +66,6 @@ type BatchWriter[T any] struct {
type BatchWriterOpt func(c *BatchWriterConfig)
func WithWriteJobNum(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.jobNum = v
}
}
func WithWriteChannelBuffer(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.channelBuffer = v
}
}
func WithWriteBatchSize(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.batchSize = v
}
}
func WithWriteIntervalTime(v time.Duration) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.batchInterval = v
}
}
func WithAsyncWorkerNum(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.asyncWorkerNum = v
}
}
func WithClause(v ...clause.Expression) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.clauseExpr = v
}
}
func WithDebug(v bool) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.debug = v
}
}
func NewBatchWrite[T any](db *MysqlDb, tableName, jobName string, opts ...BatchWriterOpt) *BatchWriter[T] {
config := &BatchWriterConfig{}
for _, opt := range opts {
@ -262,6 +223,8 @@ func (bw *BatchWriter[T]) writeToDb(bd *batchData[T]) {
}
var disableGormLog = logger.Default.LogMode(logger.Silent)
func (bw *BatchWriter[T]) asyncWriteToDb(jobIndex int, copyDataList []T) {
if len(copyDataList) == 0 {
return
@ -273,7 +236,20 @@ func (bw *BatchWriter[T]) asyncWriteToDb(jobIndex int, copyDataList []T) {
query.Clauses(bw.config.clauseExpr...)
}
err := query.Create(copyDataList).Error
var err error
if bw.config.noPrepare {
_sql := query.ToSQL(func(tx *gorm.DB) *gorm.DB {
return tx.Session(&gorm.Session{
Logger: disableGormLog,
}).Create(&copyDataList)
})
err = bw.db.Table(bw.tableName).Exec(_sql).Error
} else {
err = query.Create(copyDataList).Error
}
if err == nil {
return
}

View File

@ -1,6 +1,10 @@
package mymysql
import "gorm.io/gorm"
import (
"gorm.io/gorm"
"gorm.io/gorm/clause"
"time"
)
type Opt func(m *MysqlDb)
@ -15,3 +19,51 @@ func WithGormConfig(v *gorm.Config) Opt {
m.gormConfig = v
}
}
func WithWriteJobNum(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.jobNum = v
}
}
func WithWriteChannelBuffer(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.channelBuffer = v
}
}
func WithWriteBatchSize(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.batchSize = v
}
}
func WithWriteIntervalTime(v time.Duration) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.batchInterval = v
}
}
func WithAsyncWorkerNum(v int) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.asyncWorkerNum = v
}
}
func WithClause(v ...clause.Expression) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.clauseExpr = v
}
}
func WithDebug(v bool) BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.debug = v
}
}
func WithNoPrepare() BatchWriterOpt {
return func(c *BatchWriterConfig) {
c.noPrepare = true
}
}