# goroutine和channel

# 问题

有一个品类的csv文件,大概有3w多条数据,里面只有品类名称这一列,现在需要读取csv中的内容,调用接口实现对该品类的打分,讲返回的数据写入新的csv文件,新的csv文件包括两个字段,品类和对应的分数。由于打分接口逻辑比较复杂,单次执行比较耗时,如果让你用go来实现,应该怎么做呢?

# 思考

对于上面的问题,可以将主要的功能分为三个函数,csv文件读取、调用接口打分、csv文件写入函数。其中csv文件读取和csv文件写入函数都比较简单,就不在这里多说了。对于调用品类打分接口,如果串行的话,速度太慢。我们可以启用多个goroutine来并行执行,并将执行的结果统一写入到数据收集的channel中。最后将数据收集的channel中的数据写入csv文件就好。下面,我将带着这个问题来回顾学习一下goroutine+channel,最后来解答上面的问题。

# 学习

# goroutine简介

goroutine 是 Go 语言的一个基础特性。它是一个轻量级的线程,进程内部的线程。

# goroutine 的主要特征:
  • 轻量级:goroutine 的开销很小,一个进程可以同时运行成百上千个 goroutine。
  • 并发:goroutine 可以同时运行,实现并发。
  • 共享内存:goroutine 之间可以访问共享内存,主要通过 channel 进行通信。
  • 轻量级调度:goroutine 之间的调度开销很小,可以快速在 goroutine 之间切换。
# goroutine 的用途:
  • 实现并发:可以同时运行多个 goroutine,实现并发操作。
  • 避免阻塞:当一个 goroutine 执行阻塞操作时,可以切换到其他 goroutine 继续执行,避免主线程阻塞。
  • 充分利用多核 CPU:可以启动与 CPU 核心数量相同的 goroutine 最大限度地利用 CPU 资源。
# goroutine 的实现原理:

Go 语言的运行时(runtime)管理着一个主 Goroutine(Logical Processor)和许多新的 Goroutines。 当主 goroutine 创建新的 goroutine 时,主 goroutine 会得到一个寄存器和栈空间,然后在不同的逻辑处理器上运行 goroutine,模拟出concurrency的效果。 Go 语言的调度器会在逻辑处理器间快速切换 goroutine,让我们感觉所有的 goroutine 是平行执行的,但实际上,任何时刻都只有一个处理器在运行。 goroutine 与线程的区别:

  • 线程是进程内的执行单元,goroutine 是 Go 语言的执行单元。
  • 线程的调度和切换由 OS 进行管理,goroutine 的调度和切换由 Go 语言的运行时(runtime)进行管理。
  • 线程有较大的资源开销, goroutine 的资源开销比较小。一个进程可以创建上万个 goroutine 但同等条件下只能创建有限数量的线程。
  • 线程的切换代价比较高,goroutine 的切换代价低廉。所以 goroutine 更适合用于高并发场景。 总之,goroutine 是 Go 语言实现高效的并发编程的关键所在。通过 goroutine 和 channel 的配合可以实现高性能的并发系统。
# 例子
package main

import (
	"fmt"
	"time"
)

func main() {

	// 直接调用
	f("direct")
	// go run goroutines.go
	// direct : 0
	// direct : 1
	// direct : 2

	// 在一个go的协程中调用这个函数,这个新的go协程会并行执行这个函数调用
	go f("goroutine")

	// 我们也可以定义一个匿名函数
	go func(from string) {
		f(from)
	}("goroutine1")
	// 由于是异步执行,所以需要等待执行结束
	// time.Sleep(time.Second)
	// 同时也可以监听用户输入事件,回车结束
	var input string
	fmt.Scanln(&input)
	fmt.Println("done")

	// direct : 0
	// direct : 1
	// direct : 2
	// goroutine1 : 0
	// goroutine : 0
	// goroutine : 1
	// goroutine1 : 1
	// goroutine1 : 2
	// goroutine : 2

	// done
}

