本章节主要讲解go中的并发思想和如何创建goroutine并利用channel实现goroutine之间的通信和并发控制
概述
进程、线程、协程
进程与线程
想要了解并发就必须直到进程和线程的概念
进程是操作系统资源分配的最小单位
线程是CPU调度的基本单位
进程可以看作是一个运行着的程序,进程被分配一片系统资源。而线程可以看错进程执行函数的实体,是CPU上的执行单元。一个进程可以包含多个线程,线程之间共享进程资源
协程
协程可以认为是轻量级的线程,它的调度是通过用户态进行的,由于不需要切换到内核态所以操作系统不参与调度,相当于用户自己控制线程的切换和执行。比如Go语言就采用自己的协程调度器实现协程之间的调度
什么是并发?并行呢?
简单来说,并发就是多个程序之间交替执行,从而实现宏观上看起来多个程序在同时运行的效果
而并行就是真正意义上的两个程序同一时间都在执行,比如多核CPU可以同时并行多个任务
goroutine
go语言并发只需要关注goroutine即可,由于学习过Java和C++,初次接触到goroutine感觉无比的清爽。在Java和C++中,线程之间的上下文切换是需要消耗资源的,所以我们需要通过像Java的线程池来管理多个线程,但是goroutine所占用的初始空间很小,随着运行可以进行增大或者缩小,所以一次性创建很多goroutine是可行的。在Go语言中,我们需要并发执行任务的时候只需要通go
关键字去开启一个goroutine执行函数就可以了,不需要手动维护线程池和关心具体的调度,开发效率高代码清爽。
使用方法
对于开启一个goroutine,直接通go
关键字即可
go func...(...)
Go的程序入口是main,调用main的时候创建一个goroutine主协程,之后我们创建的goroutine都基于这个主协程
体现出主协程的一个很经典的例子就是:
func method(){
fmt.Println("goroutine_1")
}
func main(){
go method()
fmt.Println("goroutine_main")
}
最后只能打印goroutine_main"
,这是因为goroutine还没被开启,主goroutine就执行结束了
所以可以看出我们通过go创建的子协程是依附于这个主goroutine的
当然,为了看goroutine_1
的打印,我们可以主动让main中调time.Sleep(5*time.Second)
让主goroutine休眠5秒,这样就有足够的实现让子协程创建并执行任务了
通过go关键字创建的多个协程之间是互不干扰并发执行的
Channel
上面我们说到,通过go关键字可以开启一个协程,但是实际开发中经常会遇到的一个问题就是,协程之间需要通信,那么我们应该如何实现协程之间的通信呢?
channel使用方法
channel的声明方式如下:
var name chan type
var name [size]chan type
如果没有初始化分配空间,其默认值是nil,我们也可以通过make函数进行初始化
name:= make(chan type )
name:= make(chan type , size)
这里的size表示缓冲区的大小,如果没有传入这个参数默认是无缓冲区的channel
channel可以理解为一个传输数据的管道,那么我们可以想到一个常见的操作,那就是传入数据和传出数据
go语言中直接通<-
符号表示数据流动
ch := make(chan int)
go func() {
ch <- 1 // 发送操作放入 goroutine,不阻塞主线程
}()
v := <-ch
fmt.Println(v)
close(ch)
这里表示我们传递了一个int类型变量1进入管道
然后我们取出这个数据并初始化赋值给变量v,最后使用完毕关闭channel(关闭的管道仍然可以从其中获取数据,只不过获取到的数据永远都是零值)
需要注意的是,如果是无缓冲的channel,我们传递数据的时候会阻塞等待接收者
我们可以理解为无缓冲的管道是一个负责传递的工人,数据就是水通过一个水桶传递,工人拿着这个水桶,当水桶满了就只能等着有人来接水,只有把水倒出去才能继续工作,这里如果我们不使用一个goroutine做数据传输的工作,就会一直阻塞直到有人获取数据
v,ok := <-ch
当然我们也可以通过第二个变量ok作为判断,当ok为true表示还能读取到值
实际开发中,很多时候我们不知道具体的读取次数,我们就可以用for range进行循环的读取,直到不再有数据或者channel关闭
func main() {
ch := make(chan int, 5)
ch <- 1
ch <- 2
close(ch)
go func() {
for v := range ch {
fmt.Printf("v=%d\n", v)
}
}()
time.Sleep(2 * time.Second)
}
传入1和2之后关闭了channel,for range就会循环两次结束
单向channel
刚才我们使用的channel都是双向的,也就是可以传入数据也可以传出数据
实际上我们还可以规定单向的channel,具体表现为只允许读或写的channel
type Write = chan<- int
type Read = <-chan int
func main() {
var ch = make(chan int)
go func() {
var write Write = ch
fmt.Println("write: 100")
write <- 100
}()
go func() {
var read Read = ch
num := <- read
fmt.Printf("read: %d", num)
}()
time.Sleep(2*time.Second)
}
我们需要起两个别chan<-
表示只能写的channel<-chan
表示只能读的channel
通过一个双向channel来定义单向channel,比var write Write = ch
相当于限制了双向channel的特性,相当于对原本channel的一种封装,对channel的行为进行了限制
所以channel的思想就是不通过共享内存来通信,而是通过通信来共享内存。比如对于子协程的执行结果,父协程可以通过channel获取
channel如何解决并发问题
上面我们有提到有缓冲的channel和无缓冲的channel
无缓冲的channel最明显的特点就是如果执行了写入数据的操作就会阻塞等待接收者接收数据
如果我们都在主协程中执行传入和接收的操作就会阻塞,所以我们必须通过go一个协程实现传入和接收
而有缓冲的channel特点就是只要缓冲区没满就可以继续写入数据,如果缓冲区满了就会和无缓冲channel一样阻塞,这个缓冲区可以看作队列结构,先写入的数据最先出队,当缓冲队列满了就退化为无缓冲的channel
那么我们就可以利用这个特点实现并发的控制
func incr(ch chan int,v *int){
ch<-1
v = v+1
<-ch
}
func main(){
ch := make(chan int,1)
var v int
for i := 0; i < 100; i++{
go incr(ch,v)
}
time.Sleep(5)
fmt.Println(v)
}
打印结果是100,实际上我们通过channel实现了一个锁
当一个协程执行ch<-1的操作之后,由于缓冲区满了,另一个协程想要执行+1的操作就必须先传入数据
但是缓冲区满了只能阻塞,于是只能等待正在执行+1操作的协程执行完+1操作后执行<-ch才能执行相应操作,也就是实现了加锁和解锁的效果
由于变量v的+1操作不是原子操作,所以在并发场景下会出现数据不安全问题,通过channel实现了加锁解锁机制,解决了并发不安全问题
不过上面的例子只不过是增加大家的理解
这里引入一个比较复杂的例子,我们可以通过channel队列实现高并发的控制
在高并发场景中,多个 goroutine 同时操作共享数据(如 map、切片、计数器)时,必须通过加锁、原子操作或通道机制来保证安全。其中一种非常实用的模式是:**通过 Channel 构造任务队列 + 单线程串行处理任务”**,也称**顺序消费模型**
核心思想就是通过多个生产者并发的发送任务到channel,让消费者goroutine独占接收通道顺序处理,这样我们可以实现无锁安全操作共享资源
示例
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type task struct {
callback chan int // 用于返回领取的值
}
const workerNum = 4 // 启动 4 个通道/worker
var (
chTaskList = make([]chan task, workerNum)
numPool = [][]int{
{10, 20, 30},
{40, 50},
{60, 70},
{80, 90, 100},
}
)
func main() {
// 启动 worker
for i := 0; i < workerNum; i++ {
chTaskList[i] = make(chan task)
go consumeTask(i, chTaskList[i])
}
// 模拟并发请求
var wg sync.WaitGroup
for i := 0; i < 8; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
cb := make(chan int)
t := task{callback: cb}
// 简单 hash 映射分配到 worker
ch := chTaskList[i%workerNum]
ch <- t
val := <-cb
fmt.Printf("用户 %d 领取到:%d\n", i+1, val)
}(i)
}
wg.Wait()
}
// 单个 worker 处理自己的任务队列
func consumeTask(index int, ch chan task) {
for t := range ch {
if len(numPool[index]) == 0 {
t.callback <- 0
continue
}
// 随机取出一个
r := rand.New(rand.NewSource(time.Now().UnixNano()))
i := r.Intn(len(numPool[index]))
val := numPool[index][i]
// 安全移除
if i == 0 {
numPool[index] = numPool[index][1:]
} else if i == len(numPool[index])-1 {
numPool[index] = numPool[index][:i]
} else {
numPool[index] = append(numPool[index][:i], numPool[index][i+1:]...)
}
t.callback <- val
}
}
我们模拟这样一个场景:
有一组初始整数池
[]int{10, 20, 30, 40, 50}
;多个用户同时发起“领取一个数字”的请求;
系统使用多个 channel 构建任务队列,由多个 goroutine 处理任务;
每个任务处理完后通过回调 channel 返回领取到的值。
这样每个任务通过回调函数获取领取数字的结果,实现了并发安全控制
小结
关闭一个未初始化
channel
会产panic
channel
只能被关闭一次,对同一channel
重复关闭会产panic
向一个已关闭的
channel
发送消息会产生panic
m从一个已关闭
channel
读取消息不会发panic
,会一直读取所有数据,直到零值
channel
可以读端和写端都可有多goroutine
操作,在一端关channel
的时候,channel
读端的所goroutine
都会收channel
已关闭的消息
channel
是并发安全的,多goroutine
同时读channel
中的数据,不会产生并发安全问题