aboutsummaryrefslogtreecommitdiff
path: root/libphobos/src/std/concurrency.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/src/std/concurrency.d')
-rw-r--r--libphobos/src/std/concurrency.d2531
1 files changed, 2531 insertions, 0 deletions
diff --git a/libphobos/src/std/concurrency.d b/libphobos/src/std/concurrency.d
new file mode 100644
index 0000000..cf77911
--- /dev/null
+++ b/libphobos/src/std/concurrency.d
@@ -0,0 +1,2531 @@
+/**
+ * This is a low-level messaging API upon which more structured or restrictive
+ * APIs may be built. The general idea is that every messageable entity is
+ * represented by a common handle type called a Tid, which allows messages to
+ * be sent to logical threads that are executing in both the current process
+ * and in external processes using the same interface. This is an important
+ * aspect of scalability because it allows the components of a program to be
+ * spread across available resources with few to no changes to the actual
+ * implementation.
+ *
+ * A logical thread is an execution context that has its own stack and which
+ * runs asynchronously to other logical threads. These may be preemptively
+ * scheduled kernel threads, fibers (cooperative user-space threads), or some
+ * other concept with similar behavior.
+ *
+ * The type of concurrency used when logical threads are created is determined
+ * by the Scheduler selected at initialization time. The default behavior is
+ * currently to create a new kernel thread per call to spawn, but other
+ * schedulers are available that multiplex fibers across the main thread or
+ * use some combination of the two approaches.
+ *
+ * Copyright: Copyright Sean Kelly 2009 - 2014.
+ * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
+ * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak
+ * Source: $(PHOBOSSRC std/_concurrency.d)
+ */
+/* Copyright Sean Kelly 2009 - 2014.
+ * Distributed under the Boost Software License, Version 1.0.
+ * (See accompanying file LICENSE_1_0.txt or copy at
+ * http://www.boost.org/LICENSE_1_0.txt)
+ */
+module std.concurrency;
+
+public import std.variant;
+
+import core.atomic;
+import core.sync.condition;
+import core.sync.mutex;
+import core.thread;
+import std.range.primitives;
+import std.range.interfaces : InputRange;
+import std.traits;
+
+///
+@system unittest
+{
+ __gshared string received;
+ static void spawnedFunc(Tid ownerTid)
+ {
+ import std.conv : text;
+ // Receive a message from the owner thread.
+ receive((int i){
+ received = text("Received the number ", i);
+
+ // Send a message back to the owner thread
+ // indicating success.
+ send(ownerTid, true);
+ });
+ }
+
+ // Start spawnedFunc in a new thread.
+ auto childTid = spawn(&spawnedFunc, thisTid);
+
+ // Send the number 42 to this new thread.
+ send(childTid, 42);
+
+ // Receive the result code.
+ auto wasSuccessful = receiveOnly!(bool);
+ assert(wasSuccessful);
+ assert(received == "Received the number 42");
+}
+
+private
+{
+ template hasLocalAliasing(T...)
+ {
+ static if (!T.length)
+ enum hasLocalAliasing = false;
+ else
+ enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) ||
+ std.concurrency.hasLocalAliasing!(T[1 .. $]);
+ }
+
+ enum MsgType
+ {
+ standard,
+ priority,
+ linkDead,
+ }
+
+ struct Message
+ {
+ MsgType type;
+ Variant data;
+
+ this(T...)(MsgType t, T vals) if (T.length > 0)
+ {
+ static if (T.length == 1)
+ {
+ type = t;
+ data = vals[0];
+ }
+ else
+ {
+ import std.typecons : Tuple;
+
+ type = t;
+ data = Tuple!(T)(vals);
+ }
+ }
+
+ @property auto convertsTo(T...)()
+ {
+ static if (T.length == 1)
+ {
+ return is(T[0] == Variant) || data.convertsTo!(T);
+ }
+ else
+ {
+ import std.typecons : Tuple;
+ return data.convertsTo!(Tuple!(T));
+ }
+ }
+
+ @property auto get(T...)()
+ {
+ static if (T.length == 1)
+ {
+ static if (is(T[0] == Variant))
+ return data;
+ else
+ return data.get!(T);
+ }
+ else
+ {
+ import std.typecons : Tuple;
+ return data.get!(Tuple!(T));
+ }
+ }
+
+ auto map(Op)(Op op)
+ {
+ alias Args = Parameters!(Op);
+
+ static if (Args.length == 1)
+ {
+ static if (is(Args[0] == Variant))
+ return op(data);
+ else
+ return op(data.get!(Args));
+ }
+ else
+ {
+ import std.typecons : Tuple;
+ return op(data.get!(Tuple!(Args)).expand);
+ }
+ }
+ }
+
+ void checkops(T...)(T ops)
+ {
+ foreach (i, t1; T)
+ {
+ static assert(isFunctionPointer!t1 || isDelegate!t1);
+ alias a1 = Parameters!(t1);
+ alias r1 = ReturnType!(t1);
+
+ static if (i < T.length - 1 && is(r1 == void))
+ {
+ static assert(a1.length != 1 || !is(a1[0] == Variant),
+ "function with arguments " ~ a1.stringof ~
+ " occludes successive function");
+
+ foreach (t2; T[i + 1 .. $])
+ {
+ static assert(isFunctionPointer!t2 || isDelegate!t2);
+ alias a2 = Parameters!(t2);
+
+ static assert(!is(a1 == a2),
+ "function with arguments " ~ a1.stringof ~ " occludes successive function");
+ }
+ }
+ }
+ }
+
+ @property ref ThreadInfo thisInfo() nothrow
+ {
+ if (scheduler is null)
+ return ThreadInfo.thisInfo;
+ return scheduler.thisInfo;
+ }
+}
+
+static ~this()
+{
+ thisInfo.cleanup();
+}
+
+// Exceptions
+
+/**
+ * Thrown on calls to $(D receiveOnly) if a message other than the type
+ * the receiving thread expected is sent.
+ */
+class MessageMismatch : Exception
+{
+ ///
+ this(string msg = "Unexpected message type") @safe pure nothrow @nogc
+ {
+ super(msg);
+ }
+}
+
+/**
+ * Thrown on calls to $(D receive) if the thread that spawned the receiving
+ * thread has terminated and no more messages exist.
+ */
+class OwnerTerminated : Exception
+{
+ ///
+ this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
+ {
+ super(msg);
+ tid = t;
+ }
+
+ Tid tid;
+}
+
+/**
+ * Thrown if a linked thread has terminated.
+ */
+class LinkTerminated : Exception
+{
+ ///
+ this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
+ {
+ super(msg);
+ tid = t;
+ }
+
+ Tid tid;
+}
+
+/**
+ * Thrown if a message was sent to a thread via
+ * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
+ * for a message of this type.
+ */
+class PriorityMessageException : Exception
+{
+ ///
+ this(Variant vals)
+ {
+ super("Priority message");
+ message = vals;
+ }
+
+ /**
+ * The message that was sent.
+ */
+ Variant message;
+}
+
+/**
+ * Thrown on mailbox crowding if the mailbox is configured with
+ * $(D OnCrowding.throwException).
+ */
+class MailboxFull : Exception
+{
+ ///
+ this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
+ {
+ super(msg);
+ tid = t;
+ }
+
+ Tid tid;
+}
+
+/**
+ * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't
+ * find an owner thread.
+ */
+class TidMissingException : Exception
+{
+ import std.exception : basicExceptionCtors;
+ ///
+ mixin basicExceptionCtors;
+}
+
+
+// Thread ID
+
+
+/**
+ * An opaque type used to represent a logical thread.
+ */
+struct Tid
+{
+private:
+ this(MessageBox m) @safe pure nothrow @nogc
+ {
+ mbox = m;
+ }
+
+ MessageBox mbox;
+
+public:
+
+ /**
+ * Generate a convenient string for identifying this Tid. This is only
+ * useful to see if Tid's that are currently executing are the same or
+ * different, e.g. for logging and debugging. It is potentially possible
+ * that a Tid executed in the future will have the same toString() output
+ * as another Tid that has already terminated.
+ */
+ void toString(scope void delegate(const(char)[]) sink)
+ {
+ import std.format : formattedWrite;
+ formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
+ }
+
+}
+
+@system unittest
+{
+ // text!Tid is @system
+ import std.conv : text;
+ Tid tid;
+ assert(text(tid) == "Tid(0)");
+ auto tid2 = thisTid;
+ assert(text(tid2) != "Tid(0)");
+ auto tid3 = tid2;
+ assert(text(tid2) == text(tid3));
+}
+
+/**
+ * Returns: The $(LREF Tid) of the caller's thread.
+ */
+@property Tid thisTid() @safe
+{
+ // TODO: remove when concurrency is safe
+ static auto trus() @trusted
+ {
+ if (thisInfo.ident != Tid.init)
+ return thisInfo.ident;
+ thisInfo.ident = Tid(new MessageBox);
+ return thisInfo.ident;
+ }
+
+ return trus();
+}
+
+/**
+ * Return the Tid of the thread which spawned the caller's thread.
+ *
+ * Throws: A $(D TidMissingException) exception if
+ * there is no owner thread.
+ */
+@property Tid ownerTid()
+{
+ import std.exception : enforce;
+
+ enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
+ return thisInfo.owner;
+}
+
+@system unittest
+{
+ import std.exception : assertThrown;
+
+ static void fun()
+ {
+ string res = receiveOnly!string();
+ assert(res == "Main calling");
+ ownerTid.send("Child responding");
+ }
+
+ assertThrown!TidMissingException(ownerTid);
+ auto child = spawn(&fun);
+ child.send("Main calling");
+ string res = receiveOnly!string();
+ assert(res == "Child responding");
+}
+
+// Thread Creation
+
+private template isSpawnable(F, T...)
+{
+ template isParamsImplicitlyConvertible(F1, F2, int i = 0)
+ {
+ alias param1 = Parameters!F1;
+ alias param2 = Parameters!F2;
+ static if (param1.length != param2.length)
+ enum isParamsImplicitlyConvertible = false;
+ else static if (param1.length == i)
+ enum isParamsImplicitlyConvertible = true;
+ else static if (isImplicitlyConvertible!(param2[i], param1[i]))
+ enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
+ F2, i + 1);
+ else
+ enum isParamsImplicitlyConvertible = false;
+ }
+
+ enum isSpawnable = isCallable!F && is(ReturnType!F == void)
+ && isParamsImplicitlyConvertible!(F, void function(T))
+ && (isFunctionPointer!F || !hasUnsharedAliasing!F);
+}
+
+/**
+ * Starts fn(args) in a new logical thread.
+ *
+ * Executes the supplied function in a new logical thread represented by
+ * $(D Tid). The calling thread is designated as the owner of the new thread.
+ * When the owner thread terminates an $(D OwnerTerminated) message will be
+ * sent to the new thread, causing an $(D OwnerTerminated) exception to be
+ * thrown on $(D receive()).
+ *
+ * Params:
+ * fn = The function to execute.
+ * args = Arguments to the function.
+ *
+ * Returns:
+ * A Tid representing the new logical thread.
+ *
+ * Notes:
+ * $(D args) must not have unshared aliasing. In other words, all arguments
+ * to $(D fn) must either be $(D shared) or $(D immutable) or have no
+ * pointer indirection. This is necessary for enforcing isolation among
+ * threads.
+ *
+ * Example:
+ * ---
+ * import std.stdio, std.concurrency;
+ *
+ * void f1(string str)
+ * {
+ * writeln(str);
+ * }
+ *
+ * void f2(char[] str)
+ * {
+ * writeln(str);
+ * }
+ *
+ * void main()
+ * {
+ * auto str = "Hello, world";
+ *
+ * // Works: string is immutable.
+ * auto tid1 = spawn(&f1, str);
+ *
+ * // Fails: char[] has mutable aliasing.
+ * auto tid2 = spawn(&f2, str.dup);
+ *
+ * // New thread with anonymous function
+ * spawn({ writeln("This is so great!"); });
+ * }
+ * ---
+ */
+Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
+{
+ static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
+ return _spawn(false, fn, args);
+}
+
+/**
+ * Starts fn(args) in a logical thread and will receive a LinkTerminated
+ * message when the operation terminates.
+ *
+ * Executes the supplied function in a new logical thread represented by
+ * Tid. This new thread is linked to the calling thread so that if either
+ * it or the calling thread terminates a LinkTerminated message will be sent
+ * to the other, causing a LinkTerminated exception to be thrown on receive().
+ * The owner relationship from spawn() is preserved as well, so if the link
+ * between threads is broken, owner termination will still result in an
+ * OwnerTerminated exception to be thrown on receive().
+ *
+ * Params:
+ * fn = The function to execute.
+ * args = Arguments to the function.
+ *
+ * Returns:
+ * A Tid representing the new thread.
+ */
+Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
+{
+ static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
+ return _spawn(true, fn, args);
+}
+
+/*
+ *
+ */
+private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
+{
+ // TODO: MessageList and &exec should be shared.
+ auto spawnTid = Tid(new MessageBox);
+ auto ownerTid = thisTid;
+
+ void exec()
+ {
+ thisInfo.ident = spawnTid;
+ thisInfo.owner = ownerTid;
+ fn(args);
+ }
+
+ // TODO: MessageList and &exec should be shared.
+ if (scheduler !is null)
+ scheduler.spawn(&exec);
+ else
+ {
+ auto t = new Thread(&exec);
+ t.start();
+ }
+ thisInfo.links[spawnTid] = linked;
+ return spawnTid;
+}
+
+@system unittest
+{
+ void function() fn1;
+ void function(int) fn2;
+ static assert(__traits(compiles, spawn(fn1)));
+ static assert(__traits(compiles, spawn(fn2, 2)));
+ static assert(!__traits(compiles, spawn(fn1, 1)));
+ static assert(!__traits(compiles, spawn(fn2)));
+
+ void delegate(int) shared dg1;
+ shared(void delegate(int)) dg2;
+ shared(void delegate(long) shared) dg3;
+ shared(void delegate(real, int, long) shared) dg4;
+ void delegate(int) immutable dg5;
+ void delegate(int) dg6;
+ static assert(__traits(compiles, spawn(dg1, 1)));
+ static assert(__traits(compiles, spawn(dg2, 2)));
+ static assert(__traits(compiles, spawn(dg3, 3)));
+ static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
+ static assert(__traits(compiles, spawn(dg5, 5)));
+ static assert(!__traits(compiles, spawn(dg6, 6)));
+
+ auto callable1 = new class{ void opCall(int) shared {} };
+ auto callable2 = cast(shared) new class{ void opCall(int) shared {} };
+ auto callable3 = new class{ void opCall(int) immutable {} };
+ auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} };
+ auto callable5 = new class{ void opCall(int) {} };
+ auto callable6 = cast(shared) new class{ void opCall(int) immutable {} };
+ auto callable7 = cast(immutable) new class{ void opCall(int) shared {} };
+ auto callable8 = cast(shared) new class{ void opCall(int) const shared {} };
+ auto callable9 = cast(const shared) new class{ void opCall(int) shared {} };
+ auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
+ auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
+ static assert(!__traits(compiles, spawn(callable1, 1)));
+ static assert( __traits(compiles, spawn(callable2, 2)));
+ static assert(!__traits(compiles, spawn(callable3, 3)));
+ static assert( __traits(compiles, spawn(callable4, 4)));
+ static assert(!__traits(compiles, spawn(callable5, 5)));
+ static assert(!__traits(compiles, spawn(callable6, 6)));
+ static assert(!__traits(compiles, spawn(callable7, 7)));
+ static assert( __traits(compiles, spawn(callable8, 8)));
+ static assert(!__traits(compiles, spawn(callable9, 9)));
+ static assert( __traits(compiles, spawn(callable10, 10)));
+ static assert( __traits(compiles, spawn(callable11, 11)));
+}
+
+/**
+ * Places the values as a message at the back of tid's message queue.
+ *
+ * Sends the supplied value to the thread represented by tid. As with
+ * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing.
+ */
+void send(T...)(Tid tid, T vals)
+{
+ static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
+ _send(tid, vals);
+}
+
+/**
+ * Places the values as a message on the front of tid's message queue.
+ *
+ * Send a message to $(D tid) but place it at the front of $(D tid)'s message
+ * queue instead of at the back. This function is typically used for
+ * out-of-band communication, to signal exceptional conditions, etc.
+ */
+void prioritySend(T...)(Tid tid, T vals)
+{
+ static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
+ _send(MsgType.priority, tid, vals);
+}
+
+/*
+ * ditto
+ */
+private void _send(T...)(Tid tid, T vals)
+{
+ _send(MsgType.standard, tid, vals);
+}
+
+/*
+ * Implementation of send. This allows parameter checking to be different for
+ * both Tid.send() and .send().
+ */
+private void _send(T...)(MsgType type, Tid tid, T vals)
+{
+ auto msg = Message(type, vals);
+ tid.mbox.put(msg);
+}
+
+/**
+ * Receives a message from another thread.
+ *
+ * Receive a message from another thread, or block if no messages of the
+ * specified types are available. This function works by pattern matching
+ * a message against a set of delegates and executing the first match found.
+ *
+ * If a delegate that accepts a $(REF Variant, std,variant) is included as
+ * the last argument to $(D receive), it will match any message that was not
+ * matched by an earlier delegate. If more than one argument is sent,
+ * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values
+ * sent.
+ *
+ * Example:
+ * ---
+ * import std.stdio;
+ * import std.variant;
+ * import std.concurrency;
+ *
+ * void spawnedFunction()
+ * {
+ * receive(
+ * (int i) { writeln("Received an int."); },
+ * (float f) { writeln("Received a float."); },
+ * (Variant v) { writeln("Received some other type."); }
+ * );
+ * }
+ *
+ * void main()
+ * {
+ * auto tid = spawn(&spawnedFunction);
+ * send(tid, 42);
+ * }
+ * ---
+ */
+void receive(T...)( T ops )
+in
+{
+ assert(thisInfo.ident.mbox !is null,
+ "Cannot receive a message until a thread was spawned "
+ ~ "or thisTid was passed to a running thread.");
+}
+body
+{
+ checkops( ops );
+
+ thisInfo.ident.mbox.get( ops );
+}
+
+
+@safe unittest
+{
+ static assert( __traits( compiles,
+ {
+ receive( (Variant x) {} );
+ receive( (int x) {}, (Variant x) {} );
+ } ) );
+
+ static assert( !__traits( compiles,
+ {
+ receive( (Variant x) {}, (int x) {} );
+ } ) );
+
+ static assert( !__traits( compiles,
+ {
+ receive( (int x) {}, (int x) {} );
+ } ) );
+}
+
+// Make sure receive() works with free functions as well.
+version (unittest)
+{
+ private void receiveFunction(int x) {}
+}
+@safe unittest
+{
+ static assert( __traits( compiles,
+ {
+ receive( &receiveFunction );
+ receive( &receiveFunction, (Variant x) {} );
+ } ) );
+}
+
+
+private template receiveOnlyRet(T...)
+{
+ static if ( T.length == 1 )
+ {
+ alias receiveOnlyRet = T[0];
+ }
+ else
+ {
+ import std.typecons : Tuple;
+ alias receiveOnlyRet = Tuple!(T);
+ }
+}
+
+/**
+ * Receives only messages with arguments of types $(D T).
+ *
+ * Throws: $(D MessageMismatch) if a message of types other than $(D T)
+ * is received.
+ *
+ * Returns: The received message. If $(D T.length) is greater than one,
+ * the message will be packed into a $(REF Tuple, std,typecons).
+ *
+ * Example:
+ * ---
+ * import std.concurrency;
+ *
+ * void spawnedFunc()
+ * {
+ * auto msg = receiveOnly!(int, string)();
+ * assert(msg[0] == 42);
+ * assert(msg[1] == "42");
+ * }
+ *
+ * void main()
+ * {
+ * auto tid = spawn(&spawnedFunc);
+ * send(tid, 42, "42");
+ * }
+ * ---
+ */
+receiveOnlyRet!(T) receiveOnly(T...)()
+in
+{
+ assert(thisInfo.ident.mbox !is null,
+ "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
+}
+body
+{
+ import std.format : format;
+ import std.typecons : Tuple;
+
+ Tuple!(T) ret;
+
+ thisInfo.ident.mbox.get((T val) {
+ static if (T.length)
+ ret.field = val;
+ },
+ (LinkTerminated e) { throw e; },
+ (OwnerTerminated e) { throw e; },
+ (Variant val) {
+ static if (T.length > 1)
+ string exp = T.stringof;
+ else
+ string exp = T[0].stringof;
+
+ throw new MessageMismatch(
+ format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
+ });
+ static if (T.length == 1)
+ return ret[0];
+ else
+ return ret;
+}
+
+@system unittest
+{
+ static void t1(Tid mainTid)
+ {
+ try
+ {
+ receiveOnly!string();
+ mainTid.send("");
+ }
+ catch (Throwable th)
+ {
+ mainTid.send(th.msg);
+ }
+ }
+
+ auto tid = spawn(&t1, thisTid);
+ tid.send(1);
+ string result = receiveOnly!string();
+ assert(result == "Unexpected message type: expected 'string', got 'int'");
+}
+
+/**
+ * Tries to receive but will give up if no matches arrive within duration.
+ * Won't wait at all if provided $(REF Duration, core,time) is negative.
+ *
+ * Same as $(D receive) except that rather than wait forever for a message,
+ * it waits until either it receives a message or the given
+ * $(REF Duration, core,time) has passed. It returns $(D true) if it received a
+ * message and $(D false) if it timed out waiting for one.
+ */
+bool receiveTimeout(T...)(Duration duration, T ops)
+in
+{
+ assert(thisInfo.ident.mbox !is null,
+ "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
+}
+body
+{
+ checkops(ops);
+
+ return thisInfo.ident.mbox.get(duration, ops);
+}
+
+@safe unittest
+{
+ static assert(__traits(compiles, {
+ receiveTimeout(msecs(0), (Variant x) {});
+ receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
+ }));
+
+ static assert(!__traits(compiles, {
+ receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
+ }));
+
+ static assert(!__traits(compiles, {
+ receiveTimeout(msecs(0), (int x) {}, (int x) {});
+ }));
+
+ static assert(__traits(compiles, {
+ receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
+ }));
+}
+
+// MessageBox Limits
+
+/**
+ * These behaviors may be specified when a mailbox is full.
+ */
+enum OnCrowding
+{
+ block, /// Wait until room is available.
+ throwException, /// Throw a MailboxFull exception.
+ ignore /// Abort the send and return.
+}
+
+private
+{
+ bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
+ {
+ return true;
+ }
+
+ bool onCrowdingThrow(Tid tid) @safe pure
+ {
+ throw new MailboxFull(tid);
+ }
+
+ bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
+ {
+ return false;
+ }
+}
+
+/**
+ * Sets a maximum mailbox size.
+ *
+ * Sets a limit on the maximum number of user messages allowed in the mailbox.
+ * If this limit is reached, the caller attempting to add a new message will
+ * execute the behavior specified by doThis. If messages is zero, the mailbox
+ * is unbounded.
+ *
+ * Params:
+ * tid = The Tid of the thread for which this limit should be set.
+ * messages = The maximum number of messages or zero if no limit.
+ * doThis = The behavior executed when a message is sent to a full
+ * mailbox.
+ */
+void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
+{
+ final switch (doThis)
+ {
+ case OnCrowding.block:
+ return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
+ case OnCrowding.throwException:
+ return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
+ case OnCrowding.ignore:
+ return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
+ }
+}
+
+/**
+ * Sets a maximum mailbox size.
+ *
+ * Sets a limit on the maximum number of user messages allowed in the mailbox.
+ * If this limit is reached, the caller attempting to add a new message will
+ * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.
+ *
+ * Params:
+ * tid = The Tid of the thread for which this limit should be set.
+ * messages = The maximum number of messages or zero if no limit.
+ * onCrowdingDoThis = The routine called when a message is sent to a full
+ * mailbox.
+ */
+void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
+{
+ tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
+}
+
+private
+{
+ __gshared Tid[string] tidByName;
+ __gshared string[][Tid] namesByTid;
+}
+
+private @property Mutex registryLock()
+{
+ __gshared Mutex impl;
+ initOnce!impl(new Mutex);
+ return impl;
+}
+
+private void unregisterMe()
+{
+ auto me = thisInfo.ident;
+ if (thisInfo.ident != Tid.init)
+ {
+ synchronized (registryLock)
+ {
+ if (auto allNames = me in namesByTid)
+ {
+ foreach (name; *allNames)
+ tidByName.remove(name);
+ namesByTid.remove(me);
+ }
+ }
+ }
+}
+
+/**
+ * Associates name with tid.
+ *
+ * Associates name with tid in a process-local map. When the thread
+ * represented by tid terminates, any names associated with it will be
+ * automatically unregistered.
+ *
+ * Params:
+ * name = The name to associate with tid.
+ * tid = The tid register by name.
+ *
+ * Returns:
+ * true if the name is available and tid is not known to represent a
+ * defunct thread.
+ */
+bool register(string name, Tid tid)
+{
+ synchronized (registryLock)
+ {
+ if (name in tidByName)
+ return false;
+ if (tid.mbox.isClosed)
+ return false;
+ namesByTid[tid] ~= name;
+ tidByName[name] = tid;
+ return true;
+ }
+}
+
+/**
+ * Removes the registered name associated with a tid.
+ *
+ * Params:
+ * name = The name to unregister.
+ *
+ * Returns:
+ * true if the name is registered, false if not.
+ */
+bool unregister(string name)
+{
+ import std.algorithm.mutation : remove, SwapStrategy;
+ import std.algorithm.searching : countUntil;
+
+ synchronized (registryLock)
+ {
+ if (auto tid = name in tidByName)
+ {
+ auto allNames = *tid in namesByTid;
+ auto pos = countUntil(*allNames, name);
+ remove!(SwapStrategy.unstable)(*allNames, pos);
+ tidByName.remove(name);
+ return true;
+ }
+ return false;
+ }
+}
+
+/**
+ * Gets the Tid associated with name.
+ *
+ * Params:
+ * name = The name to locate within the registry.
+ *
+ * Returns:
+ * The associated Tid or Tid.init if name is not registered.
+ */
+Tid locate(string name)
+{
+ synchronized (registryLock)
+ {
+ if (auto tid = name in tidByName)
+ return *tid;
+ return Tid.init;
+ }
+}
+
+/**
+ * Encapsulates all implementation-level data needed for scheduling.
+ *
+ * When defining a Scheduler, an instance of this struct must be associated
+ * with each logical thread. It contains all implementation-level information
+ * needed by the internal API.
+ */
+struct ThreadInfo
+{
+ Tid ident;
+ bool[Tid] links;
+ Tid owner;
+
+ /**
+ * Gets a thread-local instance of ThreadInfo.
+ *
+ * Gets a thread-local instance of ThreadInfo, which should be used as the
+ * default instance when info is requested for a thread not created by the
+ * Scheduler.
+ */
+ static @property ref thisInfo() nothrow
+ {
+ static ThreadInfo val;
+ return val;
+ }
+
+ /**
+ * Cleans up this ThreadInfo.
+ *
+ * This must be called when a scheduled thread terminates. It tears down
+ * the messaging system for the thread and notifies interested parties of
+ * the thread's termination.
+ */
+ void cleanup()
+ {
+ if (ident.mbox !is null)
+ ident.mbox.close();
+ foreach (tid; links.keys)
+ _send(MsgType.linkDead, tid, ident);
+ if (owner != Tid.init)
+ _send(MsgType.linkDead, owner, ident);
+ unregisterMe(); // clean up registry entries
+ }
+}
+
+/**
+ * A Scheduler controls how threading is performed by spawn.
+ *
+ * Implementing a Scheduler allows the concurrency mechanism used by this
+ * module to be customized according to different needs. By default, a call
+ * to spawn will create a new kernel thread that executes the supplied routine
+ * and terminates when finished. But it is possible to create Schedulers that
+ * reuse threads, that multiplex Fibers (coroutines) across a single thread,
+ * or any number of other approaches. By making the choice of Scheduler a
+ * user-level option, std.concurrency may be used for far more types of
+ * application than if this behavior were predefined.
+ *
+ * Example:
+ * ---
+ * import std.concurrency;
+ * import std.stdio;
+ *
+ * void main()
+ * {
+ * scheduler = new FiberScheduler;
+ * scheduler.start(
+ * {
+ * writeln("the rest of main goes here");
+ * });
+ * }
+ * ---
+ *
+ * Some schedulers have a dispatching loop that must run if they are to work
+ * properly, so for the sake of consistency, when using a scheduler, start()
+ * must be called within main(). This yields control to the scheduler and
+ * will ensure that any spawned threads are executed in an expected manner.
+ */
+interface Scheduler
+{
+ /**
+ * Spawns the supplied op and starts the Scheduler.
+ *
+ * This is intended to be called at the start of the program to yield all
+ * scheduling to the active Scheduler instance. This is necessary for
+ * schedulers that explicitly dispatch threads rather than simply relying
+ * on the operating system to do so, and so start should always be called
+ * within main() to begin normal program execution.
+ *
+ * Params:
+ * op = A wrapper for whatever the main thread would have done in the
+ * absence of a custom scheduler. It will be automatically executed
+ * via a call to spawn by the Scheduler.
+ */
+ void start(void delegate() op);
+
+ /**
+ * Assigns a logical thread to execute the supplied op.
+ *
+ * This routine is called by spawn. It is expected to instantiate a new
+ * logical thread and run the supplied operation. This thread must call
+ * thisInfo.cleanup() when the thread terminates if the scheduled thread
+ * is not a kernel thread--all kernel threads will have their ThreadInfo
+ * cleaned up automatically by a thread-local destructor.
+ *
+ * Params:
+ * op = The function to execute. This may be the actual function passed
+ * by the user to spawn itself, or may be a wrapper function.
+ */
+ void spawn(void delegate() op);
+
+ /**
+ * Yields execution to another logical thread.
+ *
+ * This routine is called at various points within concurrency-aware APIs
+ * to provide a scheduler a chance to yield execution when using some sort
+ * of cooperative multithreading model. If this is not appropriate, such
+ * as when each logical thread is backed by a dedicated kernel thread,
+ * this routine may be a no-op.
+ */
+ void yield() nothrow;
+
+ /**
+ * Returns an appropriate ThreadInfo instance.
+ *
+ * Returns an instance of ThreadInfo specific to the logical thread that
+ * is calling this routine or, if the calling thread was not create by
+ * this scheduler, returns ThreadInfo.thisInfo instead.
+ */
+ @property ref ThreadInfo thisInfo() nothrow;
+
+ /**
+ * Creates a Condition variable analog for signaling.
+ *
+ * Creates a new Condition variable analog which is used to check for and
+ * to signal the addition of messages to a thread's message queue. Like
+ * yield, some schedulers may need to define custom behavior so that calls
+ * to Condition.wait() yield to another thread when no new messages are
+ * available instead of blocking.
+ *
+ * Params:
+ * m = The Mutex that will be associated with this condition. It will be
+ * locked prior to any operation on the condition, and so in some
+ * cases a Scheduler may need to hold this reference and unlock the
+ * mutex before yielding execution to another logical thread.
+ */
+ Condition newCondition(Mutex m) nothrow;
+}
+
+/**
+ * An example Scheduler using kernel threads.
+ *
+ * This is an example Scheduler that mirrors the default scheduling behavior
+ * of creating one kernel thread per call to spawn. It is fully functional
+ * and may be instantiated and used, but is not a necessary part of the
+ * default functioning of this module.
+ */
+class ThreadScheduler : Scheduler
+{
+ /**
+ * This simply runs op directly, since no real scheduling is needed by
+ * this approach.
+ */
+ void start(void delegate() op)
+ {
+ op();
+ }
+
+ /**
+ * Creates a new kernel thread and assigns it to run the supplied op.
+ */
+ void spawn(void delegate() op)
+ {
+ auto t = new Thread(op);
+ t.start();
+ }
+
+ /**
+ * This scheduler does no explicit multiplexing, so this is a no-op.
+ */
+ void yield() nothrow
+ {
+ // no explicit yield needed
+ }
+
+ /**
+ * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
+ * ThreadInfo, which is the correct behavior for this scheduler.
+ */
+ @property ref ThreadInfo thisInfo() nothrow
+ {
+ return ThreadInfo.thisInfo;
+ }
+
+ /**
+ * Creates a new Condition variable. No custom behavior is needed here.
+ */
+ Condition newCondition(Mutex m) nothrow
+ {
+ return new Condition(m);
+ }
+}
+
+/**
+ * An example Scheduler using Fibers.
+ *
+ * This is an example scheduler that creates a new Fiber per call to spawn
+ * and multiplexes the execution of all fibers within the main thread.
+ */
+class FiberScheduler : Scheduler
+{
+ /**
+ * This creates a new Fiber for the supplied op and then starts the
+ * dispatcher.
+ */
+ void start(void delegate() op)
+ {
+ create(op);
+ dispatch();
+ }
+
+ /**
+ * This created a new Fiber for the supplied op and adds it to the
+ * dispatch list.
+ */
+ void spawn(void delegate() op) nothrow
+ {
+ create(op);
+ yield();
+ }
+
+ /**
+ * If the caller is a scheduled Fiber, this yields execution to another
+ * scheduled Fiber.
+ */
+ void yield() nothrow
+ {
+ // NOTE: It's possible that we should test whether the calling Fiber
+ // is an InfoFiber before yielding, but I think it's reasonable
+ // that any (non-Generator) fiber should yield here.
+ if (Fiber.getThis())
+ Fiber.yield();
+ }
+
+ /**
+ * Returns an appropriate ThreadInfo instance.
+ *
+ * Returns a ThreadInfo instance specific to the calling Fiber if the
+ * Fiber was created by this dispatcher, otherwise it returns
+ * ThreadInfo.thisInfo.
+ */
+ @property ref ThreadInfo thisInfo() nothrow
+ {
+ auto f = cast(InfoFiber) Fiber.getThis();
+
+ if (f !is null)
+ return f.info;
+ return ThreadInfo.thisInfo;
+ }
+
+ /**
+ * Returns a Condition analog that yields when wait or notify is called.
+ */
+ Condition newCondition(Mutex m) nothrow
+ {
+ return new FiberCondition(m);
+ }
+
+private:
+ static class InfoFiber : Fiber
+ {
+ ThreadInfo info;
+
+ this(void delegate() op) nothrow
+ {
+ super(op);
+ }
+ }
+
+ class FiberCondition : Condition
+ {
+ this(Mutex m) nothrow
+ {
+ super(m);
+ notified = false;
+ }
+
+ override void wait() nothrow
+ {
+ scope (exit) notified = false;
+
+ while (!notified)
+ switchContext();
+ }
+
+ override bool wait(Duration period) nothrow
+ {
+ import core.time : MonoTime;
+
+ scope (exit) notified = false;
+
+ for (auto limit = MonoTime.currTime + period;
+ !notified && !period.isNegative;
+ period = limit - MonoTime.currTime)
+ {
+ yield();
+ }
+ return notified;
+ }
+
+ override void notify() nothrow
+ {
+ notified = true;
+ switchContext();
+ }
+
+ override void notifyAll() nothrow
+ {
+ notified = true;
+ switchContext();
+ }
+
+ private:
+ void switchContext() nothrow
+ {
+ mutex_nothrow.unlock_nothrow();
+ scope (exit) mutex_nothrow.lock_nothrow();
+ yield();
+ }
+
+ private bool notified;
+ }
+
+private:
+ void dispatch()
+ {
+ import std.algorithm.mutation : remove;
+
+ while (m_fibers.length > 0)
+ {
+ auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
+ if (t !is null && !(cast(OwnerTerminated) t))
+ {
+ throw t;
+ }
+ if (m_fibers[m_pos].state == Fiber.State.TERM)
+ {
+ if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
+ m_pos = 0;
+ }
+ else if (m_pos++ >= m_fibers.length - 1)
+ {
+ m_pos = 0;
+ }
+ }
+ }
+
+ void create(void delegate() op) nothrow
+ {
+ void wrap()
+ {
+ scope (exit)
+ {
+ thisInfo.cleanup();
+ }
+ op();
+ }
+
+ m_fibers ~= new InfoFiber(&wrap);
+ }
+
+private:
+ Fiber[] m_fibers;
+ size_t m_pos;
+}
+
+@system unittest
+{
+ static void receive(Condition cond, ref size_t received)
+ {
+ while (true)
+ {
+ synchronized (cond.mutex)
+ {
+ cond.wait();
+ ++received;
+ }
+ }
+ }
+
+ static void send(Condition cond, ref size_t sent)
+ {
+ while (true)
+ {
+ synchronized (cond.mutex)
+ {
+ ++sent;
+ cond.notify();
+ }
+ }
+ }
+
+ auto fs = new FiberScheduler;
+ auto mtx = new Mutex;
+ auto cond = fs.newCondition(mtx);
+
+ size_t received, sent;
+ auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
+ waiter.call();
+ assert(received == 0);
+ notifier.call();
+ assert(sent == 1);
+ assert(received == 0);
+ waiter.call();
+ assert(received == 1);
+ waiter.call();
+ assert(received == 1);
+}
+
+/**
+ * Sets the Scheduler behavior within the program.
+ *
+ * This variable sets the Scheduler behavior within this program. Typically,
+ * when setting a Scheduler, scheduler.start() should be called in main. This
+ * routine will not return until program execution is complete.
+ */
+__gshared Scheduler scheduler;
+
+// Generator
+
+/**
+ * If the caller is a Fiber and is not a Generator, this function will call
+ * scheduler.yield() or Fiber.yield(), as appropriate.
+ */
+void yield() nothrow
+{
+ auto fiber = Fiber.getThis();
+ if (!(cast(IsGenerator) fiber))
+ {
+ if (scheduler is null)
+ {
+ if (fiber)
+ return Fiber.yield();
+ }
+ else
+ scheduler.yield();
+ }
+}
+
+/// Used to determine whether a Generator is running.
+private interface IsGenerator {}
+
+
+/**
+ * A Generator is a Fiber that periodically returns values of type T to the
+ * caller via yield. This is represented as an InputRange.
+ *
+ * Example:
+ * ---
+ * import std.concurrency;
+ * import std.stdio;
+ *
+ *
+ * void main()
+ * {
+ * auto tid = spawn(
+ * {
+ * while (true)
+ * {
+ * writeln(receiveOnly!int());
+ * }
+ * });
+ *
+ * auto r = new Generator!int(
+ * {
+ * foreach (i; 1 .. 10)
+ * yield(i);
+ * });
+ *
+ * foreach (e; r)
+ * {
+ * tid.send(e);
+ * }
+ * }
+ * ---
+ */
+class Generator(T) :
+ Fiber, IsGenerator, InputRange!T
+{
+ /**
+ * Initializes a generator object which is associated with a static
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * fn = The fiber function.
+ *
+ * In:
+ * fn must not be null.
+ */
+ this(void function() fn)
+ {
+ super(fn);
+ call();
+ }
+
+ /**
+ * Initializes a generator object which is associated with a static
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * fn = The fiber function.
+ * sz = The stack size for this fiber.
+ *
+ * In:
+ * fn must not be null.
+ */
+ this(void function() fn, size_t sz)
+ {
+ super(fn, sz);
+ call();
+ }
+
+ /**
+ * Initializes a generator object which is associated with a dynamic
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * dg = The fiber function.
+ *
+ * In:
+ * dg must not be null.
+ */
+ this(void delegate() dg)
+ {
+ super(dg);
+ call();
+ }
+
+ /**
+ * Initializes a generator object which is associated with a dynamic
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * dg = The fiber function.
+ * sz = The stack size for this fiber.
+ *
+ * In:
+ * dg must not be null.
+ */
+ this(void delegate() dg, size_t sz)
+ {
+ super(dg, sz);
+ call();
+ }
+
+ /**
+ * Returns true if the generator is empty.
+ */
+ final bool empty() @property
+ {
+ return m_value is null || state == State.TERM;
+ }
+
+ /**
+ * Obtains the next value from the underlying function.
+ */
+ final void popFront()
+ {
+ call();
+ }
+
+ /**
+ * Returns the most recently generated value by shallow copy.
+ */
+ final T front() @property
+ {
+ return *m_value;
+ }
+
+ /**
+ * Returns the most recently generated value without executing a
+ * copy contructor. Will not compile for element types defining a
+ * postblit, because Generator does not return by reference.
+ */
+ final T moveFront()
+ {
+ static if (!hasElaborateCopyConstructor!T)
+ {
+ return front;
+ }
+ else
+ {
+ static assert(0,
+ "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
+ }
+ }
+
+ final int opApply(scope int delegate(T) loopBody)
+ {
+ int broken;
+ for (; !empty; popFront())
+ {
+ broken = loopBody(front);
+ if (broken) break;
+ }
+ return broken;
+ }
+
+ final int opApply(scope int delegate(size_t, T) loopBody)
+ {
+ int broken;
+ for (size_t i; !empty; ++i, popFront())
+ {
+ broken = loopBody(i, front);
+ if (broken) break;
+ }
+ return broken;
+ }
+private:
+ T* m_value;
+}
+
+/**
+ * Yields a value of type T to the caller of the currently executing
+ * generator.
+ *
+ * Params:
+ * value = The value to yield.
+ */
+void yield(T)(ref T value)
+{
+ Generator!T cur = cast(Generator!T) Fiber.getThis();
+ if (cur !is null && cur.state == Fiber.State.EXEC)
+ {
+ cur.m_value = &value;
+ return Fiber.yield();
+ }
+ throw new Exception("yield(T) called with no active generator for the supplied type");
+}
+
+/// ditto
+void yield(T)(T value)
+{
+ yield(value);
+}
+
+@system unittest
+{
+ import core.exception;
+ import std.exception;
+
+ static void testScheduler(Scheduler s)
+ {
+ scheduler = s;
+ scheduler.start({
+ auto tid = spawn({
+ int i;
+
+ try
+ {
+ for (i = 1; i < 10; i++)
+ {
+ assertNotThrown!AssertError(assert(receiveOnly!int() == i));
+ }
+ }
+ catch (OwnerTerminated e)
+ {
+
+ }
+
+ // i will advance 1 past the last value expected
+ assert(i == 4);
+ });
+
+ auto r = new Generator!int({
+ assertThrown!Exception(yield(2.0));
+ yield(); // ensure this is a no-op
+ yield(1);
+ yield(); // also once something has been yielded
+ yield(2);
+ yield(3);
+ });
+
+ foreach (e; r)
+ {
+ tid.send(e);
+ }
+ });
+ scheduler = null;
+ }
+
+ testScheduler(new ThreadScheduler);
+ testScheduler(new FiberScheduler);
+}
+///
+@system unittest
+{
+ import std.range;
+
+ InputRange!int myIota = iota(10).inputRangeObject;
+
+ myIota.popFront();
+ myIota.popFront();
+ assert(myIota.moveFront == 2);
+ assert(myIota.front == 2);
+ myIota.popFront();
+ assert(myIota.front == 3);
+
+ //can be assigned to std.range.interfaces.InputRange directly
+ myIota = new Generator!int(
+ {
+ foreach (i; 0 .. 10) yield(i);
+ });
+
+ myIota.popFront();
+ myIota.popFront();
+ assert(myIota.moveFront == 2);
+ assert(myIota.front == 2);
+ myIota.popFront();
+ assert(myIota.front == 3);
+
+ size_t[2] counter = [0, 0];
+ foreach (i, unused; myIota) counter[] += [1, i];
+
+ assert(myIota.empty);
+ assert(counter == [7, 21]);
+}
+
+private
+{
+ /*
+ * A MessageBox is a message queue for one thread. Other threads may send
+ * messages to this owner by calling put(), and the owner receives them by
+ * calling get(). The put() call is therefore effectively shared and the
+ * get() call is effectively local. setMaxMsgs may be used by any thread
+ * to limit the size of the message queue.
+ */
+ class MessageBox
+ {
+ this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
+ {
+ m_lock = new Mutex;
+ m_closed = false;
+
+ if (scheduler is null)
+ {
+ m_putMsg = new Condition(m_lock);
+ m_notFull = new Condition(m_lock);
+ }
+ else
+ {
+ m_putMsg = scheduler.newCondition(m_lock);
+ m_notFull = scheduler.newCondition(m_lock);
+ }
+ }
+
+ ///
+ final @property bool isClosed() @safe @nogc pure
+ {
+ synchronized (m_lock)
+ {
+ return m_closed;
+ }
+ }
+
+ /*
+ * Sets a limit on the maximum number of user messages allowed in the
+ * mailbox. If this limit is reached, the caller attempting to add
+ * a new message will execute call. If num is zero, there is no limit
+ * on the message queue.
+ *
+ * Params:
+ * num = The maximum size of the queue or zero if the queue is
+ * unbounded.
+ * call = The routine to call when the queue is full.
+ */
+ final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
+ {
+ synchronized (m_lock)
+ {
+ m_maxMsgs = num;
+ m_onMaxMsgs = call;
+ }
+ }
+
+ /*
+ * If maxMsgs is not set, the message is added to the queue and the
+ * owner is notified. If the queue is full, the message will still be
+ * accepted if it is a control message, otherwise onCrowdingDoThis is
+ * called. If the routine returns true, this call will block until
+ * the owner has made space available in the queue. If it returns
+ * false, this call will abort.
+ *
+ * Params:
+ * msg = The message to put in the queue.
+ *
+ * Throws:
+ * An exception if the queue is full and onCrowdingDoThis throws.
+ */
+ final void put(ref Message msg)
+ {
+ synchronized (m_lock)
+ {
+ // TODO: Generate an error here if m_closed is true, or maybe
+ // put a message in the caller's queue?
+ if (!m_closed)
+ {
+ while (true)
+ {
+ if (isPriorityMsg(msg))
+ {
+ m_sharedPty.put(msg);
+ m_putMsg.notify();
+ return;
+ }
+ if (!mboxFull() || isControlMsg(msg))
+ {
+ m_sharedBox.put(msg);
+ m_putMsg.notify();
+ return;
+ }
+ if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
+ {
+ return;
+ }
+ m_putQueue++;
+ m_notFull.wait();
+ m_putQueue--;
+ }
+ }
+ }
+ }
+
+ /*
+ * Matches ops against each message in turn until a match is found.
+ *
+ * Params:
+ * ops = The operations to match. Each may return a bool to indicate
+ * whether a message with a matching type is truly a match.
+ *
+ * Returns:
+ * true if a message was retrieved and false if not (such as if a
+ * timeout occurred).
+ *
+ * Throws:
+ * LinkTerminated if a linked thread terminated, or OwnerTerminated
+ * if the owner thread terminates and no existing messages match the
+ * supplied ops.
+ */
+ bool get(T...)(scope T vals)
+ {
+ import std.meta : AliasSeq;
+
+ static assert(T.length);
+
+ static if (isImplicitlyConvertible!(T[0], Duration))
+ {
+ alias Ops = AliasSeq!(T[1 .. $]);
+ alias ops = vals[1 .. $];
+ enum timedWait = true;
+ Duration period = vals[0];
+ }
+ else
+ {
+ alias Ops = AliasSeq!(T);
+ alias ops = vals[0 .. $];
+ enum timedWait = false;
+ }
+
+ bool onStandardMsg(ref Message msg)
+ {
+ foreach (i, t; Ops)
+ {
+ alias Args = Parameters!(t);
+ auto op = ops[i];
+
+ if (msg.convertsTo!(Args))
+ {
+ static if (is(ReturnType!(t) == bool))
+ {
+ return msg.map(op);
+ }
+ else
+ {
+ msg.map(op);
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ bool onLinkDeadMsg(ref Message msg)
+ {
+ assert(msg.convertsTo!(Tid));
+ auto tid = msg.get!(Tid);
+
+ if (bool* pDepends = tid in thisInfo.links)
+ {
+ auto depends = *pDepends;
+ thisInfo.links.remove(tid);
+ // Give the owner relationship precedence.
+ if (depends && tid != thisInfo.owner)
+ {
+ auto e = new LinkTerminated(tid);
+ auto m = Message(MsgType.standard, e);
+ if (onStandardMsg(m))
+ return true;
+ throw e;
+ }
+ }
+ if (tid == thisInfo.owner)
+ {
+ thisInfo.owner = Tid.init;
+ auto e = new OwnerTerminated(tid);
+ auto m = Message(MsgType.standard, e);
+ if (onStandardMsg(m))
+ return true;
+ throw e;
+ }
+ return false;
+ }
+
+ bool onControlMsg(ref Message msg)
+ {
+ switch (msg.type)
+ {
+ case MsgType.linkDead:
+ return onLinkDeadMsg(msg);
+ default:
+ return false;
+ }
+ }
+
+ bool scan(ref ListT list)
+ {
+ for (auto range = list[]; !range.empty;)
+ {
+ // Only the message handler will throw, so if this occurs
+ // we can be certain that the message was handled.
+ scope (failure)
+ list.removeAt(range);
+
+ if (isControlMsg(range.front))
+ {
+ if (onControlMsg(range.front))
+ {
+ // Although the linkDead message is a control message,
+ // it can be handled by the user. Since the linkDead
+ // message throws if not handled, if we get here then
+ // it has been handled and we can return from receive.
+ // This is a weird special case that will have to be
+ // handled in a more general way if more are added.
+ if (!isLinkDeadMsg(range.front))
+ {
+ list.removeAt(range);
+ continue;
+ }
+ list.removeAt(range);
+ return true;
+ }
+ range.popFront();
+ continue;
+ }
+ else
+ {
+ if (onStandardMsg(range.front))
+ {
+ list.removeAt(range);
+ return true;
+ }
+ range.popFront();
+ continue;
+ }
+ }
+ return false;
+ }
+
+ bool pty(ref ListT list)
+ {
+ if (!list.empty)
+ {
+ auto range = list[];
+
+ if (onStandardMsg(range.front))
+ {
+ list.removeAt(range);
+ return true;
+ }
+ if (range.front.convertsTo!(Throwable))
+ throw range.front.get!(Throwable);
+ else if (range.front.convertsTo!(shared(Throwable)))
+ throw range.front.get!(shared(Throwable));
+ else
+ throw new PriorityMessageException(range.front.data);
+ }
+ return false;
+ }
+
+ static if (timedWait)
+ {
+ import core.time : MonoTime;
+ auto limit = MonoTime.currTime + period;
+ }
+
+ while (true)
+ {
+ ListT arrived;
+
+ if (pty(m_localPty) || scan(m_localBox))
+ {
+ return true;
+ }
+ yield();
+ synchronized (m_lock)
+ {
+ updateMsgCount();
+ while (m_sharedPty.empty && m_sharedBox.empty)
+ {
+ // NOTE: We're notifying all waiters here instead of just
+ // a few because the onCrowding behavior may have
+ // changed and we don't want to block sender threads
+ // unnecessarily if the new behavior is not to block.
+ // This will admittedly result in spurious wakeups
+ // in other situations, but what can you do?
+ if (m_putQueue && !mboxFull())
+ m_notFull.notifyAll();
+ static if (timedWait)
+ {
+ if (period <= Duration.zero || !m_putMsg.wait(period))
+ return false;
+ }
+ else
+ {
+ m_putMsg.wait();
+ }
+ }
+ m_localPty.put(m_sharedPty);
+ arrived.put(m_sharedBox);
+ }
+ if (m_localPty.empty)
+ {
+ scope (exit) m_localBox.put(arrived);
+ if (scan(arrived))
+ {
+ return true;
+ }
+ else
+ {
+ static if (timedWait)
+ {
+ period = limit - MonoTime.currTime;
+ }
+ continue;
+ }
+ }
+ m_localBox.put(arrived);
+ pty(m_localPty);
+ return true;
+ }
+ }
+
+ /*
+ * Called on thread termination. This routine processes any remaining
+ * control messages, clears out message queues, and sets a flag to
+ * reject any future messages.
+ */
+ final void close()
+ {
+ static void onLinkDeadMsg(ref Message msg)
+ {
+ assert(msg.convertsTo!(Tid));
+ auto tid = msg.get!(Tid);
+
+ thisInfo.links.remove(tid);
+ if (tid == thisInfo.owner)
+ thisInfo.owner = Tid.init;
+ }
+
+ static void sweep(ref ListT list)
+ {
+ for (auto range = list[]; !range.empty; range.popFront())
+ {
+ if (range.front.type == MsgType.linkDead)
+ onLinkDeadMsg(range.front);
+ }
+ }
+
+ ListT arrived;
+
+ sweep(m_localBox);
+ synchronized (m_lock)
+ {
+ arrived.put(m_sharedBox);
+ m_closed = true;
+ }
+ m_localBox.clear();
+ sweep(arrived);
+ }
+
+ private:
+ // Routines involving local data only, no lock needed.
+
+ bool mboxFull() @safe @nogc pure nothrow
+ {
+ return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
+ }
+
+ void updateMsgCount() @safe @nogc pure nothrow
+ {
+ m_localMsgs = m_localBox.length;
+ }
+
+ bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
+ {
+ return msg.type != MsgType.standard && msg.type != MsgType.priority;
+ }
+
+ bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
+ {
+ return msg.type == MsgType.priority;
+ }
+
+ bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
+ {
+ return msg.type == MsgType.linkDead;
+ }
+
+ alias OnMaxFn = bool function(Tid);
+ alias ListT = List!(Message);
+
+ ListT m_localBox;
+ ListT m_localPty;
+
+ Mutex m_lock;
+ Condition m_putMsg;
+ Condition m_notFull;
+ size_t m_putQueue;
+ ListT m_sharedBox;
+ ListT m_sharedPty;
+ OnMaxFn m_onMaxMsgs;
+ size_t m_localMsgs;
+ size_t m_maxMsgs;
+ bool m_closed;
+ }
+
+ /*
+ *
+ */
+ struct List(T)
+ {
+ struct Range
+ {
+ import std.exception : enforce;
+
+ @property bool empty() const
+ {
+ return !m_prev.next;
+ }
+
+ @property ref T front()
+ {
+ enforce(m_prev.next, "invalid list node");
+ return m_prev.next.val;
+ }
+
+ @property void front(T val)
+ {
+ enforce(m_prev.next, "invalid list node");
+ m_prev.next.val = val;
+ }
+
+ void popFront()
+ {
+ enforce(m_prev.next, "invalid list node");
+ m_prev = m_prev.next;
+ }
+
+ private this(Node* p)
+ {
+ m_prev = p;
+ }
+
+ private Node* m_prev;
+ }
+
+ void put(T val)
+ {
+ put(newNode(val));
+ }
+
+ void put(ref List!(T) rhs)
+ {
+ if (!rhs.empty)
+ {
+ put(rhs.m_first);
+ while (m_last.next !is null)
+ {
+ m_last = m_last.next;
+ m_count++;
+ }
+ rhs.m_first = null;
+ rhs.m_last = null;
+ rhs.m_count = 0;
+ }
+ }
+
+ Range opSlice()
+ {
+ return Range(cast(Node*)&m_first);
+ }
+
+ void removeAt(Range r)
+ {
+ import std.exception : enforce;
+
+ assert(m_count);
+ Node* n = r.m_prev;
+ enforce(n && n.next, "attempting to remove invalid list node");
+
+ if (m_last is m_first)
+ m_last = null;
+ else if (m_last is n.next)
+ m_last = n; // nocoverage
+ Node* to_free = n.next;
+ n.next = n.next.next;
+ freeNode(to_free);
+ m_count--;
+ }
+
+ @property size_t length()
+ {
+ return m_count;
+ }
+
+ void clear()
+ {
+ m_first = m_last = null;
+ m_count = 0;
+ }
+
+ @property bool empty()
+ {
+ return m_first is null;
+ }
+
+ private:
+ struct Node
+ {
+ Node* next;
+ T val;
+
+ this(T v)
+ {
+ val = v;
+ }
+ }
+
+ static shared struct SpinLock
+ {
+ void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
+ void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
+ bool locked;
+ }
+
+ static shared SpinLock sm_lock;
+ static shared Node* sm_head;
+
+ Node* newNode(T v)
+ {
+ Node* n;
+ {
+ sm_lock.lock();
+ scope (exit) sm_lock.unlock();
+
+ if (sm_head)
+ {
+ n = cast(Node*) sm_head;
+ sm_head = sm_head.next;
+ }
+ }
+ if (n)
+ {
+ import std.conv : emplace;
+ emplace!Node(n, v);
+ }
+ else
+ {
+ n = new Node(v);
+ }
+ return n;
+ }
+
+ void freeNode(Node* n)
+ {
+ // destroy val to free any owned GC memory
+ destroy(n.val);
+
+ sm_lock.lock();
+ scope (exit) sm_lock.unlock();
+
+ auto sn = cast(shared(Node)*) n;
+ sn.next = sm_head;
+ sm_head = sn;
+ }
+
+ void put(Node* n)
+ {
+ m_count++;
+ if (!empty)
+ {
+ m_last.next = n;
+ m_last = n;
+ return;
+ }
+ m_first = n;
+ m_last = n;
+ }
+
+ Node* m_first;
+ Node* m_last;
+ size_t m_count;
+ }
+}
+
+version (unittest)
+{
+ import std.stdio;
+ import std.typecons : tuple, Tuple;
+
+ void testfn(Tid tid)
+ {
+ receive((float val) { assert(0); }, (int val, int val2) {
+ assert(val == 42 && val2 == 86);
+ });
+ receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
+ receive((Variant val) { });
+ receive((string val) {
+ if ("the quick brown fox" != val)
+ return false;
+ return true;
+ }, (string val) { assert(false); });
+ prioritySend(tid, "done");
+ }
+
+ void runTest(Tid tid)
+ {
+ send(tid, 42, 86);
+ send(tid, tuple(42, 86));
+ send(tid, "hello", "there");
+ send(tid, "the quick brown fox");
+ receive((string val) { assert(val == "done"); });
+ }
+
+ void simpleTest()
+ {
+ auto tid = spawn(&testfn, thisTid);
+ runTest(tid);
+
+ // Run the test again with a limited mailbox size.
+ tid = spawn(&testfn, thisTid);
+ setMaxMailboxSize(tid, 2, OnCrowding.block);
+ runTest(tid);
+ }
+
+ @system unittest
+ {
+ simpleTest();
+ }
+
+ @system unittest
+ {
+ scheduler = new ThreadScheduler;
+ simpleTest();
+ scheduler = null;
+ }
+}
+
+private @property Mutex initOnceLock()
+{
+ __gshared Mutex lock;
+ if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock))
+ return mtx;
+ auto mtx = new Mutex;
+ if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx))
+ return mtx;
+ return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock);
+}
+
+/**
+ * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
+ * thread-safe manner.
+ *
+ * The implementation guarantees that all threads simultaneously calling
+ * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
+ * fully initialized. All side-effects of $(D_PARAM init) are globally visible
+ * afterwards.
+ *
+ * Params:
+ * var = The variable to initialize
+ * init = The lazy initializer value
+ *
+ * Returns:
+ * A reference to the initialized variable
+ */
+auto ref initOnce(alias var)(lazy typeof(var) init)
+{
+ return initOnce!var(init, initOnceLock);
+}
+
+/// A typical use-case is to perform lazy but thread-safe initialization.
+@system unittest
+{
+ static class MySingleton
+ {
+ static MySingleton instance()
+ {
+ static __gshared MySingleton inst;
+ return initOnce!inst(new MySingleton);
+ }
+ }
+
+ assert(MySingleton.instance !is null);
+}
+
+@system unittest
+{
+ static class MySingleton
+ {
+ static MySingleton instance()
+ {
+ static __gshared MySingleton inst;
+ return initOnce!inst(new MySingleton);
+ }
+
+ private:
+ this() { val = ++cnt; }
+ size_t val;
+ static __gshared size_t cnt;
+ }
+
+ foreach (_; 0 .. 10)
+ spawn({ ownerTid.send(MySingleton.instance.val); });
+ foreach (_; 0 .. 10)
+ assert(receiveOnly!size_t == MySingleton.instance.val);
+ assert(MySingleton.cnt == 1);
+}
+
+/**
+ * Same as above, but takes a separate mutex instead of sharing one among
+ * all initOnce instances.
+ *
+ * This should be used to avoid dead-locks when the $(D_PARAM init)
+ * expression waits for the result of another thread that might also
+ * call initOnce. Use with care.
+ *
+ * Params:
+ * var = The variable to initialize
+ * init = The lazy initializer value
+ * mutex = A mutex to prevent race conditions
+ *
+ * Returns:
+ * A reference to the initialized variable
+ */
+auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
+{
+ // check that var is global, can't take address of a TLS variable
+ static assert(is(typeof({ __gshared p = &var; })),
+ "var must be 'static shared' or '__gshared'.");
+ import core.atomic : atomicLoad, MemoryOrder, atomicStore;
+
+ static shared bool flag;
+ if (!atomicLoad!(MemoryOrder.acq)(flag))
+ {
+ synchronized (mutex)
+ {
+ if (!atomicLoad!(MemoryOrder.acq)(flag))
+ {
+ var = init;
+ atomicStore!(MemoryOrder.rel)(flag, true);
+ }
+ }
+ }
+ return var;
+}
+
+/// Use a separate mutex when init blocks on another thread that might also call initOnce.
+@system unittest
+{
+ import core.sync.mutex : Mutex;
+
+ static shared bool varA, varB;
+ __gshared Mutex m;
+ m = new Mutex;
+
+ spawn({
+ // use a different mutex for varB to avoid a dead-lock
+ initOnce!varB(true, m);
+ ownerTid.send(true);
+ });
+ // init depends on the result of the spawned thread
+ initOnce!varA(receiveOnly!bool);
+ assert(varA == true);
+ assert(varB == true);
+}
+
+@system unittest
+{
+ static shared bool a;
+ __gshared bool b;
+ static bool c;
+ bool d;
+ initOnce!a(true);
+ initOnce!b(true);
+ static assert(!__traits(compiles, initOnce!c(true))); // TLS
+ static assert(!__traits(compiles, initOnce!d(true))); // local variable
+}