xfeng

xfeng

Sporting | Reading | Technology | Recording
github
bilibili

Golang為並發而生

image

1. 概述#

Google 一開始寫 Golang 的時候就是為了解決 Google 內部業務的高並發需求,而且 Golang 的一大特點就是高並發,所以本文就介紹與 Golang 高並發相關的原理、概念以及技術點。

我會首先介紹一些概念,如:並行和並發,進程、線程和協程以及它們的區別,然後介紹 Golang 裡面的 goroutine 和 channel,它們是 Golang 實現高並發的關鍵,在聊一下 select、定時器、runtime 和同步鎖,最後介紹 Go 的並發優勢、並發模型和 Go 的調度器。

2. 並行、並發#

學過操作系統的話,應該對並行和並發不陌生

image

在同一時刻,有多條指令在多個處理器上同時執行
{{< /admonition >}}

在同一時刻只能有一條指令執行,但多個進程指令被快速輪換執行(根據不同的情況有不同的輪換算法)

並行與並發的區別:

  • 並行在多處理器系統中存在,而並發可以在單處理器和多處理器系統中存在
  • 並行要求程序能夠同時執行多個操作,而並發只要求程序假裝同時執行多個操作(一個時間片執行一個操作,再輪換多個操作)

3. 進程、線程、協程#

image

是包含計算機指令、用戶數據和系統數據的程序執行環境,以及包含其允許時獲得的其他類型資源

相對進程是更加小巧而輕量的實體,線程有進程創建且包含自己的控制流和棧,進程和線程的區別在於:進程是正在執行的二進制文件,而線程是進程的子集

協程(goroutine)是 Go 程序並發執行的最小單元,因 goroutine 不像 Unix 進行那樣是自治的實體,goroutine 主要優點是非常輕巧,輕鬆運行成千上萬個都沒問題,goroutine 比線程還輕量,goroutine 需要一個進程的環境才能存在,創建 goroutine 的時候,需要一個進程且這個進程至少有一個線程。協程是一種用戶態的輕量級線程,協程的調度完全由用戶控制,協程間的切換只需要保保存任務的上下文,沒有內核的開銷。線程棧空間通常是 2M,Goroutine 棧空間最小 2K

4. goroutine#

上面介紹了協程(下文統一用 goroutine)的概念,下面介紹一下 goroutine 的實際語法。

在 Go 語言中使用 go 關鍵字後跟函數名稱或定義完整的匿名函數即可開啟一個新的 goroutine,使用 go 關鍵字調用函數後會立即返回,該函數在後台作為 goroutine 運行,程序的其餘部分會繼續執行。

創建一個 goroutine

package main
import (
	"fmt"
	"time"
)

func main()  {
	go function()
	go func() {
		for i := 10; i < 20; i++ {
			fmt.Print(i, " ")
		}
	}()
	time.Sleep(1 * time.Second)
}

func function() {
	for i := 0; i < 10; i++ {
		fmt.Print(i)
	}
	fmt.Println()
}

你可能會發現上面的輸出不是固定的(main 函數可能會提前結束),我們可以用 sync 包來解決這個問題。

package main
import (
	"flag"
	"fmt"
	"sync"
)

func main() {
	n := flag.Int("n", 20, "Number of goroutines")
	flag.Parse()
	count := *n
	fmt.Printf("Going to create %d goroutines.\n", count)
	var waitGroup sync.WaitGroup //定義sync.WaitGroup類型的變量

	fmt.Printf("%#v\n", waitGroup)
	for i := 0; i < count; i++ { //使用for循環創建所需數量的goroutine
		waitGroup.Add(1) //每次調用都會增加sync.WaitGroup變量中的計數器,防止出現任何競爭條件
		go func(x int) {
			defer waitGroup.Done() //sync.WaitGroup變量減一
			fmt.Printf("%d ", x)
		}(i)
	}

	fmt.Printf("%#v\n", waitGroup)
	waitGroup.Wait() //sync.Wait調用將阻塞,直到sync.WaitGroup變量中的計數器為0,從而保證所有groutine能執行完成
	fmt.Println("\nExiting...")
}

