1. 概述#
Google 一開始寫 Golang 的時候就是為了解決 Google 內部業務的高並發需求,而且 Golang 的一大特點就是高並發,所以本文就介紹與 Golang 高並發相關的原理、概念以及技術點。
我會首先介紹一些概念,如:並行和並發,進程、線程和協程以及它們的區別,然後介紹 Golang 裡面的 goroutine 和 channel,它們是 Golang 實現高並發的關鍵,在聊一下 select、定時器、runtime 和同步鎖,最後介紹 Go 的並發優勢、並發模型和 Go 的調度器。
2. 並行、並發#
學過操作系統的話,應該對並行和並發不陌生
在同一時刻,有多條指令在多個處理器上同時執行
{{< /admonition >}}
在同一時刻只能有一條指令執行,但多個進程指令被快速輪換執行(根據不同的情況有不同的輪換算法)
並行與並發的區別:
- 並行在多處理器系統中存在,而並發可以在單處理器和多處理器系統中存在
- 並行要求程序能夠同時執行多個操作,而並發只要求程序假裝同時執行多個操作(一個時間片執行一個操作,再輪換多個操作)
3. 進程、線程、協程#
是包含計算機指令、用戶數據和系統數據的程序執行環境,以及包含其允許時獲得的其他類型資源
相對進程是更加小巧而輕量的實體,線程有進程創建且包含自己的控制流和棧,進程和線程的區別在於:進程是正在執行的二進制文件,而線程是進程的子集
協程(goroutine)是 Go 程序並發執行的最小單元,因 goroutine 不像 Unix 進行那樣是自治的實體,goroutine 主要優點是非常輕巧,輕鬆運行成千上萬個都沒問題,goroutine 比線程還輕量,goroutine 需要一個進程的環境才能存在,創建 goroutine 的時候,需要一個進程且這個進程至少有一個線程。協程是一種用戶態的輕量級線程,協程的調度完全由用戶控制,協程間的切換只需要保保存任務的上下文,沒有內核的開銷。線程棧空間通常是 2M,Goroutine 棧空間最小 2K
4. goroutine#
上面介紹了協程(下文統一用 goroutine)的概念,下面介紹一下 goroutine 的實際語法。
在 Go 語言中使用 go 關鍵字後跟函數名稱或定義完整的匿名函數即可開啟一個新的 goroutine,使用 go 關鍵字調用函數後會立即返回,該函數在後台作為 goroutine 運行,程序的其餘部分會繼續執行。
創建一個 goroutine
package main
import (
"fmt"
"time"
)
func main() {
go function()
go func() {
for i := 10; i < 20; i++ {
fmt.Print(i, " ")
}
}()
time.Sleep(1 * time.Second)
}
func function() {
for i := 0; i < 10; i++ {
fmt.Print(i)
}
fmt.Println()
}
你可能會發現上面的輸出不是固定的(main 函數可能會提前結束),我們可以用 sync 包來解決這個問題。
package main
import (
"flag"
"fmt"
"sync"
)
func main() {
n := flag.Int("n", 20, "Number of goroutines")
flag.Parse()
count := *n
fmt.Printf("Going to create %d goroutines.\n", count)
var waitGroup sync.WaitGroup //定義sync.WaitGroup類型的變量
fmt.Printf("%#v\n", waitGroup)
for i := 0; i < count; i++ { //使用for循環創建所需數量的goroutine
waitGroup.Add(1) //每次調用都會增加sync.WaitGroup變量中的計數器,防止出現任何競爭條件
go func(x int) {
defer waitGroup.Done() //sync.WaitGroup變量減一
fmt.Printf("%d ", x)
}(i)
}
fmt.Printf("%#v\n", waitGroup)
waitGroup.Wait() //sync.Wait調用將阻塞,直到sync.WaitGroup變量中的計數器為0,從而保證所有groutine能執行完成
fmt.Println("\nExiting...")
}
5. channel#
channel(通道)是 Go 共的一種通信機制,允許 goroutine 之間進行數據傳輸。
一些明確的規定:
- 每個 channel 只允許交換指定類型的數據,也就是通道的元素類型
- 要是 channel 正常運行,需要保證通道有數據接受方法
使用 chan 關鍵字即可聲明一個 channel,可以使用 close () 函數來關閉通道
當使用 channel 作為函數時,可以指定其為單向 channel
5.1 channel 的寫入#
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
}
func writeToChannel(c chan int, x int) {
fmt.Println(x)
c <- x
close(c)
fmt.Println(x)
}
5.2 從 channel 接受數據#
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go writeToChannel(c, 10)
time.Sleep(1 * time.Second)
fmt.Println("Read:", <-c)
time.Sleep(1 * time.Second)
_, ok := <-c
if ok {
fmt.Println("Channel is open!")
}else {
fmt.Println("Channel is closed!")
}
}
func writeToChannel(c chan int, x int) {
fmt.Println("l", x)
c <- x
close(c)
fmt.Println("2", x)
}
5.3 channel 作為函數參數傳遞#
package main
import (
"fmt"
//"time"
)
func main() {
c := make(chan bool, 1)
for i := 0; i < 10; i++ {
go Go(c, i)
}
<-c
}
func Go(c chan bool, index int) {
sum := 0
for i := 0; i < 1000000; i++ {
sum += i
}
fmt.Println(sum)
c <- true
}
6. select#
Go 中 select 語句看起來像 channels 的 switch 語句,實際上,select 允許 goroutine 等待多個通信操作,因此,使用 select 的主要好處是:select 可以處理多個 channels,進行非阻塞操作。
注意:使用 channels 和 select 的最大問題是 死鎖 。為了解鎖死鎖問題,後面會介紹同步鎖。
package main
import(
"fmt"
"math/rand"
"os"
"strconv"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
createNumber := make(chan int)
end := make(chan bool)
if len(os.Args) != 2 {
fmt.Println("Please give me an integer!")
return
}
n, _ := strconv.Atoi(os.Args[1])
fmt.Printf("Going to create %d random numbers.\n", n)
go gen(0, 2*n, createNumber, end)
for i := 0; i < n; i++ {
fmt.Printf("%d ", <-createNumber)
}
time.Sleep(5 * time.Second) //給gen()函數中的time.After()函數足夠時間返回,從而激活select分支
fmt.Println("Exting...")
end <- true //激活gen()裡面的select語句中的case->end 分支來終止程序並執行相關代碼
}
func gen(min, max int, createNumber chan int, end chan bool) {
for {
select {
case createNumber <- rand.Intn(max-min) + min:
case <- end:
close(end)
return
case <- time.After(4 * time.Second): //time.After函數在指定時間過後返回,因此它將在其他channels被阻塞時解鎖select語句
fmt.Println("\ntime.After()!") //可以把這個case當作default分支
}
}
}
注意:select 語句不需要 default 分支
select 語句不是按順序計算的,因為所有的 channels 都是同時檢查的
如果 select 語句中沒有 channels 是準備好的,那麼 select 語句就會 阻塞 ,直到有 channels 準備好,Go 運行時就會在這些準備好的 channels 之間做 隨機選擇 ,做到公平一致
select 最大的優點是:可以連接、編排、管理多個 channels
當 channels 連接 goroutine 的時候,select 連接那些連接 goroutine 的 channels
7. 定時器#
介紹 select 的時候也用到了定時器,那麼什麼是定時器呢?
定時器是一種通過設置一項任務,在未來的某個時刻執行該任務的機制
定時器有兩種:
- 只執行一次的延時模式
- 每隔一段時間執行一次的間隔模式
Go 語言中的定時器比較完善,所有的 API 都在 time 包中
7.1 延時模式#
延遲執行有兩種:time.After 和 time.Sleep
7.1.1 time.After#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
timeAfterTrigger := time.After(1 * time.Second)
<-timeAfterTrigger
fmt.Println("2")
}
time 包提供了運算好的幾個 int 類型常量
const (
Nanosecond Duration = 1
Microsecond = 1000 * Nanosecond
Millisecond = 1000 * Microsecond
Second = 1000 * Millisecond
Minute = 60 * Second
Hour = 60 * Minute
)
7.1.2 time.Sleep#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
time.Sleep(1 * time.Second)
fmt.Println("2")
}
兩者的區別是:time.Sleep 是阻塞當前協程的,而 time.After 是基於 channel 實現的,可以在不同協程中傳遞
7.2 間隔模式#
間隔模式有分為兩種:一種是執行 N 次後結束,另一種是程序不停休的執行
7.2.1 time.NewTicker#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
count := 0
timeTicker := time.NewTicker(1 * time.Second)
for {
<-timeTicker.C
fmt.Println("每隔 1 秒輸出 2")
count++
if count >= 5 {
timeTicker.Stop()
}
}
}
7.2.2 time.Tick#
package main
import (
"fmt"
"time"
)
func main() {
t := time.Tick(1 * time.Second)
for {
<-t
fmt.Println("每隔 1 秒輸出一次")
}
}
7.3 控制定時器#
定時器提供了 Stop 方法和 Reset 方法
- Stop 方法的作用是停止定時器
- Reset 方法的作用是改變定時器的間隔時間
7.3.1 time.Stop#
package main
import (
"fmt"
"time"
)
func main() {
timer := time.NewTimer(time.Second * 6)
go func() {
<-timer.C
fmt.Println("時間到")
}()
timer.Stop()
}
7.3.2 time.Reset#
package main
import (
"fmt"
"time"
)
func main() {
fmt.Println("1")
count := 0
timeTicker := time.NewTicker(1 * time.Second)
for {
<-timeTicker.C
fmt.Println("2")
count++
if count >= 3 {
timeTicker.Reset(2 * time.Second)
}
}
}
8. runtime#
runtime 是 Go 語言運行所需要的基礎設施,如:控制 goroutine 的功能,debug,pprof、trace、race 進行檢測的支持,內存分配,系統操作和 CPU 相關操作的封裝(信號處理、系統調用、寄存器操作、原子操作等),map、channel、string 等內置類型及反射的實現
與 Java、python 中的 runtime 不同,Java、python 的 runtime 是虛擬機的,而 Go 的 runtime 是和用戶代碼一起編譯到一個可執行文件中的
runtime 發展歷程:
9. 同步鎖#
上文提高 channels 和 select 的最大問題是 死鎖 ,這小節介紹解決死鎖的問題 -- 同步鎖
Go 語言同步鎖有兩種方式:原子鎖,互斥鎖
9.1 原子鎖#
可以借助某個信號向所有的 goroutine 發送消息
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
shotdown int64 // 該標誌向多個goroutine通知狀態
wg sync.WaitGroup
)
func main() {
wg.Add(2)
go doWork("A")
go doWork("B")
time.Sleep(1 * time.Second)
atomic.StoreInt64(&shotdown, 1) // 修改
wg.Wait()
}
func doWork(s string) {
defer wg.Done()
for {
fmt.Printf("Doing homework %s\n", s)
time.Sleep(2 * time.Second)
if atomic.LoadInt64(&shotdown) == 1 { // 讀取
fmt.Printf("Shotdown home work %s\n", s)
break
}
}
}
9.2 互斥鎖#
通過 mutex ,能夠將一段臨界區間包含起來,只運行單個 goroutine 執行
package main
import (
"fmt"
"runtime"
"sync"
)
var (
counter int
wg sync.WaitGroup
mutex sync.Mutex // 定義臨界區
)
func main() {
wg.Add(2)
go incCount(1)
go incCount(2)
wg.Wait()
fmt.Printf("Final Counter: %d\n", counter)
}
func incCount(i int) {
defer wg.Done()
for count := 0; count < 2; count++ {
mutex.Lock()
{
value := counter
runtime.Gosched()
value++
counter = value
}
mutex.Unlock()
}
}
10. Go 並發優勢#
Go 語言為並發編程而內置的上層 API 基於 CSP (communicating sequential processes, 順序通信進程) 模型。這就意味著顯式鎖都是可以避免的,因為 Go 語言通過相册安全的通道發送和接受數據以實現同步,這大大地簡化了並發程序的編寫。
一般情況下,一個普通的桌面計算機跑十幾二十個線程就有點負載過大了,但是同樣這台機器卻可以輕鬆地讓成百上千甚至過萬個 goroutine 進行資源競爭
11. Go 並發模型#
Go 語言實現了兩種並發形式:
- 多線程共享內存(通過共享內存來通信)
- CSP(communicating sequential processes)並發模型(以通信的方式來共享內存)
Do not communicate by sharing memory; instead, share memory by communicating
Java、C++、python 它們的線程都是通過共享內存來通信的
Go 的 CSP 並發模型通過 goroutine 和 channel 來實現
goroutine 與 channel 結合使用案例:
package main
import (
"fmt"
)
//write Data
func writeData(intChan chan int) {
for i := 1; i <= 50; i++ {
//放入數據
intChan<- i //
fmt.Println("writeData ", i)
}
close(intChan) //關閉
}
//read data
func readData(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Printf("readData 讀到數據=%v\n", v)
}
//readData 讀取完數據後,即任務完成
exitChan<- true
close(exitChan)
}
func main() {
//創建兩個管道
intChan := make(chan int, 10)
exitChan := make(chan bool, 1)
go writeData(intChan)
go readData(intChan, exitChan)
for {
_, ok := <-exitChan
if !ok {
break
}
}
}
12 Go 調度器#
GO 語言的調度器使用了三種結構:
G 代表 goroutine,每個 Goroutine 對應一個 G 結構體,G 存儲 Goroutine 的運行堆棧、狀態以及任務函數,可重用
M 代表內核線程,代表著真正執行計算的資源,在綁定有效的 P 後,進入 schedule 循環;而 schedule 循環的機制大致是從 Global 隊列、P 的 Local 隊列以及 wait 隊列中獲取
P 代表邏輯處理器,表示調度的上下文。可以把它看作是一個局部的調度器,讓 Go 代碼跑在一個單獨的線程上。這是讓 Go 從一個 N:1 調度器映射到一個 M調度器的關鍵。
對 G 來說,P 相當於 CPU 核,G 只有綁定到 P 才能被調度。
對 M 來說,P 提供了相關的執行環境 (Context),如內存分配狀態 (mcache),任務隊列 (G) 等
P 的數量決定了系統內最大可並行的 G 的數量(前提:物理 CPU 核數 >= P 的數量)。
P 的數量由用戶設置的 GoMAXPROCS 決定,但是不論 GoMAXPROCS 設置為多大,P 的數量最大為 256
用經典的 地鼠推車搬磚 的模型來說明三者關係
地鼠的工作任務是:工地上有若干磚頭,地鼠借助小車把磚頭運送到火種上
13. 總結#
本文介紹了與 Golang 並發相關的一些知識,從最開始的一些基礎概念,包括:並行、並發和進程、線程、協程,到 Golang 並發的一些實際用法,包括:goroutine、channel、select、定時器和同步鎖,也簡單的介紹了 runtime,最後介紹了 Go 的調度器模型。