goroutine并发控制与通信

Golang进阶 · 2023-06-20

对于Golang来说,原生支持并发是它的一个强项。并且在项目业务中,很多时候需要用到goroutine去并发的处理业务,多个goroutine之间需要同步与通信,主goroutine需要控制它所属的子goroutine。对于这种情况其实还是很常见的,那么下面我们专门介绍实现多个goroutine间的同步与通信。

同步与通信解决方法

  • 全局共享变量(共享内存)
  • channel通信(CSP模型)
  • Context包

全局共享变量(共享内存)

通过一个变量控制所有的子goroutine开始和结束,并不断的去轮询这个变量检查是否有更新,如果子goroutine检测到这个变量有更新就会执行对应的业务逻辑。

对于同步与通信来说,共享变量是一个最简单的一个方案,但是在功能上确实有着显著的缺点:

  • 全局变量不是线程安全,需要处理竞争关系,所以我们需要对这个变量加锁处理,这样就增加的复杂度,并且过多的锁,容易死锁。排查起来也很困难
  • 全局变量传递的数据信息较小,所以它不适用于子goroutine间的通信
  • 因为单向通知的原因,主goroutine无法等待所有的子goroutine退出

示例:

var signal = true
func main() {
    execFunc := func() {
        for signal {
            fmt.Println("子goroutine收到信息,开始执行...")
            time.Sleep(1 * time.Second)
        }
        fmt.Println("执行结束...")
    }
    go execFunc()
    go execFunc()
    go execFunc()
    time.Sleep(2 * time.Second)
    signal = false
    fmt.Println("主goroutine执行完,退出...")
}

综上优缺点分析,全局变量共享这种方法适合一些比较简单的场景。

channel通信(CSP模型)

channel通信这种方式实现并发控制较为灵活,并且本身也是线程安全的所以不需要考虑锁的机制问题。
goalng中实现了 CSP 部分理论,goroutine 对应 CSP 中并发执行的实体。对于CSP这部分我们先不过多的去介绍,只需要知道CSP是一个并发编程模型,后续在详细的介绍CSP。

示例:

这里使用了一个存储10个bool类型的通道,执行成功向通道里写入true,执行失败向通道里写入false。
启动一个循环从通道读取数据,读取10次之后程序在打印最后的结果。

func Process(ch chan bool) {
    defer func() {
        if err := recover(); err != nil {
            ch <- false
        }
    }()
    time.Sleep(time.Second)
    ch <- true
}

func main() {
    channels := make([]chan bool, 10)
    for i := 0; i < 10; i++ {
        channels[i] = make(chan bool)
        go Process(channels[i])
    }
    for i, ch := range channels {
        fmt.Printf("goroutine %v quit! val = %v", i, <-ch)
    }
}

最终执行结果如下:

goroutine 0 quit! val = true
goroutine 1 quit! val = true
goroutine 2 quit! val = true
goroutine 3 quit! val = true
goroutine 4 quit! val = true
goroutine 5 quit! val = true
goroutine 6 quit! val = true
goroutine 7 quit! val = true
goroutine 8 quit! val = true
goroutine 9 quit! val = true

从执行结果可以看出成功执行打印出结果,虽然顺序是打乱的。不是顺序执行,这个就和Golang调度有关了,不做具体的详细说明了。

但是缺点就是我必须等待所有的子goroutine执行完数据写入通道后,主goroutine退出只能等待通道数据全部接收到后才能退出。
这种等待goroutine完全结束后才退出main goroutine实现方式总感觉不是很优雅。

有没有更优雅的方式呢?我们往下看。

sync(sync.WaitGroup)

sync.WaitGroup这个是标准库sync里的Waitgroup,直接可以使用。是一种控制并发的方式,可以实现多goroutine的等待。
WaitGroup对象内部有一个计数器,最初从0开始,它有三个方法:

  • Add(): 计数器增加N, 通过个方法来标记需要等待的子协程数量
  • Done(): 完成一个任务,计数器减少1
  • Wait(): 同步阻塞,计数器为0之后才继续向下执行

所以综上我们完全可以通过sync.WaitGroup来实现通道中等待子协程执行完毕的实现方法:

func main() {
    var wg sync.WaitGroup
    data := make([]int, 10)
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(idx int, wg *sync.WaitGroup) {
            defer wg.Done()
            data[idx] = idx
        }(i, &wg)
    }
    wg.Wait()
    fmt.Println(data) // output: [0 1 2 3 4 5 6 7 8 9]
}

至此我们用sync.WaitGroup实现了多个协程间的同步,但是在使用sync.WaitGroup的时候需要注意以下几点:

  • WaitGroup对象不是一个引用类型,通过函数传值的时候需要使用他的指针类型,因为Go语言只有值传递,传递WaitGroup是值的话,就会导致发生panic
  • 调用Add方法与Wait方法的顺序,不能并发同时调用这两个方法,否则就会引发panic,同时在调用了wait方法在其没有释放前不要再次调用Add方法,这样也会引发panic

Context

在go1.7之后Context被引入到标准库中,开箱即用,1.7之前的go版本使用context需要安装 golang.org/x/net/context包,这里对Context的介绍不做详细的说明,后续专门一篇去详细的介绍。下面我们直接来看怎么实现并发和通信:

Context的创建与调用是一个链式的,有点类似于数据结构中的树,每一次调用就衍生一个叶子节点,根节点的创建可以通过context.Background创建,后面可以进行链式调用context包里的各种方法,context包里的所有方法。
我们这里只用WithCancelWithValue来实现控制与通信。

下面我们以多个goroutine处理一个Request来做为例子演示:

package main

import (
    "context"
    "crypto/md5"
    "fmt"
    "io/ioutil"
    "net/http"
    "sync"
    "time"
)

func main() {
    wg := &sync.WaitGroup{}
    urls := []string{"https://www.aiweimeng.top/", "https://www.baidu.com/"}
    ctx, cancel := context.WithCancel(context.Background())
    for _, url := range urls {
        wg.Add(1)
        childCtx := context.WithValue(ctx, "requestUrl", url)
        go reqURL(childCtx, wg)
    }
    go func() {
        time.Sleep(time.Second)
        cancel()
    }()
    wg.Wait()
    fmt.Println("执行完毕,主goroutine退出...")
}

func reqURL(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    url, _ := ctx.Value("requestUrl").(string)
    select {
    case <-ctx.Done():
        fmt.Printf("停止请求地址: %s \n", url)
        return
    default:
        response, err := http.Get(url)
        if response.StatusCode == http.StatusOK && err == nil {
            body, _ := ioutil.ReadAll(response.Body)
            childCtx := context.WithValue(ctx, "response", fmt.Sprintf("%s%x", url, md5.Sum(body)))
            wg.Add(1)
            // 单独启动一个协程(不阻塞当前goroutine)去处理后续的业务逻辑
            go execOther(childCtx, wg, url)
        }
        response.Body.Close()
    }
}

func execOther(ctx context.Context, wg *sync.WaitGroup, url string) {
    defer wg.Done()
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("节点关闭停止执行...%v\n", url)
            return
        default:
            fmt.Printf("response = %v \n", ctx.Value("response"))
            time.Sleep(time.Second * 1)
        }
    }
}

执行结果如下,之所以没有打印停止请求地址是因为我们没有去轮询context的状态变化,如果需要监听,我们可以通过for去轮询监听状态的变化即可。

response = https://www.baidu.com/8f1f3fef541f7dbb36a8755a9f0eff40
response = https://www.aiweimeng.top/182dc224af638ac65e7d1b4e2911c39f
节点关闭停止执行...https://www.baidu.com/
节点关闭停止执行...https://www.aiweimeng.top/
执行完毕,主goroutine退出...

上面的例子中我们通过context.Background()生成根节点,然后调用withCancel方法,传入根节点,withCancel返回新的子Context节点以及根节点的cancel方法(通知所有子节点结束运行),我们这里生成的所有子Context节点,都可以通过Done方法检测父节点调cancel方法通知子节点退出运行,并且所有的子节点都会收到被取消的状态消息。

其实web框架里面都会看到Context的应用,我们在net/http官方包中也能找到Context,Request中有一项Context。

并发编程 channel Golang 并发控制与通信
Theme Jasmine by Kent Liao