5. channel#

channel(通道)是 Go 共的一種通信機制,允許 goroutine 之間進行數據傳輸。

一些明確的規定:

  • 每個 channel 只允許交換指定類型的數據,也就是通道的元素類型
  • 要是 channel 正常運行,需要保證通道有數據接受方法

使用 chan 關鍵字即可聲明一個 channel,可以使用 close () 函數來關閉通道

當使用 channel 作為函數時,可以指定其為單向 channel

image

5.1 channel 的寫入#

package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
}
func writeToChannel(c chan int, x int) {
	fmt.Println(x)
	c <- x
	close(c)
	fmt.Println(x)
}

5.2 從 channel 接受數據#

package main
import (
	"fmt"
	"time"
)
func main() {
	c := make(chan int)
	go writeToChannel(c, 10)
	time.Sleep(1 * time.Second)
	fmt.Println("Read:", <-c)
	time.Sleep(1 * time.Second)
	_, ok := <-c
	if ok {
		fmt.Println("Channel is open!")
	}else {
		fmt.Println("Channel is closed!")
	}
}
func writeToChannel(c chan int, x int) {
	fmt.Println("l", x)
	c <- x
	close(c)
	fmt.Println("2", x)
}

5.3 channel 作為函數參數傳遞#

package main

import (
	"fmt"
	//"time"
)
func main() {
	c := make(chan bool, 1)
	for i := 0; i < 10; i++ {
		go Go(c, i)
	}

	<-c
}
func Go(c chan bool, index int) {
	sum := 0
	for i := 0; i < 1000000; i++ {
		sum += i
	}
	fmt.Println(sum)
	c <- true
}

6. select#

Go 中 select 語句看起來像 channels 的 switch 語句,實際上,select 允許 goroutine 等待多個通信操作,因此,使用 select 的主要好處是:select 可以處理多個 channels,進行非阻塞操作。

注意:使用 channels 和 select 的最大問題是 死鎖 。為了解鎖死鎖問題,後面會介紹同步鎖。

package main
import(
	"fmt"
	"math/rand"
	"os"
	"strconv"
	"time"
)
func main() {
	rand.Seed(time.Now().Unix())
	createNumber := make(chan int)
	end := make(chan bool)
	if len(os.Args) != 2 {
		fmt.Println("Please give me an integer!")
		return
	}
	n, _ := strconv.Atoi(os.Args[1])
	fmt.Printf("Going to create %d random numbers.\n", n)
	go gen(0, 2*n, createNumber, end)
	for i := 0; i < n; i++ {
		fmt.Printf("%d ", <-createNumber)
	}
    time.Sleep(5 * time.Second)  //給gen()函數中的time.After()函數足夠時間返回,從而激活select分支
	fmt.Println("Exting...")
    end <- true  //激活gen()裡面的select語句中的case->end 分支來終止程序並執行相關代碼
}
func gen(min, max int, createNumber chan int, end chan bool) {
	for {
		select {
		case createNumber <- rand.Intn(max-min) + min:
		case <- end:
			close(end)
			return
		case <- time.After(4 * time.Second): //time.After函數在指定時間過後返回,因此它將在其他channels被阻塞時解鎖select語句
			fmt.Println("\ntime.After()!") //可以把這個case當作default分支
		}
	}
}

注意:select 語句不需要 default 分支

select 語句不是按順序計算的,因為所有的 channels 都是同時檢查的

如果 select 語句中沒有 channels 是準備好的,那麼 select 語句就會 阻塞 ,直到有 channels 準備好,Go 運行時就會在這些準備好的 channels 之間做 隨機選擇 ,做到公平一致

select 最大的優點是:可以連接、編排、管理多個 channels

當 channels 連接 goroutine 的時候,select 連接那些連接 goroutine 的 channels

7. 定時器#

介紹 select 的時候也用到了定時器,那麼什麼是定時器呢?

定時器是一種通過設置一項任務,在未來的某個時刻執行該任務的機制

定時器有兩種:

  • 只執行一次的延時模式
  • 每隔一段時間執行一次的間隔模式

