update
This commit is contained in:
366
myhttp/fasthttpc/httpclient.go
Normal file
366
myhttp/fasthttpc/httpclient.go
Normal file
@@ -0,0 +1,366 @@
|
||||
package fasthttpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"errors"
|
||||
"fmt"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/valyala/fasthttp"
|
||||
"golang.org/x/time/rate"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultClient *HttpClient
|
||||
)
|
||||
|
||||
func init() {
|
||||
defaultClient = New()
|
||||
}
|
||||
|
||||
func ReInitDefault(timeout time.Duration) {
|
||||
defaultClient = New(WithTimout(timeout))
|
||||
}
|
||||
|
||||
func ReInitDefaultOpt(opts ...ConfigOpt) {
|
||||
defaultClient = New(opts...)
|
||||
}
|
||||
|
||||
func Client() *HttpClient {
|
||||
return defaultClient
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
ctx context.Context
|
||||
header http.Header
|
||||
body any
|
||||
mapQuery map[string]string
|
||||
urlQuery url.Values
|
||||
httpClient *HttpClient
|
||||
contentType string
|
||||
noWaitQps bool
|
||||
}
|
||||
|
||||
type HttpClient struct {
|
||||
config *Config
|
||||
client *fasthttp.Client
|
||||
qpsLimiter *rate.Limiter
|
||||
}
|
||||
|
||||
func New(opts ...ConfigOpt) *HttpClient {
|
||||
config := &Config{}
|
||||
for _, opt := range opts {
|
||||
opt(config)
|
||||
}
|
||||
|
||||
if config.timeout <= 0 {
|
||||
config.timeout = time.Second * 3
|
||||
}
|
||||
|
||||
if config.fasthttpTimeout <= 0 {
|
||||
config.fasthttpTimeout = time.Second * 10
|
||||
}
|
||||
|
||||
if config.maxConnPerHost <= 0 {
|
||||
config.maxConnPerHost = 10000
|
||||
}
|
||||
|
||||
if config.client == nil {
|
||||
|
||||
client := &fasthttp.Client{
|
||||
TLSConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
DialTimeout: (&fasthttp.TCPDialer{
|
||||
Concurrency: 0,
|
||||
DNSCacheDuration: time.Second * 600,
|
||||
}).DialTimeout,
|
||||
MaxConnsPerHost: config.maxConnPerHost,
|
||||
MaxIdleConnDuration: time.Second * 90,
|
||||
MaxIdemponentCallAttempts: 5,
|
||||
ReadTimeout: config.fasthttpTimeout + time.Second,
|
||||
MaxConnWaitTimeout: 0,
|
||||
RetryIfErr: func(req *fasthttp.Request, attempts int, err error) (bool, bool) {
|
||||
if errors.Is(err, fasthttp.ErrConnectionClosed) || errors.Is(err, io.EOF) {
|
||||
return false, true
|
||||
}
|
||||
return false, false
|
||||
},
|
||||
}
|
||||
|
||||
config.client = client
|
||||
}
|
||||
|
||||
hc := &HttpClient{
|
||||
config: config,
|
||||
client: config.client,
|
||||
}
|
||||
|
||||
if config.qpsLimiter != nil {
|
||||
hc.qpsLimiter = config.qpsLimiter
|
||||
} else if config.qps > 0 {
|
||||
hc.qpsLimiter = rate.NewLimiter(rate.Every(time.Second/time.Duration(config.qps)), config.qps)
|
||||
}
|
||||
|
||||
return hc
|
||||
}
|
||||
|
||||
func (h *HttpClient) RawClient() *fasthttp.Client {
|
||||
return h.client
|
||||
}
|
||||
|
||||
func (h *HttpClient) NewRequest(ctx context.Context) *Request {
|
||||
r := &Request{
|
||||
ctx: ctx,
|
||||
header: nil,
|
||||
mapQuery: nil,
|
||||
httpClient: h,
|
||||
urlQuery: url.Values{},
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (h *HttpClient) NewPcRequest(ctx context.Context) *Request {
|
||||
r := h.NewRequest(ctx)
|
||||
r.SetHeaderPcAgent()
|
||||
return r
|
||||
}
|
||||
|
||||
func (h *HttpClient) NewMobileRequest(ctx context.Context) *Request {
|
||||
r := h.NewRequest(ctx)
|
||||
r.SetHeaderMobileAgent()
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetContentType(contentType string) *Request {
|
||||
r.contentType = contentType
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetBody(body any) *Request {
|
||||
r.body = body
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetQueryParam(k string, v string) *Request {
|
||||
if r.mapQuery == nil {
|
||||
r.mapQuery = make(map[string]string)
|
||||
}
|
||||
r.mapQuery[k] = v
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetQueryParams(params map[string]string) *Request {
|
||||
for k, v := range params {
|
||||
r.SetQueryParam(k, v)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetUrlQueryParam(k string, v string) *Request {
|
||||
r.urlQuery.Add(k, v)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetHeader(k, v string) *Request {
|
||||
if r.header == nil {
|
||||
r.header = http.Header{}
|
||||
}
|
||||
r.header.Set(k, v)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetHeaders(headers map[string]string) *Request {
|
||||
for k, v := range headers {
|
||||
r.SetHeader(k, v)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) NoWaitQps() *Request {
|
||||
r.noWaitQps = true
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) Get(rawUrl string) (*Response, error) {
|
||||
return r.Do(http.MethodGet, rawUrl)
|
||||
}
|
||||
|
||||
func (r *Request) Post(rawUrl string) (*Response, error) {
|
||||
return r.Do(http.MethodPost, rawUrl)
|
||||
}
|
||||
|
||||
var QpsLimitError = fmt.Errorf("qps limit")
|
||||
|
||||
func (r *Request) DoTimeout(timeout time.Duration, method, rawUrl string) (*Response, error) {
|
||||
if timeout <= time.Millisecond {
|
||||
return nil, fmt.Errorf("timeout is too small <= 1 millisecond")
|
||||
}
|
||||
|
||||
if r.httpClient.config.useCtxTimeout == false {
|
||||
return r.doRequest(timeout, method, rawUrl)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(r.ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
type tmpRes struct {
|
||||
res *Response
|
||||
err error
|
||||
}
|
||||
|
||||
var resChan = make(chan tmpRes, 1)
|
||||
|
||||
go func() {
|
||||
res, err := r.doRequest(r.httpClient.config.fasthttpTimeout, method, rawUrl)
|
||||
if err != nil {
|
||||
resChan <- tmpRes{err: err}
|
||||
return
|
||||
}
|
||||
resChan <- tmpRes{res: res}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("request timeout: %s (%s)", timeout, ctx.Err())
|
||||
case res := <-resChan:
|
||||
return res.res, res.err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (r *Request) Do(method, rawUrl string) (*Response, error) {
|
||||
return r.DoTimeout(r.httpClient.config.timeout, method, rawUrl)
|
||||
}
|
||||
|
||||
func (r *Request) doRequest(timeout time.Duration, method, rawUrl string) (*Response, error) {
|
||||
if r.httpClient.qpsLimiter != nil {
|
||||
if r.noWaitQps {
|
||||
allow := r.httpClient.qpsLimiter.Allow()
|
||||
if !allow {
|
||||
return nil, QpsLimitError
|
||||
}
|
||||
} else {
|
||||
err := r.httpClient.qpsLimiter.Wait(r.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
req := fasthttp.AcquireRequest()
|
||||
resp := fasthttp.AcquireResponse()
|
||||
|
||||
defer func() {
|
||||
fasthttp.ReleaseRequest(req)
|
||||
fasthttp.ReleaseResponse(resp)
|
||||
}()
|
||||
|
||||
var reqBody []byte
|
||||
if r.body != nil {
|
||||
switch v := r.body.(type) {
|
||||
case string:
|
||||
reqBody = []byte(v)
|
||||
case []byte:
|
||||
reqBody = v
|
||||
|
||||
default:
|
||||
marshal, err := jsoniter.Marshal(r.body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reqBody = marshal
|
||||
}
|
||||
|
||||
req.SetBody(reqBody)
|
||||
}
|
||||
|
||||
reqUrl, err := url.Parse(rawUrl)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse url err: %s (%s)", err.Error(), rawUrl)
|
||||
}
|
||||
|
||||
urlQuery := reqUrl.Query()
|
||||
for k, v := range r.mapQuery {
|
||||
urlQuery.Add(k, v)
|
||||
}
|
||||
|
||||
for k, v := range r.urlQuery {
|
||||
urlQuery[k] = v
|
||||
}
|
||||
|
||||
if len(urlQuery) > 0 {
|
||||
reqUrl.RawQuery = urlQuery.Encode()
|
||||
}
|
||||
|
||||
req.SetRequestURI(reqUrl.String())
|
||||
|
||||
//if len(urlQuery) > 0 {
|
||||
// req.URI().SetQueryString(urlQuery.Encode())
|
||||
//}
|
||||
|
||||
if r.contentType != "" {
|
||||
req.Header.Set("Content-Type", r.contentType)
|
||||
}
|
||||
|
||||
for k := range r.header {
|
||||
req.Header.Set(k, r.header.Get(k))
|
||||
}
|
||||
req.Header.SetMethod(method)
|
||||
|
||||
err = r.httpClient.client.DoTimeout(req, resp, timeout)
|
||||
if err != nil {
|
||||
if errors.Is(err, fasthttp.ErrTimeout) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("req err: %s (%s)", err, reqUrl.String())
|
||||
}
|
||||
|
||||
tmpBody := resp.Body()
|
||||
body := make([]byte, len(tmpBody))
|
||||
copy(body, tmpBody)
|
||||
|
||||
if r.httpClient.config.noCheckStatus == false {
|
||||
if resp.StatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("status code err: %d (%s)", resp.StatusCode(), body)
|
||||
}
|
||||
}
|
||||
|
||||
copyHeader := &fasthttp.ResponseHeader{}
|
||||
resp.Header.CopyTo(copyHeader)
|
||||
|
||||
res := &Response{
|
||||
Header: copyHeader,
|
||||
body: body,
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (r *Request) SetHeaderPcAgent() *Request {
|
||||
r.SetHeader(HeaderUserAgent, PcUserAgent)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetHeaderMobileAgent() *Request {
|
||||
r.SetHeader(HeaderUserAgent, MobileUserAgent)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Request) SetHeaderAcceptHtml() *Request {
|
||||
r.SetHeader(HeaderAccept, AcceptHtml)
|
||||
return r
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Header *fasthttp.ResponseHeader
|
||||
body []byte
|
||||
//Response *http.Response
|
||||
}
|
||||
|
||||
func (r *Response) GetBody() []byte {
|
||||
return r.body
|
||||
}
|
||||
Reference in New Issue
Block a user