aboutsummaryrefslogtreecommitdiff
path: root/libphobos/src/std/parallelism.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/src/std/parallelism.d')
-rw-r--r--libphobos/src/std/parallelism.d741
1 files changed, 452 insertions, 289 deletions
diff --git a/libphobos/src/std/parallelism.d b/libphobos/src/std/parallelism.d
index 61d5cea..664330a 100644
--- a/libphobos/src/std/parallelism.d
+++ b/libphobos/src/std/parallelism.d
@@ -1,39 +1,39 @@
/**
-$(D std._parallelism) implements high-level primitives for SMP _parallelism.
+`std.parallelism` implements high-level primitives for SMP parallelism.
These include parallel foreach, parallel reduce, parallel eager map, pipelining
-and future/promise _parallelism. $(D std._parallelism) is recommended when the
+and future/promise parallelism. `std.parallelism` is recommended when the
same operation is to be executed in parallel on different data, or when a
function is to be executed in a background thread and its result returned to a
well-defined main thread. For communication between arbitrary threads, see
-$(D std.concurrency).
+`std.concurrency`.
-$(D std._parallelism) is based on the concept of a $(D Task). A $(D Task) is an
+`std.parallelism` is based on the concept of a `Task`. A `Task` is an
object that represents the fundamental unit of work in this library and may be
-executed in parallel with any other $(D Task). Using $(D Task)
+executed in parallel with any other `Task`. Using `Task`
directly allows programming with a future/promise paradigm. All other
-supported _parallelism paradigms (parallel foreach, map, reduce, pipelining)
-represent an additional level of abstraction over $(D Task). They
-automatically create one or more $(D Task) objects, or closely related types
+supported parallelism paradigms (parallel foreach, map, reduce, pipelining)
+represent an additional level of abstraction over `Task`. They
+automatically create one or more `Task` objects, or closely related types
that are conceptually identical but not part of the public API.
-After creation, a $(D Task) may be executed in a new thread, or submitted
-to a $(D TaskPool) for execution. A $(D TaskPool) encapsulates a task queue
+After creation, a `Task` may be executed in a new thread, or submitted
+to a `TaskPool` for execution. A `TaskPool` encapsulates a task queue
and its worker threads. Its purpose is to efficiently map a large
-number of $(D Task)s onto a smaller number of threads. A task queue is a
-FIFO queue of $(D Task) objects that have been submitted to the
-$(D TaskPool) and are awaiting execution. A worker thread is a thread that
-is associated with exactly one task queue. It executes the $(D Task) at the
+number of `Task`s onto a smaller number of threads. A task queue is a
+FIFO queue of `Task` objects that have been submitted to the
+`TaskPool` and are awaiting execution. A worker thread is a thread that
+is associated with exactly one task queue. It executes the `Task` at the
front of its queue when the queue has work available, or sleeps when
no work is available. Each task queue is associated with zero or
-more worker threads. If the result of a $(D Task) is needed before execution
-by a worker thread has begun, the $(D Task) can be removed from the task queue
+more worker threads. If the result of a `Task` is needed before execution
+by a worker thread has begun, the `Task` can be removed from the task queue
and executed immediately in the thread where the result is needed.
-Warning: Unless marked as $(D @trusted) or $(D @safe), artifacts in
+Warning: Unless marked as `@trusted` or `@safe`, artifacts in
this module allow implicit data sharing between threads and cannot
guarantee that client code is free from low level data races.
-Source: $(PHOBOSSRC std/_parallelism.d)
+Source: $(PHOBOSSRC std/parallelism.d)
Author: David Simcha
Copyright: Copyright (c) 2009-2011, David Simcha.
License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0)
@@ -53,7 +53,7 @@ else version (WatchOS)
@system unittest
{
import std.algorithm.iteration : map;
- import std.math : approxEqual;
+ import std.math.operations : isClose;
import std.parallelism : taskPool;
import std.range : iota;
@@ -82,7 +82,7 @@ else version (WatchOS)
immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);
- assert(pi.approxEqual(3.1415926));
+ assert(pi.isClose(3.14159, 1e-5));
}
import core.atomic;
@@ -389,32 +389,32 @@ private struct AbstractTask
}
/**
-$(D Task) represents the fundamental unit of work. A $(D Task) may be
-executed in parallel with any other $(D Task). Using this struct directly
-allows future/promise _parallelism. In this paradigm, a function (or delegate
+`Task` represents the fundamental unit of work. A `Task` may be
+executed in parallel with any other `Task`. Using this struct directly
+allows future/promise parallelism. In this paradigm, a function (or delegate
or other callable) is executed in a thread other than the one it was called
from. The calling thread does not block while the function is being executed.
-A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to
-ensure that the $(D Task) has finished executing and to obtain the return
-value, if any. These functions and $(D done) also act as full memory barriers,
-meaning that any memory writes made in the thread that executed the $(D Task)
+A call to `workForce`, `yieldForce`, or `spinForce` is used to
+ensure that the `Task` has finished executing and to obtain the return
+value, if any. These functions and `done` also act as full memory barriers,
+meaning that any memory writes made in the thread that executed the `Task`
are guaranteed to be visible in the calling thread after one of these functions
returns.
The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can
-be used to create an instance of this struct. See $(D task) for usage examples.
+be used to create an instance of this struct. See `task` for usage examples.
-Function results are returned from $(D yieldForce), $(D spinForce) and
-$(D workForce) by ref. If $(D fun) returns by ref, the reference will point
-to the returned reference of $(D fun). Otherwise it will point to a
+Function results are returned from `yieldForce`, `spinForce` and
+`workForce` by ref. If `fun` returns by ref, the reference will point
+to the returned reference of `fun`. Otherwise it will point to a
field in this struct.
Copying of this struct is disabled, since it would provide no useful semantics.
If you want to pass this struct around, you should do so by reference or
pointer.
-Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the
- call site, only to $(D args) in this struct.
+Bugs: Changes to `ref` and `out` arguments are not propagated to the
+ call site, only to `args` in this struct.
*/
struct Task(alias fun, Args...)
{
@@ -435,7 +435,7 @@ struct Task(alias fun, Args...)
{
fun(myCastedTask._args);
}
- else static if (is(typeof(addressOf(fun(myCastedTask._args)))))
+ else static if (is(typeof(&(fun(myCastedTask._args)))))
{
myCastedTask.returnVal = addressOf(fun(myCastedTask._args));
}
@@ -451,8 +451,8 @@ struct Task(alias fun, Args...)
Args _args;
/**
- The arguments the function was called with. Changes to $(D out) and
- $(D ref) arguments will be visible here.
+ The arguments the function was called with. Changes to `out` and
+ `ref` arguments will be visible here.
*/
static if (__traits(isSame, fun, run))
{
@@ -472,7 +472,7 @@ struct Task(alias fun, Args...)
static if (isFunctionPointer!(_args[0]))
{
private enum bool isPure =
- functionAttributes!(Args[0]) & FunctionAttribute.pure_;
+ (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0;
}
else
{
@@ -493,8 +493,8 @@ struct Task(alias fun, Args...)
/**
- The return type of the function called by this $(D Task). This can be
- $(D void).
+ The return type of the function called by this `Task`. This can be
+ `void`.
*/
alias ReturnType = typeof(fun(_args));
@@ -536,7 +536,8 @@ struct Task(alias fun, Args...)
}
}
- // Work around DMD bug 6588, allow immutable elements.
+ // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588,
+ // allow immutable elements.
static if (allSatisfy!(isAssignable, Args))
{
typeof(this) opAssign(typeof(this) rhs)
@@ -550,20 +551,17 @@ struct Task(alias fun, Args...)
}
else
{
- @disable typeof(this) opAssign(typeof(this) rhs)
- {
- assert(0);
- }
+ @disable typeof(this) opAssign(typeof(this) rhs);
}
/**
- If the $(D Task) isn't started yet, execute it in the current thread.
+ If the `Task` isn't started yet, execute it in the current thread.
If it's done, return its return value, if any. If it's in progress,
busy spin until it's done, then return the return value. If it threw
an exception, rethrow that exception.
This function should be used when you expect the result of the
- $(D Task) to be available on a timescale shorter than that of an OS
+ `Task` to be available on a timescale shorter than that of an OS
context switch.
*/
@property ref ReturnType spinForce() @trusted
@@ -586,7 +584,7 @@ struct Task(alias fun, Args...)
}
/**
- If the $(D Task) isn't started yet, execute it in the current thread.
+ If the `Task` isn't started yet, execute it in the current thread.
If it's done, return its return value, if any. If it's in progress,
wait on a condition variable. If it threw an exception, rethrow that
exception.
@@ -631,13 +629,13 @@ struct Task(alias fun, Args...)
}
/**
- If this $(D Task) was not started yet, execute it in the current
+ If this `Task` was not started yet, execute it in the current
thread. If it is finished, return its result. If it is in progress,
- execute any other $(D Task) from the $(D TaskPool) instance that
- this $(D Task) was submitted to until this one
+ execute any other `Task` from the `TaskPool` instance that
+ this `Task` was submitted to until this one
is finished. If it threw an exception, rethrow that exception.
- If no other tasks are available or this $(D Task) was executed using
- $(D executeInNewThread), wait on a condition variable.
+ If no other tasks are available or this `Task` was executed using
+ `executeInNewThread`, wait on a condition variable.
*/
@property ref ReturnType workForce() @trusted
{
@@ -705,10 +703,10 @@ struct Task(alias fun, Args...)
}
/**
- Returns $(D true) if the $(D Task) is finished executing.
+ Returns `true` if the `Task` is finished executing.
Throws: Rethrows any exception thrown during the execution of the
- $(D Task).
+ `Task`.
*/
@property bool done() @trusted
{
@@ -717,11 +715,11 @@ struct Task(alias fun, Args...)
}
/**
- Create a new thread for executing this $(D Task), execute it in the
+ Create a new thread for executing this `Task`, execute it in the
newly created thread, then terminate the thread. This can be used for
future/promise parallelism. An explicit priority may be given
- to the $(D Task). If one is provided, its value is forwarded to
- $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for
+ to the `Task`. If one is provided, its value is forwarded to
+ `core.thread.Thread.priority`. See $(REF task, std,parallelism) for
usage example.
*/
void executeInNewThread() @trusted
@@ -748,8 +746,8 @@ struct Task(alias fun, Args...)
//@disable this(this) {}
}
-// Calls $(D fpOrDelegate) with $(D args). This is an
-// adapter that makes $(D Task) work with delegates, function pointers and
+// Calls `fpOrDelegate` with `args`. This is an
+// adapter that makes `Task` work with delegates, function pointers and
// functors instead of just aliases.
ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
{
@@ -757,12 +755,12 @@ ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args)
}
/**
-Creates a $(D Task) on the GC heap that calls an alias. This may be executed
-via $(D Task.executeInNewThread) or by submitting to a
+Creates a `Task` on the GC heap that calls an alias. This may be executed
+via `Task.executeInNewThread` or by submitting to a
$(REF TaskPool, std,parallelism). A globally accessible instance of
-$(D TaskPool) is provided by $(REF taskPool, std,parallelism).
+`TaskPool` is provided by $(REF taskPool, std,parallelism).
-Returns: A pointer to the $(D Task).
+Returns: A pointer to the `Task`.
Example:
---
@@ -828,7 +826,7 @@ auto task(alias fun, Args...)(Args args)
}
/**
-Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or
+Creates a `Task` on the GC heap that calls a function pointer, delegate, or
class/struct with overloaded opCall.
Example:
@@ -842,7 +840,7 @@ void main()
{
// Create and execute a Task for reading
// foo.txt.
- auto file1Task = task(&read, "foo.txt");
+ auto file1Task = task(&read!string, "foo.txt", size_t.max);
file1Task.executeInNewThread();
// Read bar.txt in parallel.
@@ -855,7 +853,7 @@ void main()
Notes: This function takes a non-scope delegate, meaning it can be
used with closures. If you can't allocate a closure due to objects
- on the stack that have scoped destruction, see $(D scopedTask), which
+ on the stack that have scoped destruction, see `scopedTask`, which
takes a scope delegate.
*/
auto task(F, Args...)(F delegateOrFp, Args args)
@@ -865,24 +863,24 @@ if (is(typeof(delegateOrFp(args))) && !isSafeTask!F)
}
/**
-Version of $(D task) usable from $(D @safe) code. Usage mechanics are
+Version of `task` usable from `@safe` code. Usage mechanics are
identical to the non-@safe case, but safety introduces some restrictions:
-1. $(D fun) must be @safe or @trusted.
+1. `fun` must be @safe or @trusted.
-2. $(D F) must not have any unshared aliasing as defined by
+2. `F` must not have any unshared aliasing as defined by
$(REF hasUnsharedAliasing, std,traits). This means it
may not be an unshared delegate or a non-shared class or struct
- with overloaded $(D opCall). This also precludes accepting template
+ with overloaded `opCall`. This also precludes accepting template
alias parameters.
-3. $(D Args) must not have unshared aliasing.
+3. `Args` must not have unshared aliasing.
-4. $(D fun) must not return by reference.
+4. `fun` must not return by reference.
-5. The return type must not have unshared aliasing unless $(D fun) is
- $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead
- of using a $(D TaskPool).
+5. The return type must not have unshared aliasing unless `fun` is
+ `pure` or the `Task` is executed via `executeInNewThread` instead
+ of using a `TaskPool`.
*/
@trusted auto task(F, Args...)(F fun, Args args)
@@ -892,25 +890,25 @@ if (is(typeof(fun(args))) && isSafeTask!F)
}
/**
-These functions allow the creation of $(D Task) objects on the stack rather
-than the GC heap. The lifetime of a $(D Task) created by $(D scopedTask)
+These functions allow the creation of `Task` objects on the stack rather
+than the GC heap. The lifetime of a `Task` created by `scopedTask`
cannot exceed the lifetime of the scope it was created in.
-$(D scopedTask) might be preferred over $(D task):
+`scopedTask` might be preferred over `task`:
-1. When a $(D Task) that calls a delegate is being created and a closure
+1. When a `Task` that calls a delegate is being created and a closure
cannot be allocated due to objects on the stack that have scoped
- destruction. The delegate overload of $(D scopedTask) takes a $(D scope)
+ destruction. The delegate overload of `scopedTask` takes a `scope`
delegate.
2. As a micro-optimization, to avoid the heap allocation associated with
- $(D task) or with the creation of a closure.
+ `task` or with the creation of a closure.
-Usage is otherwise identical to $(D task).
+Usage is otherwise identical to `task`.
-Notes: $(D Task) objects created using $(D scopedTask) will automatically
-call $(D Task.yieldForce) in their destructor if necessary to ensure
-the $(D Task) is complete before the stack frame they reside on is destroyed.
+Notes: `Task` objects created using `scopedTask` will automatically
+call `Task.yieldForce` in their destructor if necessary to ensure
+the `Task` is complete before the stack frame they reside on is destroyed.
*/
auto scopedTask(alias fun, Args...)(Args args)
{
@@ -1053,22 +1051,28 @@ shared static ~this()
/**
This class encapsulates a task queue and a set of worker threads. Its purpose
-is to efficiently map a large number of $(D Task)s onto a smaller number of
-threads. A task queue is a FIFO queue of $(D Task) objects that have been
-submitted to the $(D TaskPool) and are awaiting execution. A worker thread is a
-thread that executes the $(D Task) at the front of the queue when one is
+is to efficiently map a large number of `Task`s onto a smaller number of
+threads. A task queue is a FIFO queue of `Task` objects that have been
+submitted to the `TaskPool` and are awaiting execution. A worker thread is a
+thread that executes the `Task` at the front of the queue when one is
available and sleeps when the queue is empty.
This class should usually be used via the global instantiation
available via the $(REF taskPool, std,parallelism) property.
-Occasionally it is useful to explicitly instantiate a $(D TaskPool):
+Occasionally it is useful to explicitly instantiate a `TaskPool`:
-1. When you want $(D TaskPool) instances with multiple priorities, for example
+1. When you want `TaskPool` instances with multiple priorities, for example
a low priority pool and a high priority pool.
2. When the threads in the global task pool are waiting on a synchronization
primitive (for example a mutex), and you want to parallelize the code that
needs to run before these threads can be resumed.
+
+Note: The worker threads in this pool will not stop until
+ `stop` or `finish` is called, even if the main thread
+ has finished already. This may lead to programs that
+ never end. If you do not want this behaviour, you can set `isDaemon`
+ to true.
*/
final class TaskPool
{
@@ -1091,7 +1095,7 @@ private:
Mutex waiterMutex; // For waiterCondition
// The instanceStartIndex of the next instance that will be created.
- __gshared static size_t nextInstanceIndex = 1;
+ __gshared size_t nextInstanceIndex = 1;
// The index of the current thread.
static size_t threadIndex;
@@ -1214,7 +1218,7 @@ private:
assert(returned.prev is null);
}
}
- body
+ do
{
if (isSingleTask) return null;
@@ -1258,7 +1262,7 @@ private:
assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next));
}
}
- body
+ do
{
// Not using enforce() to save on function call overhead since this
// is a performance critical function.
@@ -1452,7 +1456,7 @@ private:
// Disabled until writing code to support
// running thread with specified priority
- // See https://d.puremagic.com/issues/show_bug.cgi?id=8960
+ // See https://issues.dlang.org/show_bug.cgi?id=8960
/*if (priority != int.max)
{
@@ -1478,11 +1482,11 @@ public:
}
/**
- Default constructor that initializes a $(D TaskPool) with
- $(D totalCPUs) - 1 worker threads. The minus 1 is included because the
+ Default constructor that initializes a `TaskPool` with
+ `totalCPUs` - 1 worker threads. The minus 1 is included because the
main thread will also be available to do work.
- Note: On single-core machines, the primitives provided by $(D TaskPool)
+ Note: On single-core machines, the primitives provided by `TaskPool`
operate transparently in single-threaded mode.
*/
this() @trusted
@@ -1522,17 +1526,17 @@ public:
/**
Implements a parallel foreach loop over a range. This works by implicitly
- creating and submitting one $(D Task) to the $(D TaskPool) for each worker
- thread. A work unit is a set of consecutive elements of $(D range) to
+ creating and submitting one `Task` to the `TaskPool` for each worker
+ thread. A work unit is a set of consecutive elements of `range` to
be processed by a worker thread between communication with any other
thread. The number of elements processed per work unit is controlled by the
- $(D workUnitSize) parameter. Smaller work units provide better load
+ `workUnitSize` parameter. Smaller work units provide better load
balancing, but larger work units avoid the overhead of communicating
with other threads frequently to fetch the next work unit. Large work
units also avoid false sharing in cases where the range is being modified.
The less time a single iteration of the loop takes, the larger
- $(D workUnitSize) should be. For very expensive loop bodies,
- $(D workUnitSize) should be 1. An overload that chooses a default work
+ `workUnitSize` should be. For very expensive loop bodies,
+ `workUnitSize` should be 1. An overload that chooses a default work
unit size is also available.
Example:
@@ -1566,18 +1570,18 @@ public:
Notes:
The memory usage of this implementation is guaranteed to be constant
- in $(D range.length).
+ in `range.length`.
Breaking from a parallel foreach loop via a break, labeled break,
labeled continue, return or goto statement throws a
- $(D ParallelForeachError).
+ `ParallelForeachError`.
In the case of non-random access ranges, parallel foreach buffers lazily
- to an array of size $(D workUnitSize) before executing the parallel portion
+ to an array of size `workUnitSize` before executing the parallel portion
of the loop. The exception is that, if a parallel foreach is executed
- over a range returned by $(D asyncBuf) or $(D map), the copying is elided
- and the buffers are simply swapped. In this case $(D workUnitSize) is
- ignored and the work unit size is set to the buffer size of $(D range).
+ over a range returned by `asyncBuf` or `map`, the copying is elided
+ and the buffers are simply swapped. In this case `workUnitSize` is
+ ignored and the work unit size is set to the buffer size of `range`.
A memory barrier is guaranteed to be executed on exit from the loop,
so that results produced by all threads are visible in the calling thread.
@@ -1585,10 +1589,10 @@ public:
$(B Exception Handling):
When at least one exception is thrown from inside a parallel foreach loop,
- the submission of additional $(D Task) objects is terminated as soon as
+ the submission of additional `Task` objects is terminated as soon as
possible, in a non-deterministic manner. All executing or
enqueued work units are allowed to complete. Then, all exceptions that
- were thrown by any work unit are chained using $(D Throwable.next) and
+ were thrown by any work unit are chained using `Throwable.next` and
rethrown. The order of the exception chaining is non-deterministic.
*/
ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
@@ -1623,9 +1627,9 @@ public:
{
/**
Eager parallel map. The eagerness of this function means it has less
- overhead than the lazily evaluated $(D TaskPool.map) and should be
+ overhead than the lazily evaluated `TaskPool.map` and should be
preferred where the memory requirements of eagerness are acceptable.
- $(D functions) are the functions to be evaluated, passed as template
+ `functions` are the functions to be evaluated, passed as template
alias parameters in a style similar to
$(REF map, std,algorithm,iteration).
The first argument must be a random access range. For performance
@@ -1633,7 +1637,7 @@ public:
initialized. Elements will be overwritten without calling a destructor
nor doing an assignment. As such, the range must not contain meaningful
data$(DDOC_COMMENT not a section): either un-initialized objects, or
- objects in their $(D .init) state.
+ objects in their `.init` state.
---
auto numbers = iota(100_000_000.0);
@@ -1648,7 +1652,7 @@ public:
---
Immediately after the range argument, an optional work unit size argument
- may be provided. Work units as used by $(D amap) are identical to those
+ may be provided. Work units as used by `amap` are identical to those
defined for parallel foreach. If no work unit size is provided, the
default work unit size is used.
@@ -1693,21 +1697,21 @@ public:
To parallelize the copying of a range with expensive to evaluate elements
to an array, pass an identity function (a function that just returns
- whatever argument is provided to it) to $(D amap).
+ whatever argument is provided to it) to `amap`.
$(B Exception Handling):
When at least one exception is thrown from inside the map functions,
- the submission of additional $(D Task) objects is terminated as soon as
+ the submission of additional `Task` objects is terminated as soon as
possible, in a non-deterministic manner. All currently executing or
enqueued work units are allowed to complete. Then, all exceptions that
- were thrown from any work unit are chained using $(D Throwable.next) and
+ were thrown from any work unit are chained using `Throwable.next` and
rethrown. The order of the exception chaining is non-deterministic.
*/
auto amap(Args...)(Args args)
if (isRandomAccessRange!(Args[0]))
{
- import std.conv : emplaceRef;
+ import core.internal.lifetime : emplaceRef;
alias fun = adjoin!(staticMap!(unaryFun, functions));
@@ -1829,8 +1833,8 @@ public:
{
/**
A semi-lazy parallel map that can be used for pipelining. The map
- functions are evaluated for the first $(D bufSize) elements and stored in a
- buffer and made available to $(D popFront). Meanwhile, in the
+ functions are evaluated for the first `bufSize` elements and stored in a
+ buffer and made available to `popFront`. Meanwhile, in the
background a second buffer of the same size is filled. When the first
buffer is exhausted, it is swapped with the second buffer and filled while
the values from what was originally the second buffer are read. This
@@ -1838,36 +1842,37 @@ public:
the need for atomic operations or synchronization for each write, and
enables the mapping function to be evaluated efficiently in parallel.
- $(D map) has more overhead than the simpler procedure used by $(D amap)
+ `map` has more overhead than the simpler procedure used by `amap`
but avoids the need to keep all results in memory simultaneously and works
with non-random access ranges.
Params:
- source = The input range to be mapped. If $(D source) is not random
- access it will be lazily buffered to an array of size $(D bufSize) before
+ source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives)
+ to be mapped. If `source` is not random
+ access it will be lazily buffered to an array of size `bufSize` before
the map function is evaluated. (For an exception to this rule, see Notes.)
bufSize = The size of the buffer to store the evaluated elements.
workUnitSize = The number of elements to evaluate in a single
- $(D Task). Must be less than or equal to $(D bufSize), and
- should be a fraction of $(D bufSize) such that all worker threads can be
+ `Task`. Must be less than or equal to `bufSize`, and
+ should be a fraction of `bufSize` such that all worker threads can be
used. If the default of size_t.max is used, workUnitSize will be set to
the pool-wide default.
Returns: An input range representing the results of the map. This range
- has a length iff $(D source) has a length.
+ has a length iff `source` has a length.
Notes:
- If a range returned by $(D map) or $(D asyncBuf) is used as an input to
- $(D map), then as an optimization the copying from the output buffer
+ If a range returned by `map` or `asyncBuf` is used as an input to
+ `map`, then as an optimization the copying from the output buffer
of the first range to the input buffer of the second range is elided, even
- though the ranges returned by $(D map) and $(D asyncBuf) are non-random
- access ranges. This means that the $(D bufSize) parameter passed to the
- current call to $(D map) will be ignored and the size of the buffer
- will be the buffer size of $(D source).
+ though the ranges returned by `map` and `asyncBuf` are non-random
+ access ranges. This means that the `bufSize` parameter passed to the
+ current call to `map` will be ignored and the size of the buffer
+ will be the buffer size of `source`.
Example:
---
@@ -1890,11 +1895,11 @@ public:
$(B Exception Handling):
- Any exceptions thrown while iterating over $(D source)
- or computing the map function are re-thrown on a call to $(D popFront) or,
+ Any exceptions thrown while iterating over `source`
+ or computing the map function are re-thrown on a call to `popFront` or,
if thrown during construction, are simply allowed to propagate to the
caller. In the case of exceptions thrown while computing the map function,
- the exceptions are chained as in $(D TaskPool.amap).
+ the exceptions are chained as in `TaskPool.amap`.
*/
auto
map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
@@ -2092,7 +2097,8 @@ public:
{
assert(nextBufTask.prev is null);
assert(nextBufTask.next is null);
- } body
+ }
+ do
{
// Hack to reuse the task object.
@@ -2171,13 +2177,13 @@ public:
}
/**
- Given a $(D source) range that is expensive to iterate over, returns an
- input range that asynchronously buffers the contents of
- $(D source) into a buffer of $(D bufSize) elements in a worker thread,
+ Given a `source` range that is expensive to iterate over, returns an
+ $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that
+ asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread,
while making previously buffered elements from a second buffer, also of size
- $(D bufSize), available via the range interface of the returned
- object. The returned range has a length iff $(D hasLength!S).
- $(D asyncBuf) is useful, for example, when performing expensive operations
+ `bufSize`, available via the range interface of the returned
+ object. The returned range has a length iff `hasLength!S`.
+ `asyncBuf` is useful, for example, when performing expensive operations
on the elements of ranges that represent data on a disk or network.
Example:
@@ -2209,8 +2215,8 @@ public:
$(B Exception Handling):
- Any exceptions thrown while iterating over $(D source) are re-thrown on a
- call to $(D popFront) or, if thrown during construction, simply
+ Any exceptions thrown while iterating over `source` are re-thrown on a
+ call to `popFront` or, if thrown during construction, simply
allowed to propagate to the caller.
*/
auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S)
@@ -2278,7 +2284,8 @@ public:
{
assert(nextBufTask.prev is null);
assert(nextBufTask.next is null);
- } body
+ }
+ do
{
// Hack to reuse the task object.
@@ -2351,30 +2358,30 @@ public:
}
/**
- Given a callable object $(D next) that writes to a user-provided buffer and
- a second callable object $(D empty) that determines whether more data is
- available to write via $(D next), returns an input range that
- asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers
+ Given a callable object `next` that writes to a user-provided buffer and
+ a second callable object `empty` that determines whether more data is
+ available to write via `next`, returns an input range that
+ asynchronously calls `next` with a set of size `nBuffers` of buffers
and makes the results available in the order they were obtained via the
input range interface of the returned object. Similarly to the
- input range overload of $(D asyncBuf), the first half of the buffers
+ input range overload of `asyncBuf`, the first half of the buffers
are made available via the range interface while the second half are
filled and vice-versa.
Params:
next = A callable object that takes a single argument that must be an array
- with mutable elements. When called, $(D next) writes data to
+ with mutable elements. When called, `next` writes data to
the array provided by the caller.
empty = A callable object that takes no arguments and returns a type
- implicitly convertible to $(D bool). This is used to signify
- that no more data is available to be obtained by calling $(D next).
+ implicitly convertible to `bool`. This is used to signify
+ that no more data is available to be obtained by calling `next`.
- initialBufSize = The initial size of each buffer. If $(D next) takes its
+ initialBufSize = The initial size of each buffer. If `next` takes its
array by reference, it may resize the buffers.
- nBuffers = The number of buffers to cycle through when calling $(D next).
+ nBuffers = The number of buffers to cycle through when calling `next`.
Example:
---
@@ -2403,8 +2410,8 @@ public:
$(B Exception Handling):
- Any exceptions thrown while iterating over $(D range) are re-thrown on a
- call to $(D popFront).
+ Any exceptions thrown while iterating over `range` are re-thrown on a
+ call to `popFront`.
Warning:
@@ -2428,23 +2435,26 @@ public:
{
/**
Parallel reduce on a random access range. Except as otherwise noted,
- usage is similar to $(REF _reduce, std,algorithm,iteration). This
- function works by splitting the range to be reduced into work units,
- which are slices to be reduced in parallel. Once the results from all
- work units are computed, a final serial reduction is performed on these
- results to compute the final answer. Therefore, care must be taken to
- choose the seed value appropriately.
-
- Because the reduction is being performed in parallel, $(D functions)
+ usage is similar to $(REF _reduce, std,algorithm,iteration). There is
+ also $(LREF fold) which does the same thing with a different parameter
+ order.
+
+ This function works by splitting the range to be reduced into work
+ units, which are slices to be reduced in parallel. Once the results
+ from all work units are computed, a final serial reduction is performed
+ on these results to compute the final answer. Therefore, care must be
+ taken to choose the seed value appropriately.
+
+ Because the reduction is being performed in parallel, `functions`
must be associative. For notational simplicity, let # be an
- infix operator representing $(D functions). Then, (a # b) # c must equal
+ infix operator representing `functions`. Then, (a # b) # c must equal
a # (b # c). Floating point addition is not associative
even though addition in exact arithmetic is. Summing floating
point numbers using this function may give different results than summing
serially. However, for many practical purposes floating point addition
can be treated as associative.
- Note that, since $(D functions) are assumed to be associative,
+ Note that, since `functions` are assumed to be associative,
additional optimizations are made to the serial portion of the reduction
algorithm. These take advantage of the instruction level parallelism of
modern CPUs, in addition to the thread-level parallelism that the rest
@@ -2485,7 +2495,7 @@ public:
An explicit work unit size may be specified as the last argument.
Specifying too small a work unit size will effectively serialize the
reduction, as the final reduction of the result of each work unit will
- dominate computation time. If $(D TaskPool.size) for this instance
+ dominate computation time. If `TaskPool.size` for this instance
is zero, this parameter is ignored and one work unit is used.
---
// Use a work unit size of 100.
@@ -2496,7 +2506,7 @@ public:
---
Parallel reduce supports multiple functions, like
- $(D std.algorithm.reduce).
+ `std.algorithm.reduce`.
---
// Find both the min and max of nums.
auto minMax = taskPool.reduce!(min, max)(nums);
@@ -2507,13 +2517,19 @@ public:
$(B Exception Handling):
After this function is finished executing, any exceptions thrown
- are chained together via $(D Throwable.next) and rethrown. The chaining
+ are chained together via `Throwable.next` and rethrown. The chaining
order is non-deterministic.
+
+ See_Also:
+
+ $(LREF fold) is functionally equivalent to $(LREF _reduce) except the
+ range parameter comes first and there is no need to use
+ $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds.
*/
auto reduce(Args...)(Args args)
{
import core.exception : OutOfMemoryError;
- import std.conv : emplaceRef;
+ import core.internal.lifetime : emplaceRef;
import std.exception : enforce;
alias fun = reduceAdjoin!functions;
@@ -2685,8 +2701,8 @@ public:
alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t);
RTask[] tasks;
- // Can't use alloca() due to Bug 3753. Use a fixed buffer
- // backed by malloc().
+ // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753
+ // Use a fixed buffer backed by malloc().
enum maxStack = 2_048;
byte[maxStack] buf = void;
immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof;
@@ -2787,7 +2803,7 @@ public:
// done or in progress. Force all of them.
E result = seed;
- Throwable firstException, lastException;
+ Throwable firstException;
foreach (ref task; tasks)
{
@@ -2797,7 +2813,10 @@ public:
}
catch (Throwable e)
{
- addToChain(e, firstException, lastException);
+ /* Chain e to front because order doesn't matter and because
+ * e is not likely to be a chain itself (so fewer traversals)
+ */
+ firstException = Throwable.chainTogether(e, firstException);
continue;
}
@@ -2810,10 +2829,132 @@ public:
}
}
+ ///
+ template fold(functions...)
+ {
+ /** Implements the homonym function (also known as `accumulate`, `compress`,
+ `inject`, or `foldl`) present in various programming languages of
+ functional flavor.
+
+ `fold` is functionally equivalent to $(LREF reduce) except the range
+ parameter comes first and there is no need to use $(REF_ALTTEXT
+ `tuple`,tuple,std,typecons) for multiple seeds.
+
+ There may be one or more callable entities (`functions` argument) to
+ apply.
+
+ Params:
+ args = Just the range to _fold over; or the range and one seed
+ per function; or the range, one seed per function, and
+ the work unit size
+
+ Returns:
+ The accumulated result as a single value for single function and
+ as a tuple of values for multiple functions
+
+ See_Also:
+ Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce).
+
+ Example:
+ ---
+ static int adder(int a, int b)
+ {
+ return a + b;
+ }
+ static int multiplier(int a, int b)
+ {
+ return a * b;
+ }
+
+ // Just the range
+ auto x = taskPool.fold!adder([1, 2, 3, 4]);
+ assert(x == 10);
+
+ // The range and the seeds (0 and 1 below; also note multiple
+ // functions in this example)
+ auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1);
+ assert(y[0] == 10);
+ assert(y[1] == 24);
+
+ // The range, the seed (0), and the work unit size (20)
+ auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20);
+ assert(z == 10);
+ ---
+ */
+ auto fold(Args...)(Args args)
+ {
+ static assert(isInputRange!(Args[0]), "First argument must be an InputRange");
+
+ alias range = args[0];
+
+ static if (Args.length == 1)
+ {
+ // Just the range
+ return reduce!functions(range);
+ }
+ else static if (Args.length == 1 + functions.length ||
+ Args.length == 1 + functions.length + 1)
+ {
+ static if (functions.length == 1)
+ {
+ alias seeds = args[1];
+ }
+ else
+ {
+ auto seeds()
+ {
+ import std.typecons : tuple;
+ return tuple(args[1 .. functions.length+1]);
+ }
+ }
+
+ static if (Args.length == 1 + functions.length)
+ {
+ // The range and the seeds
+ return reduce!functions(seeds, range);
+ }
+ else static if (Args.length == 1 + functions.length + 1)
+ {
+ // The range, the seeds, and the work unit size
+ static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type");
+ return reduce!functions(seeds, range, args[$-1]);
+ }
+ }
+ else
+ {
+ import std.conv : text;
+ static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, "
+ ~ functions.length.text ~ " optional seed(s), and an optional work unit size.");
+ }
+ }
+ }
+
+ // This test is not included in the documentation because even though these
+ // examples are for the inner fold() template, with their current location,
+ // they would appear under the outer one. (We can't move this inside the
+ // outer fold() template because then dmd runs out of memory possibly due to
+ // recursive template instantiation, which is surprisingly not caught.)
+ @system unittest
+ {
+ // Just the range
+ auto x = taskPool.fold!"a + b"([1, 2, 3, 4]);
+ assert(x == 10);
+
+ // The range and the seeds (0 and 1 below; also note multiple
+ // functions in this example)
+ auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1);
+ assert(y[0] == 10);
+ assert(y[1] == 24);
+
+ // The range, the seed (0), and the work unit size (20)
+ auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20);
+ assert(z == 10);
+ }
+
/**
- Gets the index of the current thread relative to this $(D TaskPool). Any
+ Gets the index of the current thread relative to this `TaskPool`. Any
thread not in this pool will receive an index of 0. The worker threads in
- this pool receive unique indices of 1 through $(D this.size).
+ this pool receive unique indices of 1 through `this.size`.
This function is useful for maintaining worker-local resources.
@@ -2860,22 +3001,22 @@ public:
/**
Struct for creating worker-local storage. Worker-local storage is
thread-local storage that exists only for worker threads in a given
- $(D TaskPool) plus a single thread outside the pool. It is allocated on the
+ `TaskPool` plus a single thread outside the pool. It is allocated on the
garbage collected heap in a way that avoids _false sharing, and doesn't
necessarily have global scope within any thread. It can be accessed from
- any worker thread in the $(D TaskPool) that created it, and one thread
- outside this $(D TaskPool). All threads outside the pool that created a
+ any worker thread in the `TaskPool` that created it, and one thread
+ outside this `TaskPool`. All threads outside the pool that created a
given instance of worker-local storage share a single slot.
Since the underlying data for this struct is heap-allocated, this struct
has reference semantics when passed between functions.
- The main uses cases for $(D WorkerLocalStorageStorage) are:
+ The main uses cases for `WorkerLocalStorageStorage` are:
1. Performing parallel reductions with an imperative, as opposed to
- functional, programming style. In this case, it's useful to treat
- $(D WorkerLocalStorageStorage) as local to each thread for only the parallel
- portion of an algorithm.
+ functional, programming style. In this case, it's useful to treat
+ `WorkerLocalStorageStorage` as local to each thread for only the parallel
+ portion of an algorithm.
2. Recycling temporary buffers across iterations of a parallel foreach loop.
@@ -2971,13 +3112,13 @@ public:
public:
/**
Get the current thread's instance. Returns by ref.
- Note that calling $(D get) from any thread
- outside the $(D TaskPool) that created this instance will return the
+ Note that calling `get` from any thread
+ outside the `TaskPool` that created this instance will return the
same reference, so an instance of worker-local storage should only be
accessed from one thread outside the pool that created it. If this
rule is violated, undefined behavior will result.
- If assertions are enabled and $(D toRange) has been called, then this
+ If assertions are enabled and `toRange` has been called, then this
WorkerLocalStorage instance is no longer worker-local and an assertion
failure will result when calling this method. This is not checked
when assertions are disabled for performance reasons.
@@ -3012,7 +3153,7 @@ public:
of your algorithm.
Calling this function sets a flag indicating that this struct is no
- longer worker-local, and attempting to use the $(D get) method again
+ longer worker-local, and attempting to use the `get` method again
will result in an assertion failure if assertions are enabled.
*/
WorkerLocalStorageRange!T toRange() @property
@@ -3042,9 +3183,9 @@ public:
Do not use this struct in the parallel portion of your algorithm.
The proper way to instantiate this object is to call
- $(D WorkerLocalStorage.toRange). Once instantiated, this object behaves
+ `WorkerLocalStorage.toRange`. Once instantiated, this object behaves
as a finite random-access range with assignable, lvalue elements and
- a length equal to the number of worker threads in the $(D TaskPool) that
+ a length equal to the number of worker threads in the `TaskPool` that
created it plus 1.
*/
static struct WorkerLocalStorageRange(T)
@@ -3128,9 +3269,9 @@ public:
/**
Creates an instance of worker-local storage, initialized with a given
- value. The value is $(D lazy) so that you can, for example, easily
+ value. The value is `lazy` so that you can, for example, easily
create one instance of a class for each worker. For usage example,
- see the $(D WorkerLocalStorage) struct.
+ see the `WorkerLocalStorage` struct.
*/
WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init)
{
@@ -3151,12 +3292,12 @@ public:
/**
Signals to all worker threads to terminate as soon as they are finished
- with their current $(D Task), or immediately if they are not executing a
- $(D Task). $(D Task)s that were in queue will not be executed unless
- a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce)
+ with their current `Task`, or immediately if they are not executing a
+ `Task`. `Task`s that were in queue will not be executed unless
+ a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce`
causes them to be executed.
- Use only if you have waited on every $(D Task) and therefore know the
+ Use only if you have waited on every `Task` and therefore know the
queue is empty, or if you speculatively executed some tasks and no longer
need the results.
*/
@@ -3173,13 +3314,13 @@ public:
If blocking argument is true, wait for all worker threads to terminate
before returning. This option might be used in applications where
- task results are never consumed-- e.g. when $(D TaskPool) is employed as a
+ task results are never consumed-- e.g. when `TaskPool` is employed as a
rudimentary scheduler for tasks which communicate by means other than
return values.
Warning: Calling this function with $(D blocking = true) from a worker
- thread that is a member of the same $(D TaskPool) that
- $(D finish) is being called on will result in a deadlock.
+ thread that is a member of the same `TaskPool` that
+ `finish` is being called on will result in a deadlock.
*/
void finish(bool blocking = false) @trusted
{
@@ -3218,7 +3359,7 @@ public:
}
/**
- Put a $(D Task) object on the back of the task queue. The $(D Task)
+ Put a `Task` object on the back of the task queue. The `Task`
object may be passed by pointer or reference.
Example:
@@ -3234,20 +3375,20 @@ public:
Notes:
- @trusted overloads of this function are called for $(D Task)s if
- $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s
- return type or the function the $(D Task) executes is $(D pure).
- $(D Task) objects that meet all other requirements specified in the
- $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created
- and executed from $(D @safe) code via $(D Task.executeInNewThread) but
- not via $(D TaskPool).
+ @trusted overloads of this function are called for `Task`s if
+ $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s
+ return type or the function the `Task` executes is `pure`.
+ `Task` objects that meet all other requirements specified in the
+ `@trusted` overloads of `task` and `scopedTask` may be created
+ and executed from `@safe` code via `Task.executeInNewThread` but
+ not via `TaskPool`.
While this function takes the address of variables that may
be on the stack, some overloads are marked as @trusted.
- $(D Task) includes a destructor that waits for the task to complete
+ `Task` includes a destructor that waits for the task to complete
before destroying the stack frame it is allocated on. Therefore,
it is impossible for the stack frame to be destroyed before the task is
- complete and no longer referenced by a $(D TaskPool).
+ complete and no longer referenced by a `TaskPool`.
*/
void put(alias fun, Args...)(ref Task!(fun, Args) task)
if (!isSafeReturn!(typeof(task)))
@@ -3286,11 +3427,11 @@ public:
have terminated. A non-daemon thread will prevent a program from
terminating as long as it has not terminated.
- If any $(D TaskPool) with non-daemon threads is active, either $(D stop)
- or $(D finish) must be called on it before the program can terminate.
+ If any `TaskPool` with non-daemon threads is active, either `stop`
+ or `finish` must be called on it before the program can terminate.
- The worker treads in the $(D TaskPool) instance returned by the
- $(D taskPool) property are daemon by default. The worker threads of
+ The worker treads in the `TaskPool` instance returned by the
+ `taskPool` property are daemon by default. The worker threads of
manually instantiated task pools are non-daemon by default.
Note: For a size zero pool, the getter arbitrarily returns true and the
@@ -3316,12 +3457,12 @@ public:
/**
These functions allow getting and setting the OS scheduling priority of
- the worker threads in this $(D TaskPool). They forward to
- $(D core.thread.Thread.priority), so a given priority value here means the
- same thing as an identical priority value in $(D core.thread).
+ the worker threads in this `TaskPool`. They forward to
+ `core.thread.Thread.priority`, so a given priority value here means the
+ same thing as an identical priority value in `core.thread`.
Note: For a size zero pool, the getter arbitrarily returns
- $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect.
+ `core.thread.Thread.PRIORITY_MIN` and the setter has no effect.
*/
int priority() @property @trusted
{
@@ -3342,11 +3483,33 @@ public:
}
}
+@system unittest
+{
+ import std.algorithm.iteration : sum;
+ import std.range : iota;
+ import std.typecons : tuple;
+
+ enum N = 100;
+ auto r = iota(1, N + 1);
+ const expected = r.sum();
+
+ // Just the range
+ assert(taskPool.fold!"a + b"(r) == expected);
+
+ // Range and seeds
+ assert(taskPool.fold!"a + b"(r, 0) == expected);
+ assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected));
+
+ // Range, seeds, and work unit size
+ assert(taskPool.fold!"a + b"(r, 0, 42) == expected);
+ assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected));
+}
+
/**
-Returns a lazily initialized global instantiation of $(D TaskPool).
+Returns a lazily initialized global instantiation of `TaskPool`.
This function can safely be called concurrently from multiple non-worker
threads. The worker threads in this pool are daemon threads, meaning that it
-is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before
+is not necessary to call `TaskPool.stop` or `TaskPool.finish` before
terminating the main thread.
*/
@property TaskPool taskPool() @trusted
@@ -3363,10 +3526,10 @@ terminating the main thread.
private shared uint _defaultPoolThreads = uint.max;
/**
-These properties get and set the number of worker threads in the $(D TaskPool)
-instance returned by $(D taskPool). The default value is $(D totalCPUs) - 1.
-Calling the setter after the first call to $(D taskPool) does not changes
-number of worker threads in the instance returned by $(D taskPool).
+These properties get and set the number of worker threads in the `TaskPool`
+instance returned by `taskPool`. The default value is `totalCPUs` - 1.
+Calling the setter after the first call to `taskPool` does not changes
+number of worker threads in the instance returned by `taskPool`.
*/
@property uint defaultPoolThreads() @trusted
{
@@ -3381,7 +3544,7 @@ number of worker threads in the instance returned by $(D taskPool).
}
/**
-Convenience functions that forwards to $(D taskPool.parallel). The
+Convenience functions that forwards to `taskPool.parallel`. The
purpose of these is to make parallel foreach less verbose and more
readable.
@@ -3410,6 +3573,23 @@ ParallelForeach!R parallel(R)(R range, size_t workUnitSize)
return taskPool.parallel(range, workUnitSize);
}
+// `each` should be usable with parallel
+// https://issues.dlang.org/show_bug.cgi?id=17019
+@system unittest
+{
+ import std.algorithm.iteration : each, sum;
+ import std.range : iota;
+
+ // check behavior with parallel
+ auto arr = new int[10];
+ parallel(arr).each!((ref e) => e += 1);
+ assert(arr.sum == 10);
+
+ auto arrIndex = new int[10];
+ parallel(arrIndex).each!((i, ref e) => e += i);
+ assert(arrIndex.sum == 10.iota.sum);
+}
+
// Thrown when a parallel foreach loop is broken from.
class ParallelForeachError : Error
{
@@ -3441,9 +3621,10 @@ private void submitAndExecute(
// The logical thing to do would be to just use alloca() here, but that
// causes problems on Windows for reasons that I don't understand
// (tentatively a compiler bug) and definitely doesn't work on Posix due
- // to Bug 3753. Therefore, allocate a fixed buffer and fall back to
- // malloc() if someone's using a ridiculous amount of threads. Also,
- // the using a byte array instead of a PTask array as the fixed buffer
+ // to https://issues.dlang.org/show_bug.cgi?id=3753.
+ // Therefore, allocate a fixed buffer and fall back to `malloc()` if
+ // someone's using a ridiculous amount of threads.
+ // Also, the using a byte array instead of a PTask array as the fixed buffer
// is to prevent d'tors from being called on uninitialized excess PTask
// instances.
enum nBuf = 64;
@@ -3519,7 +3700,7 @@ private void submitAndExecute(
}
}
- Throwable firstException, lastException;
+ Throwable firstException;
foreach (i, ref task; tasks)
{
@@ -3529,7 +3710,10 @@ private void submitAndExecute(
}
catch (Throwable e)
{
- addToChain(e, firstException, lastException);
+ /* Chain e to front because order doesn't matter and because
+ * e is not likely to be a chain itself (so fewer traversals)
+ */
+ firstException = Throwable.chainTogether(e, firstException);
continue;
}
}
@@ -3829,38 +4013,6 @@ enum string parallelApplyMixinInputRange = q{
return 0;
};
-// Calls e.next until the end of the chain is found.
-private Throwable findLastException(Throwable e) pure nothrow
-{
- if (e is null) return null;
-
- while (e.next)
- {
- e = e.next;
- }
-
- return e;
-}
-
-// Adds e to the exception chain.
-private void addToChain(
- Throwable e,
- ref Throwable firstException,
- ref Throwable lastException
-) pure nothrow
-{
- if (firstException)
- {
- assert(lastException); // nocoverage
- lastException.next = e; // nocoverage
- lastException = findLastException(e); // nocoverage
- }
- else
- {
- firstException = e;
- lastException = findLastException(e);
- }
-}
private struct ParallelForeach(R)
{
@@ -3947,7 +4099,7 @@ private struct RoundRobinBuffer(C1, C2)
{
assert(!empty);
}
- body
+ do
{
scope(success) primed = true;
nextDel(bufs[index]);
@@ -3959,7 +4111,7 @@ private struct RoundRobinBuffer(C1, C2)
{
assert(!empty);
}
- body
+ do
{
if (!primed) prime();
return bufs[index];
@@ -3983,12 +4135,10 @@ private struct RoundRobinBuffer(C1, C2)
}
}
-version (unittest)
+version (StdUnittest)
{
// This was the only way I could get nested maps to work.
- __gshared TaskPool poolInstance;
-
- import std.stdio;
+ private __gshared TaskPool poolInstance;
}
// These test basic functionality but don't stress test for threading bugs.
@@ -4000,9 +4150,12 @@ version (unittest)
import std.array : split;
import std.conv : text;
import std.exception : assertThrown;
- import std.math : approxEqual, sqrt, log, abs;
+ import std.math.operations : isClose;
+ import std.math.algebraic : sqrt, abs;
+ import std.math.exponential : log;
import std.range : indexed, iota, join;
import std.typecons : Tuple, tuple;
+ import std.stdio;
poolInstance = new TaskPool(2);
scope(exit) poolInstance.stop();
@@ -4132,7 +4285,7 @@ version (unittest)
foreach (i, elem; logs)
{
- assert(approxEqual(elem, cast(double) log(i + 1)));
+ assert(isClose(elem, cast(double) log(i + 1)));
}
assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]);
@@ -4206,7 +4359,7 @@ version (unittest)
pool2.finish(true); // blocking
assert(tSlow2.done);
- // Test fix for Bug 8582 by making pool size zero.
+ // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero.
auto pool3 = new TaskPool(0);
auto tSlow3 = task!slowFun();
pool3.put(tSlow3);
@@ -4230,25 +4383,25 @@ version (unittest)
assert(equal(nums, iota(1000)));
assert(equal(
- poolInstance.map!"a * a"(iota(30_000_001), 10_000),
- map!"a * a"(iota(30_000_001))
+ poolInstance.map!"a * a"(iota(3_000_001), 10_000),
+ map!"a * a"(iota(3_000_001))
));
// The filter is to kill random access and test the non-random access
// branch.
assert(equal(
poolInstance.map!"a * a"(
- filter!"a == a"(iota(30_000_001)
+ filter!"a == a"(iota(3_000_001)
), 10_000, 1000),
- map!"a * a"(iota(30_000_001))
+ map!"a * a"(iota(3_000_001))
));
assert(
reduce!"a + b"(0UL,
- poolInstance.map!"a * a"(iota(3_000_001), 10_000)
+ poolInstance.map!"a * a"(iota(300_001), 10_000)
) ==
reduce!"a + b"(0UL,
- map!"a * a"(iota(3_000_001))
+ map!"a * a"(iota(300_001))
)
);
@@ -4311,7 +4464,7 @@ version (unittest)
int ii;
foreach ( elem; (lmchain))
{
- if (!approxEqual(elem, ii))
+ if (!isClose(elem, ii))
{
stderr.writeln(ii, '\t', elem);
}
@@ -4405,8 +4558,12 @@ version (unittest)
// tons of stuff and should not be run every time make unittest is run.
version (parallelismStressTest)
{
- @safe unittest
+ @system unittest
{
+ import std.stdio : stderr, writeln, readln;
+ import std.range : iota;
+ import std.algorithm.iteration : filter, reduce;
+
size_t attempt;
for (; attempt < 10; attempt++)
foreach (poolSize; [0, 4])
@@ -4488,8 +4645,16 @@ version (parallelismStressTest)
// These unittests are intended more for actual testing and not so much
// as examples.
- @safe unittest
- {
+ @system unittest
+ {
+ import std.stdio : stderr;
+ import std.range : iota;
+ import std.algorithm.iteration : filter, reduce;
+ import std.math.algebraic : sqrt;
+ import std.math.operations : isClose;
+ import std.math.traits : isNaN;
+ import std.conv : text;
+
foreach (attempt; 0 .. 10)
foreach (poolSize; [0, 4])
{
@@ -4559,7 +4724,7 @@ version (parallelismStressTest)
foreach (j, elem; row)
{
real shouldBe = sqrt( cast(real) i * j);
- assert(approxEqual(shouldBe, elem));
+ assert(isClose(shouldBe, elem));
sqrtMatrix[i][j] = shouldBe;
}
}
@@ -4586,10 +4751,11 @@ version (parallelismStressTest)
)
);
- assert(approxEqual(sumSqrt, 4.437e8));
+ assert(isClose(sumSqrt, 4.437e8, 1e-2));
stderr.writeln("Done sum of square roots.");
// Test whether tasks work with function pointers.
+ /+ // This part is buggy and needs to be fixed...
auto nanTask = task(&isNaN, 1.0L);
poolInstance.put(nanTask);
assert(nanTask.spinForce == false);
@@ -4616,6 +4782,7 @@ version (parallelismStressTest)
uselessTask.workForce();
}
}
+ +/
// Test the case of non-random access + ref returns.
int[] nums = [1,2,3,4,5];
@@ -4650,9 +4817,9 @@ version (parallelismStressTest)
}
}
-version (unittest)
+@system unittest
{
- struct __S_12733
+ static struct __S_12733
{
invariant() { assert(checksum == 1_234_567_890); }
this(ulong u){n = u;}
@@ -4662,10 +4829,6 @@ version (unittest)
}
static auto __genPair_12733(ulong n) { return __S_12733(n); }
-}
-
-@system unittest
-{
immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ];
auto result = taskPool.amap!__genPair_12733(data);