關於在 PowerShell 使用多執行緒平行執行作業,先前介紹過 Start-Job

Start-Job 會在背後啟動另一個 Process 跑 ScriptBlock,寫法較簡單但有兩個明顯缺點:

  1. 啟動一整個 Process 的成本不低,因此 Start-Job 明顯較笨重,效能也差,慘輸給另開 Thread 的 Runspace

    圖表來源
  2. 由於 Start-Job 的程式在另一個 Process 裡執行,參數只能序列化後傳遞,無法共用變數及物件,父程式或平行作業間很難溝通。

基於以上理由,Runspace 雖然較複雜,仍有學習的價值,這篇文章就來練習使用 Runspace 執行平行作業。(註:後面會介紹 PowerShell 7+ 新加入的厲害武器,這裡先討論 PowerShell 5.1 相容寫法)

開始之前,我們先複習 Start-Job 寫法,以下是個簡單範例,總共要產生 32 個 GUID,分成四組平行產生,每次產生後等待 450 ~ 500ms,預期 3.5s 跑完。

$groups = @(
    @(0..7),
    @(8..15),
    @(16..23),
    @(24..31)
)
$sw = [System.Diagnostics.Stopwatch]::StartNew()
# 平行執行
$psJobPool = @()
$groups | ForEach-Object {
    $psJob = Start-Job -ScriptBlock {
        param ([object[]]$array)
        $array | ForEach-Object {
            $guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
            Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
            return $guid
        }
    } -ArgumentList @(, $_) 
    $psJobPool += $psJob
}
# Wait-Job 可等待所有 PSJob 結束
$psJobPool | Wait-Job | Out-Null
Write-Host "Duration: $($sw.ElapsedMilliseconds.ToString('n0'))ms"
# Receive-Job 接收 PSJob 傳回結果
$groupNo = 0
$psJobPool | ForEach-Object {
    $result = Receive-Job -Job $_
    $groupNo++
    Write-Host "Group $groupNo" -ForegroundColor Yellow
    Write-Host ($result -join ',')
} 

實測加上 Start-Job 的 Overhead,耗時約 4.6。

接著,我們嘗試改用 Runspace 來跑。

Runspace 的使用方式是先透過 RunspaceFactory 建立 Runspace,再建立 PowerShell 物件使用該 Runspace 物件執行。PowerShell 物件除了 Invoke() 方法外,還提供 BeginInvoke() 支援非同步呼叫,同時啟動多個 PowerShell.BeginInvoke(),檢查它們傳回的 IAsyncResult.IsCompleted 狀態等待所有 PowerShell 物件執行完畢,透過 EndInvoke() 取回結果,便實現了平行作業。Runspace 官方範例

依據以上概念,我們將 Start-Job 版改成 Runspace 版:

$groups = @(
    @(0..7),
    @(8..15),
    @(16..23),
    @(24..31)
)

$scriptBlock = {
    param ([object[]]$array)
    $array | ForEach-Object {
        $guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
        Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
        return $guid
    }
}

$jobs = @()
$sw = [System.Diagnostics.Stopwatch]::StartNew()
$groups | ForEach-Object {
    $rs = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace()
    $rs.Open()    
    $ps = [System.Management.Automation.PowerShell]::Create()
    $ps.Runspace = $rs
    $ps.AddScript($scriptBlock).AddArgument($_) | Out-Null
    $jobs += [PSCustomObject]@{
        Job = $ps.BeginInvoke()
        Runspace = $rs
        PowerShell = $ps
    }    
}

while ($jobs.Job.IsCompleted -contains $false) {
    Start-Sleep -Milliseconds 100
}
Write-Host "Duration: $($sw.ElapsedMilliseconds.ToString('n0'))ms"

$groupNo = 0
$jobs | ForEach-Object {
    $res = $_.PowerShell.EndInvoke($_.Job)
    $groupNo++
    Write-Host "Group $groupNo" -ForegroundColor Yellow
    Write-Host ($res -join ',')
    $_.PowerShell.Dispose()
    $_.Runspace.Close()
    $_.Runspace.Dispose()
}

寫法複雜些,但執行速度變快,3.9s 完成。在這個範例中,我們只啟動四線平行作業,平行數量愈多,Start-Job 與 Runspace 的效能差異會更明顯。

然而,Runspace 更可貴之處在於我們可以傳遞物件參數進去,與 Runspace 執行 ScriptBlock 共享物件,如此可做到各 Thread 由共用 Queue 取出待辦工作,實現經典的生產者消費者模式;而原本另起 PowerShell 物件執行無法用 Write-Host 將進度輸出到終端的問題,也能透過新增進度訊息字串 Queue 解決。

在 Runspace 間共享物件必須考量 Thread-Safty,放任多執行緒同時更新集合物件,很容易遇到各式各類的鬼問題。PowerShell 不像 C# 有 lock 指令,但用 Mutex 互斥鎖便能簡單實現多執行緒安全更新。這裡我選擇更簡便的做法,直接借用 .NET ConcurrentQueue<object>

而在呼叫 Runspace ScriptBlock 時,要以 By Reference 傳入 ConcurrentQueue 物件,做法傳入參數為加上 [ref],而函式內則透過 .Value 屬性存取內容。

