如何控制 Parallel.For/ForEach 使用幾條 Thread?
0 | 5,852 |
我有個迷思。Parallel.For 或 Parallel.ForEach 有個 ParallelOptions.MaxDegreeOfParallelism 參數,可用來控制並行運算的工作數量,避免 Thread 數過多效能反而變差(延伸閱讀:從 ThreadPool 翻船談起),我的理解是如果我將 MaxDegreeOfParallelism 設成 4,.NET 便會開四條 Task/Thread 消化工作,設成 6 就開六條 Task/Thread。(注意:這個概念是錯的)
跑一些簡單測試,結果似乎能印證這個事實:
Parallel.ForEach(Enumerable.Range(0, 10),
new ParallelOptions { MaxDegreeOfParallelism = 2 },
(i) =>
{
for (var j = 1; j < 3; j++)
{
Console.WriteLine($"{Task.CurrentId}: {i}-{j}");
}
}
);
MaxDegreeOfParallelism = 2 時,Task.CurrentId 出現 1, 2,前後兩條 Thread 搞定:
MaxDegreeOfParallelism = 4,Task.CurrentId 則為 1, 2, 3, 4,用到四條 Thread:
不過,只要在迴圈加上 Thread.Sleep() 延遲,這個推論馬上被推翻。如下圖,MaxDegreeOfParallelism 雖然設成 4,但因 for 迴圈延遲消化速度降低,前後用到 9 條 Thread。
其實本草綱目早有記載,又一個因為沒有 RTFM 形成的錯誤認知:
By default, For and ForEach will utilize however many threads the underlying scheduler provides, so changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.
For 及 ForEach 會透過底層的 Scheduler 用到很多條 Thread,MaxDegreeOfParallelism 控制的是允許幾條 Thread 同時執行,而非使用 Thread 數。
實際用到幾條 Thread 一般影響不大,但在 Func<TLocal>
、Func<TSource,ParallelLoopState,TLocal,TLocal>
、Action<TLocal>
區分 Task 啟用時、迴圈執行及 Task 結束作業三段邏輯的情境,有可能影響資源運用規劃。如下範例,我計劃每個 Task 只建一條服務連線,在處理不同 i 值時重複使用,結束時則 Dispose() 釋放資源。用幾組 Task 決定建幾次連線,實際次數將比 MaxDegreeOfParallelism 多。
Parallel.ForEach(Enumerable.Range(0, 100),
new ParallelOptions { MaxDegreeOfParallelism = 10 },
() => {
// 只有在 Task/Thread 建立時執行一次
// 建立網路服務連線
return new {
Conn = someSvcConnection
}; // 向後傳送的 taskLocal 參數
},
(i, loopState, taskLocal) =>
{
// Task 可重複使用,這段可能執行多次
var res = taskLocal.Conn.DoSomthing()
// ...
},
(taskLocal) => {
// Task/Thread 結束前執行一次
// 釋放資源
taskLocal.Conn.Dispose();
}
);
最後,我寫了一個小玩具,以實地驗證 MaxDegreeOfParallelism 控制並行 Task/Thread 數的能力。
程式是支單檔 WebForm,有三個可調參數,Parallel.ForEach 迴圈次數(c)、MaxDegreeOfParallelism(m)、迴圈過程延遲(d)。過程用 Task.CurrentId 識別 Task (不同數字代表不同 Task),記錄 Task 起始結束,以及迴圈執行過程,最後以 HTML Table 呈現各 Task 的生命周期及並存狀況:
<%@Page Language="C#"%>
<%@Import Namespace="System.Collections.Concurrent"%>
<%@Import Namespace="System.Threading.Tasks"%>
<%@Import Namespace="System.Threading"%>
<script runat="server">
void Page_Load(object sender, EventArgs e)
{
var count = int.Parse(Request["c"] ?? "10");
var max = int.Parse(Request["m"] ?? "2");
var delay = int.Parse(Request["d"] ?? "10");
var queue = new ConcurrentQueue<string>();
var st = DateTime.Now;
Action<string, string> log = (threadId, msg) => {
var t = (DateTime.Now - st).ToString("ss\\.fff");
queue.Enqueue(string.Format("{0} {1} {2}", t, threadId, msg));
};
Parallel.ForEach(Enumerable.Range(0, count),
new ParallelOptions {
MaxDegreeOfParallelism = max
},
//init
() => {
log(Task.CurrentId.ToString(), "Start");
return new {
Results = new List<string>()
};
},
//loop
(i, loopState, taskLocal) => {
for (var j = 1; j < 3; j++) {
log(Task.CurrentId.ToString(), string.Format("{0}-{1}", i, j));
Thread.Sleep(delay);
}
return taskLocal;
},
//final
(taskLocal) => {
log(Task.CurrentId.ToString(), "End");
});
//convert to table
var labels = new List<string>();
var threads = new List<string>();
var cells = new Dictionary<string, List<string>>();
foreach (var m in queue.ToArray()) {
var p = m.Split(' ');
var label = p[0];
if (!labels.Contains(label)) labels.Add(label);
var thread = p[1];
if (!threads.Contains(thread)) threads.Add(thread);
var msg = p[2];
var key = string.Format("{0}-{1}", label, thread);
if (!cells.ContainsKey(key)) cells.Add(key, new List<string>());
cells[key].Add(msg);
}
var sb = new StringBuilder();
sb.Append(@"
<style>
body {
font-size: 9pt; font-family: Tahoma;
}
table { border-collapse: collapse; font-size: 9pt; }
td { border: 1px solid #ccc; padding: 5px; }
td.exist { background-color: #eee; }
</style>
");
sb.AppendFormat("<p>Count: {0}, MaxDegreeOfParallelism: {1}, Delay: {2}</p>",
count, max, delay);
var existFlags = threads.ToDictionary(x => x, x => false);
sb.Append("<table border='1'>");
sb.Append("<tr><td></td>");
foreach (var thread in threads) {
sb.AppendFormat("<td>{0}</td>", thread);
}
sb.Append("</tr>");
foreach (var label in labels) {
sb.AppendFormat("<tr><td>{0}</td>", label);
foreach (var thread in threads) {
var key = string.Format("{0}-{1}", label, thread);
var msgs = cells.ContainsKey(key) ?
string.Join("<br/>", cells[key].ToArray()) : "";
if (msgs.Contains("Start")) existFlags[thread] = true;
sb.AppendFormat("<td class={1}>{0}</td>", msgs,
existFlags[thread] ? "exist" : "");
if (msgs.Contains("End")) existFlags[thread] = false;
}
sb.Append("</tr>");
}
sb.Append("</table>");
Response.Write(sb.ToString());
// Response.ContentType = "text/plain";
// Response.Write(string.Join("\n", queue.ToArray()));
}
</script>
m = 3,最多三個 Task 並存,每一列最多三個格子有數字:(576、577 則是建立後沒用到就結束)
m = 4,最多四個 Task 並存。
m = 6,最多六個 Task 並存。
2023-06-15 更新:忠成老師提醒,Task 與 Thread 並非一對一關係,有可能兩個 Task 排給同一條 Thread,為更精確觀察 Thread,稍微修改程式以 Thread.CurrentThread.ManagedThreadId 為觀察標對象,調整 log 寫入資訊:
Parallel.ForEach(Enumerable.Range(0, count),
new ParallelOptions {
MaxDegreeOfParallelism = max
},
//init
() => {
log(Thread.CurrentThread.ManagedThreadId.ToString(), Task.CurrentId.ToString() + ":Start");
return new {
Results = new List<string>()
};
},
//loop
(i, loopState, taskLocal) => {
for (var j = 1; j < 3; j++) {
log(Thread.CurrentThread.ManagedThreadId.ToString(), string.Format("{2}:{0}-{1}", i, j, Task.CurrentId));
Thread.Sleep(delay);
}
return taskLocal;
},
//final
(taskLocal) => {
log(Thread.CurrentThread.ManagedThreadId.ToString(), Task.CurrentId.ToString() + ":End");
});
//....
// 由於 Thread 可能被多個 Task 共用,Start/End 一次以上,調整產生 Table 時檢查是否結束的比對規則
if (msgs.Contains("End") && Regex.Matches(msgs, "(Start|End)").Cast<Match>().Last().Value == "End")
existFlags[thread] = false;
資訊複雜一些,可觀察到一條 Thread 被多個 Task 共用的狀況(例如 Thread 43 用來跑 Task 413/417/418、Thread 50 跑 Task 416/419/421),但並存數量仍受 MaxDegreeOfParallelism 嚴格控制,同一列上正在執行的 Thread 數量小於等於 MaxDegreeOfParallelism:
MaxDegreeOfParallelism = 4:
MaxDegreeOfParallelism = 6:
由此可知,Parallel.ForEach API 會靈活配置 Task/Thread 準備處理集合中的元素,而 MaxDegreeOfParallelism 可嚴格管控同時執行的 Task 數量上限,確保不要有太多 Thread 並存,以免過度耗用資源及影響效果。至此,我對 MaxDegreeOfParallelism 的效果有了清楚認識,又導正一個觀念。
Using a simple experiment to observe how MaxDegreeOfParallelism controle the threads of Parallel.For/ForEach.
Comments
Be the first to post a comment