再談 PowerShell 平行作業 - 從 Runspace 到 ForEach-Object -Parallel
1 |
關於在 PowerShell 使用多執行緒平行執行作業,先前介紹過 Start-Job。
Start-Job 會在背後啟動另一個 Process 跑 ScriptBlock,寫法較簡單但有兩個明顯缺點:
- 啟動一整個 Process 的成本不低,因此 Start-Job 明顯較笨重,效能也差,慘輸給另開 Thread 的 Runspace。
圖表來源 - 由於 Start-Job 的程式在另一個 Process 裡執行,參數只能序列化後傳遞,無法共用變數及物件,父程式或平行作業間很難溝通。
基於以上理由,Runspace 雖然較複雜,仍有學習的價值,這篇文章就來練習使用 Runspace 執行平行作業。(註:後面會介紹 PowerShell 7+ 新加入的厲害武器,這裡先討論 PowerShell 5.1 相容寫法)
開始之前,我們先複習 Start-Job 寫法,以下是個簡單範例,總共要產生 32 個 GUID,分成四組平行產生,每次產生後等待 450 ~ 500ms,預期 3.5s 跑完。
$groups = @(
@(0..7),
@(8..15),
@(16..23),
@(24..31)
)
$sw = [System.Diagnostics.Stopwatch]::StartNew()
# 平行執行
$psJobPool = @()
$groups | ForEach-Object {
$psJob = Start-Job -ScriptBlock {
param ([object[]]$array)
$array | ForEach-Object {
$guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
return $guid
}
} -ArgumentList @(, $_)
$psJobPool += $psJob
}
# Wait-Job 可等待所有 PSJob 結束
$psJobPool | Wait-Job | Out-Null
Write-Host "Duration: $($sw.ElapsedMilliseconds.ToString('n0'))ms"
# Receive-Job 接收 PSJob 傳回結果
$groupNo = 0
$psJobPool | ForEach-Object {
$result = Receive-Job -Job $_
$groupNo++
Write-Host "Group $groupNo" -ForegroundColor Yellow
Write-Host ($result -join ',')
}
實測加上 Start-Job 的 Overhead,耗時約 4.6。
接著,我們嘗試改用 Runspace 來跑。
Runspace 的使用方式是先透過 RunspaceFactory 建立 Runspace,再建立 PowerShell 物件使用該 Runspace 物件執行。PowerShell 物件除了 Invoke() 方法外,還提供 BeginInvoke() 支援非同步呼叫,同時啟動多個 PowerShell.BeginInvoke(),檢查它們傳回的 IAsyncResult.IsCompleted 狀態等待所有 PowerShell 物件執行完畢,透過 EndInvoke() 取回結果,便實現了平行作業。Runspace 官方範例
依據以上概念,我們將 Start-Job 版改成 Runspace 版:
$groups = @(
@(0..7),
@(8..15),
@(16..23),
@(24..31)
)
$scriptBlock = {
param ([object[]]$array)
$array | ForEach-Object {
$guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
return $guid
}
}
$jobs = @()
$sw = [System.Diagnostics.Stopwatch]::StartNew()
$groups | ForEach-Object {
$rs = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace()
$rs.Open()
$ps = [System.Management.Automation.PowerShell]::Create()
$ps.Runspace = $rs
$ps.AddScript($scriptBlock).AddArgument($_) | Out-Null
$jobs += [PSCustomObject]@{
Job = $ps.BeginInvoke()
Runspace = $rs
PowerShell = $ps
}
}
while ($jobs.Job.IsCompleted -contains $false) {
Start-Sleep -Milliseconds 100
}
Write-Host "Duration: $($sw.ElapsedMilliseconds.ToString('n0'))ms"
$groupNo = 0
$jobs | ForEach-Object {
$res = $_.PowerShell.EndInvoke($_.Job)
$groupNo++
Write-Host "Group $groupNo" -ForegroundColor Yellow
Write-Host ($res -join ',')
$_.PowerShell.Dispose()
$_.Runspace.Close()
$_.Runspace.Dispose()
}
寫法複雜些,但執行速度變快,3.9s 完成。在這個範例中,我們只啟動四線平行作業,平行數量愈多,Start-Job 與 Runspace 的效能差異會更明顯。
然而,Runspace 更可貴之處在於我們可以傳遞物件參數進去,與 Runspace 執行 ScriptBlock 共享物件,如此可做到各 Thread 由共用 Queue 取出待辦工作,實現經典的生產者消費者模式;而原本另起 PowerShell 物件執行無法用 Write-Host 將進度輸出到終端的問題,也能透過新增進度訊息字串 Queue 解決。
在 Runspace 間共享物件必須考量 Thread-Safty,放任多執行緒同時更新集合物件,很容易遇到各式各類的鬼問題。PowerShell 不像 C# 有 lock 指令,但用 Mutex 互斥鎖便能簡單實現多執行緒安全更新。這裡我選擇更簡便的做法,直接借用 .NET ConcurrentQueue<object>
。
而在呼叫 Runspace ScriptBlock 時,要以 By Reference 傳入 ConcurrentQueue 物件,做法傳入參數為加上 [ref],而函式內則透過 .Value 屬性存取內容。
具備以上知識後來著手改造程式。概念是設一個待辦 Queue,放入 32 個待辦工作,建立四個 Runspace 同步執行,各 Runspace 自己從待辦區撿工作來做,執行結果 Write-Output 傳回或存入結果 Queue。而為了即時觀察進度,我的解法是增設一個 Queue 蒐集進度訊息字串,在等待過程即時顯示。完整程式如下:
$ErrorActionPreference = 'Stop'
Add-Type -AssemblyName System.Collections.Concurrent
[PSCustomObject[]]$items = @(0..31) | ForEach-Object { [PSCustomObject]@{ Id = $_; Result = $null } }
$todoItems = [System.Collections.Concurrent.ConcurrentQueue[object]]::new($items)
$progressMsgs = [System.Collections.Concurrent.ConcurrentQueue[object]]::new()
$results = [System.Collections.Concurrent.ConcurrentQueue[object]]::new()
$scriptBlock = {
param ([ref]$todo, [ref]$results, [ref]$progressMsgs)
while (-not $todo.Value.IsEmpty) {
$item = $null
if (-not $todo.Value.TryDequeue([ref]$item)) {
continue
}
$guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
Write-Output "$guid"
# 除了用 Write-Output 回傳結果以 EndInvoke() 接收,
# 一併示範用 Queue 蒐集結果
$item.Result = $guid
$results.Value.Enqueue($item)
# 回報進度
$progressMsgs.Value.Enqueue("Generated: $guid")
Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
}
}
(Measure-Command {
$jobs = @()
(1..4) | ForEach-Object {
$rs = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace()
$rs.Open()
$ps = [System.Management.Automation.PowerShell]::Create()
$ps.Runspace = $rs
$ps.AddScript($scriptBlock).AddArgument([ref]$todoItems).AddArgument([ref]$results).AddArgument([ref]$progressMsgs) | Out-Null
$jobs += [PSCustomObject]@{
Job = $ps.BeginInvoke()
Runspace = $rs
PowerShell = $ps
}
}
while ($jobs.Job.IsCompleted -contains $false) {
while (-not $progressMsgs.IsEmpty) {
$msg = $null
if ($progressMsgs.TryDequeue([ref]$msg))
{
Write-Host "$([DateTime]::Now.ToString('HH:mm:ss.fff')) $msg"
}
}
Start-Sleep -Milliseconds 100
}
}).TotalMilliseconds
$groupNo = 0
$jobs | ForEach-Object {
$groupNo++
Write-Host "Group $groupNo" -ForegroundColor Yellow
$res = $_.PowerShell.EndInvoke($_.Job)
Write-Host ($res -join ',')
$_.PowerShell.Dispose()
$_.Runspace.Close()
$_.Runspace.Dispose()
}
Write-Host "Results Queue: " -ForegroundColor Yellow
($results.ToArray() | ForEach-Object {
$_.Result
}) -join ','
如此,我們實現了用四個 Consumer 消化待辦工作的 PowerShell 平行作業:
除了預先指定平行 Runspace 數量,PowerShell 也提供 RunspacePool,可用類似 ThreadPool 的概念輕鬆發揮多執行緒的效益。
最後,如果你已升級到 PowerShell 7+,新加入的 ForEach-Object -Parallel 功能讓事情簡單十倍,沿用原本的 ForEach-Object 寫法,加上 -Parallel 開啟平行處理,並加上 -ThrottleLimit 指定最多幾條 Thread,原本的 ForEach-Object 迴圈當場升級成平行作業,而在其中的 Write-Host 可正常顯示,不需花功夫自己串接。夠簡單吧?
# https://devblogs.microsoft.com/powershell/powershell-foreach-object-parallel-feature/
$ErrorActionPreference = 'Stop'
[PSCustomObject[]]$todoItems = @(0..31) | ForEach-Object { [PSCustomObject]@{ Id = $_; Result = $null } }
(Measure-Command {
$results = $todoItems | Foreach-Object -ThrottleLimit 8 -Parallel {
$guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
$_.Result = $guid
Write-Output $_
Write-Host "$([DateTime]::Now.ToString('HH:mm:ss.fff')) Generated: $guid"
Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
}
}).TotalMilliseconds
Write-Host "Results Queue: " -ForegroundColor Yellow
($results | ForEach-Object {
$_.Result
}) -join ','
以上就是在 PowerShell 5.1 及 PowerShell 7+ 實現平行作業的簡單介紹,希望對大家有幫助。
The blog post compares Start-Job and Runspaces for parallel execution in PowerShell, emphasizing Runspaces’ efficiency and flexibility. It provides detailed examples for both PowerShell 5.1 and 7+, showcasing how to use Runspaces for parallel tasks and introducing ForEach-Object -Parallel for simpler parallel processing in PowerShell 7+.
Comments
# by 小黑
太強了