// 定义一个函数,循环3次输出:"from:index"
func f(from string) {
	for i := 0; i < 3; i++ {
		// 为了能看出多个协程执行效果,每循环一次,休眠1s
		time.Sleep(time.Second)
		fmt.Println(from, ":", i)
	}
}

# channel简介

channel 是 Go 语言中用于 goroutine 之间通讯的重要工具。channel 可以使一个 goroutine 发送数据,另一个 goroutine 接收数据。

# channel 的主要特征:
  • 通讯机制:channel 可用于 goroutine 之间的数据传输通信
  • 先进先出:channel 会严格遵循先进先出原则讲数据传递给接收方。
  • 阻塞机制:当channel 没有数据时,接收方会阻塞。当channel已满时,发送方会阻塞。
  • 并发安全:channel 可以在并发环境下使用,内置了锁机制保证并发安全。
# channel 的主要用途:
  • 解耦:通过 channel 可以使得发送方和接收方解耦,松散耦合。
  • 安全通信:channel 是并发安全的,可以在并发环境下使用。
  • 同步:通过 channel 可以使得发送方和接收方同步。例如在接收方准备好之前,发送方一直阻塞。
  • 流控:通过 channel 可以控制数据流进出的速度,防止数据消耗过快导致接收方来不及处理。
# channel 的声明语法:
ch := make(chan type, buffSize)
  • type:channel 中传递的数据类型,必须是 gobject 兼容类型。
  • buffSize:可选,指定 channel 的缓冲区大小,默认为 0 表明该 channel 是无缓冲 channel。

总之,channel 是 Go 语言实现 goroutine 之间通信的重要工具,它可以使得多个 goroutine 之间保持同步和通讯。通过 channel 可以实现并发编程中常见的工作池、消息传递等模式。

# 例子
package main

import "fmt"

func main() {
	// 通道是连接多个go协程的管道,可以在一个go协程中发送数据到通道,在另外一个协程中接收数据

	// 使用 make(chan val-type)来创建一个新的通道。
	messages := make(chan string)
	// 使用channel <- 语法发送一个新值到通道,这里在协程中发送数据到messages通道
	go func() {
		messages <- "ping"
	}()
	// 使用<-channel语法从channel中接收一个值,并打印出来
	message := <-messages
	fmt.Println(message)

	// 默认发送和接收操作都是阻塞的,直到发送方和接收方都准备完毕,这个特性可以让我们在不使用其他的同步操作的情况下,在程序结尾的时候等待接收ping消息
}

# channel缓冲

channel 缓冲指 channel 中可存放数据的个数。channel 的缓冲大小可以通过第二个参数指定,语法如下:

ch := make(chan type, bufferSize)
  • type:channel 用于传递的数据类型。
  • bufferSize:指定 channel 可以存放的数据个数,如果省略默认为 0。
# 根据缓冲大小的不同,channel 可以分为:
  • 无缓冲 channel(bufferSize = 0):发送方发送数据前必须等待接收方接收数据,否则发送方会阻塞。
  • 有缓冲 channel(bufferSize > 0):发送方发送的数据先入队列,然后立即返回,直到队列已满后才会阻塞。 无缓冲 channel 和有缓冲 channel 的区别在于:
  • 无缓冲 channel:必须等接收方接收消息后才能继续发送消息,否则会阻塞。更像一种同步机制。
  • 有缓冲 channel:可以连续发送多个消息而不会阻塞(直到缓冲区满),更像一个队列。接收方可以从队列中接收消息。 下面我们通过例子演示无缓冲 channel 和有缓冲 channel 的区别:
# 有缓冲的channel例子
package main

import "fmt"

