# goroutine和channel
# 问题
有一个品类的csv文件,大概有3w多条数据,里面只有品类名称这一列,现在需要读取csv中的内容,调用接口实现对该品类的打分,讲返回的数据写入新的csv文件,新的csv文件包括两个字段,品类和对应的分数。由于打分接口逻辑比较复杂,单次执行比较耗时,如果让你用go来实现,应该怎么做呢?
# 思考
对于上面的问题,可以将主要的功能分为三个函数,csv文件读取、调用接口打分、csv文件写入函数。其中csv文件读取和csv文件写入函数都比较简单,就不在这里多说了。对于调用品类打分接口,如果串行的话,速度太慢。我们可以启用多个goroutine来并行执行,并将执行的结果统一写入到数据收集的channel中。最后将数据收集的channel中的数据写入csv文件就好。下面,我将带着这个问题来回顾学习一下goroutine+channel,最后来解答上面的问题。
# 学习
# goroutine简介
goroutine 是 Go 语言的一个基础特性。它是一个轻量级的线程,进程内部的线程。
# goroutine 的主要特征:
- 轻量级:goroutine 的开销很小,一个进程可以同时运行成百上千个 goroutine。
- 并发:goroutine 可以同时运行,实现并发。
- 共享内存:goroutine 之间可以访问共享内存,主要通过 channel 进行通信。
- 轻量级调度:goroutine 之间的调度开销很小,可以快速在 goroutine 之间切换。
# goroutine 的用途:
- 实现并发:可以同时运行多个 goroutine,实现并发操作。
- 避免阻塞:当一个 goroutine 执行阻塞操作时,可以切换到其他 goroutine 继续执行,避免主线程阻塞。
- 充分利用多核 CPU:可以启动与 CPU 核心数量相同的 goroutine 最大限度地利用 CPU 资源。
# goroutine 的实现原理:
Go 语言的运行时(runtime)管理着一个主 Goroutine(Logical Processor)和许多新的 Goroutines。 当主 goroutine 创建新的 goroutine 时,主 goroutine 会得到一个寄存器和栈空间,然后在不同的逻辑处理器上运行 goroutine,模拟出concurrency的效果。 Go 语言的调度器会在逻辑处理器间快速切换 goroutine,让我们感觉所有的 goroutine 是平行执行的,但实际上,任何时刻都只有一个处理器在运行。 goroutine 与线程的区别:
- 线程是进程内的执行单元,goroutine 是 Go 语言的执行单元。
- 线程的调度和切换由 OS 进行管理,goroutine 的调度和切换由 Go 语言的运行时(runtime)进行管理。
- 线程有较大的资源开销, goroutine 的资源开销比较小。一个进程可以创建上万个 goroutine 但同等条件下只能创建有限数量的线程。
- 线程的切换代价比较高,goroutine 的切换代价低廉。所以 goroutine 更适合用于高并发场景。 总之,goroutine 是 Go 语言实现高效的并发编程的关键所在。通过 goroutine 和 channel 的配合可以实现高性能的并发系统。
# 例子
package main
import (
"fmt"
"time"
)
func main() {
// 直接调用
f("direct")
// go run goroutines.go
// direct : 0
// direct : 1
// direct : 2
// 在一个go的协程中调用这个函数,这个新的go协程会并行执行这个函数调用
go f("goroutine")
// 我们也可以定义一个匿名函数
go func(from string) {
f(from)
}("goroutine1")
// 由于是异步执行,所以需要等待执行结束
// time.Sleep(time.Second)
// 同时也可以监听用户输入事件,回车结束
var input string
fmt.Scanln(&input)
fmt.Println("done")
// direct : 0
// direct : 1
// direct : 2
// goroutine1 : 0
// goroutine : 0
// goroutine : 1
// goroutine1 : 1
// goroutine1 : 2
// goroutine : 2
// done
}
// 定义一个函数,循环3次输出:"from:index"
func f(from string) {
for i := 0; i < 3; i++ {
// 为了能看出多个协程执行效果,每循环一次,休眠1s
time.Sleep(time.Second)
fmt.Println(from, ":", i)
}
}
# channel简介
channel 是 Go 语言中用于 goroutine 之间通讯的重要工具。channel 可以使一个 goroutine 发送数据,另一个 goroutine 接收数据。
# channel 的主要特征:
- 通讯机制:channel 可用于 goroutine 之间的数据传输和通信。
- 先进先出:channel 会严格遵循先进先出原则讲数据传递给接收方。
- 阻塞机制:当channel 没有数据时,接收方会阻塞。当channel已满时,发送方会阻塞。
- 并发安全:channel 可以在并发环境下使用,内置了锁机制保证并发安全。
# channel 的主要用途:
- 解耦:通过 channel 可以使得发送方和接收方解耦,松散耦合。
- 安全通信:channel 是并发安全的,可以在并发环境下使用。
- 同步:通过 channel 可以使得发送方和接收方同步。例如在接收方准备好之前,发送方一直阻塞。
- 流控:通过 channel 可以控制数据流进出的速度,防止数据消耗过快导致接收方来不及处理。
# channel 的声明语法:
ch := make(chan type, buffSize)
- type:channel 中传递的数据类型,必须是 gobject 兼容类型。
- buffSize:可选,指定 channel 的缓冲区大小,默认为 0 表明该 channel 是无缓冲 channel。
总之,channel 是 Go 语言实现 goroutine 之间通信的重要工具,它可以使得多个 goroutine 之间保持同步和通讯。通过 channel 可以实现并发编程中常见的工作池、消息传递等模式。
# 例子
package main
import "fmt"
func main() {
// 通道是连接多个go协程的管道,可以在一个go协程中发送数据到通道,在另外一个协程中接收数据
// 使用 make(chan val-type)来创建一个新的通道。
messages := make(chan string)
// 使用channel <- 语法发送一个新值到通道,这里在协程中发送数据到messages通道
go func() {
messages <- "ping"
}()
// 使用<-channel语法从channel中接收一个值,并打印出来
message := <-messages
fmt.Println(message)
// 默认发送和接收操作都是阻塞的,直到发送方和接收方都准备完毕,这个特性可以让我们在不使用其他的同步操作的情况下,在程序结尾的时候等待接收ping消息
}
# channel缓冲
channel 缓冲指 channel 中可存放数据的个数。channel 的缓冲大小可以通过第二个参数指定,语法如下:
ch := make(chan type, bufferSize)
- type:channel 用于传递的数据类型。
- bufferSize:指定 channel 可以存放的数据个数,如果省略默认为 0。
# 根据缓冲大小的不同,channel 可以分为:
- 无缓冲 channel(bufferSize = 0):发送方发送数据前必须等待接收方接收数据,否则发送方会阻塞。
- 有缓冲 channel(bufferSize > 0):发送方发送的数据先入队列,然后立即返回,直到队列已满后才会阻塞。 无缓冲 channel 和有缓冲 channel 的区别在于:
- 无缓冲 channel:必须等接收方接收消息后才能继续发送消息,否则会阻塞。更像一种同步机制。
- 有缓冲 channel:可以连续发送多个消息而不会阻塞(直到缓冲区满),更像一个队列。接收方可以从队列中接收消息。 下面我们通过例子演示无缓冲 channel 和有缓冲 channel 的区别:
# 有缓冲的channel例子
package main
import "fmt"
func main() {
// 默认通道是没有缓冲的,这意味着只有在对于的接收通道准备接收数据时,才能进行发送操作。
// 缓冲通道允许,在没有接收方的情况下,缓存一定数量的值
// 创建一个可以缓冲两个值的通道
messages := make(chan string, 2)
// 向通道中发送2个值
messages <- "buffered"
messages <- "channel"
fmt.Println(<-messages)
fmt.Println(<-messages)
}
# 无缓冲的通道例子
package main
import "fmt"
func main() {
// 默认通道是没有缓冲的,这意味着只有在对于的接收通道准备接收数据时,才能进行发送操作。
// 缓冲通道允许,在没有接收方的情况下,缓存一定数量的值
// 创建一个可以缓冲两个值的通道
messages := make(chan string)
// 开启新的goroutine向通道中发送2个值
go func(){
// 发送方发送消息,阻塞
messages <- "buffered"
messages <- "channel"
}()
// 接收方接收消息,发送方解阻塞
fmt.Println(<-messages)
fmt.Println(<-messages)
}
# channel方向
# channel 有两种方向:
- 单向 channel:只能在一个方向上发送和接收消息。
- 双向 channel:可以在两个方向上发送和接收消息。 channel 的方向在定义 channel 时使用 <- 指定:
- ch := <-chan int:接收用的单向 channel,只能接收不能发送。
- ch := chan <-int:发送用的单向 channel,只能发送不能接收。
- ch := chan int:双向 channel,可以发送和接收。
// 创建单向channel
var ch1 chan<- int // 只发送
var ch2 <-chan int // 只接收
// 创建双向channel
var ch3 chan int // 接收+发送
# 单向 channel 的使用场景:
- 当我们明确 channel 的使用方向时,使用单向 channel 可以避免误用。
- 减少由于误用 channel 方向导致的 bug,提高程序的健壮性。
- channel 的发送方和接收方的功能可以更清晰的划分,利于理解。 总之,当程序中 channel 的使用方向是明确的,我们应该使用单向 channel。否则使用双向 channel 会更加灵活。单向 channel 可以在一定程度上避免由于误使用 channel 方向导致的错误。
# 例子
package main
import "fmt"
func main() {
// 双向channel
pings := make(chan string, 1)
pongs := make(chan string, 1)
ping(pings, "passed message")
pong(pings, pongs)
fmt.Println(<-pongs)
}
// 当使用通道作为函数参数的时候,可以制定通道的方向,即是接收通道,还是发送通道
// ping 函数定义了一个只允许发送数据的通道。尝试使用这个通道来接收数据将会得到一个编译时错误。
func ping(pings chan<- string, msg string) {
pings <- msg
}
// pong 函数允许通道(pings)来接收数据,另一通道(pongs)来发送数据。
func pong(pings <-chan string, pongs chan<- string) {
// msg := <-pings
// pongs <- msg
pongs <- <-pings
}
# channel状态同步
channel 除了用于 goroutine 之间的数据传输外,还可以用于 goroutine 之间的同步。 channel 的同步特性体现在:
- 发送方发送数据时如果没有接收方接收数据会阻塞,直到接收方接收数据才会解阻塞。
- 接收方接收数据时如果没有发送方发送数据会阻塞,直到发送方发送数据才会解阻塞。 利用这个特性,我们可以实现 goroutine 之间的同步。
# 例子
package main
import (
"fmt"
"time"
)
func main() {
// 使用channel来同步goroutine之间的执行状态
done := make(chan bool, 1)
// 运行一个 worker Go协程,并给予用于通知的通道。
go worker(done)
// 程序将在接收到通道中 worker 发出的通知前一直阻塞。
<-done
// 如果你把 <- done 这行代码从程序中移除,程序甚至会在 worker还没开始运行时就结束了。
}
func worker(done chan bool) {
fmt.Println("working...")
time.Sleep(time.Second)
fmt.Println("done")
// 发送一个值来通知我们已经完工啦。
done <- true
}
上面代码中,如果 main 没有从 go worker 接收数据则 done 阻塞。这样就实现了 main 和 go worker 同步。 总之,channel 的阻塞特性可以用于不同 goroutine 之间的同步。当一个 goroutine Expects 另一个 goroutine 的操作结果或信号时,可以使用 channel 使其阻塞,等待另一个 goroutine 的操作。这是 channel 除了数据传输之外的另一个重要用途。
# channel选择器
在 Go 语言中,通道选择器可以用来监听多个通道上的操作并进行选择。它通过 select 关键字实现。
select 的基本语法如下:
select {
case <-chan1:
// chan1 可读,执行该 case
case chan2 <- 1:
// chan2 可写,执行该 case
default:
// 如果没有 case 可执行,执行 default
}
例如,这里是一个简单的例子:
func main() {
// 在我们的例子中,我们将从两个通道中选择。
c1 := make(chan string)
c2 := make(chan string)
// 开启第一个goroutine
// 休眠1s 然后异步向c1通道输入"one"
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
// 开启第二个goroutine
// 休眠2s 然后异步向c2通道输入"two"
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
// 使用通道选择器监听两个通道,谁先接收到值就输出谁
// 所以这个通道选择器一直接收到的是"one"
select {
case msg1 := <-c1:
println(msg1)
case msg2 := <-c2:
println(msg2)
}
// 输出
// one
}
# channel选择器的第一个用法:超时处理
通过使用time.After()函数来定义一个指定时间之后的执行的通道。我们很容易的实现超时处理。
下面是一个使用超时处理的例子:
package main
import (
"fmt"
"strconv"
"time"
)
func main() {
// 定义双向通道c1和c2
c1 := make(chan string)
c2 := make(chan string)
// 开启goroutine 每个200ms向c1输入一个值 执行5次
go func() {
for i := 1; i <= 5; i++ {
time.Sleep(time.Millisecond * 200)
c1 <- "result " + strconv.Itoa(i)
}
}()
// 开启goroutine 每个300ms向c2输入一个值 执行5次
go func() {
for i := 1; i <= 5; i++ {
time.Sleep(time.Millisecond * 300)
c2 <- "result " + strconv.Itoa(i)
}
}()
// 定义一个独立的定时器
timeout := time.After(time.Second * 1)
// for循环可以一直启用select监听c1和c2通道
for {
select {
case res1 := <-c1: // 输出c1通道接收到的值
fmt.Println("received from chan c1: ", res1)
case res2 := <-c2: // 输出c2通道接收到的值
fmt.Println("received from chan c2: ", res2)
case <-timeout: // 在1s后让通道选择器执行超时退出操作
fmt.Println("timeout after 1s")
return
}
}
// 输出
// received from chan c1: result 1
// received from chan c2: result 1
// received from chan c1: result 2
// received from chan c1: result 3
// received from chan c2: result 2
// received from chan c1: result 4
// received from chan c2: result 3
// timeout after 1s
}
# channel选择器的第二个用法:非阻塞channel
可以使用select的default语句来实现非阻塞通道的监听。
package main
import (
"fmt"
"strconv"
"time"
)
// 常规的通过通道发送和接收数据是阻塞的。然而,我们可以使用带一个 default 子句的 select 来实现非阻塞 的发送、接收,甚至是非阻塞的多路 select。
func main() {
// 定义一个messages通道
messages := make(chan string)
// 使用goroutine异步,每个500ms向messages写入一条消息
go func() {
for i := 1; i < 5; i++ {
time.Sleep(time.Millisecond * 500)
messages <- "msg " + strconv.Itoa(i)
}
}()
// 定义一个超时处理器
timeout := time.After(time.Second * 3)
for {
// 如果在 messages 中存在,然后 select 将这个值带入 <-messages case中。如果不是,就直接到 default 分支中。
select {
case msg := <-messages:
fmt.Println(msg)
case <-timeout:
fmt.Println("timeout 1")
return
default:
time.Sleep(time.Millisecond * 300)
fmt.Println("no message received")
}
}
// 输出
// no message received
// no message received
// msg 1
// no message received
// no message received
// msg 2
// no message received
// no message received
// msg 3
// no message received
// no message received
// msg 4
// no message received
// no message received
// timeout 1
}
所以,select default的用来实现一个非阻塞的通道读写:
- 尝试向通道写入数据,如果通道已满,则选择 default
- 尝试从通道读取数据,如果通道为空,则选择 default 这种“非阻塞”的特性可以用于在不确定通道状态的情况下尝试读写,如果不能成功则实现默认行为,而不是阻塞当前 goroutine。
# channel的关闭
关闭 一个通道意味着不能再向这个通道发送值了。这个特性可以用来给这个通道的接收方传达工作已经完成的信息。通道需要关闭(close)的情况有两种:
- 当不再使用这个通道发送数据时,需要关闭通道
- 对方可以通过通道的关闭状态来判断发送方是否结束工作
关闭通道的语法很简单,使用 close 关键字:
c := make(chan int)
close(c)
关闭通道主要有一下几个作用:
- 对已关闭的通道再发送数据会 panic,这可以确保不会向通道发送无效数据。
- 对已关闭的通道进行接收(<-chan)不会阻塞,而是立即返回通道元素类型的零值。这可以用于正常关闭接收方 goroutine。
- 通过 select 中的 case <-chan 语句,如果通道关闭,则会立即执行该 case,这可以用于检测通道是否关闭。
- 可以通过通道接收时的第二个参数测试通道是否关闭,如果关闭则返回 true 。
例子:
package main
import "fmt"
func main() {
// 定义一个工作数据接收通道
jobs := make(chan int, 5)
// 定义一个多goroutine之间状态同步的通道
done := make(chan bool, 0)
// 开启goroutine异步监听jobs通道
// 如果有数据输出数据
// 如果通道关闭向状态同步通道同步关闭状态,然后退出当前goroutine
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
// 主goroutine向jobs写入数据
for i := 0; i < 3; i++ {
jobs <- i
}
// 关闭通道
close(jobs)
fmt.Println("sent all jobs")
// 阻塞主goroutine 等待同步的状态 然后退出
<-done
}
# channel的遍历
Go 语言的通道(channel)可以使用 range 关键字进行遍历。遍历通道会一直阻塞到通道关闭。
package main
import "fmt"
func main() {
// for range 来遍历通道
c := make(chan int)
go func() {
c <- 1
c <- 2
close(c)
}()
for v := range c {
fmt.Println(v)
}
// 输出
// 1
// 2
}
上面的通道,如果在非关闭状态下遍历,会产生死锁。后面我们将学习goroutine的死锁。
# 扩展
# 什么是goroutine的阻塞?
Goroutine 阻塞意味着 goroutine 被暂停执行,等待某个条件满足才继续执行。 Goroutine 可以因以下几个原因被阻塞:
- 通道阻塞:当从一个空通道接收数据时,goroutine 会被阻塞直到有数据发送到该通道。
- 互斥锁阻塞:当 goroutine 尝试锁定一个已经被其他 goroutine 锁定的互斥锁时,它会被阻塞直到互斥锁被解锁。
- 等待其他 goroutine:可以使用 WaitGroup 显式等待其他 goroutine 结束。goroutine 会被阻塞直到 WaitGroup 的计数减为 0。
- 系统调用阻塞:当 goroutine 执行系统调用发生 IO 或涉及同步的操作时,它会被阻塞。例如文件读取,数据库查询等。
- 定时器阻塞:goroutine 可以使用 time.Sleep 或 channel 和 select 实现定时器,在定时时间未到达前被阻塞。 阻塞的 goroutine 会被放入调度队列中,等待它所需要的资源或条件准备就绪后继续执行。
# 什么是goroutine的死锁?
Goroutine 死锁指的是两个或多个 goroutine 因互相等待对方释放资源而陷入无限等待的状态。 死锁的四个必要条件:
- 互斥:一个资源每次只能被一个 goroutine 占有。
- 占有且等待:goroutine 已经占有某资源,但还在等待其他被其他 goroutine 占有的资源。
- 不可剥夺:资源在被占有期间不能被 goroutine 外力剥夺。
- 循环等待:goroutine 形成了一个等待循环队列。
package main
import "fmt"
func main() {
// for range 来遍历通道
c := make(chan int)
go func() {
c <- 1
c <- 2
}()
for v := range c {
fmt.Println(v)
}
// 输出
// 1
// 2
}
如上,新开启的goroutine向通道c写入1,2两个值之后直接退出。但是main goroutine的for...range...还在等待通道c关闭或者继续往通道c里面写入数据。所以导致了死锁的情况。
# 阻塞与死锁的区别:
- 阻塞的 goroutine 会在条件满足时被唤醒继续执行,死锁的 goroutine 却无法继续执行。
- 阻塞通常是临时的,死锁发生后程序无法继续运行。
- 阻塞不会导致资源占用不释放,死锁会导致有限资源被永久占用无法释放。 总之,理解 goroutine 的阻塞机制和死锁的区别可以避免 Go 程序出现死锁问题,编写出健壮的并发程序。阻塞是控制 goroutine 执行顺序的一种重要手段。
# 解答
# 实现步骤
- 读取 CSV 文件,获取所有品类名称,存入 brands 数组。
- 创建结果文件 result.csv 和 waitGroup。
- 启动 goroutine,对 brands 数组中的每个品类名称调用接口打分,得到分数 score,并通过 channel 返回。
- 从 channel 中获取分数 score,并写入结果文件 result.csv。
- 使用 waitGroup 等待所有 goroutine 结束。
# 实现代码
package main
import (
"encoding/csv"
"os"
"sync"
)
var wg sync.WaitGroup
func main() {
// 读取 CSV 文件,获取所有品类名称
brands := readBrandsFromCSV("brands.csv")
// 创建结果文件和waitGroup
f, _ := os.Create("result.csv")
w := csv.NewWriter(f)
wg.Add(len(brands))
// 启动 goroutine 打分,并通过 channel 返回
scores := make(chan string)
for _, brand := range brands {
go getScore(brand, scores)
}
// 从channel获取分数并写入文件
for i := 0; i < len(brands); i++ {
score := <-scores
w.Write([]string{brands[i], score})
}
// 等待goroutine结束
wg.Wait()
// 关闭文件
w.Flush()
f.Close()
}
// 读取 CSV 文件获取品类
func readBrandsFromCSV(path string) []string {
// ...
}
// 调用接口打分,并通过channel返回
func getScore(brand string, scores chan string) {
// 调用接口,获取分数 score
score := getScoreFromAPI(brand)
scores <- score // 返回分数
wg.Done() // 通知waitGroup
}