具備以上知識後來著手改造程式。概念是設一個待辦 Queue,放入 32 個待辦工作,建立四個 Runspace 同步執行,各 Runspace 自己從待辦區撿工作來做,執行結果 Write-Output 傳回或存入結果 Queue。而為了即時觀察進度,我的解法是增設一個 Queue 蒐集進度訊息字串,在等待過程即時顯示。完整程式如下:

$ErrorActionPreference = 'Stop'
Add-Type -AssemblyName System.Collections.Concurrent
[PSCustomObject[]]$items = @(0..31) | ForEach-Object { [PSCustomObject]@{ Id = $_; Result = $null } }
$todoItems = [System.Collections.Concurrent.ConcurrentQueue[object]]::new($items)
$progressMsgs = [System.Collections.Concurrent.ConcurrentQueue[object]]::new()
$results = [System.Collections.Concurrent.ConcurrentQueue[object]]::new()
$scriptBlock = {
    param ([ref]$todo, [ref]$results, [ref]$progressMsgs)
    while (-not $todo.Value.IsEmpty) {
        $item = $null
        if (-not $todo.Value.TryDequeue([ref]$item)) {
            continue
        }
        $guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
        Write-Output "$guid"
        # 除了用 Write-Output 回傳結果以 EndInvoke() 接收,
        # 一併示範用 Queue 蒐集結果
        $item.Result = $guid
        $results.Value.Enqueue($item)
        # 回報進度
        $progressMsgs.Value.Enqueue("Generated: $guid")
        Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
    }
}
(Measure-Command {
    $jobs = @()
    (1..4) | ForEach-Object {
        $rs = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace()
        $rs.Open()    
        $ps = [System.Management.Automation.PowerShell]::Create()
        $ps.Runspace = $rs

        $ps.AddScript($scriptBlock).AddArgument([ref]$todoItems).AddArgument([ref]$results).AddArgument([ref]$progressMsgs) | Out-Null
        $jobs += [PSCustomObject]@{
            Job = $ps.BeginInvoke()
            Runspace = $rs
            PowerShell = $ps
        }    
    }
    while ($jobs.Job.IsCompleted -contains $false) {
        while (-not $progressMsgs.IsEmpty) {
            $msg = $null
            if ($progressMsgs.TryDequeue([ref]$msg)) 
            {
                Write-Host "$([DateTime]::Now.ToString('HH:mm:ss.fff')) $msg"
            }
        }
        Start-Sleep -Milliseconds 100
    }
}).TotalMilliseconds
$groupNo = 0
$jobs | ForEach-Object {
    $groupNo++
    Write-Host "Group $groupNo" -ForegroundColor Yellow
    $res = $_.PowerShell.EndInvoke($_.Job)
    Write-Host ($res -join ',')
    $_.PowerShell.Dispose()
    $_.Runspace.Close()
    $_.Runspace.Dispose()
}
Write-Host "Results Queue: " -ForegroundColor Yellow
($results.ToArray() | ForEach-Object {
    $_.Result
}) -join ','

如此,我們實現了用四個 Consumer 消化待辦工作的 PowerShell 平行作業:

除了預先指定平行 Runspace 數量,PowerShell 也提供 RunspacePool,可用類似 ThreadPool 的概念輕鬆發揮多執行緒的效益。

最後,如果你已升級到 PowerShell 7+,新加入的 ForEach-Object -Parallel 功能讓事情簡單十倍,沿用原本的 ForEach-Object 寫法,加上 -Parallel 開啟平行處理,並加上 -ThrottleLimit 指定最多幾條 Thread,原本的 ForEach-Object 迴圈當場升級成平行作業,而在其中的 Write-Host 可正常顯示,不需花功夫自己串接。夠簡單吧?

# https://devblogs.microsoft.com/powershell/powershell-foreach-object-parallel-feature/
$ErrorActionPreference = 'Stop'
[PSCustomObject[]]$todoItems = @(0..31) | ForEach-Object { [PSCustomObject]@{ Id = $_; Result = $null } }
(Measure-Command {
    $results = $todoItems | Foreach-Object -ThrottleLimit 8 -Parallel {
        $guid = (New-Guid).ToString().Substring(0, 4).ToUpper()
        $_.Result = $guid
        Write-Output $_
        Write-Host "$([DateTime]::Now.ToString('HH:mm:ss.fff')) Generated: $guid"
        Start-Sleep -Milliseconds (450 + (Get-Random -Minimum 1 -Maximum 50))
    }
}).TotalMilliseconds
Write-Host "Results Queue: " -ForegroundColor Yellow
($results | ForEach-Object {
    $_.Result
}) -join ','

以上就是在 PowerShell 5.1 及 PowerShell 7+ 實現平行作業的簡單介紹,希望對大家有幫助。

The blog post compares Start-Job and Runspaces for parallel execution in PowerShell, emphasizing Runspaces’ efficiency and flexibility. It provides detailed examples for both PowerShell 5.1 and 7+, showcasing how to use Runspaces for parallel tasks and introducing ForEach-Object -Parallel for simpler parallel processing in PowerShell 7+.


Comments

# by 小黑

太強了

Post a comment