func main() {
	// 默认通道是没有缓冲的,这意味着只有在对于的接收通道准备接收数据时,才能进行发送操作。
	// 缓冲通道允许,在没有接收方的情况下,缓存一定数量的值

	// 创建一个可以缓冲两个值的通道
	messages := make(chan string, 2)

	// 向通道中发送2个值
	messages <- "buffered"
	messages <- "channel"

	fmt.Println(<-messages)
	fmt.Println(<-messages)
}
# 无缓冲的通道例子
package main

import "fmt"

func main() {
	// 默认通道是没有缓冲的,这意味着只有在对于的接收通道准备接收数据时,才能进行发送操作。
	// 缓冲通道允许,在没有接收方的情况下,缓存一定数量的值

	// 创建一个可以缓冲两个值的通道
	messages := make(chan string)

	// 开启新的goroutine向通道中发送2个值
	go func(){
        // 发送方发送消息,阻塞
        messages <- "buffered"
	    messages <- "channel"
    }()
    // 接收方接收消息,发送方解阻塞
	fmt.Println(<-messages)
	fmt.Println(<-messages)
}

# channel方向

# channel 有两种方向:
  • 单向 channel:只能在一个方向上发送和接收消息。
  • 双向 channel:可以在两个方向上发送和接收消息。 channel 的方向在定义 channel 时使用 <- 指定:
  • ch := <-chan int:接收用的单向 channel,只能接收不能发送。
  • ch := chan <-int:发送用的单向 channel,只能发送不能接收。
  • ch := chan int:双向 channel,可以发送和接收。
// 创建单向channel
var ch1 chan<- int    // 只发送
var ch2 <-chan int    // 只接收
// 创建双向channel
var ch3 chan int // 接收+发送
# 单向 channel 的使用场景:
  • 当我们明确 channel 的使用方向时,使用单向 channel 可以避免误用。
  • 减少由于误用 channel 方向导致的 bug,提高程序的健壮性。
  • channel 的发送方和接收方的功能可以更清晰的划分,利于理解。 总之,当程序中 channel 的使用方向是明确的,我们应该使用单向 channel。否则使用双向 channel 会更加灵活。单向 channel 可以在一定程度上避免由于误使用 channel 方向导致的错误。
# 例子
package main

import "fmt"

func main() {
	// 双向channel
	pings := make(chan string, 1)
	pongs := make(chan string, 1)

	ping(pings, "passed message")
	pong(pings, pongs)
	fmt.Println(<-pongs)
}

// 当使用通道作为函数参数的时候,可以制定通道的方向,即是接收通道,还是发送通道

// ping 函数定义了一个只允许发送数据的通道。尝试使用这个通道来接收数据将会得到一个编译时错误。
func ping(pings chan<- string, msg string) {
	pings <- msg
}

// pong 函数允许通道(pings)来接收数据,另一通道(pongs)来发送数据。
func pong(pings <-chan string, pongs chan<- string) {
	// msg := <-pings
	// pongs <- msg
	pongs <- <-pings
}

# channel状态同步

channel 除了用于 goroutine 之间的数据传输外,还可以用于 goroutine 之间的同步。 channel 的同步特性体现在:

  • 发送方发送数据时如果没有接收方接收数据会阻塞,直到接收方接收数据才会解阻塞。
  • 接收方接收数据时如果没有发送方发送数据会阻塞,直到发送方发送数据才会解阻塞。 利用这个特性,我们可以实现 goroutine 之间的同步。
# 例子
package main

import (
	"fmt"
	"time"
)

func main() {

	// 使用channel来同步goroutine之间的执行状态
	done := make(chan bool, 1)
	// 运行一个 worker Go协程,并给予用于通知的通道。
	go worker(done)
	// 程序将在接收到通道中 worker 发出的通知前一直阻塞。
	<-done
	// 如果你把 <- done 这行代码从程序中移除,程序甚至会在 worker还没开始运行时就结束了。
}

func worker(done chan bool) {
	fmt.Println("working...")
	time.Sleep(time.Second)
	fmt.Println("done")
	// 发送一个值来通知我们已经完工啦。
	done <- true
}

