ThreadPools.jl Documentation
Improved background Task management, keeping heavy Julia computations off the primary thread
A simple package that creates a few functions mimicked from Base
(bgforeach
, bgmap
, and @bgthreads
) that behave like the originals but generate spawned tasks that stay purely on background threads.
Overview
As of v1.3.1, Julia does not have any built-in mechanisms for keeping computational threads off of the primary thread. For many use cases, this restriction is not important - except in very specific instances, pure computational activities will run faster using all threads. But in some cases, we may want to keep the primary thread free of blocking tasks. For example, a GUI running on the primary thread will become unresponsive if a computational task hits. For another, parallel computations with very nonuniform processing times can benefit from sacrificing the primary thread to manage the loads on the remaining ones.
ThreadPools is a simple package that allows background-only Task assignment for cases where this makes sense. (As Julia matures, it is hoped this package is made obsolete.) The standard foreach
, map
, and @threads
functions are mimicked, adding a bg
prefix to each to denote background operation: bgforeach
, bgmap
, @bgthreads
. Code that runs with one of those Base functions should run just fine with the bg
prepended, but adding multithreading for free in the foreach
and map
cases, and in all cases keeping the primary thread free of blocking Tasks.
Usage
Each of the simple API functions can be used like the Base
versions of the same function, with a bg
prepended to the function:
julia> bgforeach([1,2,3]) do x
println("\$(x+1) \$(Threads.threadid())")
end
3 3
4 4
2 2
julia> bgmap([1,2,3]) do x
println("\$x \$(Threads.threadid())")
x^2
end
2 3
3 4
1 2
3-element Array{Int64,1}:
1
4
9
julia> @bgthreads for x in 1:3
println("\$x \$(Threads.threadid())")
end
2 3
3 4
1 2
For an example of a more complex load-management scenario, see examples/stackdemo.jl
.
Demonstrations
There are a couple of demonstrations in the examples
directory. demo.jl
shows how jobs are distributed across threads in both the @threads
and @bgthreads
cases for various workload distributions. Running these demos is fairly simple (results below on 4 threads):
julia> include("examples/demo.jl")
Main.Demo
julia> Demo.run_with_outliers()
@bgthreads, Active Job Per Thread on 200ms Intervals
0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
0 6 14 25 29 31 31 40 49 52 62 68 73 83 89 0 100 105 109 109 109 109 109 109 132 137 141 147 0 0 0
0 8 15 20 30 33 33 33 50 57 63 66 66 84 90 94 0 104 108 112 116 121 123 127 131 134 134 134 0 0 0
0 9 12 24 24 24 35 38 0 56 61 69 0 82 91 95 98 98 98 113 117 120 120 120 120 135 142 146 0 0 0
@threads, Active Job Per Thread on 200ms Intervals
0 4 6 9 10 12 15 16 20 24 24 24 28 29 31 31 32 33 33 34 37 0 0 0 0 0
0 43 46 50 52 54 56 60 62 65 66 66 68 70 73 0 0 0 0 0 0 0 0 0 0 0
0 79 82 84 87 90 92 94 96 98 98 98 98 100 101 104 106 108 109 109 109 109 109 110 112 0
0 117 119 120 120 120 120 121 124 127 131 133 134 134 134 137 141 143 146 149 0 0 0 0 0 0
Speed increase using all threads (ideal 33.3%): 14.4%
These demos generate numbered jobs with a randomized work distribution that can be varied. There are normal, uniform, and uniform with 10% outliers of 10x distributions. The activity graphs in these demos present time-sliced shapshots of the thread activities, showing which job number was active in that time slice.
The available demos are:
Demo.run_with_uniform()
Demo.run_with_variation()
Demo.run_with_outliers()
There is also a more complex demo at examples/stackdemo.jl
. Here, the workload is heirarchal - each jobs produces a result and possibly more jobs. The primary thread in this case is used purely more managing the job stack.
Simple API
Each function of the simple API tries to mimic an existing function in Base
or Base.Threads
to keep any code rework to a minimum.
ThreadPools.bgforeach
— Methodbgforeach(fn, itrs...) -> Nothing
Mimics the Base.foreach
function, but spawns each iteration to a background thread. Falls back to Base.foreach
when nthreads() == 1.
Example
julia> bgforeach([1,2,3]) do x
println("$(x+1) $(Threads.threadid())")
end
3 3
4 4
2 2
Note that the execution order across the threads is not guaranteed.
ThreadPools.bgmap
— Methodbgmap(fn, itrs...) -> collection
Mimics the Base.map
function, but spawns each case to a background thread. Falls back to Base.map
when nthreads() == 1.
Note that the collection(s) supplied must be of equal and finite length.
Example
julia> bgmap([1,2,3]) do x
println("$x $(Threads.threadid())")
x^2
end
2 3
3 4
1 2
3-element Array{Int64,1}:
1
4
9
Note that while the thread execution order is not guaranteed, the final result will maintain the proper sequence.
ThreadPools.@bgthreads
— Macro@bgthreads
A macro to parallelize a for-loop to run with multiple threads.
@bgthreads
mimics the Threads.@threads
macro, but keeps the activity off of the primary thread. Will fall back gracefully to Base.foreach
behavior when nthreads == 1.
Example
julia> @bgthreads for x in 1:3
println("$x $(Threads.threadid())")
end
2 3
3 4
1 2
Note that the execution order across the threads is not guaranteed.
ThreadPool API
The ThreadPool
mimics the Channel{Task}
API, where put!
ting a Task
causes it to be executed, and take!
returns the completed Task
. The ThreadPool
is iterable over the completed Task
s in the same way a Channel
would be.
ThreadPools.ThreadPool
Base.put!(pool::ThreadPool, t::Task)
Base.put!(pool::ThreadPool, fn::Function, args...)
Base.take!(pool::ThreadPool, ind::Integer)
Base.close(pool::ThreadPool)
ThreadPools.isactive(pool::ThreadPool)
ThreadPools.ThreadPool
— TypeThreadPool(allow_primary=false)
The main ThreadPool object. Its API mimics that of a Channel{Task}
, but each submitted task is executed on a different thread. If allow_primary
is true, the assigned thread might be the primary, which will interfere with future thread management for the duration of any heavy-computational (blocking) processes. If it is false, all assigned threads will be off of the primary. Each thread will only be allowed one Task at a time, but each thread will backfill with the next queued Task immediately on completion of the previous, without regard to how bust the other threads may be.
Base.put!
— MethodBase.put!(pool::ThreadPool, t::Task)
Put the task t
into the pool, blocking until the pool has an available thread.
Base.put!
— MethodBase.put!(pool::ThreadPool, fn::Function, args...)
Base.put!(fn::Function, pool::ThreadPool, args...)
Creates a task that runs fn(args...)
and adds it to the pool, blocking until the pool has an available thread.
Base.take!
— MethodBase.take!(pool::ThreadPool) -> Task
Takes the next available completed task from the pool, blocking until a task is available.
Base.close
— MethodBase.close(pool::ThreadPool)
Shuts down the pool, closing the internal thread handlers. It is safe to issue this command after all Tasks have been submitted, regardless of the Task completion status. If issued while the pool is still active, it will yield
until all tasks have been completed.
ThreadPools.isactive
— MethodThreadPools.isactive(pool::ThreadPool)
Returns true
if there are queued Tasks anywhere in the pool, either awaiting execution, executing, or waiting to be retrieved.