我有個迷思。Parallel.For 或 Parallel.ForEach 有個 ParallelOptions.MaxDegreeOfParallelism 參數,可用來控制並行運算的工作數量,避免 Thread 數過多效能反而變差(延伸閱讀:從 ThreadPool 翻船談起),我的理解是如果我將 MaxDegreeOfParallelism 設成 4,.NET 便會開四條 Task/Thread 消化工作,設成 6 就開六條 Task/Thread。(注意:這個概念是錯的)

跑一些簡單測試,結果似乎能印證這個事實:

Parallel.ForEach(Enumerable.Range(0, 10),
    new ParallelOptions { MaxDegreeOfParallelism = 2 },
    (i) =>
    {
        for (var j = 1; j < 3; j++)
        {
            Console.WriteLine($"{Task.CurrentId}: {i}-{j}");
        }
    }
);

MaxDegreeOfParallelism = 2 時,Task.CurrentId 出現 1, 2,前後兩條 Thread 搞定:

MaxDegreeOfParallelism = 4,Task.CurrentId 則為 1, 2, 3, 4,用到四條 Thread:

不過,只要在迴圈加上 Thread.Sleep() 延遲,這個推論馬上被推翻。如下圖,MaxDegreeOfParallelism 雖然設成 4,但因 for 迴圈延遲消化速度降低,前後用到 9 條 Thread。

其實本草綱目早有記載,又一個因為沒有 RTFM 形成的錯誤認知:

By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.
For 及 ForEach 會透過底層的 Scheduler 用到很多條 Thread,MaxDegreeOfParallelism 控制的是允許幾條 Thread 同時執行,而非使用 Thread 數。

實際用到幾條 Thread 一般影響不大,但在 Func<TLocal>Func<TSource,ParallelLoopState,TLocal,TLocal>Action<TLocal> 區分 Task 啟用時、迴圈執行及 Task 結束作業三段邏輯的情境,有可能影響資源運用規劃。如下範例,我計劃每個 Task 只建一條服務連線,在處理不同 i 值時重複使用,結束時則 Dispose() 釋放資源。用幾組 Task 決定建幾次連線,實際次數將比 MaxDegreeOfParallelism 多。

Parallel.ForEach(Enumerable.Range(0, 100),
    new ParallelOptions { MaxDegreeOfParallelism = 10 },
    () => { 
        // 只有在 Task/Thread 建立時執行一次
        // 建立網路服務連線
        return new {
            Conn = someSvcConnection
        }; // 向後傳送的 taskLocal 參數
    },
    (i, loopState, taskLocal) =>
    {
        // Task 可重複使用,這段可能執行多次
        var res = taskLocal.Conn.DoSomthing()
        // ... 
    },
    (taskLocal) => {
        // Task/Thread 結束前執行一次
        // 釋放資源
        taskLocal.Conn.Dispose();
    }
);

最後,我寫了一個小玩具,以實地驗證 MaxDegreeOfParallelism 控制並行 Task/Thread 數的能力。

程式是支單檔 WebForm,有三個可調參數,Parallel.ForEach 迴圈次數(c)、MaxDegreeOfParallelism(m)、迴圈過程延遲(d)。過程用 Task.CurrentId 識別 Task (不同數字代表不同 Task),記錄 Task 起始結束,以及迴圈執行過程,最後以 HTML Table 呈現各 Task 的生命周期及並存狀況:

