在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    抑制并發(fā)請(qǐng)求-singleflight

    抑制并發(fā)請(qǐng)求-singleflight

    背景

    高并發(fā)的場(chǎng)景下,經(jīng)常會(huì)出現(xiàn)并發(fā)重復(fù)請(qǐng)求資源的情況。

    比如說(shuō),緩存失效時(shí),我們?nèi)フ?qǐng)求db獲取最新的數(shù)據(jù),如果這個(gè)key是一個(gè)熱key,那么在緩存失效的瞬間,可能會(huì)有大量的并發(fā)請(qǐng)求訪問(wèn)到db,導(dǎo)致db訪問(wèn)量陡增,甚至是打崩db,這種場(chǎng)景也就是我們常說(shuō)的緩存擊穿。

    并發(fā)請(qǐng)求資源

    針對(duì)同一個(gè)key的并發(fā)請(qǐng)求,這些請(qǐng)求和響應(yīng)實(shí)際上都是一樣的。所以我們可以把這種并發(fā)請(qǐng)求優(yōu)化為:只進(jìn)行一次實(shí)際請(qǐng)求去訪問(wèn)資源,然后得到實(shí)際響應(yīng),所有的并發(fā)請(qǐng)求共享這個(gè)實(shí)際響應(yīng)的結(jié)果

    針對(duì)分布式場(chǎng)景,我們可以使用分布式鎖來(lái)實(shí)現(xiàn)

    針對(duì)單機(jī)場(chǎng)景,我們可以使用singleflight來(lái)實(shí)現(xiàn)

    singleflight

    singleflight

    singleflight是golang內(nèi)置的一個(gè)包,這個(gè)包提供了對(duì)重復(fù)函數(shù)調(diào)用的抑制功能,也就是保證并發(fā)請(qǐng)求只會(huì)有一個(gè)實(shí)際請(qǐng)求去訪問(wèn)資源,所有并發(fā)請(qǐng)求共享實(shí)際響應(yīng)。

    使用

    singleflight在golang sdk源碼中的路徑為:src/internal/singleflight

    但是internal是golang sdk內(nèi)部的包,所以我們不能直接去使用

    使用步驟:

  • 引入go mod
  • 使用singleflight包
  • 引入go mod

    go get golang.org/x/sync

    使用singleflight包

    singleflight包主要提供了三個(gè)方法

    // 方法作用:保證并發(fā)請(qǐng)求只會(huì)執(zhí)行一次函數(shù),并共享實(shí)際響應(yīng)// 請(qǐng)求參數(shù)// key:請(qǐng)求的唯一標(biāo)識(shí),相同的key會(huì)被視為并發(fā)請(qǐng)求// fn:實(shí)際需要執(zhí)行的函數(shù)// 響應(yīng)參數(shù)// v:實(shí)際執(zhí)行函數(shù)的返回值// err:實(shí)際執(zhí)行函數(shù)的錯(cuò)誤// shared:返回值v是否被共享,若存在并發(fā)請(qǐng)求,則為true;若不存在并發(fā)請(qǐng)求則為falsefunc (g *Group) Do(key string, fn func() (any, error)) (v any, err error, shared bool)// 方法作用:和Do類似,不過(guò)方法返回的是chanfunc (g *Group) DoChan(key string, fn func() (any, error)) (<-chan Result, bool)// 方法作用:刪除key,一般來(lái)說(shuō)不會(huì)直接使用這個(gè)方法func (g *Group) ForgetUnshared(key string) bool

    針對(duì)以上的三個(gè)方法,我們重點(diǎn)了解一下Do方法的使用即可

    沒(méi)有使用singleflight之前

    package mainimport ( “fmt” “sync” “testing” “time”)var ( mx sync.Mutex wg sync.WaitGroup cacheData = make(map[string]string, 0))func TestSingleFlight(t *testing.T) { // 添加10個(gè)任務(wù),模擬并發(fā)請(qǐng)求 wg.Add(10) for i := 0; i < 10; i++ { go getData("demo") } // 等待所有任務(wù)完成 wg.Wait()}func getData(key string) { data, _ := getDataFromCache(key) if len(data) == 0 { // 緩存沒(méi)有找到,則進(jìn)行回源 data, _ = getDataFromDB(key) // 設(shè)置緩存 mx.Lock() cacheData[key] = data mx.Unlock() } fmt.Println(data) // 任務(wù)完成 wg.Done()}func getDataFromCache(key string) (string, error) { return cacheData[key], nil}func getDataFromDB(key string) (string, error) { fmt.Println("getDataFromDB key: ", key) // 模擬訪問(wèn)db的耗時(shí) time.Sleep(10 * time.Millisecond) return "db data", nil}

    執(zhí)行TestSingleFlight函數(shù)后,會(huì)發(fā)現(xiàn)并發(fā)請(qǐng)求多次調(diào)用了getDataFromDB函數(shù)

    使用singleflight之后

    package mainimport ( “fmt” “golang.org/x/sync/singleflight” “sync” “testing” “time”)var ( mx sync.Mutex wg sync.WaitGroup g singleflight.Group cacheData = make(map[string]string, 0))func TestSingleFlight(t *testing.T) { // 添加10個(gè)任務(wù) wg.Add(10) for i := 0; i < 10; i++ { go getDataSingleWarp("demo") } // 等待所有任務(wù)完成 wg.Wait()}func getDataSingleWarp(key string) { data, _ := getDataFromCache(key) if len(data) == 0 { // 使用singleflight來(lái)避免并發(fā)請(qǐng)求,實(shí)際改動(dòng)就這一行 d, _, shared := g.Do(key, func() (interface{}, error) { return getDataFromDB(key) }) fmt.Println(shared) data = d.(string) // 設(shè)置緩存 mx.Lock() cacheData[key] = data mx.Unlock() } fmt.Println(data) wg.Done()}func getDataFromCache(key string) (string, error) { return cacheData[key], nil}func getDataFromDB(key string) (string, error) { fmt.Println("getDataFromDB key: ", key) // 模擬訪問(wèn)db的耗時(shí) time.Sleep(10 * time.Millisecond) return "db data", nil}

    執(zhí)行TestSingleFlight函數(shù)后,會(huì)發(fā)現(xiàn)只調(diào)用了一次getDataFromDB函數(shù)

    源碼分析

    • Group struct:封裝并發(fā)請(qǐng)求
    • call struct:每一個(gè)需要執(zhí)行的函數(shù),都會(huì)被封裝成一個(gè)call
    • func Do:對(duì)并發(fā)請(qǐng)求進(jìn)行控制的方法

    // Copyright 2013 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package singleflight provides a duplicate function call suppression// mechanism.package singleflight // import “golang.org/x/sync/singleflight”import ( “bytes” “errors” “fmt” “runtime” “runtime/debug” “sync”)// errGoexit indicates the runtime.Goexit was called in// the user given function.var errGoexit = errors.New(“runtime.Goexit was called”)// A panicError is an arbitrary value recovered from a panic// with the stack trace during the execution of given function.type panicError struct { value interface{} stack []byte}// Error implements error interface.func (p *panicError) Error() string { return fmt.Sprintf(“%v%s”, p.value, p.stack)}func newPanicError(v interface{}) error { stack := debug.Stack() // The first line of the stack trace is of the form “goroutine N [status]:” // but by the time the panic reaches Do the goroutine may no longer exist // and its status will have changed. Trim out the misleading line. if line := bytes.IndexByte(stack[:], ”); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack}}// call is an in-flight or completed singleflight.Do calltype call struct { // 保證相同key,只會(huì)進(jìn)行一次實(shí)際請(qǐng)求 // 相同key的并發(fā)請(qǐng)求會(huì)共享返回 wg sync.WaitGroup // These fields are written once before the WaitGroup is done // and are only read after the WaitGroup is done. // 實(shí)際執(zhí)行函數(shù)的返回值和錯(cuò)誤 val interface{} err error // forgotten indicates whether Forget was called with this call’s key // while the call was still in flight. // 是否已刪除當(dāng)前并發(fā)請(qǐng)求的key forgotten bool // These fields are read and written with the singleflight // mutex held before the WaitGroup is done, and are read but // not written after the WaitGroup is done. // 并發(fā)請(qǐng)求的次數(shù) dups int chans []chan 0}// DoChan is like Do but returns a channel that will receive the// results when they are ready.//// The returned channel will not be closed.func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan 0 { go panic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again } else { // Normal return for _, ch := range c.chans { ch 0} } } }() // 匿名函數(shù)立即執(zhí)行 func() { defer func() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // whether this is a panic or a runtime.Goexit. // // Unfortunately, the only way we can distinguish the two is to see // whether the recover stopped the goroutine from terminating, and by // the time we know that, the part of the stack trace relevant to the // panic has been discarded. if r := recover(); r != nil { c.err = newPanicError(r) } } }() // 執(zhí)行實(shí)際函數(shù) c.val, c.err = fn() // 正常返回 normalReturn = true }() if !normalReturn { recovered = true }}// Forget tells the singleflight to forget about a key. Future calls// to Do for this key will call the function rather than waiting for// an earlier call to complete.func (g *Group) Forget(key string) { g.mu.Lock() if c, ok := g.m[key]; ok { c.forgotten = true } delete(g.m, key) g.mu.Unlock()}

    鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場(chǎng),版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
    用戶投稿
    上一篇 2022年6月19日 18:21
    下一篇 2022年6月19日 21:05

    相關(guān)推薦

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息