go并发编程
go的GMP并发模型,让go天然支持高并发,先了解一下GMP模型吧
GMP
G协程,M工作线程、P处理器,M必须持有P才可以执行G
P维护着一个协程G队列,P依次将G调度到M中运行
if M0中G0发生系统调用,M0将释放P,冗余的M1获取P,继续执行P队列中剩余的G。(只要P不空闲就充分利用了CPU)
G0系统调用结束后,如果有空闲的P,则获取P继续执行G0,否则将G0放入全局队列,M0进入缓存池睡眠。(全局队列中的G主要来自从系统调用中恢复的G)
下面介绍一下编程常用的同步(synchronize)原语
互斥锁 mutex
rwmutex
,要了解自旋和饥饿模式
自旋最多4次,cpu核数要大于1,Processor大于1
饥饿模式阻塞超过1ms,饥饿模式下不会启动自旋过程
channel
go的精华所在,注意不能关闭已经关闭的channel,不能向已关闭的channel写数据,会panic
Once
多用于实现单例模式
饿汉模式,一般是直接创建一个包级变量直接使用即可,注意既然是单例模式,就不能让他人随意创建,类型要是私有的,使用接口暴露方法,让外部获得私有变量
懒汉模式,在第一次使用时创建,这里需要注意并发安全,可以使用sync.Once
来保证并发安全
type Singleton interface {
Work() string
}
type singleton2 struct{}
func (s *singleton2) Work() string {
return "singleton2 is working"
}
func newSingleton2() *singleton2 {
return &singleton2{}
}
var (
instance *singleton2
once sync.Once
)
// GetSingleton2 用于获取单例模式对象
func GetSingleton2() Singleton {
once.Do(func() {
instance = newSingleton2()
})
return instance
}
Pool
sync.Pool 是 Go 语言标准库中的一个并发安全的对象池,用于缓存和重用临时对象,以提高性能。通常用于减少内存分配和垃圾回收的开销。PS:一般将变量Put回Pool中,需要将变量重置
var studentPool = sync.Pool{
New: func() any {
return new(Student)
},
}
func BenchmarkUnmarshalWithPool(b *testing.B) {
for n := 0; n < b.N; n++ {
stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
}
}
Cond
Cond是条件变量,可以让一组goroutine等待某个条件的发生。当条件发生时,调用Broadcast或者Signal来通知所有等待的goroutine继续执行。
需要注意的是 sync.Cond 都要在构造的时候绑定一个 sync.Mutex。Wait() 和 Signal() 函数必须在锁保护下的临界区中执行。Wait()一般放在for循环中,因为可能会出现虚假唤醒
var mu = &sync.Mutex{}
var done = false
func read(name string, c *sync.Cond) {
c.L.Lock()
for !done {
c.Wait()
}
log.Println(name, "reading")
c.L.Unlock()
}
func write(name string, c *sync.Cond) {
time.Sleep(1 * time.Second)
c.L.Lock()
done = true
c.L.Unlock()
log.Println(name, "writting finish")
c.Broadcast()
}
func main() {
cond := sync.NewCond(mu)
go read("reader1", cond)
go read("reader2", cond)
go read("reader3", cond)
write("writer", cond)
time.Sleep(time.Second * 3)
}
WaitGroup
注意Wait后不能再Add,否则会panic
// WaitGroup 完美写法
func main() {
taskNum := 3
ch := make(chan any)
go func() {
group := &sync.WaitGroup{}
for i := 1; i <= taskNum; i++ {
group.Add(1)
go func(i int) {
defer group.Done()
ch <- i
}(i)
}
// 确保所有取数据的协程都完成了工作,才关闭 ch
group.Wait()
close(ch)
}()
for i := range ch {
log.Println("goroutine ", i)
}
log.Println("finish")
}
扩展:errgroup,可以捕获goroutine中的panic,只要有一个goroutine出错,就会取消所有的goroutine
atomic
原子操作,用于解决并发问题,比如计数器,锁等
func main() {
// 定义一个共享的计数器,使用 int64 类型
var counter int64
// 使用 WaitGroup 来等待所有 goroutine 完成
var wg sync.WaitGroup
// 启动多个 goroutine 来增加计数器的值
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
// 在每个 goroutine 中原子地增加计数器的值
for j := 0; j < 100000; j++ {
atomic.AddInt64(&counter, 1)
}
wg.Done()
}()
}
// 等待所有 goroutine 完成
wg.Wait()
// 输出最终的计数器值
fmt.Println("Final Counter Value:", atomic.LoadInt64(&counter))
}
context
WithCancel
context.WithCancel()
创建可取消的 Context 对象,即可以主动通知子协程退出。
func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println("stop", name)
return
default:
fmt.Println(name, "send request")
time.Sleep(1 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go reqTask(ctx, "worker1")
time.Sleep(3 * time.Second)
cancel()
time.Sleep(3 * time.Second)
}
WithValue
如果需要往子协程中传递参数,可以使用 context.WithValue()
。
type Options struct{ Interval time.Duration }
func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
fmt.Println("stop", name)
return
default:
fmt.Println(name, "send request")
op := ctx.Value("options").(*Options)
time.Sleep(op.Interval * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
vCtx := context.WithValue(ctx, "options", &Options{1})
go reqTask(vCtx, "worker1")
go reqTask(vCtx, "worker2")
time.Sleep(3 * time.Second)
cancel()
time.Sleep(3 * time.Second)
}
WithTimeout
如果需要控制子协程的执行时间,可以使用 context.WithTimeout
创建具有超时通知机制的 Context 对象。
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
WithDeadline
与 WithTimeout
类似,不同的是 WithDeadline
可以指定一个具体的时间点。
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
semaphore
用于限制对一组共享资源的访问。也可以手动用channel实现
// 创建一个具有初始计数为 2 的信号量
sem := semaphore.NewWeighted(2)
// 启动多个 goroutine 模拟并发访问
for i := 1; i <= 5; i++ {
go func(id int) {
fmt.Printf("Goroutine %d is trying to acquire the semaphore\n", id)
// 尝试获取信号量,如果已经达到限制,则阻塞
err := sem.Acquire(context.Background(), 1)
if err != nil {
fmt.Printf("Error acquiring semaphore in goroutine %d: %v\n", id, err)
return
}
fmt.Printf("Goroutine %d has acquired the semaphore\n", id)
time.Sleep(2 * time.Second)
// 释放信号量
sem.Release(1)
fmt.Printf("Goroutine %d has released the semaphore\n", id)
}(i)
}
// 等待所有 goroutine 完成
time.Sleep(10 * time.Second)
singleflight
golang.org/x/sync/singleflight
防止缓存击穿
var sf singleflight.Group
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 以 "key" 为参数调用 Do 方法
val, err, _ := sf.Do("key", func() (interface{}, error) {
fmt.Printf("Goroutine %d is performing the operation\n", id)
time.Sleep(2 * time.Second)
return fmt.Sprintf("Result from goroutine %d", id), nil
})
if err != nil {
fmt.Printf("Goroutine %d encountered an error: %v\n", id, err)
} else {
fmt.Printf("Goroutine %d got result: %v\n", id, val)
}
}(i)
}
wg.Wait()