Go 語言中的定時器比較完善,所有的 API 都在 time 包中

7.1 延時模式#

延遲執行有兩種:time.After 和 time.Sleep

7.1.1 time.After#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	timeAfterTrigger := time.After(1 * time.Second)
	<-timeAfterTrigger
	fmt.Println("2")
}

time 包提供了運算好的幾個 int 類型常量

const (
	Nanosecond  Duration = 1
	Microsecond          = 1000 * Nanosecond
	Millisecond          = 1000 * Microsecond
	Second               = 1000 * Millisecond
	Minute               = 60 * Second
	Hour                 = 60 * Minute
)

7.1.2 time.Sleep#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	time.Sleep(1 * time.Second)
	fmt.Println("2")
}

兩者的區別是:time.Sleep 是阻塞當前協程的,而 time.After 是基於 channel 實現的,可以在不同協程中傳遞

7.2 間隔模式#

間隔模式有分為兩種:一種是執行 N 次後結束,另一種是程序不停休的執行

7.2.1 time.NewTicker#

package main

import (
	"fmt"
	"time"
)
func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("每隔 1 秒輸出 2")
		count++
		if count >= 5 {
			timeTicker.Stop()
		}
	}
}

7.2.2 time.Tick#

package main

import (
	"fmt"
	"time"
)
func main() {
	t := time.Tick(1 * time.Second)
	for {
		<-t
		fmt.Println("每隔 1 秒輸出一次")
	}
}

7.3 控制定時器#

定時器提供了 Stop 方法和 Reset 方法

  • Stop 方法的作用是停止定時器
  • Reset 方法的作用是改變定時器的間隔時間

7.3.1 time.Stop#

package main

import (
	"fmt"
	"time"
)

func main() {
	timer := time.NewTimer(time.Second * 6)
	go func() {
		<-timer.C
		fmt.Println("時間到")
	}()
	timer.Stop()
}

7.3.2 time.Reset#

package main

import (
	"fmt"
	"time"
)

func main() {
	fmt.Println("1")
	count := 0
	timeTicker := time.NewTicker(1 * time.Second)
	for {
		<-timeTicker.C
		fmt.Println("2")
		count++
		if count >= 3 {
			timeTicker.Reset(2 * time.Second)
		}
	}
}

8. runtime#

runtime 是 Go 語言運行所需要的基礎設施,如:控制 goroutine 的功能,debug,pprof、trace、race 進行檢測的支持,內存分配,系統操作和 CPU 相關操作的封裝(信號處理、系統調用、寄存器操作、原子操作等),map、channel、string 等內置類型及反射的實現

與 Java、python 中的 runtime 不同,Java、python 的 runtime 是虛擬機的,而 Go 的 runtime 是和用戶代碼一起編譯到一個可執行文件中的

runtime 發展歷程:

image

9. 同步鎖#

上文提高 channels 和 select 的最大問題是 死鎖 ,這小節介紹解決死鎖的問題 -- 同步鎖

Go 語言同步鎖有兩種方式:原子鎖,互斥鎖

9.1 原子鎖#

可以借助某個信號向所有的 goroutine 發送消息

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

var (
	shotdown int64 // 該標誌向多個goroutine通知狀態
	wg       sync.WaitGroup
)

func main() {
	wg.Add(2)

	go doWork("A")
	go doWork("B")

	time.Sleep(1 * time.Second)

	atomic.StoreInt64(&shotdown, 1) // 修改
	wg.Wait()
}

func doWork(s string) {
	defer wg.Done()

	for {
		fmt.Printf("Doing homework %s\n", s)
		time.Sleep(2 * time.Second)

		if atomic.LoadInt64(&shotdown) == 1 { // 讀取
			fmt.Printf("Shotdown home work %s\n", s)
			break
		}
	}
}

9.2 互斥鎖#

通過 mutex ,能夠將一段臨界區間包含起來,只運行單個 goroutine 執行

package main

import (
	"fmt"
	"runtime"
	"sync"
)

