用 .NET 寫生產者消費者模式的好物 - BlockingCollection
| | 2 | | ![]() |
當代量產型製造業幾乎都是採用生產線作業,以汽車為例,會分成焊接、烤潻、組裝、測試等站,各站依處理時間調整人力與設備數量,以求站與站之間能無縫接軌,將閒置及等待時間降到最低,達到最大產能。而我們開發系統時,若遇到包含多個步驟的大量批次作業,每個步驟具有一定複雜度、涉及資源不一,此時就可借用生產線概念,實作成「生產者 vs 消費者」模式,追求最佳處理效能。
生產者消費者模式是電腦處理多步驟處理程序常用的軟體設計模式,也是作業系統課程一定會講到的觀念。在這個模式中,將處理動作分為「生產者」與「消費者」,前者會產出半成品或成品,由後者接手後續運用,例如:
任務 | 生產者 | 消費者 |
---|---|---|
資料爬蟲 | 關鍵字搜索、下載內容 (網路 I/O) | 內容分析 (CPU 運算) |
照片匯入資料庫 | 識別日期、尺寸縮放、產生縮圖 (CPU 運算) | 寫入資料庫 (磁碟 I/O) |
在以上兩個案例中,性質相關的動作會歸在一起,方便資源調配,例如:下載內容常需等待網路傳回結果但不耗 CPU,我們可以多開幾條執行緒提高抓取速度;內容分析很吃 CPU,可依 CPU 核數決定執行緒數量使產能最大化。寫資料庫為循序作業假設每秒可寫入一筆,但圖檔處理吃 CPU 四秒才能完成一張,故生產者端可開四條執行緒平均每秒產生一張照片,以便與消費者的消化速度完美銜接。
生產者與消費者間需要一條 Queue,讓生產者將資料有效率且可靠地交給消費者接手處理,這條 Queue 最好能具備以下功能:
- 具有大小限制,當超出上限時阻擋生產者塞入資料
- 當 Queue 沒有資料時,讓消費者暫停動作
- 支援關機動作,當生產者終止時,告知消費者可以收包包回家了
關於這個議題,推薦 MVP 安德魯這篇生產者 vs 消費者 - BlockQueue 實作,裡面對生產者消費者概念有更詳細的說明,其中還自己實作了滿足上述理想的 BlockQueue 物件。
有個好消息,.NET Framework 4 之後,.NET 加入了 BlockingCollection<T>,實作生產者消費者模式不需再花功夫自己寫,用專為生產者消費者設計的 BlockingQueue 即可輕鬆搞定。
BlockingCollection 提供以下功能:(參考:BlockingCollection Overview)
- Add 及 Take 方法均為 Thread-Safe,可安心在多執行緒環境使用
- 可設定容量上限
- 沒有資料及超過上限時會自動 Block 程式,不需自己寫判斷,程式超好寫
- 支援 TryAdd、TryTake,可不被 Block 或有 Block 時限進行資料存取
- 支援 forech 過程移除元素 (使用 GetConsumingEnumerable,參考)
- 可指定採用 Queue (先進先出) 或是 Stack (後進先出) 規則 參考
- 可建立多個 BlockingCollection 組成陣列,使用 TryAddToAny、TryTakeFromAny 由陣列中存取資料 參考
下面來段範例程式感受一下它的好用。
範例程式的模疑情境為:生產者處理圖檔交給消費者寫入資料庫,這裡不真的執行動作,而是用 Thread.Sleep 模擬圖檔處理及寫資料庫所耗用時間,預設圖檔處理耗時 4 秒(±20%)、寫資料庫 1 秒(±20%),作業耗時、生產者數量、BlockingCollection 數量上限(queueSize)均可調整,程式碼如下:
using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.Extensions.Configuration;
// 命令列參數使用參考 https://blog.darkthread.net/blog/-net6-args-parsing/
IConfiguration config = new ConfigurationBuilder()
.AddCommandLine(args)
.Build();
int photoCount = int.Parse(config.GetValue<string>("photoCount", "100"));
int queueSize = int.Parse(config.GetValue<string>("queueSize", "10"));
int producerCount = int.Parse(config.GetValue<string>("producerCount", "1"));
int imgProcTime = int.Parse(config.GetValue<string>("imgProcTime", "4000"));;
int saveDbTime = int.Parse(config.GetValue<string>("saveDbTime", "1000"));;
var photoIds = new ConcurrentQueue<int>(Enumerable.Range(1, photoCount));
var blockQueue = new BlockingCollection<SimuImgData>(queueSize);
var rnd = new Random();
var sw = new Stopwatch();
sw.Start();
var producerTasks = Enumerable.Range(1, producerCount)
.Select(o => Task.Run(RunProducer)).ToArray();
var consumerTask = Task.Run(RunConsumer);
Task.WaitAll(producerTasks);
//生產者已完成所有工作,宣告 blockQueue 不再新增項目
blockQueue.CompleteAdding();
//等待消費者作業完成
consumerTask.Wait();
// 延遲指定時間(ms) 含 ±20% 隨機變化
void RandomDelay(int baseTime) {
Thread.Sleep(baseTime * 4 / 5 + rnd.Next(baseTime * 2 / 5));
}
void Print(string msg, ConsoleColor color = ConsoleColor.White) {
Console.ForegroundColor = color;
Console.WriteLine(msg);
Console.ResetColor();
}
void RunProducer() {
while (photoIds.Any()) {
if (photoIds.TryDequeue(out var id)) {
//模擬照片縮放、縮圖作業延遲
RandomDelay(imgProcTime);
//若達到queueSize上限,Add動作會被Block住
blockQueue.Add(new SimuImgData{
Id = id,
Image = Array.Empty<byte>()
});
Print($"{DateTime.Now:mm:ss} Photo {id} is processed", ConsoleColor.Yellow);
}
}
}
void RunConsumer()
{
//IsComplete 表示生產者已不再新增資料,blockQueue 的資料也消化完畢
while (!blockQueue.IsCompleted)
{
SimuImgData data = null!;
try {
//若blockQueue沒有資料,Take動作會被Block住,有資料時再往下執行
//若
data = blockQueue.Take();
} catch (InvalidOperationException) {}
if (data != null) {
//模擬寫入資料庫延遲
RandomDelay(saveDbTime);
Print($"{DateTime.Now:mm:ss} Photo {data.Id} is saved to DB", ConsoleColor.Cyan);
}
}
sw.Stop();
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine($"Done {sw.ElapsedMilliseconds:n0}ms");
}
public class SimuImgData
{
public int Id { get; set; }
public byte[] Image { get; set; } = null!;
}
看似複雜的生產者消費者資料串接,在使用 BlockingCollection 後程式意外好寫。BlockingCollection 預設使用先進先出規則,在程式裡我叫它 blockQueue。在 ConcurrentQueue 中放入待處理照片代號,生產者取得代號,Thread.Sleep() 一段時間模擬圖形處理,呼叫 BlockingCollection.Add() 方法放入圖形處理後的結果,若 blockQueue 中排隊等待處理的數量達到上限,Add() 動作會被 Block 住直到 blockQueue 有空間容納時再繼續;消費者端則是在 blockQueue.IsCompleted != true 期間持續呼叫 BlockingCollection.Take() 取出生產者塞入的資料,當 blockQueue 沒有資料時,Take() 動作會被 Block 住,直到有資料進來才繼續往下執行,或是生產者呼叫 blockQueue.CompleteAdding() 表明不再有資料且 blockQueue 裡已無資料(發生在有多個消費者同時消化資料時),此時 Take() 會拋出 InvalidOperationException。如此,不需要處理惱人的 lock 鎖定、ManualResetEvent 同步,即可輕鬆完成多執行緒生產者與消費者作業的銜接。
簡單實測。一個生產者一個消費者處理 10 張照片耗時約 44 秒、四個生產者可縮短到 14 秒,生產者增加到 8 個會卡在消費者一秒只能消化一個,速度也不會再提高,都符合預期。驗證消費者的 Take() 會被 Block 住直到資料進來,不需自己寫檢查資料決定是否執行的程式;而 CompleteAdding() 及 IsCompleted 也成功讓程式結束。
再來驗證 Add() 在 blockQueue 數量達上限時會被 Block 住的行為(設定上限很重要,消費者端來不及消化通知生產端煞車,避免狂塞資料讓 Queue 記憶體爆量),我們設定四個生產者,但分別測試 queueSize 設為 2 及 1,可觀察到 Add() 在 blockQueue 容量達上限需排隊,queueSize = 1 時會呈現一筆生產者動作一筆消費者動作的穿插現象:
最後,介紹更簡潔的寫法 - 用 GetConsumingEnumerable() 配合 foreach 迴圈使用,跑到沒有資料自動結束。故上述程式可再簡化為:
//照片代號清單也改用 BlockingCollection
var photoIds = new BlockingCollection<int>();
for (int i = 1; i <= photoCount; i++) { photoIds.Add(i); }
photoIds.CompleteAdding();
//...略...
//生產者與消費者無腦用 GetConsumingEnumerable() 跑 foreach 即可
void RunProducer() {
foreach (int id in photoIds.GetConsumingEnumerable())
{
//模擬照片縮放、縮圖作業延遲
RandomDelay(imgProcTime);
//若達到queueSize上限,Add動作會被Block住
blockQueue.Add(new SimuImgData{
Id = id,
Image = Array.Empty<byte>()
});
Print($"{DateTime.Now:mm:ss} Photo {id} is processed", ConsoleColor.Yellow);
}
}
void RunConsumer()
{
foreach (var data in blockQueue.GetConsumingEnumerable())
{
//模擬寫入資料庫延遲
RandomDelay(saveDbTime);
Print($"{DateTime.Now:mm:ss} Photo {data.Id} is saved to DB", ConsoleColor.Cyan);
}
sw.Stop();
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine($"Done {sw.ElapsedMilliseconds:n0}ms");
}
執行結果完全相同,但程式更簡潔,用 C# 寫生產者消費者模式易如反掌。
程式碼已上傳 Github,共有兩個 Commit:Add()/Take() 展示 與 GetConsumingEnumerable() 版本,需要的同學請自取。
Introduce to the BlockCollection class and how to use it to implementate producer-consumer pattern easily.
Comments
# by Ryan
黑大有考慮用`System.Threading.Channels`來寫生產消費者嗎? https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/
# by Jeffrey
to Ryan, 打算之後再介紹。Channels 效能更好,但它完全架構在 async/await 之上,對一些開發者來說有小門檻要先跨過,BlockingCollection 相對較親民,故派它先發,Channels 已在牛棚暖身中。