上面代码中,如果 main 没有从 go worker 接收数据则 done 阻塞。这样就实现了 main 和 go worker 同步。 总之,channel 的阻塞特性可以用于不同 goroutine 之间的同步。当一个 goroutine Expects 另一个 goroutine 的操作结果或信号时,可以使用 channel 使其阻塞,等待另一个 goroutine 的操作。这是 channel 除了数据传输之外的另一个重要用途。

# channel选择器

在 Go 语言中,通道选择器可以用来监听多个通道上的操作并进行选择。它通过 select 关键字实现。

select 的基本语法如下:

select {
case <-chan1:
    // chan1 可读,执行该 case
case chan2 <- 1:
    // chan2 可写,执行该 case  
default:
    // 如果没有 case 可执行,执行 default
}

例如,这里是一个简单的例子:

func main() {
    // 在我们的例子中,我们将从两个通道中选择。
    c1 := make(chan string)
    c2 := make(chan string)

    // 开启第一个goroutine
    // 休眠1s 然后异步向c1通道输入"one"
    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()

    // 开启第二个goroutine
    // 休眠2s 然后异步向c2通道输入"two"
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    // 使用通道选择器监听两个通道,谁先接收到值就输出谁
    // 所以这个通道选择器一直接收到的是"one"
    select {
    case msg1 := <-c1:
        println(msg1)
    case msg2 := <-c2:
        println(msg2)
    }
    // 输出
    // one
}
# channel选择器的第一个用法:超时处理

通过使用time.After()函数来定义一个指定时间之后的执行的通道。我们很容易的实现超时处理

下面是一个使用超时处理的例子:

package main

import (
	"fmt"
	"strconv"
	"time"
)

func main() {
	// 定义双向通道c1和c2
	c1 := make(chan string)
	c2 := make(chan string)

	// 开启goroutine 每个200ms向c1输入一个值 执行5次
	go func() {
		for i := 1; i <= 5; i++ {
			time.Sleep(time.Millisecond * 200)
			c1 <- "result " + strconv.Itoa(i)
		}
	}()

	// 开启goroutine 每个300ms向c2输入一个值 执行5次
	go func() {
		for i := 1; i <= 5; i++ {
			time.Sleep(time.Millisecond * 300)
			c2 <- "result " + strconv.Itoa(i)
		}
	}()

	// 定义一个独立的定时器
	timeout := time.After(time.Second * 1)

	// for循环可以一直启用select监听c1和c2通道
	for {
		select {
		case res1 := <-c1: // 输出c1通道接收到的值
			fmt.Println("received from chan c1: ", res1)
		case res2 := <-c2: // 输出c2通道接收到的值
			fmt.Println("received from chan c2: ", res2)
		case <-timeout: // 在1s后让通道选择器执行超时退出操作
			fmt.Println("timeout after 1s")
			return
		}
	}

	// 输出
	// received from chan c1:  result 1
	// received from chan c2:  result 1
	// received from chan c1:  result 2
	// received from chan c1:  result 3
	// received from chan c2:  result 2
	// received from chan c1:  result 4
	// received from chan c2:  result 3
	// timeout after 1s
}
# channel选择器的第二个用法:非阻塞channel

可以使用select的default语句来实现非阻塞通道的监听。

package main

import (
	"fmt"
	"strconv"
	"time"
)

// 常规的通过通道发送和接收数据是阻塞的。然而,我们可以使用带一个 default 子句的 select 来实现非阻塞 的发送、接收,甚至是非阻塞的多路 select。