<%@Page Language="C#"%>
<%@Import Namespace="System.Collections.Concurrent"%>
<%@Import Namespace="System.Threading.Tasks"%>
<%@Import Namespace="System.Threading"%>
<script runat="server">
void Page_Load(object sender, EventArgs e)
{
	var count = int.Parse(Request["c"] ?? "10");
	var max = int.Parse(Request["m"] ?? "2");
	var delay = int.Parse(Request["d"] ?? "10");
	var queue = new ConcurrentQueue<string>();
	var st = DateTime.Now;
	Action<string, string> log = (threadId, msg) => {
		var t = (DateTime.Now - st).ToString("ss\\.fff");
		queue.Enqueue(string.Format("{0} {1} {2}", t, threadId, msg));
	};
	Parallel.ForEach(Enumerable.Range(0, count), 
	new ParallelOptions { 
        MaxDegreeOfParallelism = max
    }, 
	//init
	() => {
		log(Task.CurrentId.ToString(), "Start");
		return new {
			Results = new List<string>()
		};
	},
	//loop
	(i, loopState, taskLocal) => {
		for (var j = 1; j < 3; j++) {
			log(Task.CurrentId.ToString(), string.Format("{0}-{1}", i, j));
			Thread.Sleep(delay);
		}
		return taskLocal;
	},
	//final 
	(taskLocal) => {
		log(Task.CurrentId.ToString(), "End");
	});
	//convert to table
	var labels = new List<string>();
	var threads = new List<string>();
	var cells = new Dictionary<string, List<string>>();
	foreach (var m in queue.ToArray()) {
		var p = m.Split(' ');
		var label = p[0];
		if (!labels.Contains(label)) labels.Add(label);
		var thread = p[1];
		if (!threads.Contains(thread)) threads.Add(thread);
		var msg = p[2];
		var key = string.Format("{0}-{1}", label, thread);
		if (!cells.ContainsKey(key)) cells.Add(key, new List<string>());
		cells[key].Add(msg);
	}
	var sb = new StringBuilder();
	sb.Append(@"
	<style>		
		body {
			font-size: 9pt; font-family: Tahoma;
		}
		table { border-collapse: collapse; font-size: 9pt; }
		td { border: 1px solid #ccc; padding: 5px; }
		td.exist { background-color: #eee; }
	</style>
	");
	sb.AppendFormat("<p>Count: {0}, MaxDegreeOfParallelism: {1}, Delay: {2}</p>", 
		count, max, delay);
	var existFlags = threads.ToDictionary(x => x, x => false);
	sb.Append("<table border='1'>");
	sb.Append("<tr><td></td>");
	foreach (var thread in threads) {
		sb.AppendFormat("<td>{0}</td>", thread);
	}
	sb.Append("</tr>");
	foreach (var label in labels) {
		sb.AppendFormat("<tr><td>{0}</td>", label);
		foreach (var thread in threads) {
			var key = string.Format("{0}-{1}", label, thread);
			var msgs = cells.ContainsKey(key) ? 
				string.Join("<br/>", cells[key].ToArray()) : "";
			if (msgs.Contains("Start")) existFlags[thread] = true;
			sb.AppendFormat("<td class={1}>{0}</td>", msgs, 
				existFlags[thread] ? "exist" : "");
			if (msgs.Contains("End")) existFlags[thread] = false;
		}
		sb.Append("</tr>");
	}
	sb.Append("</table>");
	Response.Write(sb.ToString());
	// Response.ContentType = "text/plain";
	// Response.Write(string.Join("\n", queue.ToArray()));
}
</script>

m = 3,最多三個 Task 並存,每一列最多三個格子有數字:(576、577 則是建立後沒用到就結束)

m = 4,最多四個 Task 並存。

m = 6,最多六個 Task 並存。

2023-06-15 更新:忠成老師提醒,Task 與 Thread 並非一對一關係,有可能兩個 Task 排給同一條 Thread,為更精確觀察 Thread,稍微修改程式以 Thread.CurrentThread.ManagedThreadId 為觀察標對象,調整 log 寫入資訊:

	Parallel.ForEach(Enumerable.Range(0, count), 
	new ParallelOptions { 
        MaxDegreeOfParallelism = max
    }, 
	//init
	() => {
		log(Thread.CurrentThread.ManagedThreadId.ToString(), Task.CurrentId.ToString() + ":Start");
		return new {
			Results = new List<string>()
		};
	},
	//loop
	(i, loopState, taskLocal) => {
		for (var j = 1; j < 3; j++) {
			log(Thread.CurrentThread.ManagedThreadId.ToString(), string.Format("{2}:{0}-{1}", i, j, Task.CurrentId));
			Thread.Sleep(delay);
		}
		return taskLocal;
	},
	//final 
	(taskLocal) => {
		log(Thread.CurrentThread.ManagedThreadId.ToString(), Task.CurrentId.ToString() + ":End");
	});
	//....
	// 由於 Thread 可能被多個 Task 共用,Start/End 一次以上,調整產生 Table 時檢查是否結束的比對規則
	if (msgs.Contains("End") && Regex.Matches(msgs, "(Start|End)").Cast<Match>().Last().Value == "End") 
	    existFlags[thread] = false;

資訊複雜一些,可觀察到一條 Thread 被多個 Task 共用的狀況(例如 Thread 43 用來跑 Task 413/417/418、Thread 50 跑 Task 416/419/421),但並存數量仍受 MaxDegreeOfParallelism 嚴格控制,同一列上正在執行的 Thread 數量小於等於 MaxDegreeOfParallelism:

MaxDegreeOfParallelism = 4:

MaxDegreeOfParallelism = 6:

由此可知,Parallel.ForEach API 會靈活配置 Task/Thread 準備處理集合中的元素,而 MaxDegreeOfParallelism 可嚴格管控同時執行的 Task 數量上限,確保不要有太多 Thread 並存,以免過度耗用資源及影響效果。至此,我對 MaxDegreeOfParallelism 的效果有了清楚認識,又導正一個觀念。

Using a simple experiment to observe how MaxDegreeOfParallelism controle the threads of Parallel.For/ForEach.


Comments

Be the first to post a comment

Post a comment