關於 Go Channel 同步化行為
| | 0 | |
微軟 Go 教學有篇談可緩衝通道(Buffered Channel),一開始對緩衝區、阻塞行為有點迷惑,做了幾個實驗,總算有比較清楚的概念,寫篇筆記備忘。
在 Go 跑多執行緒程式是用 Goroutine,而 Goroutine 的並行處理精神是「不透過共用記憶體進行通訊;而是透過通訊來共用記憶體」,而通道 Channel 便是 Goroutine 採用的通訊機制。關於通道的基本概念可參考教學。
我們用 ch := make(chan string)
所建立的通道是不具備緩衝區的,意思是塞入一筆字串(ch <- "..."
)後,在資料被讀走(recv := <-ch
)之前,下一個寫入動作(ch <- "another"
)會被阻塞住(Blocked),等於所有寫入動作要排隊,一次只能執行一個。
所有平行作業最後都需要一個同步化程序,以確認所有作業都已執行完,好蒐集結果做後續處理或結束程式,而通道會阻塞執行的特性,剛好可以實現同步化。
第一個測試,迴圈跑 8 次使用 Gorountine 送出 [0] ~ [7] 這段會平行執行,故 Sending 訊息幾乎同時發生;讀取時採每兩秒消化一筆的速度,則我們可觀察到 Sent 以每兩秒一筆的速度出現,證明所有的 ch <- message
必須排隊,待通道空出來才能執行。
package main
import (
"fmt"
"time"
)
func send(ch chan string, message string) {
fmt.Println(time.Now().Format("15:04:05"), "Sending:", message)
ch <- message // 會等待 Channel 資料被讀走才能再寫入
// 注意:寫在這裡的程式需等待 Channel 讀取
fmt.Println(time.Now().Format("15:04:05"), "Sent:", message)
}
func main() {
n := 8
ch := make(chan string)
for i := 0; i < n; i++ {
go send(ch, fmt.Sprintf("[%d]", i))
}
time.Sleep(2 * time.Second)
for i := 0; i < n; i++ {
recv := <-ch
fmt.Println(time.Now().Format("15:04:05"), "Received:", recv)
time.Sleep(2 * time.Second) // 等待 2 秒再讀下一筆
}
fmt.Println("Done!")
}
結果顯示,一開始 0 ~ 7 的 Sending 同時出現,屬非同步平行處理;而 Sent 則每隔兩秒才顯示一筆,證明 ch <- message
會被 recv := <-ch
卡住,採同步化執行。
若要改善只能一個接一個的現象,可改用可緩衝通道。在建立通道時指定緩衝區 ch := make(chan string, bufferSize)
,想像成在通道加入等待空位,大小設 4 代表當通道內有三筆資料未讀取時仍可先寫入,後面的程式不會因此卡住。
程式只修改一行,變成 ch := make(chan string, 4)
,猜猜結果有什麼不同。
func main() {
n := 8
// 指定 Channel 的緩衝區大小為 4
ch := make(chan string, 4)
for i := 0; i < n; i++ {
go send(ch, fmt.Sprintf("[%d]", i))
}
time.Sleep(2 * time.Second)
for i := 0; i < n; i++ {
recv := <-ch
fmt.Println(time.Now().Format("15:04:05"), "Received:", recv)
time.Sleep(2 * time.Second) // 等待 2 秒再讀下一筆
}
fmt.Println("Done!")
}
開始執行時有四筆 Sent 瞬間完成,而總執行時間仍是 16 秒不變,最後四次 Received 沒有伴隨新的 Sent! 如果有猜對,就代表已懂原理,將來應可靈活運用。
【同場加映】
微軟 Go 教學沒提,這裡補上另一種同步化做法。若 Goroutine 執行完不需傳遞資料,則可透過 sync.WaitGroup 計數器來統計執行中的作業,例如以下的範例:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func send(wg *sync.WaitGroup, message string) {
defer wg.Done() // func 結束時計數器 -1
fmt.Println(time.Now().Format("15:04:05"), "Start:", message)
// 等待 1 ~ 5 秒
time.Sleep(time.Duration(rand.Intn(5)+1) * time.Second)
fmt.Println(time.Now().Format("15:04:05"), "End:", message)
}
func main() {
n := 8
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1) // 計數器 +1
go send(&wg, fmt.Sprintf("[%d]", i))
}
wg.Wait()
fmt.Println(time.Now().Format("15:04:05"), "All Done!")
}
Comments
Be the first to post a comment