func main() {
	// 定义一个messages通道
	messages := make(chan string)

	// 使用goroutine异步,每个500ms向messages写入一条消息
	go func() {
		for i := 1; i < 5; i++ {
			time.Sleep(time.Millisecond * 500)
			messages <- "msg " + strconv.Itoa(i)
		}
	}()

	// 定义一个超时处理器
	timeout := time.After(time.Second * 3)

	for {
		// 如果在 messages 中存在,然后 select 将这个值带入 <-messages case中。如果不是,就直接到 default 分支中。
		select {
		case msg := <-messages:
			fmt.Println(msg)
		case <-timeout:
			fmt.Println("timeout 1")
			return
		default:
			time.Sleep(time.Millisecond * 300)
			fmt.Println("no message received")
		}
	}
	// 输出
	// no message received
	// no message received
	// msg 1
	// no message received
	// no message received
	// msg 2
	// no message received
	// no message received
	// msg 3
	// no message received
	// no message received
	// msg 4
	// no message received
	// no message received
	// timeout 1
}

所以,select default的用来实现一个非阻塞的通道读写:

  • 尝试向通道写入数据,如果通道已满,则选择 default
  • 尝试从通道读取数据,如果通道为空,则选择 default 这种“非阻塞”的特性可以用于在不确定通道状态的情况下尝试读写,如果不能成功则实现默认行为,而不是阻塞当前 goroutine。

# channel的关闭

关闭 一个通道意味着不能再向这个通道发送值了。这个特性可以用来给这个通道的接收方传达工作已经完成的信息。通道需要关闭(close)的情况有两种:

  • 当不再使用这个通道发送数据时,需要关闭通道
  • 对方可以通过通道的关闭状态来判断发送方是否结束工作

关闭通道的语法很简单,使用 close 关键字:

c := make(chan int)
close(c)

关闭通道主要有一下几个作用:

  • 对已关闭的通道再发送数据会 panic,这可以确保不会向通道发送无效数据。
  • 对已关闭的通道进行接收(<-chan)不会阻塞,而是立即返回通道元素类型的零值。这可以用于正常关闭接收方 goroutine。
  • 通过 select 中的 case <-chan 语句,如果通道关闭,则会立即执行该 case,这可以用于检测通道是否关闭。
  • 可以通过通道接收时的第二个参数测试通道是否关闭,如果关闭则返回 true 。

例子:

package main

import "fmt"

func main() {
	// 定义一个工作数据接收通道
	jobs := make(chan int, 5)
	// 定义一个多goroutine之间状态同步的通道
	done := make(chan bool, 0)

	// 开启goroutine异步监听jobs通道
	// 如果有数据输出数据
	// 如果通道关闭向状态同步通道同步关闭状态,然后退出当前goroutine
	go func() {
		for {
			j, more := <-jobs
			if more {
				fmt.Println("received job", j)
			} else {
				fmt.Println("received all jobs")
				done <- true
				return
			}
		}
	}()

	// 主goroutine向jobs写入数据
	for i := 0; i < 3; i++ {
		jobs <- i
	}
	// 关闭通道
	close(jobs)
	fmt.Println("sent all jobs")

	// 阻塞主goroutine 等待同步的状态 然后退出
	<-done
}

# channel的遍历

Go 语言的通道(channel)可以使用 range 关键字进行遍历。遍历通道会一直阻塞到通道关闭。

package main

import "fmt"

func main() {
	// for range 来遍历通道
	c := make(chan int)
	go func() {
		c <- 1
		c <- 2
        close(c)
	}()

	for v := range c {
		fmt.Println(v)
	}
	// 输出
	// 1
	// 2
}

上面的通道,如果在非关闭状态下遍历,会产生死锁。后面我们将学习goroutine的死锁。

# 扩展

# 什么是goroutine的阻塞?

