When managing and maintaining our client’s Autonomy infrastructures, it’s often beneficial to thread the underlying scripts. At some installations, we manage tens of servers and thousands of services so running maintenance tasks in parallel saves plenty of time.

Powershell jobs are great for executing quick background tasks, but they can be too resource-intensive and cumbersome to throttle. However, .NET runspaces and runspace pools are simple enough to implement and, as seen here, have the performance advantage over jobs.

So as part of the RAVNTools PowerShell library available to our Professional Services team, I’ve introduced a simple way to thread scripting using the .NET construct of runspace pools:

 

Function Invoke-RunspacePool {
    <#
        .SYNOPSIS
        Creates a new runspace pool, executing the ThreadBlock in multiple threads.

        .EXAMPLE
        Invoke-RunspacePool $cmd $args
    #>

    [CmdletBinding()]
    Param(
        # Script block to execute in each thread.
        [Parameter(Mandatory=$True,
                   Position=1)]
        [scriptblock]$ThreadBlock,

        # Set of arguments to pass to the thread. $threadId will always be added to this.
        [Parameter(Mandatory=$False,
                   Position=2)]
        [hashtable]$ThreadParams,

        # Maximum number of threads. Default is the number of logical CPUs on the executing machine.
        [Parameter(Mandatory=$False,
                   Position=3)]
        [int]$MaxThreads,

        # Garbage collector cleanup interval.
        [Parameter(Mandatory=$False)]
        [int]$CleanupInterval = 2,

        # Powershell modules to import into the RunspacePool.
        [Parameter(Mandatory=$False)]
        [String[]]$ImportModules,

        # Paths to modules to be imported into the RunspacePool.
        [Parameter(Mandatory=$False)]
        [String[]]$ImportModulesPath
    )

    if (!$MaxThreads) {
        $MaxThreads = ((Get-WmiObject Win32_Processor) `
                           | Measure-Object -Sum -Property NumberOfLogicalProcessors).Sum
    }

    $sessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

    if ($ImportModules) {
        $ImportModules | % { $sessionState.ImportPSModule($_) }
    }

    if ($ImportModulesPath) {
        $ImportModulesPath | % { $sessionState.ImportPSModulesFromPath($_) }
    }

    $pool = [RunspaceFactory]::CreateRunspacePool(1, $MaxThreads, $sessionState, $Host)

    $pool.ApartmentState  = "STA" # Single-threaded runspaces created
    $pool.CleanupInterval = $CleanupInterval * [timespan]::TicksPerMinute

    $pool.Open()

    $jobs      = New-Object 'Collections.Generic.List[System.IAsyncResult]'
    $pipelines = New-Object 'Collections.Generic.List[System.Management.Automation.PowerShell]'
    $handles   = New-Object 'Collections.Generic.List[System.Threading.WaitHandle]'

    for ($i = 1 ; $i -le $MaxThreads ; $i++) {

        $pipeline = ::Create()
        $pipeline.RunspacePool = $pool
        $pipeline.AddScript($ThreadBlock) | Out-Null

        $params = @{ 'threadId' = $i }

        if ($ThreadParams) {
            $params += $ThreadParams
        }

        $pipeline.AddParameters($params) | Out-Null

        $pipelines.Add($pipeline)

        $job = $pipeline.BeginInvoke()
        $jobs.Add($job)

        $handles.Add($job.AsyncWaitHandle)
    }

    while ($pipelines.Count -gt 0) {

        $h = [System.Threading.WaitHandle]::WaitAny($handles)

        $handle   = $handles.Item($h)
        $job      = $jobs.Item($h)
        $pipeline = $pipelines.Item($h)

        $result = $pipeline.EndInvoke($job)

        ### Process results here
        if ($PSBoundParameters['Verbose'].IsPresent) { Write-Host "" }
        Write-Verbose "Pipeline state: $($pipeline.InvocationStateInfo.State)"
        if ($pipeline.HadErrors) {
            $pipeline.Streams.Error.ReadAll() | % { Write-Error $_ }
        }
        $result | % { Write-Verbose $_ }

        $handles.RemoveAt($h)
        $jobs.RemoveAt($h)
        $pipelines.RemoveAt($h)

        $handle.Dispose()
        $pipeline.Dispose()
    }

    $pool.Close()
}

Unless specified, maximum threads defaults to the number of logical CPUs on the machine where this function is executed. It’s also possible to import shared powershell modules into the runspace. We have the proprietary AutonomyTools and RavnTools libraries which we often use, so they’re set up to import by default for us.

The pool ‘ApartmentState’ is set to STA, so each runspace within the pool is a single thread.

Using this function, threading becomes as easy as:

# 1.
$cmd = {
    param($threadId, $var)
    "Thread id : $threadId"
    "Variable : $var"
}

# 2.
$args = @{
    var = "Hello World!"
}

# 3.
Invoke-RunspacePool -ThreadBlock $cmd `
                    -ThreadParams $args `
                    -Verbose

$threadId is part of the function and is included to simplify debugging. It can be omitted if it’s not required.

In the example above, Invoke-RunspacePool will create a runspace pool with the maximum number of threads set to the number of CPUs on the machine where it executes, and then spawn that number of threads, each executing $cmd. A simple way to parallelize the execution of scripts.

Maximum benefit in using this function is derived from combining it with synchronized objects such as work queues.

$servicesToStart = @(
    [PSCustomObject] @{ Service = 'Service1'; Computer = 'Computer1' }
    [PSCustomObject] @{ Service = 'Service2'; Computer = 'Computer1' }
    [PSCustomObject] @{ Service = 'Service1'; Computer = 'Computer2' }
)

$workQ = [System.Collections.Queue]::Synchronized(
    (New-Object System.Collections.Queue(, $servicesToStart))
)

$cmd = {
    param($q)

    while($True) {
        try {
            $svc = $q.Dequeue()
        } catch {
            break
        }
        Get-Service -Name $svc.Service -ComputerName $svc.Computer `
            | Set-Service -Status Running
    }
}

$args = @{
    q = $workQ
}

Invoke-RunspacePool $cmd $args

Note that the comma preceding $servicesToStart is not a typo – powershell gets confused with passing arrays as constructor arguments.

If you’re starting thousands of services across many servers, running this script can significantly reduce the time required to execute a task. And it doesn’t have to be maintenance tasks; it can be used to thread document conversion, PDF to image conversion when using Ghostscript.NET, verify the integrity of two IDOL engines, or even perform large IDOL queries.

 

 

(Icons in image from: www.iconpharm.com)[/vc_column_text][/vc_column][/vc_row]

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

clear formSubmit