微軟 Go 教學有篇談可緩衝通道(Buffered Channel),一開始對緩衝區、阻塞行為有點迷惑,做了幾個實驗,總算有比較清楚的概念,寫篇筆記備忘。

在 Go 跑多執行緒程式是用 Goroutine,而 Goroutine 的並行處理精神是「不透過共用記憶體進行通訊;而是透過通訊來共用記憶體」,而通道 Channel 便是 Goroutine 採用的通訊機制。關於通道的基本概念可參考教學

我們用 ch := make(chan string) 所建立的通道是不具備緩衝區的,意思是塞入一筆字串(ch <- "...")後,在資料被讀走(recv := <-ch)之前,下一個寫入動作(ch <- "another")會被阻塞住(Blocked),等於所有寫入動作要排隊,一次只能執行一個。

所有平行作業最後都需要一個同步化程序,以確認所有作業都已執行完,好蒐集結果做後續處理或結束程式,而通道會阻塞執行的特性,剛好可以實現同步化。

第一個測試,迴圈跑 8 次使用 Gorountine 送出 [0] ~ [7] 這段會平行執行,故 Sending 訊息幾乎同時發生;讀取時採每兩秒消化一筆的速度,則我們可觀察到 Sent 以每兩秒一筆的速度出現,證明所有的 ch <- message 必須排隊,待通道空出來才能執行。

package main

import (
	"fmt"
	"time"
)

func send(ch chan string, message string) {
	fmt.Println(time.Now().Format("15:04:05"), "Sending:", message)
	ch <- message // 會等待 Channel 資料被讀走才能再寫入
	// 注意:寫在這裡的程式需等待 Channel 讀取
	fmt.Println(time.Now().Format("15:04:05"), "Sent:", message)
}

func main() {
	n := 8
	ch := make(chan string)
	for i := 0; i < n; i++ {
		go send(ch, fmt.Sprintf("[%d]", i))
	}
	time.Sleep(2 * time.Second)
	for i := 0; i < n; i++ {
		recv := <-ch
		fmt.Println(time.Now().Format("15:04:05"), "Received:", recv)
		time.Sleep(2 * time.Second) // 等待 2 秒再讀下一筆
	}
	fmt.Println("Done!")
}

結果顯示,一開始 0 ~ 7 的 Sending 同時出現,屬非同步平行處理;而 Sent 則每隔兩秒才顯示一筆,證明 ch <- message 會被 recv := <-ch 卡住,採同步化執行。

若要改善只能一個接一個的現象,可改用可緩衝通道。在建立通道時指定緩衝區 ch := make(chan string, bufferSize),想像成在通道加入等待空位,大小設 4 代表當通道內有三筆資料未讀取時仍可先寫入,後面的程式不會因此卡住。

程式只修改一行,變成 ch := make(chan string, 4),猜猜結果有什麼不同。

func main() {
	n := 8
	// 指定 Channel 的緩衝區大小為 4
	ch := make(chan string, 4)
	for i := 0; i < n; i++ {
		go send(ch, fmt.Sprintf("[%d]", i))
	}
	time.Sleep(2 * time.Second)
	for i := 0; i < n; i++ {
		recv := <-ch
		fmt.Println(time.Now().Format("15:04:05"), "Received:", recv)
		time.Sleep(2 * time.Second) // 等待 2 秒再讀下一筆
	}
	fmt.Println("Done!")
}

開始執行時有四筆 Sent 瞬間完成,而總執行時間仍是 16 秒不變,最後四次 Received 沒有伴隨新的 Sent! 如果有猜對,就代表已懂原理,將來應可靈活運用。

【同場加映】

微軟 Go 教學沒提,這裡補上另一種同步化做法。若 Goroutine 執行完不需傳遞資料,則可透過 sync.WaitGroup 計數器來統計執行中的作業,例如以下的範例:

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func send(wg *sync.WaitGroup, message string) {
	defer wg.Done() // func 結束時計數器 -1 
	fmt.Println(time.Now().Format("15:04:05"), "Start:", message)
	// 等待 1 ~ 5 秒
	time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
	fmt.Println(time.Now().Format("15:04:05"), "End:", message)
}

func main() {
	n := 8
	var wg sync.WaitGroup
	for i := 0; i < n; i++ {
		wg.Add(1) // 計數器 +1
		go send(&wg, fmt.Sprintf("[%d]", i))
	}
	wg.Wait()
	fmt.Println(time.Now().Format("15:04:05"), "All Done!")
}


Comments

Be the first to post a comment

Post a comment