diff options
Diffstat (limited to 'libphobos/src/std/parallelism.d')
-rw-r--r-- | libphobos/src/std/parallelism.d | 4636 |
1 files changed, 4636 insertions, 0 deletions
diff --git a/libphobos/src/std/parallelism.d b/libphobos/src/std/parallelism.d new file mode 100644 index 0000000..df07baf --- /dev/null +++ b/libphobos/src/std/parallelism.d @@ -0,0 +1,4636 @@ +/** +$(D std._parallelism) implements high-level primitives for SMP _parallelism. +These include parallel foreach, parallel reduce, parallel eager map, pipelining +and future/promise _parallelism. $(D std._parallelism) is recommended when the +same operation is to be executed in parallel on different data, or when a +function is to be executed in a background thread and its result returned to a +well-defined main thread. For communication between arbitrary threads, see +$(D std.concurrency). + +$(D std._parallelism) is based on the concept of a $(D Task). A $(D Task) is an +object that represents the fundamental unit of work in this library and may be +executed in parallel with any other $(D Task). Using $(D Task) +directly allows programming with a future/promise paradigm. All other +supported _parallelism paradigms (parallel foreach, map, reduce, pipelining) +represent an additional level of abstraction over $(D Task). They +automatically create one or more $(D Task) objects, or closely related types +that are conceptually identical but not part of the public API. + +After creation, a $(D Task) may be executed in a new thread, or submitted +to a $(D TaskPool) for execution. A $(D TaskPool) encapsulates a task queue +and its worker threads. Its purpose is to efficiently map a large +number of $(D Task)s onto a smaller number of threads. A task queue is a +FIFO queue of $(D Task) objects that have been submitted to the +$(D TaskPool) and are awaiting execution. A worker thread is a thread that +is associated with exactly one task queue. It executes the $(D Task) at the +front of its queue when the queue has work available, or sleeps when +no work is available. Each task queue is associated with zero or +more worker threads. If the result of a $(D Task) is needed before execution +by a worker thread has begun, the $(D Task) can be removed from the task queue +and executed immediately in the thread where the result is needed. + +Warning: Unless marked as $(D @trusted) or $(D @safe), artifacts in + this module allow implicit data sharing between threads and cannot + guarantee that client code is free from low level data races. + +Source: $(PHOBOSSRC std/_parallelism.d) +Author: David Simcha +Copyright: Copyright (c) 2009-2011, David Simcha. +License: $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) +*/ +module std.parallelism; + +/// +@system unittest +{ + import std.algorithm.iteration : map; + import std.math : approxEqual; + import std.parallelism : taskPool; + import std.range : iota; + + // Parallel reduce can be combined with + // std.algorithm.iteration.map to interesting effect. + // The following example (thanks to Russel Winder) + // calculates pi by quadrature using + // std.algorithm.map and TaskPool.reduce. + // getTerm is evaluated in parallel as needed by + // TaskPool.reduce. + // + // Timings on an Intel i5-3450 quad core machine + // for n = 1_000_000_000: + // + // TaskPool.reduce: 1.067 s + // std.algorithm.reduce: 4.011 s + + enum n = 1_000_000; + enum delta = 1.0 / n; + + alias getTerm = (int i) + { + immutable x = ( i - 0.5 ) * delta; + return delta / ( 1.0 + x * x ) ; + }; + + immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); + + assert(pi.approxEqual(3.1415926)); +} + +import core.atomic; +import core.memory; +import core.sync.condition; +import core.thread; + +import std.functional; +import std.meta; +import std.range.primitives; +import std.traits; + +version (OSX) +{ + version = useSysctlbyname; +} +else version (FreeBSD) +{ + version = useSysctlbyname; +} +else version (NetBSD) +{ + version = useSysctlbyname; +} + + +version (Windows) +{ + // BUGS: Only works on Windows 2000 and above. + shared static this() + { + import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; + import std.algorithm.comparison : max; + + SYSTEM_INFO si; + GetSystemInfo(&si); + totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors); + } + +} +else version (linux) +{ + shared static this() + { + import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; + totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); + } +} +else version (Solaris) +{ + shared static this() + { + import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; + totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); + } +} +else version (useSysctlbyname) +{ + extern(C) int sysctlbyname( + const char *, void *, size_t *, void *, size_t + ); + + shared static this() + { + version (OSX) + { + auto nameStr = "machdep.cpu.core_count\0".ptr; + } + else version (FreeBSD) + { + auto nameStr = "hw.ncpu\0".ptr; + } + else version (NetBSD) + { + auto nameStr = "hw.ncpu\0".ptr; + } + + uint ans; + size_t len = uint.sizeof; + sysctlbyname(nameStr, &ans, &len, null, 0); + totalCPUs = ans; + } + +} +else +{ + static assert(0, "Don't know how to get N CPUs on this OS."); +} + +immutable size_t cacheLineSize; +shared static this() +{ + import core.cpuid : datacache; + size_t lineSize = 0; + foreach (cachelevel; datacache) + { + if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max) + { + lineSize = cachelevel.lineSize; + } + } + + cacheLineSize = lineSize; +} + + +/* Atomics code. These forward to core.atomic, but are written like this + for two reasons: + + 1. They used to actually contain ASM code and I don' want to have to change + to directly calling core.atomic in a zillion different places. + + 2. core.atomic has some misc. issues that make my use cases difficult + without wrapping it. If I didn't wrap it, casts would be required + basically everywhere. +*/ +private void atomicSetUbyte(T)(ref T stuff, T newVal) +if (__traits(isIntegral, T) && is(T : ubyte)) +{ + //core.atomic.cas(cast(shared) &stuff, stuff, newVal); + atomicStore(*(cast(shared) &stuff), newVal); +} + +private ubyte atomicReadUbyte(T)(ref T val) +if (__traits(isIntegral, T) && is(T : ubyte)) +{ + return atomicLoad(*(cast(shared) &val)); +} + +// This gets rid of the need for a lot of annoying casts in other parts of the +// code, when enums are involved. +private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) +if (__traits(isIntegral, T) && is(T : ubyte)) +{ + return core.atomic.cas(cast(shared) &stuff, testVal, newVal); +} + +/*--------------------- Generic helper functions, etc.------------------------*/ +private template MapType(R, functions...) +{ + static assert(functions.length); + + ElementType!R e = void; + alias MapType = + typeof(adjoin!(staticMap!(unaryFun, functions))(e)); +} + +private template ReduceType(alias fun, R, E) +{ + alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); +} + +private template noUnsharedAliasing(T) +{ + enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; +} + +// This template tests whether a function may be executed in parallel from +// @safe code via Task.executeInNewThread(). There is an additional +// requirement for executing it via a TaskPool. (See isSafeReturn). +private template isSafeTask(F) +{ + enum bool isSafeTask = + (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && + (functionAttributes!F & FunctionAttribute.ref_) == 0 && + (isFunctionPointer!F || !hasUnsharedAliasing!F) && + allSatisfy!(noUnsharedAliasing, Parameters!F); +} + +@safe unittest +{ + alias F1 = void function() @safe; + alias F2 = void function(); + alias F3 = void function(uint, string) @trusted; + alias F4 = void function(uint, char[]); + + static assert( isSafeTask!F1); + static assert(!isSafeTask!F2); + static assert( isSafeTask!F3); + static assert(!isSafeTask!F4); + + alias F5 = uint[] function(uint, string) pure @trusted; + static assert( isSafeTask!F5); +} + +// This function decides whether Tasks that meet all of the other requirements +// for being executed from @safe code can be executed on a TaskPool. +// When executing via TaskPool, it's theoretically possible +// to return a value that is also pointed to by a worker thread's thread local +// storage. When executing from executeInNewThread(), the thread that executed +// the Task is terminated by the time the return value is visible in the calling +// thread, so this is a non-issue. It's also a non-issue for pure functions +// since they can't read global state. +private template isSafeReturn(T) +{ + static if (!hasUnsharedAliasing!(T.ReturnType)) + { + enum isSafeReturn = true; + } + else static if (T.isPure) + { + enum isSafeReturn = true; + } + else + { + enum isSafeReturn = false; + } +} + +private template randAssignable(R) +{ + enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; +} + +private enum TaskStatus : ubyte +{ + notStarted, + inProgress, + done +} + +private template AliasReturn(alias fun, T...) +{ + alias AliasReturn = typeof({ T args; return fun(args); }); +} + +// Should be private, but std.algorithm.reduce is used in the zero-thread case +// and won't work w/ private. +template reduceAdjoin(functions...) +{ + static if (functions.length == 1) + { + alias reduceAdjoin = binaryFun!(functions[0]); + } + else + { + T reduceAdjoin(T, U)(T lhs, U rhs) + { + alias funs = staticMap!(binaryFun, functions); + + foreach (i, Unused; typeof(lhs.expand)) + { + lhs.expand[i] = funs[i](lhs.expand[i], rhs); + } + + return lhs; + } + } +} + +private template reduceFinish(functions...) +{ + static if (functions.length == 1) + { + alias reduceFinish = binaryFun!(functions[0]); + } + else + { + T reduceFinish(T)(T lhs, T rhs) + { + alias funs = staticMap!(binaryFun, functions); + + foreach (i, Unused; typeof(lhs.expand)) + { + lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); + } + + return lhs; + } + } +} + +private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) +{ + enum isRoundRobin = true; +} + +private template isRoundRobin(T) +{ + enum isRoundRobin = false; +} + +@safe unittest +{ + static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); + static assert(!isRoundRobin!(uint)); +} + +// This is the base "class" for all of the other tasks. Using C-style +// polymorphism to allow more direct control over memory allocation, etc. +private struct AbstractTask +{ + AbstractTask* prev; + AbstractTask* next; + + // Pointer to a function that executes this task. + void function(void*) runTask; + + Throwable exception; + ubyte taskStatus = TaskStatus.notStarted; + + bool done() @property + { + if (atomicReadUbyte(taskStatus) == TaskStatus.done) + { + if (exception) + { + throw exception; + } + + return true; + } + + return false; + } + + void job() + { + runTask(&this); + } +} + +/** +$(D Task) represents the fundamental unit of work. A $(D Task) may be +executed in parallel with any other $(D Task). Using this struct directly +allows future/promise _parallelism. In this paradigm, a function (or delegate +or other callable) is executed in a thread other than the one it was called +from. The calling thread does not block while the function is being executed. +A call to $(D workForce), $(D yieldForce), or $(D spinForce) is used to +ensure that the $(D Task) has finished executing and to obtain the return +value, if any. These functions and $(D done) also act as full memory barriers, +meaning that any memory writes made in the thread that executed the $(D Task) +are guaranteed to be visible in the calling thread after one of these functions +returns. + +The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can +be used to create an instance of this struct. See $(D task) for usage examples. + +Function results are returned from $(D yieldForce), $(D spinForce) and +$(D workForce) by ref. If $(D fun) returns by ref, the reference will point +to the returned reference of $(D fun). Otherwise it will point to a +field in this struct. + +Copying of this struct is disabled, since it would provide no useful semantics. +If you want to pass this struct around, you should do so by reference or +pointer. + +Bugs: Changes to $(D ref) and $(D out) arguments are not propagated to the + call site, only to $(D args) in this struct. +*/ +struct Task(alias fun, Args...) +{ + AbstractTask base = {runTask : &impl}; + alias base this; + + private @property AbstractTask* basePtr() + { + return &base; + } + + private static void impl(void* myTask) + { + import std.algorithm.internal : addressOf; + + Task* myCastedTask = cast(typeof(this)*) myTask; + static if (is(ReturnType == void)) + { + fun(myCastedTask._args); + } + else static if (is(typeof(addressOf(fun(myCastedTask._args))))) + { + myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); + } + else + { + myCastedTask.returnVal = fun(myCastedTask._args); + } + } + + private TaskPool pool; + private bool isScoped; // True if created with scopedTask. + + Args _args; + + /** + The arguments the function was called with. Changes to $(D out) and + $(D ref) arguments will be visible here. + */ + static if (__traits(isSame, fun, run)) + { + alias args = _args[1..$]; + } + else + { + alias args = _args; + } + + + // The purpose of this code is to decide whether functions whose + // return values have unshared aliasing can be executed via + // TaskPool from @safe code. See isSafeReturn. + static if (__traits(isSame, fun, run)) + { + static if (isFunctionPointer!(_args[0])) + { + private enum bool isPure = + functionAttributes!(Args[0]) & FunctionAttribute.pure_; + } + else + { + // BUG: Should check this for delegates too, but std.traits + // apparently doesn't allow this. isPure is irrelevant + // for delegates, at least for now since shared delegates + // don't work. + private enum bool isPure = false; + } + + } + else + { + // We already know that we can't execute aliases in @safe code, so + // just put a dummy value here. + private enum bool isPure = false; + } + + + /** + The return type of the function called by this $(D Task). This can be + $(D void). + */ + alias ReturnType = typeof(fun(_args)); + + static if (!is(ReturnType == void)) + { + static if (is(typeof(&fun(_args)))) + { + // Ref return. + ReturnType* returnVal; + + ref ReturnType fixRef(ReturnType* val) + { + return *val; + } + + } + else + { + ReturnType returnVal; + + ref ReturnType fixRef(ref ReturnType val) + { + return val; + } + } + } + + private void enforcePool() + { + import std.exception : enforce; + enforce(this.pool !is null, "Job not submitted yet."); + } + + static if (Args.length > 0) + { + private this(Args args) + { + _args = args; + } + } + + // Work around DMD bug 6588, allow immutable elements. + static if (allSatisfy!(isAssignable, Args)) + { + typeof(this) opAssign(typeof(this) rhs) + { + foreach (i, Type; typeof(this.tupleof)) + { + this.tupleof[i] = rhs.tupleof[i]; + } + return this; + } + } + else + { + @disable typeof(this) opAssign(typeof(this) rhs) + { + assert(0); + } + } + + /** + If the $(D Task) isn't started yet, execute it in the current thread. + If it's done, return its return value, if any. If it's in progress, + busy spin until it's done, then return the return value. If it threw + an exception, rethrow that exception. + + This function should be used when you expect the result of the + $(D Task) to be available on a timescale shorter than that of an OS + context switch. + */ + @property ref ReturnType spinForce() @trusted + { + enforcePool(); + + this.pool.tryDeleteExecute(basePtr); + + while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} + + if (exception) + { + throw exception; + } + + static if (!is(ReturnType == void)) + { + return fixRef(this.returnVal); + } + } + + /** + If the $(D Task) isn't started yet, execute it in the current thread. + If it's done, return its return value, if any. If it's in progress, + wait on a condition variable. If it threw an exception, rethrow that + exception. + + This function should be used for expensive functions, as waiting on a + condition variable introduces latency, but avoids wasted CPU cycles. + */ + @property ref ReturnType yieldForce() @trusted + { + enforcePool(); + this.pool.tryDeleteExecute(basePtr); + + if (done) + { + static if (is(ReturnType == void)) + { + return; + } + else + { + return fixRef(this.returnVal); + } + } + + pool.waiterLock(); + scope(exit) pool.waiterUnlock(); + + while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) + { + pool.waitUntilCompletion(); + } + + if (exception) + { + throw exception; // nocoverage + } + + static if (!is(ReturnType == void)) + { + return fixRef(this.returnVal); + } + } + + /** + If this $(D Task) was not started yet, execute it in the current + thread. If it is finished, return its result. If it is in progress, + execute any other $(D Task) from the $(D TaskPool) instance that + this $(D Task) was submitted to until this one + is finished. If it threw an exception, rethrow that exception. + If no other tasks are available or this $(D Task) was executed using + $(D executeInNewThread), wait on a condition variable. + */ + @property ref ReturnType workForce() @trusted + { + enforcePool(); + this.pool.tryDeleteExecute(basePtr); + + while (true) + { + if (done) // done() implicitly checks for exceptions. + { + static if (is(ReturnType == void)) + { + return; + } + else + { + return fixRef(this.returnVal); + } + } + + AbstractTask* job; + { + // Locking explicitly and calling popNoSync() because + // pop() waits on a condition variable if there are no Tasks + // in the queue. + + pool.queueLock(); + scope(exit) pool.queueUnlock(); + job = pool.popNoSync(); + } + + + if (job !is null) + { + + version (verboseUnittest) + { + stderr.writeln("Doing workForce work."); + } + + pool.doJob(job); + + if (done) + { + static if (is(ReturnType == void)) + { + return; + } + else + { + return fixRef(this.returnVal); + } + } + } + else + { + version (verboseUnittest) + { + stderr.writeln("Yield from workForce."); + } + + return yieldForce; + } + } + } + + /** + Returns $(D true) if the $(D Task) is finished executing. + + Throws: Rethrows any exception thrown during the execution of the + $(D Task). + */ + @property bool done() @trusted + { + // Explicitly forwarded for documentation purposes. + return base.done; + } + + /** + Create a new thread for executing this $(D Task), execute it in the + newly created thread, then terminate the thread. This can be used for + future/promise parallelism. An explicit priority may be given + to the $(D Task). If one is provided, its value is forwarded to + $(D core.thread.Thread.priority). See $(REF task, std,parallelism) for + usage example. + */ + void executeInNewThread() @trusted + { + pool = new TaskPool(basePtr); + } + + /// Ditto + void executeInNewThread(int priority) @trusted + { + pool = new TaskPool(basePtr, priority); + } + + @safe ~this() + { + if (isScoped && pool !is null && taskStatus != TaskStatus.done) + { + yieldForce; + } + } + + // When this is uncommented, it somehow gets called on returning from + // scopedTask even though the struct shouldn't be getting copied. + //@disable this(this) {} +} + +// Calls $(D fpOrDelegate) with $(D args). This is an +// adapter that makes $(D Task) work with delegates, function pointers and +// functors instead of just aliases. +ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) +{ + return fpOrDelegate(args); +} + +/** +Creates a $(D Task) on the GC heap that calls an alias. This may be executed +via $(D Task.executeInNewThread) or by submitting to a +$(REF TaskPool, std,parallelism). A globally accessible instance of +$(D TaskPool) is provided by $(REF taskPool, std,parallelism). + +Returns: A pointer to the $(D Task). + +Example: +--- +// Read two files into memory at the same time. +import std.file; + +void main() +{ + // Create and execute a Task for reading + // foo.txt. + auto file1Task = task!read("foo.txt"); + file1Task.executeInNewThread(); + + // Read bar.txt in parallel. + auto file2Data = read("bar.txt"); + + // Get the results of reading foo.txt. + auto file1Data = file1Task.yieldForce; +} +--- + +--- +// Sorts an array using a parallel quick sort algorithm. +// The first partition is done serially. Both recursion +// branches are then executed in parallel. +// +// Timings for sorting an array of 1,000,000 doubles on +// an Athlon 64 X2 dual core machine: +// +// This implementation: 176 milliseconds. +// Equivalent serial implementation: 280 milliseconds +void parallelSort(T)(T[] data) +{ + // Sort small subarrays serially. + if (data.length < 100) + { + std.algorithm.sort(data); + return; + } + + // Partition the array. + swap(data[$ / 2], data[$ - 1]); + auto pivot = data[$ - 1]; + bool lessThanPivot(T elem) { return elem < pivot; } + + auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); + swap(data[$ - greaterEqual.length - 1], data[$ - 1]); + + auto less = data[0..$ - greaterEqual.length - 1]; + greaterEqual = data[$ - greaterEqual.length..$]; + + // Execute both recursion branches in parallel. + auto recurseTask = task!parallelSort(greaterEqual); + taskPool.put(recurseTask); + parallelSort(less); + recurseTask.yieldForce; +} +--- +*/ +auto task(alias fun, Args...)(Args args) +{ + return new Task!(fun, Args)(args); +} + +/** +Creates a $(D Task) on the GC heap that calls a function pointer, delegate, or +class/struct with overloaded opCall. + +Example: +--- +// Read two files in at the same time again, +// but this time use a function pointer instead +// of an alias to represent std.file.read. +import std.file; + +void main() +{ + // Create and execute a Task for reading + // foo.txt. + auto file1Task = task(&read, "foo.txt"); + file1Task.executeInNewThread(); + + // Read bar.txt in parallel. + auto file2Data = read("bar.txt"); + + // Get the results of reading foo.txt. + auto file1Data = file1Task.yieldForce; +} +--- + +Notes: This function takes a non-scope delegate, meaning it can be + used with closures. If you can't allocate a closure due to objects + on the stack that have scoped destruction, see $(D scopedTask), which + takes a scope delegate. + */ +auto task(F, Args...)(F delegateOrFp, Args args) +if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) +{ + return new Task!(run, F, Args)(delegateOrFp, args); +} + +/** +Version of $(D task) usable from $(D @safe) code. Usage mechanics are +identical to the non-@safe case, but safety introduces some restrictions: + +1. $(D fun) must be @safe or @trusted. + +2. $(D F) must not have any unshared aliasing as defined by + $(REF hasUnsharedAliasing, std,traits). This means it + may not be an unshared delegate or a non-shared class or struct + with overloaded $(D opCall). This also precludes accepting template + alias parameters. + +3. $(D Args) must not have unshared aliasing. + +4. $(D fun) must not return by reference. + +5. The return type must not have unshared aliasing unless $(D fun) is + $(D pure) or the $(D Task) is executed via $(D executeInNewThread) instead + of using a $(D TaskPool). + +*/ +@trusted auto task(F, Args...)(F fun, Args args) +if (is(typeof(fun(args))) && isSafeTask!F) +{ + return new Task!(run, F, Args)(fun, args); +} + +/** +These functions allow the creation of $(D Task) objects on the stack rather +than the GC heap. The lifetime of a $(D Task) created by $(D scopedTask) +cannot exceed the lifetime of the scope it was created in. + +$(D scopedTask) might be preferred over $(D task): + +1. When a $(D Task) that calls a delegate is being created and a closure + cannot be allocated due to objects on the stack that have scoped + destruction. The delegate overload of $(D scopedTask) takes a $(D scope) + delegate. + +2. As a micro-optimization, to avoid the heap allocation associated with + $(D task) or with the creation of a closure. + +Usage is otherwise identical to $(D task). + +Notes: $(D Task) objects created using $(D scopedTask) will automatically +call $(D Task.yieldForce) in their destructor if necessary to ensure +the $(D Task) is complete before the stack frame they reside on is destroyed. +*/ +auto scopedTask(alias fun, Args...)(Args args) +{ + auto ret = Task!(fun, Args)(args); + ret.isScoped = true; + return ret; +} + +/// Ditto +auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) +if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) +{ + auto ret = Task!(run, F, Args)(delegateOrFp, args); + ret.isScoped = true; + return ret; +} + +/// Ditto +@trusted auto scopedTask(F, Args...)(F fun, Args args) +if (is(typeof(fun(args))) && isSafeTask!F) +{ + auto ret = Task!(run, F, Args)(fun, args); + ret.isScoped = true; + return ret; +} + +/** +The total number of CPU cores available on the current machine, as reported by +the operating system. +*/ +immutable uint totalCPUs; + +/* +This class serves two purposes: + +1. It distinguishes std.parallelism threads from other threads so that + the std.parallelism daemon threads can be terminated. + +2. It adds a reference to the pool that the thread is a member of, + which is also necessary to allow the daemon threads to be properly + terminated. +*/ +private final class ParallelismThread : Thread +{ + this(void delegate() dg) + { + super(dg); + } + + TaskPool pool; +} + +// Kill daemon threads. +shared static ~this() +{ + foreach (ref thread; Thread) + { + auto pthread = cast(ParallelismThread) thread; + if (pthread is null) continue; + auto pool = pthread.pool; + if (!pool.isDaemon) continue; + pool.stop(); + pthread.join(); + } +} + +/** +This class encapsulates a task queue and a set of worker threads. Its purpose +is to efficiently map a large number of $(D Task)s onto a smaller number of +threads. A task queue is a FIFO queue of $(D Task) objects that have been +submitted to the $(D TaskPool) and are awaiting execution. A worker thread is a +thread that executes the $(D Task) at the front of the queue when one is +available and sleeps when the queue is empty. + +This class should usually be used via the global instantiation +available via the $(REF taskPool, std,parallelism) property. +Occasionally it is useful to explicitly instantiate a $(D TaskPool): + +1. When you want $(D TaskPool) instances with multiple priorities, for example + a low priority pool and a high priority pool. + +2. When the threads in the global task pool are waiting on a synchronization + primitive (for example a mutex), and you want to parallelize the code that + needs to run before these threads can be resumed. + */ +final class TaskPool +{ +private: + + // A pool can either be a regular pool or a single-task pool. A + // single-task pool is a dummy pool that's fired up for + // Task.executeInNewThread(). + bool isSingleTask; + + ParallelismThread[] pool; + Thread singleTaskThread; + + AbstractTask* head; + AbstractTask* tail; + PoolState status = PoolState.running; + Condition workerCondition; + Condition waiterCondition; + Mutex queueMutex; + Mutex waiterMutex; // For waiterCondition + + // The instanceStartIndex of the next instance that will be created. + __gshared static size_t nextInstanceIndex = 1; + + // The index of the current thread. + static size_t threadIndex; + + // The index of the first thread in this instance. + immutable size_t instanceStartIndex; + + // The index that the next thread to be initialized in this pool will have. + size_t nextThreadIndex; + + enum PoolState : ubyte + { + running, + finishing, + stopNow + } + + void doJob(AbstractTask* job) + { + assert(job.taskStatus == TaskStatus.inProgress); + assert(job.next is null); + assert(job.prev is null); + + scope(exit) + { + if (!isSingleTask) + { + waiterLock(); + scope(exit) waiterUnlock(); + notifyWaiters(); + } + } + + try + { + job.job(); + } + catch (Throwable e) + { + job.exception = e; + } + + atomicSetUbyte(job.taskStatus, TaskStatus.done); + } + + // This function is used for dummy pools created by Task.executeInNewThread(). + void doSingleTask() + { + // No synchronization. Pool is guaranteed to only have one thread, + // and the queue is submitted to before this thread is created. + assert(head); + auto t = head; + t.next = t.prev = head = null; + doJob(t); + } + + // This function performs initialization for each thread that affects + // thread local storage and therefore must be done from within the + // worker thread. It then calls executeWorkLoop(). + void startWorkLoop() + { + // Initialize thread index. + { + queueLock(); + scope(exit) queueUnlock(); + threadIndex = nextThreadIndex; + nextThreadIndex++; + } + + executeWorkLoop(); + } + + // This is the main work loop that worker threads spend their time in + // until they terminate. It's also entered by non-worker threads when + // finish() is called with the blocking variable set to true. + void executeWorkLoop() + { + while (atomicReadUbyte(status) != PoolState.stopNow) + { + AbstractTask* task = pop(); + if (task is null) + { + if (atomicReadUbyte(status) == PoolState.finishing) + { + atomicSetUbyte(status, PoolState.stopNow); + return; + } + } + else + { + doJob(task); + } + } + } + + // Pop a task off the queue. + AbstractTask* pop() + { + queueLock(); + scope(exit) queueUnlock(); + auto ret = popNoSync(); + while (ret is null && status == PoolState.running) + { + wait(); + ret = popNoSync(); + } + return ret; + } + + AbstractTask* popNoSync() + out(returned) + { + /* If task.prev and task.next aren't null, then another thread + * can try to delete this task from the pool after it's + * alreadly been deleted/popped. + */ + if (returned !is null) + { + assert(returned.next is null); + assert(returned.prev is null); + } + } + body + { + if (isSingleTask) return null; + + AbstractTask* returned = head; + if (head !is null) + { + head = head.next; + returned.prev = null; + returned.next = null; + returned.taskStatus = TaskStatus.inProgress; + } + if (head !is null) + { + head.prev = null; + } + + return returned; + } + + // Push a task onto the queue. + void abstractPut(AbstractTask* task) + { + queueLock(); + scope(exit) queueUnlock(); + abstractPutNoSync(task); + } + + void abstractPutNoSync(AbstractTask* task) + in + { + assert(task); + } + out + { + import std.conv : text; + + assert(tail.prev !is tail); + assert(tail.next is null, text(tail.prev, '\t', tail.next)); + if (tail.prev !is null) + { + assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); + } + } + body + { + // Not using enforce() to save on function call overhead since this + // is a performance critical function. + if (status != PoolState.running) + { + throw new Error( + "Cannot submit a new task to a pool after calling " ~ + "finish() or stop()." + ); + } + + task.next = null; + if (head is null) //Queue is empty. + { + head = task; + tail = task; + tail.prev = null; + } + else + { + assert(tail); + task.prev = tail; + tail.next = task; + tail = task; + } + notify(); + } + + void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) + { + if (status != PoolState.running) + { + throw new Error( + "Cannot submit a new task to a pool after calling " ~ + "finish() or stop()." + ); + } + + if (head is null) + { + head = h; + tail = t; + } + else + { + h.prev = tail; + tail.next = h; + tail = t; + } + + notifyAll(); + } + + void tryDeleteExecute(AbstractTask* toExecute) + { + if (isSingleTask) return; + + if ( !deleteItem(toExecute) ) + { + return; + } + + try + { + toExecute.job(); + } + catch (Exception e) + { + toExecute.exception = e; + } + + atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); + } + + bool deleteItem(AbstractTask* item) + { + queueLock(); + scope(exit) queueUnlock(); + return deleteItemNoSync(item); + } + + bool deleteItemNoSync(AbstractTask* item) + { + if (item.taskStatus != TaskStatus.notStarted) + { + return false; + } + item.taskStatus = TaskStatus.inProgress; + + if (item is head) + { + // Make sure head gets set properly. + popNoSync(); + return true; + } + if (item is tail) + { + tail = tail.prev; + if (tail !is null) + { + tail.next = null; + } + item.next = null; + item.prev = null; + return true; + } + if (item.next !is null) + { + assert(item.next.prev is item); // Check queue consistency. + item.next.prev = item.prev; + } + if (item.prev !is null) + { + assert(item.prev.next is item); // Check queue consistency. + item.prev.next = item.next; + } + item.next = null; + item.prev = null; + return true; + } + + void queueLock() + { + assert(queueMutex); + if (!isSingleTask) queueMutex.lock(); + } + + void queueUnlock() + { + assert(queueMutex); + if (!isSingleTask) queueMutex.unlock(); + } + + void waiterLock() + { + if (!isSingleTask) waiterMutex.lock(); + } + + void waiterUnlock() + { + if (!isSingleTask) waiterMutex.unlock(); + } + + void wait() + { + if (!isSingleTask) workerCondition.wait(); + } + + void notify() + { + if (!isSingleTask) workerCondition.notify(); + } + + void notifyAll() + { + if (!isSingleTask) workerCondition.notifyAll(); + } + + void waitUntilCompletion() + { + if (isSingleTask) + { + singleTaskThread.join(); + } + else + { + waiterCondition.wait(); + } + } + + void notifyWaiters() + { + if (!isSingleTask) waiterCondition.notifyAll(); + } + + // Private constructor for creating dummy pools that only have one thread, + // only execute one Task, and then terminate. This is used for + // Task.executeInNewThread(). + this(AbstractTask* task, int priority = int.max) + { + assert(task); + + // Dummy value, not used. + instanceStartIndex = 0; + + this.isSingleTask = true; + task.taskStatus = TaskStatus.inProgress; + this.head = task; + singleTaskThread = new Thread(&doSingleTask); + singleTaskThread.start(); + + // Disabled until writing code to support + // running thread with specified priority + // See https://d.puremagic.com/issues/show_bug.cgi?id=8960 + + /*if (priority != int.max) + { + singleTaskThread.priority = priority; + }*/ + } + +public: + // This is used in parallel_algorithm but is too unstable to document + // as public API. + size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow + { + import std.algorithm.comparison : max; + + if (this.size == 0) + { + return rangeLen; + } + + immutable size_t eightSize = 4 * (this.size + 1); + auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); + return max(ret, 1); + } + + /** + Default constructor that initializes a $(D TaskPool) with + $(D totalCPUs) - 1 worker threads. The minus 1 is included because the + main thread will also be available to do work. + + Note: On single-core machines, the primitives provided by $(D TaskPool) + operate transparently in single-threaded mode. + */ + this() @trusted + { + this(totalCPUs - 1); + } + + /** + Allows for custom number of worker threads. + */ + this(size_t nWorkers) @trusted + { + synchronized(typeid(TaskPool)) + { + instanceStartIndex = nextInstanceIndex; + + // The first worker thread to be initialized will have this index, + // and will increment it. The second worker to be initialized will + // have this index plus 1. + nextThreadIndex = instanceStartIndex; + nextInstanceIndex += nWorkers; + } + + queueMutex = new Mutex(this); + waiterMutex = new Mutex(); + workerCondition = new Condition(queueMutex); + waiterCondition = new Condition(waiterMutex); + + pool = new ParallelismThread[nWorkers]; + foreach (ref poolThread; pool) + { + poolThread = new ParallelismThread(&startWorkLoop); + poolThread.pool = this; + poolThread.start(); + } + } + + /** + Implements a parallel foreach loop over a range. This works by implicitly + creating and submitting one $(D Task) to the $(D TaskPool) for each worker + thread. A work unit is a set of consecutive elements of $(D range) to + be processed by a worker thread between communication with any other + thread. The number of elements processed per work unit is controlled by the + $(D workUnitSize) parameter. Smaller work units provide better load + balancing, but larger work units avoid the overhead of communicating + with other threads frequently to fetch the next work unit. Large work + units also avoid false sharing in cases where the range is being modified. + The less time a single iteration of the loop takes, the larger + $(D workUnitSize) should be. For very expensive loop bodies, + $(D workUnitSize) should be 1. An overload that chooses a default work + unit size is also available. + + Example: + --- + // Find the logarithm of every number from 1 to + // 10_000_000 in parallel. + auto logs = new double[10_000_000]; + + // Parallel foreach works with or without an index + // variable. It can be iterate by ref if range.front + // returns by ref. + + // Iterate over logs using work units of size 100. + foreach (i, ref elem; taskPool.parallel(logs, 100)) + { + elem = log(i + 1.0); + } + + // Same thing, but use the default work unit size. + // + // Timings on an Athlon 64 X2 dual core machine: + // + // Parallel foreach: 388 milliseconds + // Regular foreach: 619 milliseconds + foreach (i, ref elem; taskPool.parallel(logs)) + { + elem = log(i + 1.0); + } + --- + + Notes: + + The memory usage of this implementation is guaranteed to be constant + in $(D range.length). + + Breaking from a parallel foreach loop via a break, labeled break, + labeled continue, return or goto statement throws a + $(D ParallelForeachError). + + In the case of non-random access ranges, parallel foreach buffers lazily + to an array of size $(D workUnitSize) before executing the parallel portion + of the loop. The exception is that, if a parallel foreach is executed + over a range returned by $(D asyncBuf) or $(D map), the copying is elided + and the buffers are simply swapped. In this case $(D workUnitSize) is + ignored and the work unit size is set to the buffer size of $(D range). + + A memory barrier is guaranteed to be executed on exit from the loop, + so that results produced by all threads are visible in the calling thread. + + $(B Exception Handling): + + When at least one exception is thrown from inside a parallel foreach loop, + the submission of additional $(D Task) objects is terminated as soon as + possible, in a non-deterministic manner. All executing or + enqueued work units are allowed to complete. Then, all exceptions that + were thrown by any work unit are chained using $(D Throwable.next) and + rethrown. The order of the exception chaining is non-deterministic. + */ + ParallelForeach!R parallel(R)(R range, size_t workUnitSize) + { + import std.exception : enforce; + enforce(workUnitSize > 0, "workUnitSize must be > 0."); + alias RetType = ParallelForeach!R; + return RetType(this, range, workUnitSize); + } + + + /// Ditto + ParallelForeach!R parallel(R)(R range) + { + static if (hasLength!R) + { + // Default work unit size is such that we would use 4x as many + // slots as are in this thread pool. + size_t workUnitSize = defaultWorkUnitSize(range.length); + return parallel(range, workUnitSize); + } + else + { + // Just use a really, really dumb guess if the user is too lazy to + // specify. + return parallel(range, 512); + } + } + + /// + template amap(functions...) + { + /** + Eager parallel map. The eagerness of this function means it has less + overhead than the lazily evaluated $(D TaskPool.map) and should be + preferred where the memory requirements of eagerness are acceptable. + $(D functions) are the functions to be evaluated, passed as template + alias parameters in a style similar to + $(REF map, std,algorithm,iteration). + The first argument must be a random access range. For performance + reasons, amap will assume the range elements have not yet been + initialized. Elements will be overwritten without calling a destructor + nor doing an assignment. As such, the range must not contain meaningful + data$(DDOC_COMMENT not a section): either un-initialized objects, or + objects in their $(D .init) state. + + --- + auto numbers = iota(100_000_000.0); + + // Find the square roots of numbers. + // + // Timings on an Athlon 64 X2 dual core machine: + // + // Parallel eager map: 0.802 s + // Equivalent serial implementation: 1.768 s + auto squareRoots = taskPool.amap!sqrt(numbers); + --- + + Immediately after the range argument, an optional work unit size argument + may be provided. Work units as used by $(D amap) are identical to those + defined for parallel foreach. If no work unit size is provided, the + default work unit size is used. + + --- + // Same thing, but make work unit size 100. + auto squareRoots = taskPool.amap!sqrt(numbers, 100); + --- + + An output range for returning the results may be provided as the last + argument. If one is not provided, an array of the proper type will be + allocated on the garbage collected heap. If one is provided, it must be a + random access range with assignable elements, must have reference + semantics with respect to assignment to its elements, and must have the + same length as the input range. Writing to adjacent elements from + different threads must be safe. + + --- + // Same thing, but explicitly allocate an array + // to return the results in. The element type + // of the array may be either the exact type + // returned by functions or an implicit conversion + // target. + auto squareRoots = new float[numbers.length]; + taskPool.amap!sqrt(numbers, squareRoots); + + // Multiple functions, explicit output range, and + // explicit work unit size. + auto results = new Tuple!(float, real)[numbers.length]; + taskPool.amap!(sqrt, log)(numbers, 100, results); + --- + + Note: + + A memory barrier is guaranteed to be executed after all results are written + but before returning so that results produced by all threads are visible + in the calling thread. + + Tips: + + To perform the mapping operation in place, provide the same range for the + input and output range. + + To parallelize the copying of a range with expensive to evaluate elements + to an array, pass an identity function (a function that just returns + whatever argument is provided to it) to $(D amap). + + $(B Exception Handling): + + When at least one exception is thrown from inside the map functions, + the submission of additional $(D Task) objects is terminated as soon as + possible, in a non-deterministic manner. All currently executing or + enqueued work units are allowed to complete. Then, all exceptions that + were thrown from any work unit are chained using $(D Throwable.next) and + rethrown. The order of the exception chaining is non-deterministic. + */ + auto amap(Args...)(Args args) + if (isRandomAccessRange!(Args[0])) + { + import std.conv : emplaceRef; + + alias fun = adjoin!(staticMap!(unaryFun, functions)); + + alias range = args[0]; + immutable len = range.length; + + static if ( + Args.length > 1 && + randAssignable!(Args[$ - 1]) && + is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) + ) + { + import std.conv : text; + import std.exception : enforce; + + alias buf = args[$ - 1]; + alias args2 = args[0..$ - 1]; + alias Args2 = Args[0..$ - 1]; + enforce(buf.length == len, + text("Can't use a user supplied buffer that's the wrong ", + "size. (Expected :", len, " Got: ", buf.length)); + } + else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) + { + static assert(0, "Wrong buffer type."); + } + else + { + import std.array : uninitializedArray; + + auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); + alias args2 = args; + alias Args2 = Args; + } + + if (!len) return buf; + + static if (isIntegral!(Args2[$ - 1])) + { + static assert(args2.length == 2); + auto workUnitSize = cast(size_t) args2[1]; + } + else + { + static assert(args2.length == 1, Args); + auto workUnitSize = defaultWorkUnitSize(range.length); + } + + alias R = typeof(range); + + if (workUnitSize > len) + { + workUnitSize = len; + } + + // Handle as a special case: + if (size == 0) + { + size_t index = 0; + foreach (elem; range) + { + emplaceRef(buf[index++], fun(elem)); + } + return buf; + } + + // Effectively -1: chunkIndex + 1 == 0: + shared size_t workUnitIndex = size_t.max; + shared bool shouldContinue = true; + + void doIt() + { + import std.algorithm.comparison : min; + + scope(failure) + { + // If an exception is thrown, all threads should bail. + atomicStore(shouldContinue, false); + } + + while (atomicLoad(shouldContinue)) + { + immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); + immutable start = workUnitSize * myUnitIndex; + if (start >= len) + { + atomicStore(shouldContinue, false); + break; + } + + immutable end = min(len, start + workUnitSize); + + static if (hasSlicing!R) + { + auto subrange = range[start .. end]; + foreach (i; start .. end) + { + emplaceRef(buf[i], fun(subrange.front)); + subrange.popFront(); + } + } + else + { + foreach (i; start .. end) + { + emplaceRef(buf[i], fun(range[i])); + } + } + } + } + + submitAndExecute(this, &doIt); + return buf; + } + } + + /// + template map(functions...) + { + /** + A semi-lazy parallel map that can be used for pipelining. The map + functions are evaluated for the first $(D bufSize) elements and stored in a + buffer and made available to $(D popFront). Meanwhile, in the + background a second buffer of the same size is filled. When the first + buffer is exhausted, it is swapped with the second buffer and filled while + the values from what was originally the second buffer are read. This + implementation allows for elements to be written to the buffer without + the need for atomic operations or synchronization for each write, and + enables the mapping function to be evaluated efficiently in parallel. + + $(D map) has more overhead than the simpler procedure used by $(D amap) + but avoids the need to keep all results in memory simultaneously and works + with non-random access ranges. + + Params: + + source = The input range to be mapped. If $(D source) is not random + access it will be lazily buffered to an array of size $(D bufSize) before + the map function is evaluated. (For an exception to this rule, see Notes.) + + bufSize = The size of the buffer to store the evaluated elements. + + workUnitSize = The number of elements to evaluate in a single + $(D Task). Must be less than or equal to $(D bufSize), and + should be a fraction of $(D bufSize) such that all worker threads can be + used. If the default of size_t.max is used, workUnitSize will be set to + the pool-wide default. + + Returns: An input range representing the results of the map. This range + has a length iff $(D source) has a length. + + Notes: + + If a range returned by $(D map) or $(D asyncBuf) is used as an input to + $(D map), then as an optimization the copying from the output buffer + of the first range to the input buffer of the second range is elided, even + though the ranges returned by $(D map) and $(D asyncBuf) are non-random + access ranges. This means that the $(D bufSize) parameter passed to the + current call to $(D map) will be ignored and the size of the buffer + will be the buffer size of $(D source). + + Example: + --- + // Pipeline reading a file, converting each line + // to a number, taking the logarithms of the numbers, + // and performing the additions necessary to find + // the sum of the logarithms. + + auto lineRange = File("numberList.txt").byLine(); + auto dupedLines = std.algorithm.map!"a.idup"(lineRange); + auto nums = taskPool.map!(to!double)(dupedLines); + auto logs = taskPool.map!log10(nums); + + double sum = 0; + foreach (elem; logs) + { + sum += elem; + } + --- + + $(B Exception Handling): + + Any exceptions thrown while iterating over $(D source) + or computing the map function are re-thrown on a call to $(D popFront) or, + if thrown during construction, are simply allowed to propagate to the + caller. In the case of exceptions thrown while computing the map function, + the exceptions are chained as in $(D TaskPool.amap). + */ + auto + map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) + if (isInputRange!S) + { + import std.exception : enforce; + + enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, + "Work unit size must be smaller than buffer size."); + alias fun = adjoin!(staticMap!(unaryFun, functions)); + + static final class Map + { + // This is a class because the task needs to be located on the + // heap and in the non-random access case source needs to be on + // the heap, too. + + private: + enum bufferTrick = is(typeof(source.buf1)) && + is(typeof(source.bufPos)) && + is(typeof(source.doBufSwap())); + + alias E = MapType!(S, functions); + E[] buf1, buf2; + S source; + TaskPool pool; + Task!(run, E[] delegate(E[]), E[]) nextBufTask; + size_t workUnitSize; + size_t bufPos; + bool lastTaskWaited; + + static if (isRandomAccessRange!S) + { + alias FromType = S; + + void popSource() + { + import std.algorithm.comparison : min; + + static if (__traits(compiles, source[0 .. source.length])) + { + source = source[min(buf1.length, source.length)..source.length]; + } + else static if (__traits(compiles, source[0..$])) + { + source = source[min(buf1.length, source.length)..$]; + } + else + { + static assert(0, "S must have slicing for Map." + ~ " " ~ S.stringof ~ " doesn't."); + } + } + } + else static if (bufferTrick) + { + // Make sure we don't have the buffer recycling overload of + // asyncBuf. + static if ( + is(typeof(source.source)) && + isRoundRobin!(typeof(source.source)) + ) + { + static assert(0, "Cannot execute a parallel map on " ~ + "the buffer recycling overload of asyncBuf." + ); + } + + alias FromType = typeof(source.buf1); + FromType from; + + // Just swap our input buffer with source's output buffer. + // No need to copy element by element. + FromType dumpToFrom() + { + import std.algorithm.mutation : swap; + + assert(source.buf1.length <= from.length); + from.length = source.buf1.length; + swap(source.buf1, from); + + // Just in case this source has been popped before + // being sent to map: + from = from[source.bufPos..$]; + + static if (is(typeof(source._length))) + { + source._length -= (from.length - source.bufPos); + } + + source.doBufSwap(); + + return from; + } + } + else + { + alias FromType = ElementType!S[]; + + // The temporary array that data is copied to before being + // mapped. + FromType from; + + FromType dumpToFrom() + { + assert(from !is null); + + size_t i; + for (; !source.empty && i < from.length; source.popFront()) + { + from[i++] = source.front; + } + + from = from[0 .. i]; + return from; + } + } + + static if (hasLength!S) + { + size_t _length; + + public @property size_t length() const @safe pure nothrow + { + return _length; + } + } + + this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) + { + static if (bufferTrick) + { + bufSize = source.buf1.length; + } + + buf1.length = bufSize; + buf2.length = bufSize; + + static if (!isRandomAccessRange!S) + { + from.length = bufSize; + } + + this.workUnitSize = (workUnitSize == size_t.max) ? + pool.defaultWorkUnitSize(bufSize) : workUnitSize; + this.source = source; + this.pool = pool; + + static if (hasLength!S) + { + _length = source.length; + } + + buf1 = fillBuf(buf1); + submitBuf2(); + } + + // The from parameter is a dummy and ignored in the random access + // case. + E[] fillBuf(E[] buf) + { + import std.algorithm.comparison : min; + + static if (isRandomAccessRange!S) + { + import std.range : take; + auto toMap = take(source, buf.length); + scope(success) popSource(); + } + else + { + auto toMap = dumpToFrom(); + } + + buf = buf[0 .. min(buf.length, toMap.length)]; + + // Handle as a special case: + if (pool.size == 0) + { + size_t index = 0; + foreach (elem; toMap) + { + buf[index++] = fun(elem); + } + return buf; + } + + pool.amap!functions(toMap, workUnitSize, buf); + + return buf; + } + + void submitBuf2() + in + { + assert(nextBufTask.prev is null); + assert(nextBufTask.next is null); + } body + { + // Hack to reuse the task object. + + nextBufTask = typeof(nextBufTask).init; + nextBufTask._args[0] = &fillBuf; + nextBufTask._args[1] = buf2; + pool.put(nextBufTask); + } + + void doBufSwap() + { + if (lastTaskWaited) + { + // Then the source is empty. Signal it here. + buf1 = null; + buf2 = null; + + static if (!isRandomAccessRange!S) + { + from = null; + } + + return; + } + + buf2 = buf1; + buf1 = nextBufTask.yieldForce; + bufPos = 0; + + if (source.empty) + { + lastTaskWaited = true; + } + else + { + submitBuf2(); + } + } + + public: + @property auto front() + { + return buf1[bufPos]; + } + + void popFront() + { + static if (hasLength!S) + { + _length--; + } + + bufPos++; + if (bufPos >= buf1.length) + { + doBufSwap(); + } + } + + static if (isInfinite!S) + { + enum bool empty = false; + } + else + { + + bool empty() const @property + { + // popFront() sets this when source is empty + return buf1.length == 0; + } + } + } + return new Map(source, bufSize, workUnitSize, this); + } + } + + /** + Given a $(D source) range that is expensive to iterate over, returns an + input range that asynchronously buffers the contents of + $(D source) into a buffer of $(D bufSize) elements in a worker thread, + while making previously buffered elements from a second buffer, also of size + $(D bufSize), available via the range interface of the returned + object. The returned range has a length iff $(D hasLength!S). + $(D asyncBuf) is useful, for example, when performing expensive operations + on the elements of ranges that represent data on a disk or network. + + Example: + --- + import std.conv, std.stdio; + + void main() + { + // Fetch lines of a file in a background thread + // while processing previously fetched lines, + // dealing with byLine's buffer recycling by + // eagerly duplicating every line. + auto lines = File("foo.txt").byLine(); + auto duped = std.algorithm.map!"a.idup"(lines); + + // Fetch more lines in the background while we + // process the lines already read into memory + // into a matrix of doubles. + double[][] matrix; + auto asyncReader = taskPool.asyncBuf(duped); + + foreach (line; asyncReader) + { + auto ls = line.split("\t"); + matrix ~= to!(double[])(ls); + } + } + --- + + $(B Exception Handling): + + Any exceptions thrown while iterating over $(D source) are re-thrown on a + call to $(D popFront) or, if thrown during construction, simply + allowed to propagate to the caller. + */ + auto asyncBuf(S)(S source, size_t bufSize = 100) if (isInputRange!S) + { + static final class AsyncBuf + { + // This is a class because the task and source both need to be on + // the heap. + + // The element type of S. + alias E = ElementType!S; // Needs to be here b/c of forward ref bugs. + + private: + E[] buf1, buf2; + S source; + TaskPool pool; + Task!(run, E[] delegate(E[]), E[]) nextBufTask; + size_t bufPos; + bool lastTaskWaited; + + static if (hasLength!S) + { + size_t _length; + + // Available if hasLength!S. + public @property size_t length() const @safe pure nothrow + { + return _length; + } + } + + this(S source, size_t bufSize, TaskPool pool) + { + buf1.length = bufSize; + buf2.length = bufSize; + + this.source = source; + this.pool = pool; + + static if (hasLength!S) + { + _length = source.length; + } + + buf1 = fillBuf(buf1); + submitBuf2(); + } + + E[] fillBuf(E[] buf) + { + assert(buf !is null); + + size_t i; + for (; !source.empty && i < buf.length; source.popFront()) + { + buf[i++] = source.front; + } + + buf = buf[0 .. i]; + return buf; + } + + void submitBuf2() + in + { + assert(nextBufTask.prev is null); + assert(nextBufTask.next is null); + } body + { + // Hack to reuse the task object. + + nextBufTask = typeof(nextBufTask).init; + nextBufTask._args[0] = &fillBuf; + nextBufTask._args[1] = buf2; + pool.put(nextBufTask); + } + + void doBufSwap() + { + if (lastTaskWaited) + { + // Then source is empty. Signal it here. + buf1 = null; + buf2 = null; + return; + } + + buf2 = buf1; + buf1 = nextBufTask.yieldForce; + bufPos = 0; + + if (source.empty) + { + lastTaskWaited = true; + } + else + { + submitBuf2(); + } + } + + public: + E front() @property + { + return buf1[bufPos]; + } + + void popFront() + { + static if (hasLength!S) + { + _length--; + } + + bufPos++; + if (bufPos >= buf1.length) + { + doBufSwap(); + } + } + + static if (isInfinite!S) + { + enum bool empty = false; + } + + else + { + /// + bool empty() @property + { + // popFront() sets this when source is empty: + return buf1.length == 0; + } + } + } + return new AsyncBuf(source, bufSize, this); + } + + /** + Given a callable object $(D next) that writes to a user-provided buffer and + a second callable object $(D empty) that determines whether more data is + available to write via $(D next), returns an input range that + asynchronously calls $(D next) with a set of size $(D nBuffers) of buffers + and makes the results available in the order they were obtained via the + input range interface of the returned object. Similarly to the + input range overload of $(D asyncBuf), the first half of the buffers + are made available via the range interface while the second half are + filled and vice-versa. + + Params: + + next = A callable object that takes a single argument that must be an array + with mutable elements. When called, $(D next) writes data to + the array provided by the caller. + + empty = A callable object that takes no arguments and returns a type + implicitly convertible to $(D bool). This is used to signify + that no more data is available to be obtained by calling $(D next). + + initialBufSize = The initial size of each buffer. If $(D next) takes its + array by reference, it may resize the buffers. + + nBuffers = The number of buffers to cycle through when calling $(D next). + + Example: + --- + // Fetch lines of a file in a background + // thread while processing previously fetched + // lines, without duplicating any lines. + auto file = File("foo.txt"); + + void next(ref char[] buf) + { + file.readln(buf); + } + + // Fetch more lines in the background while we + // process the lines already read into memory + // into a matrix of doubles. + double[][] matrix; + auto asyncReader = taskPool.asyncBuf(&next, &file.eof); + + foreach (line; asyncReader) + { + auto ls = line.split("\t"); + matrix ~= to!(double[])(ls); + } + --- + + $(B Exception Handling): + + Any exceptions thrown while iterating over $(D range) are re-thrown on a + call to $(D popFront). + + Warning: + + Using the range returned by this function in a parallel foreach loop + will not work because buffers may be overwritten while the task that + processes them is in queue. This is checked for at compile time + and will result in a static assertion failure. + */ + auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) + if (is(typeof(C2.init()) : bool) && + Parameters!C1.length == 1 && + Parameters!C2.length == 0 && + isArray!(Parameters!C1[0]) + ) { + auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); + return asyncBuf(roundRobin, nBuffers / 2); + } + + /// + template reduce(functions...) + { + /** + Parallel reduce on a random access range. Except as otherwise noted, + usage is similar to $(REF _reduce, std,algorithm,iteration). This + function works by splitting the range to be reduced into work units, + which are slices to be reduced in parallel. Once the results from all + work units are computed, a final serial reduction is performed on these + results to compute the final answer. Therefore, care must be taken to + choose the seed value appropriately. + + Because the reduction is being performed in parallel, $(D functions) + must be associative. For notational simplicity, let # be an + infix operator representing $(D functions). Then, (a # b) # c must equal + a # (b # c). Floating point addition is not associative + even though addition in exact arithmetic is. Summing floating + point numbers using this function may give different results than summing + serially. However, for many practical purposes floating point addition + can be treated as associative. + + Note that, since $(D functions) are assumed to be associative, + additional optimizations are made to the serial portion of the reduction + algorithm. These take advantage of the instruction level parallelism of + modern CPUs, in addition to the thread-level parallelism that the rest + of this module exploits. This can lead to better than linear speedups + relative to $(REF _reduce, std,algorithm,iteration), especially for + fine-grained benchmarks like dot products. + + An explicit seed may be provided as the first argument. If + provided, it is used as the seed for all work units and for the final + reduction of results from all work units. Therefore, if it is not the + identity value for the operation being performed, results may differ + from those generated by $(REF _reduce, std,algorithm,iteration) or + depending on how many work units are used. The next argument must be + the range to be reduced. + --- + // Find the sum of squares of a range in parallel, using + // an explicit seed. + // + // Timings on an Athlon 64 X2 dual core machine: + // + // Parallel reduce: 72 milliseconds + // Using std.algorithm.reduce instead: 181 milliseconds + auto nums = iota(10_000_000.0f); + auto sumSquares = taskPool.reduce!"a + b"( + 0.0, std.algorithm.map!"a * a"(nums) + ); + --- + + If no explicit seed is provided, the first element of each work unit + is used as a seed. For the final reduction, the result from the first + work unit is used as the seed. + --- + // Find the sum of a range in parallel, using the first + // element of each work unit as the seed. + auto sum = taskPool.reduce!"a + b"(nums); + --- + + An explicit work unit size may be specified as the last argument. + Specifying too small a work unit size will effectively serialize the + reduction, as the final reduction of the result of each work unit will + dominate computation time. If $(D TaskPool.size) for this instance + is zero, this parameter is ignored and one work unit is used. + --- + // Use a work unit size of 100. + auto sum2 = taskPool.reduce!"a + b"(nums, 100); + + // Work unit size of 100 and explicit seed. + auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); + --- + + Parallel reduce supports multiple functions, like + $(D std.algorithm.reduce). + --- + // Find both the min and max of nums. + auto minMax = taskPool.reduce!(min, max)(nums); + assert(minMax[0] == reduce!min(nums)); + assert(minMax[1] == reduce!max(nums)); + --- + + $(B Exception Handling): + + After this function is finished executing, any exceptions thrown + are chained together via $(D Throwable.next) and rethrown. The chaining + order is non-deterministic. + */ + auto reduce(Args...)(Args args) + { + import core.exception : OutOfMemoryError; + import std.conv : emplaceRef; + import std.exception : enforce; + + alias fun = reduceAdjoin!functions; + alias finishFun = reduceFinish!functions; + + static if (isIntegral!(Args[$ - 1])) + { + size_t workUnitSize = cast(size_t) args[$ - 1]; + alias args2 = args[0..$ - 1]; + alias Args2 = Args[0..$ - 1]; + } + else + { + alias args2 = args; + alias Args2 = Args; + } + + auto makeStartValue(Type)(Type e) + { + static if (functions.length == 1) + { + return e; + } + else + { + typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; + foreach (i, T; seed.Types) + { + emplaceRef(seed.expand[i], e); + } + + return seed; + } + } + + static if (args2.length == 2) + { + static assert(isInputRange!(Args2[1])); + alias range = args2[1]; + alias seed = args2[0]; + enum explicitSeed = true; + + static if (!is(typeof(workUnitSize))) + { + size_t workUnitSize = defaultWorkUnitSize(range.length); + } + } + else + { + static assert(args2.length == 1); + alias range = args2[0]; + + static if (!is(typeof(workUnitSize))) + { + size_t workUnitSize = defaultWorkUnitSize(range.length); + } + + enforce(!range.empty, + "Cannot reduce an empty range with first element as start value."); + + auto seed = makeStartValue(range.front); + enum explicitSeed = false; + range.popFront(); + } + + alias E = typeof(seed); + alias R = typeof(range); + + E reduceOnRange(R range, size_t lowerBound, size_t upperBound) + { + // This is for exploiting instruction level parallelism by + // using multiple accumulator variables within each thread, + // since we're assuming functions are associative anyhow. + + // This is so that loops can be unrolled automatically. + enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); + enum nILP = ilpTuple.length; + immutable subSize = (upperBound - lowerBound) / nILP; + + if (subSize <= 1) + { + // Handle as a special case. + static if (explicitSeed) + { + E result = seed; + } + else + { + E result = makeStartValue(range[lowerBound]); + lowerBound++; + } + + foreach (i; lowerBound .. upperBound) + { + result = fun(result, range[i]); + } + + return result; + } + + assert(subSize > 1); + E[nILP] results; + size_t[nILP] offsets; + + foreach (i; ilpTuple) + { + offsets[i] = lowerBound + subSize * i; + + static if (explicitSeed) + { + results[i] = seed; + } + else + { + results[i] = makeStartValue(range[offsets[i]]); + offsets[i]++; + } + } + + immutable nLoop = subSize - (!explicitSeed); + foreach (i; 0 .. nLoop) + { + foreach (j; ilpTuple) + { + results[j] = fun(results[j], range[offsets[j]]); + offsets[j]++; + } + } + + // Finish the remainder. + foreach (i; nILP * subSize + lowerBound .. upperBound) + { + results[$ - 1] = fun(results[$ - 1], range[i]); + } + + foreach (i; ilpTuple[1..$]) + { + results[0] = finishFun(results[0], results[i]); + } + + return results[0]; + } + + immutable len = range.length; + if (len == 0) + { + return seed; + } + + if (this.size == 0) + { + return finishFun(seed, reduceOnRange(range, 0, len)); + } + + // Unlike the rest of the functions here, I can't use the Task object + // recycling trick here because this has to work on non-commutative + // operations. After all the tasks are done executing, fun() has to + // be applied on the results of these to get a final result, but + // it can't be evaluated out of order. + + if (workUnitSize > len) + { + workUnitSize = len; + } + + immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); + assert(nWorkUnits * workUnitSize >= len); + + alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); + RTask[] tasks; + + // Can't use alloca() due to Bug 3753. Use a fixed buffer + // backed by malloc(). + enum maxStack = 2_048; + byte[maxStack] buf = void; + immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; + + import core.stdc.stdlib : malloc, free; + if (nBytesNeeded < maxStack) + { + tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; + } + else + { + auto ptr = cast(RTask*) malloc(nBytesNeeded); + if (!ptr) + { + throw new OutOfMemoryError( + "Out of memory in std.parallelism." + ); + } + + tasks = ptr[0 .. nWorkUnits]; + } + + scope(exit) + { + if (nBytesNeeded > maxStack) + { + free(tasks.ptr); + } + } + + foreach (ref t; tasks[]) + emplaceRef(t, RTask()); + + // Hack to take the address of a nested function w/o + // making a closure. + static auto scopedAddress(D)(scope D del) @system + { + auto tmp = del; + return tmp; + } + + size_t curPos = 0; + void useTask(ref RTask task) + { + import std.algorithm.comparison : min; + + task.pool = this; + task._args[0] = scopedAddress(&reduceOnRange); + task._args[3] = min(len, curPos + workUnitSize); // upper bound. + task._args[1] = range; // range + task._args[2] = curPos; // lower bound. + + curPos += workUnitSize; + } + + foreach (ref task; tasks) + { + useTask(task); + } + + foreach (i; 1 .. tasks.length - 1) + { + tasks[i].next = tasks[i + 1].basePtr; + tasks[i + 1].prev = tasks[i].basePtr; + } + + if (tasks.length > 1) + { + queueLock(); + scope(exit) queueUnlock(); + + abstractPutGroupNoSync( + tasks[1].basePtr, + tasks[$ - 1].basePtr + ); + } + + if (tasks.length > 0) + { + try + { + tasks[0].job(); + } + catch (Throwable e) + { + tasks[0].exception = e; + } + tasks[0].taskStatus = TaskStatus.done; + + // Try to execute each of these in the current thread + foreach (ref task; tasks[1..$]) + { + tryDeleteExecute(task.basePtr); + } + } + + // Now that we've tried to execute every task, they're all either + // done or in progress. Force all of them. + E result = seed; + + Throwable firstException, lastException; + + foreach (ref task; tasks) + { + try + { + task.yieldForce; + } + catch (Throwable e) + { + addToChain(e, firstException, lastException); + continue; + } + + if (!firstException) result = finishFun(result, task.returnVal); + } + + if (firstException) throw firstException; + + return result; + } + } + + /** + Gets the index of the current thread relative to this $(D TaskPool). Any + thread not in this pool will receive an index of 0. The worker threads in + this pool receive unique indices of 1 through $(D this.size). + + This function is useful for maintaining worker-local resources. + + Example: + --- + // Execute a loop that computes the greatest common + // divisor of every number from 0 through 999 with + // 42 in parallel. Write the results out to + // a set of files, one for each thread. This allows + // results to be written out without any synchronization. + + import std.conv, std.range, std.numeric, std.stdio; + + void main() + { + auto filesHandles = new File[taskPool.size + 1]; + scope(exit) { + foreach (ref handle; fileHandles) + { + handle.close(); + } + } + + foreach (i, ref handle; fileHandles) + { + handle = File("workerResults" ~ to!string(i) ~ ".txt"); + } + + foreach (num; parallel(iota(1_000))) + { + auto outHandle = fileHandles[taskPool.workerIndex]; + outHandle.writeln(num, '\t', gcd(num, 42)); + } + } + --- + */ + size_t workerIndex() @property @safe const nothrow + { + immutable rawInd = threadIndex; + return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? + (rawInd - instanceStartIndex + 1) : 0; + } + + /** + Struct for creating worker-local storage. Worker-local storage is + thread-local storage that exists only for worker threads in a given + $(D TaskPool) plus a single thread outside the pool. It is allocated on the + garbage collected heap in a way that avoids _false sharing, and doesn't + necessarily have global scope within any thread. It can be accessed from + any worker thread in the $(D TaskPool) that created it, and one thread + outside this $(D TaskPool). All threads outside the pool that created a + given instance of worker-local storage share a single slot. + + Since the underlying data for this struct is heap-allocated, this struct + has reference semantics when passed between functions. + + The main uses cases for $(D WorkerLocalStorageStorage) are: + + 1. Performing parallel reductions with an imperative, as opposed to + functional, programming style. In this case, it's useful to treat + $(D WorkerLocalStorageStorage) as local to each thread for only the parallel + portion of an algorithm. + + 2. Recycling temporary buffers across iterations of a parallel foreach loop. + + Example: + --- + // Calculate pi as in our synopsis example, but + // use an imperative instead of a functional style. + immutable n = 1_000_000_000; + immutable delta = 1.0L / n; + + auto sums = taskPool.workerLocalStorage(0.0L); + foreach (i; parallel(iota(n))) + { + immutable x = ( i - 0.5L ) * delta; + immutable toAdd = delta / ( 1.0 + x * x ); + sums.get += toAdd; + } + + // Add up the results from each worker thread. + real pi = 0; + foreach (threadResult; sums.toRange) + { + pi += 4.0L * threadResult; + } + --- + */ + static struct WorkerLocalStorage(T) + { + private: + TaskPool pool; + size_t size; + + size_t elemSize; + bool* stillThreadLocal; + + static size_t roundToLine(size_t num) pure nothrow + { + if (num % cacheLineSize == 0) + { + return num; + } + else + { + return ((num / cacheLineSize) + 1) * cacheLineSize; + } + } + + void* data; + + void initialize(TaskPool pool) + { + this.pool = pool; + size = pool.size + 1; + stillThreadLocal = new bool; + *stillThreadLocal = true; + + // Determines whether the GC should scan the array. + auto blkInfo = (typeid(T).flags & 1) ? + cast(GC.BlkAttr) 0 : + GC.BlkAttr.NO_SCAN; + + immutable nElem = pool.size + 1; + elemSize = roundToLine(T.sizeof); + + // The + 3 is to pad one full cache line worth of space on either side + // of the data structure to make sure false sharing with completely + // unrelated heap data is prevented, and to provide enough padding to + // make sure that data is cache line-aligned. + data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; + + // Cache line align data ptr. + data = cast(void*) roundToLine(cast(size_t) data); + + foreach (i; 0 .. nElem) + { + this.opIndex(i) = T.init; + } + } + + ref opIndex(this Qualified)(size_t index) + { + import std.conv : text; + assert(index < size, text(index, '\t', uint.max)); + return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); + } + + void opIndexAssign(T val, size_t index) + { + assert(index < size); + *(cast(T*) (data + elemSize * index)) = val; + } + + public: + /** + Get the current thread's instance. Returns by ref. + Note that calling $(D get) from any thread + outside the $(D TaskPool) that created this instance will return the + same reference, so an instance of worker-local storage should only be + accessed from one thread outside the pool that created it. If this + rule is violated, undefined behavior will result. + + If assertions are enabled and $(D toRange) has been called, then this + WorkerLocalStorage instance is no longer worker-local and an assertion + failure will result when calling this method. This is not checked + when assertions are disabled for performance reasons. + */ + ref get(this Qualified)() @property + { + assert(*stillThreadLocal, + "Cannot call get() on this instance of WorkerLocalStorage " ~ + "because it is no longer worker-local." + ); + return opIndex(pool.workerIndex); + } + + /** + Assign a value to the current thread's instance. This function has + the same caveats as its overload. + */ + void get(T val) @property + { + assert(*stillThreadLocal, + "Cannot call get() on this instance of WorkerLocalStorage " ~ + "because it is no longer worker-local." + ); + + opIndexAssign(val, pool.workerIndex); + } + + /** + Returns a range view of the values for all threads, which can be used + to further process the results of each thread after running the parallel + part of your algorithm. Do not use this method in the parallel portion + of your algorithm. + + Calling this function sets a flag indicating that this struct is no + longer worker-local, and attempting to use the $(D get) method again + will result in an assertion failure if assertions are enabled. + */ + WorkerLocalStorageRange!T toRange() @property + { + if (*stillThreadLocal) + { + *stillThreadLocal = false; + + // Make absolutely sure results are visible to all threads. + // This is probably not necessary since some other + // synchronization primitive will be used to signal that the + // parallel part of the algorithm is done, but the + // performance impact should be negligible, so it's better + // to be safe. + ubyte barrierDummy; + atomicSetUbyte(barrierDummy, 1); + } + + return WorkerLocalStorageRange!T(this); + } + } + + /** + Range primitives for worker-local storage. The purpose of this is to + access results produced by each worker thread from a single thread once you + are no longer using the worker-local storage from multiple threads. + Do not use this struct in the parallel portion of your algorithm. + + The proper way to instantiate this object is to call + $(D WorkerLocalStorage.toRange). Once instantiated, this object behaves + as a finite random-access range with assignable, lvalue elements and + a length equal to the number of worker threads in the $(D TaskPool) that + created it plus 1. + */ + static struct WorkerLocalStorageRange(T) + { + private: + WorkerLocalStorage!T workerLocalStorage; + + size_t _length; + size_t beginOffset; + + this(WorkerLocalStorage!T wl) + { + this.workerLocalStorage = wl; + _length = wl.size; + } + + public: + ref front(this Qualified)() @property + { + return this[0]; + } + + ref back(this Qualified)() @property + { + return this[_length - 1]; + } + + void popFront() + { + if (_length > 0) + { + beginOffset++; + _length--; + } + } + + void popBack() + { + if (_length > 0) + { + _length--; + } + } + + typeof(this) save() @property + { + return this; + } + + ref opIndex(this Qualified)(size_t index) + { + assert(index < _length); + return workerLocalStorage[index + beginOffset]; + } + + void opIndexAssign(T val, size_t index) + { + assert(index < _length); + workerLocalStorage[index] = val; + } + + typeof(this) opSlice(size_t lower, size_t upper) + { + assert(upper <= _length); + auto newWl = this.workerLocalStorage; + newWl.data += lower * newWl.elemSize; + newWl.size = upper - lower; + return typeof(this)(newWl); + } + + bool empty() const @property + { + return length == 0; + } + + size_t length() const @property + { + return _length; + } + } + + /** + Creates an instance of worker-local storage, initialized with a given + value. The value is $(D lazy) so that you can, for example, easily + create one instance of a class for each worker. For usage example, + see the $(D WorkerLocalStorage) struct. + */ + WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) + { + WorkerLocalStorage!T ret; + ret.initialize(this); + foreach (i; 0 .. size + 1) + { + ret[i] = initialVal; + } + + // Memory barrier to make absolutely sure that what we wrote is + // visible to worker threads. + ubyte barrierDummy; + atomicSetUbyte(barrierDummy, 0); + + return ret; + } + + /** + Signals to all worker threads to terminate as soon as they are finished + with their current $(D Task), or immediately if they are not executing a + $(D Task). $(D Task)s that were in queue will not be executed unless + a call to $(D Task.workForce), $(D Task.yieldForce) or $(D Task.spinForce) + causes them to be executed. + + Use only if you have waited on every $(D Task) and therefore know the + queue is empty, or if you speculatively executed some tasks and no longer + need the results. + */ + void stop() @trusted + { + queueLock(); + scope(exit) queueUnlock(); + atomicSetUbyte(status, PoolState.stopNow); + notifyAll(); + } + + /** + Signals worker threads to terminate when the queue becomes empty. + + If blocking argument is true, wait for all worker threads to terminate + before returning. This option might be used in applications where + task results are never consumed-- e.g. when $(D TaskPool) is employed as a + rudimentary scheduler for tasks which communicate by means other than + return values. + + Warning: Calling this function with $(D blocking = true) from a worker + thread that is a member of the same $(D TaskPool) that + $(D finish) is being called on will result in a deadlock. + */ + void finish(bool blocking = false) @trusted + { + { + queueLock(); + scope(exit) queueUnlock(); + atomicCasUbyte(status, PoolState.running, PoolState.finishing); + notifyAll(); + } + if (blocking) + { + // Use this thread as a worker until everything is finished. + executeWorkLoop(); + + foreach (t; pool) + { + // Maybe there should be something here to prevent a thread + // from calling join() on itself if this function is called + // from a worker thread in the same pool, but: + // + // 1. Using an if statement to skip join() would result in + // finish() returning without all tasks being finished. + // + // 2. If an exception were thrown, it would bubble up to the + // Task from which finish() was called and likely be + // swallowed. + t.join(); + } + } + } + + /// Returns the number of worker threads in the pool. + @property size_t size() @safe const pure nothrow + { + return pool.length; + } + + /** + Put a $(D Task) object on the back of the task queue. The $(D Task) + object may be passed by pointer or reference. + + Example: + --- + import std.file; + + // Create a task. + auto t = task!read("foo.txt"); + + // Add it to the queue to be executed. + taskPool.put(t); + --- + + Notes: + + @trusted overloads of this function are called for $(D Task)s if + $(REF hasUnsharedAliasing, std,traits) is false for the $(D Task)'s + return type or the function the $(D Task) executes is $(D pure). + $(D Task) objects that meet all other requirements specified in the + $(D @trusted) overloads of $(D task) and $(D scopedTask) may be created + and executed from $(D @safe) code via $(D Task.executeInNewThread) but + not via $(D TaskPool). + + While this function takes the address of variables that may + be on the stack, some overloads are marked as @trusted. + $(D Task) includes a destructor that waits for the task to complete + before destroying the stack frame it is allocated on. Therefore, + it is impossible for the stack frame to be destroyed before the task is + complete and no longer referenced by a $(D TaskPool). + */ + void put(alias fun, Args...)(ref Task!(fun, Args) task) + if (!isSafeReturn!(typeof(task))) + { + task.pool = this; + abstractPut(task.basePtr); + } + + /// Ditto + void put(alias fun, Args...)(Task!(fun, Args)* task) + if (!isSafeReturn!(typeof(*task))) + { + import std.exception : enforce; + enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); + put(*task); + } + + @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) + if (isSafeReturn!(typeof(task))) + { + task.pool = this; + abstractPut(task.basePtr); + } + + @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) + if (isSafeReturn!(typeof(*task))) + { + import std.exception : enforce; + enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); + put(*task); + } + + /** + These properties control whether the worker threads are daemon threads. + A daemon thread is automatically terminated when all non-daemon threads + have terminated. A non-daemon thread will prevent a program from + terminating as long as it has not terminated. + + If any $(D TaskPool) with non-daemon threads is active, either $(D stop) + or $(D finish) must be called on it before the program can terminate. + + The worker treads in the $(D TaskPool) instance returned by the + $(D taskPool) property are daemon by default. The worker threads of + manually instantiated task pools are non-daemon by default. + + Note: For a size zero pool, the getter arbitrarily returns true and the + setter has no effect. + */ + bool isDaemon() @property @trusted + { + queueLock(); + scope(exit) queueUnlock(); + return (size == 0) ? true : pool[0].isDaemon; + } + + /// Ditto + void isDaemon(bool newVal) @property @trusted + { + queueLock(); + scope(exit) queueUnlock(); + foreach (thread; pool) + { + thread.isDaemon = newVal; + } + } + + /** + These functions allow getting and setting the OS scheduling priority of + the worker threads in this $(D TaskPool). They forward to + $(D core.thread.Thread.priority), so a given priority value here means the + same thing as an identical priority value in $(D core.thread). + + Note: For a size zero pool, the getter arbitrarily returns + $(D core.thread.Thread.PRIORITY_MIN) and the setter has no effect. + */ + int priority() @property @trusted + { + return (size == 0) ? core.thread.Thread.PRIORITY_MIN : + pool[0].priority; + } + + /// Ditto + void priority(int newPriority) @property @trusted + { + if (size > 0) + { + foreach (t; pool) + { + t.priority = newPriority; + } + } + } +} + +/** +Returns a lazily initialized global instantiation of $(D TaskPool). +This function can safely be called concurrently from multiple non-worker +threads. The worker threads in this pool are daemon threads, meaning that it +is not necessary to call $(D TaskPool.stop) or $(D TaskPool.finish) before +terminating the main thread. +*/ +@property TaskPool taskPool() @trusted +{ + import std.concurrency : initOnce; + __gshared TaskPool pool; + return initOnce!pool({ + auto p = new TaskPool(defaultPoolThreads); + p.isDaemon = true; + return p; + }()); +} + +private shared uint _defaultPoolThreads; +shared static this() +{ + atomicStore(_defaultPoolThreads, totalCPUs - 1); +} + +/** +These properties get and set the number of worker threads in the $(D TaskPool) +instance returned by $(D taskPool). The default value is $(D totalCPUs) - 1. +Calling the setter after the first call to $(D taskPool) does not changes +number of worker threads in the instance returned by $(D taskPool). +*/ +@property uint defaultPoolThreads() @trusted +{ + return atomicLoad(_defaultPoolThreads); +} + +/// Ditto +@property void defaultPoolThreads(uint newVal) @trusted +{ + atomicStore(_defaultPoolThreads, newVal); +} + +/** +Convenience functions that forwards to $(D taskPool.parallel). The +purpose of these is to make parallel foreach less verbose and more +readable. + +Example: +--- +// Find the logarithm of every number from +// 1 to 1_000_000 in parallel, using the +// default TaskPool instance. +auto logs = new double[1_000_000]; + +foreach (i, ref elem; parallel(logs)) +{ + elem = log(i + 1.0); +} +--- + +*/ +ParallelForeach!R parallel(R)(R range) +{ + return taskPool.parallel(range); +} + +/// Ditto +ParallelForeach!R parallel(R)(R range, size_t workUnitSize) +{ + return taskPool.parallel(range, workUnitSize); +} + +// Thrown when a parallel foreach loop is broken from. +class ParallelForeachError : Error +{ + this() + { + super("Cannot break from a parallel foreach loop using break, return, " + ~ "labeled break/continue or goto statements."); + } +} + +/*------Structs that implement opApply for parallel foreach.------------------*/ +private template randLen(R) +{ + enum randLen = isRandomAccessRange!R && hasLength!R; +} + +private void submitAndExecute( + TaskPool pool, + scope void delegate() doIt +) +{ + import core.exception : OutOfMemoryError; + immutable nThreads = pool.size + 1; + + alias PTask = typeof(scopedTask(doIt)); + import core.stdc.stdlib : malloc, free; + import core.stdc.string : memcpy; + + // The logical thing to do would be to just use alloca() here, but that + // causes problems on Windows for reasons that I don't understand + // (tentatively a compiler bug) and definitely doesn't work on Posix due + // to Bug 3753. Therefore, allocate a fixed buffer and fall back to + // malloc() if someone's using a ridiculous amount of threads. Also, + // the using a byte array instead of a PTask array as the fixed buffer + // is to prevent d'tors from being called on uninitialized excess PTask + // instances. + enum nBuf = 64; + byte[nBuf * PTask.sizeof] buf = void; + PTask[] tasks; + if (nThreads <= nBuf) + { + tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; + } + else + { + auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); + if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); + tasks = ptr[0 .. nThreads]; + } + + scope(exit) + { + if (nThreads > nBuf) + { + free(tasks.ptr); + } + } + + foreach (ref t; tasks) + { + import core.stdc.string : memcpy; + + // This silly looking code is necessary to prevent d'tors from being + // called on uninitialized objects. + auto temp = scopedTask(doIt); + memcpy(&t, &temp, PTask.sizeof); + + // This has to be done to t after copying, not temp before copying. + // Otherwise, temp's destructor will sit here and wait for the + // task to finish. + t.pool = pool; + } + + foreach (i; 1 .. tasks.length - 1) + { + tasks[i].next = tasks[i + 1].basePtr; + tasks[i + 1].prev = tasks[i].basePtr; + } + + if (tasks.length > 1) + { + pool.queueLock(); + scope(exit) pool.queueUnlock(); + + pool.abstractPutGroupNoSync( + tasks[1].basePtr, + tasks[$ - 1].basePtr + ); + } + + if (tasks.length > 0) + { + try + { + tasks[0].job(); + } + catch (Throwable e) + { + tasks[0].exception = e; // nocoverage + } + tasks[0].taskStatus = TaskStatus.done; + + // Try to execute each of these in the current thread + foreach (ref task; tasks[1..$]) + { + pool.tryDeleteExecute(task.basePtr); + } + } + + Throwable firstException, lastException; + + foreach (i, ref task; tasks) + { + try + { + task.yieldForce; + } + catch (Throwable e) + { + addToChain(e, firstException, lastException); + continue; + } + } + + if (firstException) throw firstException; +} + +void foreachErr() +{ + throw new ParallelForeachError(); +} + +int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) +{ + with(p) + { + int res = 0; + size_t index = 0; + + // The explicit ElementType!R in the foreach loops is necessary for + // correct behavior when iterating over strings. + static if (hasLvalueElements!R) + { + foreach (ref ElementType!R elem; range) + { + static if (Parameters!dg.length == 2) + { + res = dg(index, elem); + } + else + { + res = dg(elem); + } + if (res) break; + index++; + } + } + else + { + foreach (ElementType!R elem; range) + { + static if (Parameters!dg.length == 2) + { + res = dg(index, elem); + } + else + { + res = dg(elem); + } + if (res) break; + index++; + } + } + if (res) foreachErr; + return res; + } +} + +private enum string parallelApplyMixinRandomAccess = q{ + // Handle empty thread pool as special case. + if (pool.size == 0) + { + return doSizeZeroCase(this, dg); + } + + // Whether iteration is with or without an index variable. + enum withIndex = Parameters!(typeof(dg)).length == 2; + + shared size_t workUnitIndex = size_t.max; // Effectively -1: chunkIndex + 1 == 0 + immutable len = range.length; + if (!len) return 0; + + shared bool shouldContinue = true; + + void doIt() + { + import std.algorithm.comparison : min; + + scope(failure) + { + // If an exception is thrown, all threads should bail. + atomicStore(shouldContinue, false); + } + + while (atomicLoad(shouldContinue)) + { + immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); + immutable start = workUnitSize * myUnitIndex; + if (start >= len) + { + atomicStore(shouldContinue, false); + break; + } + + immutable end = min(len, start + workUnitSize); + + foreach (i; start .. end) + { + static if (withIndex) + { + if (dg(i, range[i])) foreachErr(); + } + else + { + if (dg(range[i])) foreachErr(); + } + } + } + } + + submitAndExecute(pool, &doIt); + + return 0; +}; + +enum string parallelApplyMixinInputRange = q{ + // Handle empty thread pool as special case. + if (pool.size == 0) + { + return doSizeZeroCase(this, dg); + } + + // Whether iteration is with or without an index variable. + enum withIndex = Parameters!(typeof(dg)).length == 2; + + // This protects the range while copying it. + auto rangeMutex = new Mutex(); + + shared bool shouldContinue = true; + + // The total number of elements that have been popped off range. + // This is updated only while protected by rangeMutex; + size_t nPopped = 0; + + static if ( + is(typeof(range.buf1)) && + is(typeof(range.bufPos)) && + is(typeof(range.doBufSwap())) + ) + { + // Make sure we don't have the buffer recycling overload of + // asyncBuf. + static if ( + is(typeof(range.source)) && + isRoundRobin!(typeof(range.source)) + ) + { + static assert(0, "Cannot execute a parallel foreach loop on " ~ + "the buffer recycling overload of asyncBuf."); + } + + enum bool bufferTrick = true; + } + else + { + enum bool bufferTrick = false; + } + + void doIt() + { + scope(failure) + { + // If an exception is thrown, all threads should bail. + atomicStore(shouldContinue, false); + } + + static if (hasLvalueElements!R) + { + alias Temp = ElementType!R*[]; + Temp temp; + + // Returns: The previous value of nPopped. + size_t makeTemp() + { + import std.algorithm.internal : addressOf; + import std.array : uninitializedArray; + + if (temp is null) + { + temp = uninitializedArray!Temp(workUnitSize); + } + + rangeMutex.lock(); + scope(exit) rangeMutex.unlock(); + + size_t i = 0; + for (; i < workUnitSize && !range.empty; range.popFront(), i++) + { + temp[i] = addressOf(range.front); + } + + temp = temp[0 .. i]; + auto ret = nPopped; + nPopped += temp.length; + return ret; + } + + } + else + { + + alias Temp = ElementType!R[]; + Temp temp; + + // Returns: The previous value of nPopped. + static if (!bufferTrick) size_t makeTemp() + { + import std.array : uninitializedArray; + + if (temp is null) + { + temp = uninitializedArray!Temp(workUnitSize); + } + + rangeMutex.lock(); + scope(exit) rangeMutex.unlock(); + + size_t i = 0; + for (; i < workUnitSize && !range.empty; range.popFront(), i++) + { + temp[i] = range.front; + } + + temp = temp[0 .. i]; + auto ret = nPopped; + nPopped += temp.length; + return ret; + } + + static if (bufferTrick) size_t makeTemp() + { + import std.algorithm.mutation : swap; + rangeMutex.lock(); + scope(exit) rangeMutex.unlock(); + + // Elide copying by just swapping buffers. + temp.length = range.buf1.length; + swap(range.buf1, temp); + + // This is necessary in case popFront() has been called on + // range before entering the parallel foreach loop. + temp = temp[range.bufPos..$]; + + static if (is(typeof(range._length))) + { + range._length -= (temp.length - range.bufPos); + } + + range.doBufSwap(); + auto ret = nPopped; + nPopped += temp.length; + return ret; + } + } + + while (atomicLoad(shouldContinue)) + { + auto overallIndex = makeTemp(); + if (temp.empty) + { + atomicStore(shouldContinue, false); + break; + } + + foreach (i; 0 .. temp.length) + { + scope(success) overallIndex++; + + static if (hasLvalueElements!R) + { + static if (withIndex) + { + if (dg(overallIndex, *temp[i])) foreachErr(); + } + else + { + if (dg(*temp[i])) foreachErr(); + } + } + else + { + static if (withIndex) + { + if (dg(overallIndex, temp[i])) foreachErr(); + } + else + { + if (dg(temp[i])) foreachErr(); + } + } + } + } + } + + submitAndExecute(pool, &doIt); + + return 0; +}; + +// Calls e.next until the end of the chain is found. +private Throwable findLastException(Throwable e) pure nothrow +{ + if (e is null) return null; + + while (e.next) + { + e = e.next; + } + + return e; +} + +// Adds e to the exception chain. +private void addToChain( + Throwable e, + ref Throwable firstException, + ref Throwable lastException +) pure nothrow +{ + if (firstException) + { + assert(lastException); // nocoverage + lastException.next = e; // nocoverage + lastException = findLastException(e); // nocoverage + } + else + { + firstException = e; + lastException = findLastException(e); + } +} + +private struct ParallelForeach(R) +{ + TaskPool pool; + R range; + size_t workUnitSize; + alias E = ElementType!R; + + static if (hasLvalueElements!R) + { + alias NoIndexDg = int delegate(ref E); + alias IndexDg = int delegate(size_t, ref E); + } + else + { + alias NoIndexDg = int delegate(E); + alias IndexDg = int delegate(size_t, E); + } + + int opApply(scope NoIndexDg dg) + { + static if (randLen!R) + { + mixin(parallelApplyMixinRandomAccess); + } + else + { + mixin(parallelApplyMixinInputRange); + } + } + + int opApply(scope IndexDg dg) + { + static if (randLen!R) + { + mixin(parallelApplyMixinRandomAccess); + } + else + { + mixin(parallelApplyMixinInputRange); + } + } +} + +/* +This struct buffers the output of a callable that outputs data into a +user-supplied buffer into a set of buffers of some fixed size. It allows these +buffers to be accessed with an input range interface. This is used internally +in the buffer-recycling overload of TaskPool.asyncBuf, which creates an +instance and forwards it to the input range overload of asyncBuf. +*/ +private struct RoundRobinBuffer(C1, C2) +{ + // No need for constraints because they're already checked for in asyncBuf. + + alias Array = Parameters!(C1.init)[0]; + alias T = typeof(Array.init[0]); + + T[][] bufs; + size_t index; + C1 nextDel; + C2 emptyDel; + bool _empty; + bool primed; + + this( + C1 nextDel, + C2 emptyDel, + size_t initialBufSize, + size_t nBuffers + ) { + this.nextDel = nextDel; + this.emptyDel = emptyDel; + bufs.length = nBuffers; + + foreach (ref buf; bufs) + { + buf.length = initialBufSize; + } + } + + void prime() + in + { + assert(!empty); + } + body + { + scope(success) primed = true; + nextDel(bufs[index]); + } + + + T[] front() @property + in + { + assert(!empty); + } + body + { + if (!primed) prime(); + return bufs[index]; + } + + void popFront() + { + if (empty || emptyDel()) + { + _empty = true; + return; + } + + index = (index + 1) % bufs.length; + primed = false; + } + + bool empty() @property const @safe pure nothrow + { + return _empty; + } +} + +version (unittest) +{ + // This was the only way I could get nested maps to work. + __gshared TaskPool poolInstance; + + import std.stdio; +} + +// These test basic functionality but don't stress test for threading bugs. +// These are the tests that should be run every time Phobos is compiled. +@system unittest +{ + import std.algorithm.comparison : equal, min, max; + import std.algorithm.iteration : filter, map, reduce; + import std.array : split; + import std.conv : text; + import std.exception : assertThrown; + import std.math : approxEqual, sqrt, log; + import std.range : indexed, iota, join; + import std.typecons : Tuple, tuple; + + poolInstance = new TaskPool(2); + scope(exit) poolInstance.stop(); + + // The only way this can be verified is manually. + debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); + + auto oldPriority = poolInstance.priority; + poolInstance.priority = Thread.PRIORITY_MAX; + assert(poolInstance.priority == Thread.PRIORITY_MAX); + + poolInstance.priority = Thread.PRIORITY_MIN; + assert(poolInstance.priority == Thread.PRIORITY_MIN); + + poolInstance.priority = oldPriority; + assert(poolInstance.priority == oldPriority); + + static void refFun(ref uint num) + { + num++; + } + + uint x; + + // Test task(). + auto t = task!refFun(x); + poolInstance.put(t); + t.yieldForce; + assert(t.args[0] == 1); + + auto t2 = task(&refFun, x); + poolInstance.put(t2); + t2.yieldForce; + assert(t2.args[0] == 1); + + // Test scopedTask(). + auto st = scopedTask!refFun(x); + poolInstance.put(st); + st.yieldForce; + assert(st.args[0] == 1); + + auto st2 = scopedTask(&refFun, x); + poolInstance.put(st2); + st2.yieldForce; + assert(st2.args[0] == 1); + + // Test executeInNewThread(). + auto ct = scopedTask!refFun(x); + ct.executeInNewThread(Thread.PRIORITY_MAX); + ct.yieldForce; + assert(ct.args[0] == 1); + + // Test ref return. + uint toInc = 0; + static ref T makeRef(T)(ref T num) + { + return num; + } + + auto t3 = task!makeRef(toInc); + taskPool.put(t3); + assert(t3.args[0] == 0); + t3.spinForce++; + assert(t3.args[0] == 1); + + static void testSafe() @safe { + static int bump(int num) + { + return num + 1; + } + + auto safePool = new TaskPool(0); + auto t = task(&bump, 1); + taskPool.put(t); + assert(t.yieldForce == 2); + + auto st = scopedTask(&bump, 1); + taskPool.put(st); + assert(st.yieldForce == 2); + safePool.stop(); + } + + auto arr = [1,2,3,4,5]; + auto nums = new uint[5]; + auto nums2 = new uint[5]; + + foreach (i, ref elem; poolInstance.parallel(arr)) + { + elem++; + nums[i] = cast(uint) i + 2; + nums2[i] = elem; + } + + assert(nums == [2,3,4,5,6], text(nums)); + assert(nums2 == nums, text(nums2)); + assert(arr == nums, text(arr)); + + // Test const/immutable arguments. + static int add(int lhs, int rhs) + { + return lhs + rhs; + } + immutable addLhs = 1; + immutable addRhs = 2; + auto addTask = task(&add, addLhs, addRhs); + auto addScopedTask = scopedTask(&add, addLhs, addRhs); + poolInstance.put(addTask); + poolInstance.put(addScopedTask); + assert(addTask.yieldForce == 3); + assert(addScopedTask.yieldForce == 3); + + // Test parallel foreach with non-random access range. + auto range = filter!"a != 666"([0, 1, 2, 3, 4]); + + foreach (i, elem; poolInstance.parallel(range)) + { + nums[i] = cast(uint) i; + } + + assert(nums == [0,1,2,3,4]); + + auto logs = new double[1_000_000]; + foreach (i, ref elem; poolInstance.parallel(logs)) + { + elem = log(i + 1.0); + } + + foreach (i, elem; logs) + { + assert(approxEqual(elem, cast(double) log(i + 1))); + } + + assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); + assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); + assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == + [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); + + auto tupleBuf = new Tuple!(int, int)[3]; + poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); + assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); + poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); + assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); + + // Test amap with a non-array buffer. + auto toIndex = new int[5]; + auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); + poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); + assert(equal(ind, [2, 4, 6, 8, 10])); + assert(equal(toIndex, [8, 4, 10, 2, 6])); + poolInstance.amap!"a / 2"(ind, ind); + assert(equal(ind, [1, 2, 3, 4, 5])); + assert(equal(toIndex, [4, 2, 5, 1, 3])); + + auto buf = new int[5]; + poolInstance.amap!"a * a"([1,2,3,4,5], buf); + assert(buf == [1,4,9,16,25]); + poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); + assert(buf == [1,4,9,16,25]); + + assert(poolInstance.reduce!"a + b"([1]) == 1); + assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); + assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); + assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); + assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); + assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == + tuple(10, 24)); + + immutable serialAns = reduce!"a + b"(iota(1000)); + assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); + assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); + + // Test worker-local storage. + auto wl = poolInstance.workerLocalStorage(0); + foreach (i; poolInstance.parallel(iota(1000), 1)) + { + wl.get = wl.get + i; + } + + auto wlRange = wl.toRange; + auto parallelSum = poolInstance.reduce!"a + b"(wlRange); + assert(parallelSum == 499500); + assert(wlRange[0 .. 1][0] == wlRange[0]); + assert(wlRange[1 .. 2][0] == wlRange[1]); + + // Test finish() + { + static void slowFun() { Thread.sleep(dur!"msecs"(1)); } + + auto pool1 = new TaskPool(); + auto tSlow = task!slowFun(); + pool1.put(tSlow); + pool1.finish(); + tSlow.yieldForce; + // Can't assert that pool1.status == PoolState.stopNow because status + // doesn't change until after the "done" flag is set and the waiting + // thread is woken up. + + auto pool2 = new TaskPool(); + auto tSlow2 = task!slowFun(); + pool2.put(tSlow2); + pool2.finish(true); // blocking + assert(tSlow2.done); + + // Test fix for Bug 8582 by making pool size zero. + auto pool3 = new TaskPool(0); + auto tSlow3 = task!slowFun(); + pool3.put(tSlow3); + pool3.finish(true); // blocking + assert(tSlow3.done); + + // This is correct because no thread will terminate unless pool2.status + // and pool3.status have already been set to stopNow. + assert(pool2.status == TaskPool.PoolState.stopNow); + assert(pool3.status == TaskPool.PoolState.stopNow); + } + + // Test default pool stuff. + assert(taskPool.size == totalCPUs - 1); + + nums = new uint[1000]; + foreach (i; parallel(iota(1000))) + { + nums[i] = cast(uint) i; + } + assert(equal(nums, iota(1000))); + + assert(equal( + poolInstance.map!"a * a"(iota(30_000_001), 10_000), + map!"a * a"(iota(30_000_001)) + )); + + // The filter is to kill random access and test the non-random access + // branch. + assert(equal( + poolInstance.map!"a * a"( + filter!"a == a"(iota(30_000_001) + ), 10_000, 1000), + map!"a * a"(iota(30_000_001)) + )); + + assert( + reduce!"a + b"(0UL, + poolInstance.map!"a * a"(iota(3_000_001), 10_000) + ) == + reduce!"a + b"(0UL, + map!"a * a"(iota(3_000_001)) + ) + ); + + assert(equal( + iota(1_000_002), + poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) + )); + + { + import std.conv : to; + import std.file : deleteme; + + string temp_file = deleteme ~ "-tempDelMe.txt"; + auto file = File(temp_file, "wb"); + scope(exit) + { + file.close(); + import std.file; + remove(temp_file); + } + + auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; + foreach (row; written) + { + file.writeln(join(to!(string[])(row), "\t")); + } + + file = File(temp_file); + + void next(ref char[] buf) + { + file.readln(buf); + import std.string : chomp; + buf = chomp(buf); + } + + double[][] read; + auto asyncReader = taskPool.asyncBuf(&next, &file.eof); + + foreach (line; asyncReader) + { + if (line.length == 0) continue; + auto ls = line.split("\t"); + read ~= to!(double[])(ls); + } + + assert(read == written); + file.close(); + } + + // Test Map/AsyncBuf chaining. + + auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); + auto temp = poolInstance.map!sqrt( + abuf, 100, 5 + ); + auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); + lmchain.popFront(); + + int ii; + foreach ( elem; (lmchain)) + { + if (!approxEqual(elem, ii)) + { + stderr.writeln(ii, '\t', elem); + } + ii++; + } + + // Test buffer trick in parallel foreach. + abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); + abuf.popFront(); + auto bufTrickTest = new size_t[abuf.length]; + foreach (i, elem; parallel(abuf)) + { + bufTrickTest[i] = i; + } + + assert(equal(iota(1_000_000), bufTrickTest)); + + auto myTask = task!(std.math.abs)(-1); + taskPool.put(myTask); + assert(myTask.spinForce == 1); + + // Test that worker local storage from one pool receives an index of 0 + // when the index is queried w.r.t. another pool. The only way to do this + // is non-deterministically. + foreach (i; parallel(iota(1000), 1)) + { + assert(poolInstance.workerIndex == 0); + } + + foreach (i; poolInstance.parallel(iota(1000), 1)) + { + assert(taskPool.workerIndex == 0); + } + + // Test exception handling. + static void parallelForeachThrow() + { + foreach (elem; parallel(iota(10))) + { + throw new Exception(""); + } + } + + assertThrown!Exception(parallelForeachThrow()); + + static int reduceException(int a, int b) + { + throw new Exception(""); + } + + assertThrown!Exception( + poolInstance.reduce!reduceException(iota(3)) + ); + + static int mapException(int a) + { + throw new Exception(""); + } + + assertThrown!Exception( + poolInstance.amap!mapException(iota(3)) + ); + + static void mapThrow() + { + auto m = poolInstance.map!mapException(iota(3)); + m.popFront(); + } + + assertThrown!Exception(mapThrow()); + + struct ThrowingRange + { + @property int front() + { + return 1; + } + void popFront() + { + throw new Exception(""); + } + enum bool empty = false; + } + + assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); +} + +//version = parallelismStressTest; + +// These are more like stress tests than real unit tests. They print out +// tons of stuff and should not be run every time make unittest is run. +version (parallelismStressTest) +{ + @safe unittest + { + size_t attempt; + for (; attempt < 10; attempt++) + foreach (poolSize; [0, 4]) + { + + poolInstance = new TaskPool(poolSize); + + uint[] numbers = new uint[1_000]; + + foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) + { + numbers[i] = cast(uint) i; + } + + // Make sure it works. + foreach (i; 0 .. numbers.length) + { + assert(numbers[i] == i); + } + + stderr.writeln("Done creating nums."); + + + auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); + foreach (num; poolInstance.parallel(myNumbers)) + { + assert(num % 7 > 0 && num < 1000); + } + stderr.writeln("Done modulus test."); + + uint[] squares = poolInstance.amap!"a * a"(numbers, 100); + assert(squares.length == numbers.length); + foreach (i, number; numbers) + { + assert(squares[i] == number * number); + } + stderr.writeln("Done squares."); + + auto sumFuture = task!( reduce!"a + b" )(numbers); + poolInstance.put(sumFuture); + + ulong sumSquares = 0; + foreach (elem; numbers) + { + sumSquares += elem * elem; + } + + uint mySum = sumFuture.spinForce(); + assert(mySum == 999 * 1000 / 2); + + auto mySumParallel = poolInstance.reduce!"a + b"(numbers); + assert(mySum == mySumParallel); + stderr.writeln("Done sums."); + + auto myTask = task( + { + synchronized writeln("Our lives are parallel...Our lives are parallel."); + }); + poolInstance.put(myTask); + + auto nestedOuter = "abcd"; + auto nestedInner = iota(0, 10, 2); + + foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) + { + foreach (j, number; poolInstance.parallel(nestedInner, 1)) + { + synchronized writeln(i, ": ", letter, " ", j, ": ", number); + } + } + + poolInstance.stop(); + } + + assert(attempt == 10); + writeln("Press enter to go to next round of unittests."); + readln(); + } + + // These unittests are intended more for actual testing and not so much + // as examples. + @safe unittest + { + foreach (attempt; 0 .. 10) + foreach (poolSize; [0, 4]) + { + poolInstance = new TaskPool(poolSize); + + // Test indexing. + stderr.writeln("Creator Raw Index: ", poolInstance.threadIndex); + assert(poolInstance.workerIndex() == 0); + + // Test worker-local storage. + auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); + foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) + { + workerLocalStorage.get++; + } + assert(reduce!"a + b"(workerLocalStorage.toRange) == + 1_000_000 + poolInstance.size + 1); + + // Make sure work is reasonably balanced among threads. This test is + // non-deterministic and is more of a sanity check than something that + // has an absolute pass/fail. + shared(uint)[void*] nJobsByThread; + foreach (thread; poolInstance.pool) + { + nJobsByThread[cast(void*) thread] = 0; + } + nJobsByThread[ cast(void*) Thread.getThis()] = 0; + + foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) + { + atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); + } + + stderr.writeln("\nCurrent thread is: ", + cast(void*) Thread.getThis()); + stderr.writeln("Workload distribution: "); + foreach (k, v; nJobsByThread) + { + stderr.writeln(k, '\t', v); + } + + // Test whether amap can be nested. + real[][] matrix = new real[][](1000, 1000); + foreach (i; poolInstance.parallel( iota(0, matrix.length) )) + { + foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) + { + matrix[i][j] = i * j; + } + } + + // Get around weird bugs having to do w/ sqrt being an intrinsic: + static real mySqrt(real num) + { + return sqrt(num); + } + + static real[] parallelSqrt(real[] nums) + { + return poolInstance.amap!mySqrt(nums); + } + + real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); + + foreach (i, row; sqrtMatrix) + { + foreach (j, elem; row) + { + real shouldBe = sqrt( cast(real) i * j); + assert(approxEqual(shouldBe, elem)); + sqrtMatrix[i][j] = shouldBe; + } + } + + auto saySuccess = task( + { + stderr.writeln( + "Success doing matrix stuff that involves nested pool use."); + }); + poolInstance.put(saySuccess); + saySuccess.workForce(); + + // A more thorough test of amap, reduce: Find the sum of the square roots of + // matrix. + + static real parallelSum(real[] input) + { + return poolInstance.reduce!"a + b"(input); + } + + auto sumSqrt = poolInstance.reduce!"a + b"( + poolInstance.amap!parallelSum( + sqrtMatrix + ) + ); + + assert(approxEqual(sumSqrt, 4.437e8)); + stderr.writeln("Done sum of square roots."); + + // Test whether tasks work with function pointers. + auto nanTask = task(&isNaN, 1.0L); + poolInstance.put(nanTask); + assert(nanTask.spinForce == false); + + if (poolInstance.size > 0) + { + // Test work waiting. + static void uselessFun() + { + foreach (i; 0 .. 1_000_000) {} + } + + auto uselessTasks = new typeof(task(&uselessFun))[1000]; + foreach (ref uselessTask; uselessTasks) + { + uselessTask = task(&uselessFun); + } + foreach (ref uselessTask; uselessTasks) + { + poolInstance.put(uselessTask); + } + foreach (ref uselessTask; uselessTasks) + { + uselessTask.workForce(); + } + } + + // Test the case of non-random access + ref returns. + int[] nums = [1,2,3,4,5]; + static struct RemoveRandom + { + int[] arr; + + ref int front() + { + return arr.front; + } + void popFront() + { + arr.popFront(); + } + bool empty() + { + return arr.empty; + } + } + + auto refRange = RemoveRandom(nums); + foreach (ref elem; poolInstance.parallel(refRange)) + { + elem++; + } + assert(nums == [2,3,4,5,6], text(nums)); + stderr.writeln("Nums: ", nums); + + poolInstance.stop(); + } + } +} + +version (unittest) +{ + struct __S_12733 + { + invariant() { assert(checksum == 1_234_567_890); } + this(ulong u){n = u;} + void opAssign(__S_12733 s){this.n = s.n;} + ulong n; + ulong checksum = 1_234_567_890; + } + + static auto __genPair_12733(ulong n) { return __S_12733(n); } +} + +@system unittest +{ + immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; + + auto result = taskPool.amap!__genPair_12733(data); +} + +@safe unittest +{ + import std.range : iota; + + // this test was in std.range, but caused cycles. + assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); +} + +@safe unittest +{ + import std.algorithm.iteration : each; + + long[] arr; + static assert(is(typeof({ + arr.parallel.each!"a++"; + }))); +} + +// https://issues.dlang.org/show_bug.cgi?id=17539 +@system unittest +{ + import std.random : rndGen; + // ensure compilation + try foreach (rnd; rndGen.parallel) break; + catch (ParallelForeachError e) {} +} |