當代量產型製造業幾乎都是採用生產線作業,以汽車為例,會分成焊接、烤潻、組裝、測試等站,各站依處理時間調整人力與設備數量,以求站與站之間能無縫接軌,將閒置及等待時間降到最低,達到最大產能。而我們開發系統時,若遇到包含多個步驟的大量批次作業,每個步驟具有一定複雜度、涉及資源不一,此時就可借用生產線概念,實作成「生產者 vs 消費者」模式,追求最佳處理效能。

生產者消費者模式是電腦處理多步驟處理程序常用的軟體設計模式,也是作業系統課程一定會講到的觀念。在這個模式中,將處理動作分為「生產者」與「消費者」,前者會產出半成品或成品,由後者接手後續運用,例如:

任務生產者消費者
資料爬蟲關鍵字搜索、下載內容 (網路 I/O)內容分析 (CPU 運算)
照片匯入資料庫識別日期、尺寸縮放、產生縮圖 (CPU 運算)寫入資料庫 (磁碟 I/O)

在以上兩個案例中,性質相關的動作會歸在一起,方便資源調配,例如:下載內容常需等待網路傳回結果但不耗 CPU,我們可以多開幾條執行緒提高抓取速度;內容分析很吃 CPU,可依 CPU 核數決定執行緒數量使產能最大化。寫資料庫為循序作業假設每秒可寫入一筆,但圖檔處理吃 CPU 四秒才能完成一張,故生產者端可開四條執行緒平均每秒產生一張照片,以便與消費者的消化速度完美銜接。

生產者與消費者間需要一條 Queue,讓生產者將資料有效率且可靠地交給消費者接手處理,這條 Queue 最好能具備以下功能:

  1. 具有大小限制,當超出上限時阻擋生產者塞入資料
  2. 當 Queue 沒有資料時,讓消費者暫停動作
  3. 支援關機動作,當生產者終止時,告知消費者可以收包包回家了

關於這個議題,推薦 MVP 安德魯這篇生產者 vs 消費者 - BlockQueue 實作,裡面對生產者消費者概念有更詳細的說明,其中還自己實作了滿足上述理想的 BlockQueue 物件。

有個好消息,.NET Framework 4 之後,.NET 加入了 BlockingCollection<T>,實作生產者消費者模式不需再花功夫自己寫,用專為生產者消費者設計的 BlockingQueue 即可輕鬆搞定。

BlockingCollection 提供以下功能:(參考:BlockingCollection Overview)

  • Add 及 Take 方法均為 Thread-Safe,可安心在多執行緒環境使用
  • 可設定容量上限
  • 沒有資料及超過上限時會自動 Block 程式,不需自己寫判斷,程式超好寫
  • 支援 TryAdd、TryTake,可不被 Block 或有 Block 時限進行資料存取
  • 支援 forech 過程移除元素 (使用 GetConsumingEnumerable,參考)
  • 可指定採用 Queue (先進先出) 或是 Stack (後進先出) 規則 參考
  • 可建立多個 BlockingCollection 組成陣列,使用 TryAddToAny、TryTakeFromAny 由陣列中存取資料 參考

下面來段範例程式感受一下它的好用。

範例程式的模疑情境為:生產者處理圖檔交給消費者寫入資料庫,這裡不真的執行動作,而是用 Thread.Sleep 模擬圖檔處理及寫資料庫所耗用時間,預設圖檔處理耗時 4 秒(±20%)、寫資料庫 1 秒(±20%),作業耗時、生產者數量、BlockingCollection 數量上限(queueSize)均可調整,程式碼如下:

using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.Extensions.Configuration;

// 命令列參數使用參考 https://blog.darkthread.net/blog/-net6-args-parsing/
IConfiguration config = new ConfigurationBuilder()
    .AddCommandLine(args)
    .Build();
int photoCount = int.Parse(config.GetValue<string>("photoCount", "100"));
int queueSize = int.Parse(config.GetValue<string>("queueSize", "10"));
int producerCount = int.Parse(config.GetValue<string>("producerCount", "1"));
int imgProcTime =  int.Parse(config.GetValue<string>("imgProcTime", "4000"));;
int saveDbTime =  int.Parse(config.GetValue<string>("saveDbTime", "1000"));;

var photoIds = new ConcurrentQueue<int>(Enumerable.Range(1, photoCount));
var blockQueue = new BlockingCollection<SimuImgData>(queueSize);
var rnd = new Random();

var sw = new Stopwatch();
sw.Start();
var producerTasks = Enumerable.Range(1, producerCount)
    .Select(o => Task.Run(RunProducer)).ToArray();
var consumerTask = Task.Run(RunConsumer);

Task.WaitAll(producerTasks);
//生產者已完成所有工作,宣告 blockQueue 不再新增項目
blockQueue.CompleteAdding();
//等待消費者作業完成
consumerTask.Wait();

// 延遲指定時間(ms) 含 ±20% 隨機變化
void RandomDelay(int baseTime) {
    Thread.Sleep(baseTime * 4 / 5 + rnd.Next(baseTime * 2 / 5));
}
void Print(string msg, ConsoleColor color = ConsoleColor.White) {
    Console.ForegroundColor = color;
    Console.WriteLine(msg);
    Console.ResetColor();
}