var (
	counter int
	wg      sync.WaitGroup
	mutex   sync.Mutex // 定義臨界區
)

func main() {
	wg.Add(2)

	go incCount(1)
	go incCount(2)

	wg.Wait()
	fmt.Printf("Final Counter: %d\n", counter)
}

func incCount(i int) {
	defer wg.Done()

	for count := 0; count < 2; count++ {
		mutex.Lock()
		{
			value := counter
			runtime.Gosched()
			value++
			counter = value
		}
		mutex.Unlock()
	}
}

10. Go 並發優勢#

Go 語言為並發編程而內置的上層 API 基於 CSP (communicating sequential processes, 順序通信進程) 模型。這就意味著顯式鎖都是可以避免的,因為 Go 語言通過相册安全的通道發送和接受數據以實現同步,這大大地簡化了並發程序的編寫。

一般情況下,一個普通的桌面計算機跑十幾二十個線程就有點負載過大了,但是同樣這台機器卻可以輕鬆地讓成百上千甚至過萬個 goroutine 進行資源競爭

11. Go 並發模型#

Go 語言實現了兩種並發形式:

  • 多線程共享內存(通過共享內存來通信)
  • CSP(communicating sequential processes)並發模型(以通信的方式來共享內存)

Do not communicate by sharing memory; instead, share memory by communicating

Java、C++、python 它們的線程都是通過共享內存來通信的

Go 的 CSP 並發模型通過 goroutine 和 channel 來實現

goroutine 與 channel 結合使用案例:

image

package main
import (
	"fmt"
)
 
 
//write Data
func writeData(intChan chan int) {
	for i := 1; i <= 50; i++ {
		//放入數據
		intChan<- i //
		fmt.Println("writeData ", i)
	}
	close(intChan) //關閉
}
 
//read data
func readData(intChan chan int, exitChan chan bool) {
 
	for {
		v, ok := <-intChan
		if !ok {
			break
		}
		fmt.Printf("readData 讀到數據=%v\n", v) 
	}
	//readData 讀取完數據後,即任務完成
	exitChan<- true
	close(exitChan)
 
}
 
func main() {
 
	//創建兩個管道
	intChan := make(chan int, 10)
	exitChan := make(chan bool, 1)
	
	go writeData(intChan)
	go readData(intChan, exitChan)
 
	for {
		_, ok := <-exitChan
		if !ok {
			break
		}
	}
}

12 Go 調度器#

GO 語言的調度器使用了三種結構:

G 代表 goroutine,每個 Goroutine 對應一個 G 結構體,G 存儲 Goroutine 的運行堆棧、狀態以及任務函數,可重用

M 代表內核線程,代表著真正執行計算的資源,在綁定有效的 P 後,進入 schedule 循環;而 schedule 循環的機制大致是從 Global 隊列、P 的 Local 隊列以及 wait 隊列中獲取

P 代表邏輯處理器,表示調度的上下文。可以把它看作是一個局部的調度器,讓 Go 代碼跑在一個單獨的線程上。這是讓 Go 從一個 N:1 調度器映射到一個 M調度器的關鍵。

對 G 來說,P 相當於 CPU 核,G 只有綁定到 P 才能被調度。

對 M 來說,P 提供了相關的執行環境 (Context),如內存分配狀態 (mcache),任務隊列 (G) 等

P 的數量決定了系統內最大可並行的 G 的數量(前提:物理 CPU 核數 >= P 的數量)。

P 的數量由用戶設置的 GoMAXPROCS 決定,但是不論 GoMAXPROCS 設置為多大,P 的數量最大為 256

用經典的 地鼠推車搬磚 的模型來說明三者關係

image

地鼠的工作任務是:工地上有若干磚頭,地鼠借助小車把磚頭運送到火種上

13. 總結#

本文介紹了與 Golang 並發相關的一些知識,從最開始的一些基礎概念,包括:並行、並發和進程、線程、協程,到 Golang 並發的一些實際用法,包括:goroutine、channel、select、定時器和同步鎖,也簡單的介紹了 runtime,最後介紹了 Go 的調度器模型。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。