Goroutine 阻塞意味着 goroutine 被暂停执行,等待某个条件满足才继续执行。 Goroutine 可以因以下几个原因被阻塞:

  • 通道阻塞:当从一个空通道接收数据时,goroutine 会被阻塞直到有数据发送到该通道。
  • 互斥锁阻塞:当 goroutine 尝试锁定一个已经被其他 goroutine 锁定的互斥锁时,它会被阻塞直到互斥锁被解锁。
  • 等待其他 goroutine:可以使用 WaitGroup 显式等待其他 goroutine 结束。goroutine 会被阻塞直到 WaitGroup 的计数减为 0。
  • 系统调用阻塞:当 goroutine 执行系统调用发生 IO 或涉及同步的操作时,它会被阻塞。例如文件读取,数据库查询等。
  • 定时器阻塞:goroutine 可以使用 time.Sleep 或 channel 和 select 实现定时器,在定时时间未到达前被阻塞。 阻塞的 goroutine 会被放入调度队列中,等待它所需要的资源或条件准备就绪后继续执行。

# 什么是goroutine的死锁?

Goroutine 死锁指的是两个或多个 goroutine 因互相等待对方释放资源而陷入无限等待的状态。 死锁的四个必要条件:

  • 互斥:一个资源每次只能被一个 goroutine 占有。
  • 占有且等待:goroutine 已经占有某资源,但还在等待其他被其他 goroutine 占有的资源。
  • 不可剥夺:资源在被占有期间不能被 goroutine 外力剥夺。
  • 循环等待:goroutine 形成了一个等待循环队列。
package main

import "fmt"

func main() {
	// for range 来遍历通道
	c := make(chan int)
	go func() {
		c <- 1
		c <- 2
	}()

	for v := range c {
		fmt.Println(v)
	}
	// 输出
	// 1
	// 2
}

如上,新开启的goroutine向通道c写入1,2两个值之后直接退出。但是main goroutine的for...range...还在等待通道c关闭或者继续往通道c里面写入数据。所以导致了死锁的情况。

# 阻塞与死锁的区别:

  • 阻塞的 goroutine 会在条件满足时被唤醒继续执行,死锁的 goroutine 却无法继续执行。
  • 阻塞通常是临时的,死锁发生后程序无法继续运行。
  • 阻塞不会导致资源占用不释放,死锁会导致有限资源被永久占用无法释放。 总之,理解 goroutine 的阻塞机制和死锁的区别可以避免 Go 程序出现死锁问题,编写出健壮的并发程序。阻塞是控制 goroutine 执行顺序的一种重要手段。

# 解答

# 实现步骤

  1. 读取 CSV 文件,获取所有品类名称,存入 brands 数组。
  2. 创建结果文件 result.csv 和 waitGroup。
  3. 启动 goroutine,对 brands 数组中的每个品类名称调用接口打分,得到分数 score,并通过 channel 返回。
  4. 从 channel 中获取分数 score,并写入结果文件 result.csv。
  5. 使用 waitGroup 等待所有 goroutine 结束。

# 实现代码

package main

import (
    "encoding/csv"
    "os"
    "sync"
)

var wg sync.WaitGroup

func main() {
    // 读取 CSV 文件,获取所有品类名称
    brands := readBrandsFromCSV("brands.csv")

    // 创建结果文件和waitGroup
    f, _ := os.Create("result.csv")
    w := csv.NewWriter(f)
    wg.Add(len(brands))

    // 启动 goroutine 打分,并通过 channel 返回
    scores := make(chan string)
    for _, brand := range brands {
        go getScore(brand, scores)
    }

    // 从channel获取分数并写入文件
    for i := 0; i < len(brands); i++ {
        score := <-scores
        w.Write([]string{brands[i], score})
    }

    // 等待goroutine结束
    wg.Wait()  

    // 关闭文件
    w.Flush()
    f.Close()
}

// 读取 CSV 文件获取品类  
func readBrandsFromCSV(path string) []string {
    // ...
}

// 调用接口打分,并通过channel返回    
func getScore(brand string, scores chan string) {
    // 调用接口,获取分数 score 
    score := getScoreFromAPI(brand)  

    scores <- score  // 返回分数
    wg.Done()       // 通知waitGroup
}