void RunProducer() {
    while (photoIds.Any()) {
        if (photoIds.TryDequeue(out var id)) {
            //模擬照片縮放、縮圖作業延遲
            RandomDelay(imgProcTime);
            //若達到queueSize上限,Add動作會被Block住
            blockQueue.Add(new SimuImgData{
                Id = id,
                Image = Array.Empty<byte>()
            });
            Print($"{DateTime.Now:mm:ss} Photo {id} is processed", ConsoleColor.Yellow);
        }
    }
}

void RunConsumer()
{
    //IsComplete 表示生產者已不再新增資料,blockQueue 的資料也消化完畢
    while (!blockQueue.IsCompleted) 
    {
        SimuImgData data = null!;
        try {
            //若blockQueue沒有資料,Take動作會被Block住,有資料時再往下執行
            //若
            data = blockQueue.Take();
        } catch (InvalidOperationException) {}
        if (data != null) {
            //模擬寫入資料庫延遲
            RandomDelay(saveDbTime);
            Print($"{DateTime.Now:mm:ss} Photo {data.Id} is saved to DB", ConsoleColor.Cyan);
        }
    }
    sw.Stop();
    Console.ForegroundColor = ConsoleColor.White;
    Console.WriteLine($"Done {sw.ElapsedMilliseconds:n0}ms");
}

public class SimuImgData
{
    public int Id { get; set; }
    public byte[] Image { get; set; } = null!;
}

看似複雜的生產者消費者資料串接,在使用 BlockingCollection 後程式意外好寫。BlockingCollection 預設使用先進先出規則,在程式裡我叫它 blockQueue。在 ConcurrentQueue 中放入待處理照片代號,生產者取得代號,Thread.Sleep() 一段時間模擬圖形處理,呼叫 BlockingCollection.Add() 方法放入圖形處理後的結果,若 blockQueue 中排隊等待處理的數量達到上限,Add() 動作會被 Block 住直到 blockQueue 有空間容納時再繼續;消費者端則是在 blockQueue.IsCompleted != true 期間持續呼叫 BlockingCollection.Take() 取出生產者塞入的資料,當 blockQueue 沒有資料時,Take() 動作會被 Block 住,直到有資料進來才繼續往下執行,或是生產者呼叫 blockQueue.CompleteAdding() 表明不再有資料且 blockQueue 裡已無資料(發生在有多個消費者同時消化資料時),此時 Take() 會拋出 InvalidOperationException。如此,不需要處理惱人的 lock 鎖定、ManualResetEvent 同步,即可輕鬆完成多執行緒生產者與消費者作業的銜接。

簡單實測。一個生產者一個消費者處理 10 張照片耗時約 44 秒、四個生產者可縮短到 14 秒,生產者增加到 8 個會卡在消費者一秒只能消化一個,速度也不會再提高,都符合預期。驗證消費者的 Take() 會被 Block 住直到資料進來,不需自己寫檢查資料決定是否執行的程式;而 CompleteAdding() 及 IsCompleted 也成功讓程式結束。

再來驗證 Add() 在 blockQueue 數量達上限時會被 Block 住的行為(設定上限很重要,消費者端來不及消化通知生產端煞車,避免狂塞資料讓 Queue 記憶體爆量),我們設定四個生產者,但分別測試 queueSize 設為 2 及 1,可觀察到 Add() 在 blockQueue 容量達上限需排隊,queueSize = 1 時會呈現一筆生產者動作一筆消費者動作的穿插現象:

最後,介紹更簡潔的寫法 - 用 GetConsumingEnumerable() 配合 foreach 迴圈使用,跑到沒有資料自動結束。故上述程式可再簡化為:

//照片代號清單也改用 BlockingCollection
var photoIds = new BlockingCollection<int>();
for (int i = 1; i <= photoCount; i++) { photoIds.Add(i); }
photoIds.CompleteAdding();

//...略...

//生產者與消費者無腦用 GetConsumingEnumerable() 跑 foreach 即可
void RunProducer() {
    foreach (int id in photoIds.GetConsumingEnumerable()) 
    {
        //模擬照片縮放、縮圖作業延遲
        RandomDelay(imgProcTime);
        //若達到queueSize上限,Add動作會被Block住
        blockQueue.Add(new SimuImgData{
            Id = id,
            Image = Array.Empty<byte>()
        });
        Print($"{DateTime.Now:mm:ss} Photo {id} is processed", ConsoleColor.Yellow);
    }
}

void RunConsumer()
{
    foreach (var data in blockQueue.GetConsumingEnumerable())
    {
        //模擬寫入資料庫延遲
        RandomDelay(saveDbTime);
        Print($"{DateTime.Now:mm:ss} Photo {data.Id} is saved to DB", ConsoleColor.Cyan);
    }
    sw.Stop();
    Console.ForegroundColor = ConsoleColor.White;
    Console.WriteLine($"Done {sw.ElapsedMilliseconds:n0}ms");
}

執行結果完全相同,但程式更簡潔,用 C# 寫生產者消費者模式易如反掌。

程式碼已上傳 Github,共有兩個 Commit:Add()/Take() 展示GetConsumingEnumerable() 版本,需要的同學請自取。

Introduce to the BlockCollection class and how to use it to implementate producer-consumer pattern easily.


Comments

# by Ryan

黑大有考慮用`System.Threading.Channels`來寫生產消費者嗎? https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

# by Jeffrey

to Ryan, 打算之後再介紹。Channels 效能更好,但它完全架構在 async/await 之上,對一些開發者來說有小門檻要先跨過,BlockingCollection 相對較親民,故派它先發,Channels 已在牛棚暖身中。

Post a comment