aboutsummaryrefslogtreecommitdiff
path: root/libphobos/src/std/concurrency.d
diff options
context:
space:
mode:
Diffstat (limited to 'libphobos/src/std/concurrency.d')
-rw-r--r--libphobos/src/std/concurrency.d695
1 files changed, 495 insertions, 200 deletions
diff --git a/libphobos/src/std/concurrency.d b/libphobos/src/std/concurrency.d
index cf77911..d101ce4 100644
--- a/libphobos/src/std/concurrency.d
+++ b/libphobos/src/std/concurrency.d
@@ -1,4 +1,49 @@
/**
+ * $(SCRIPT inhibitQuickIndex = 1;)
+ * $(DIVC quickindex,
+ * $(BOOKTABLE,
+ * $(TR $(TH Category) $(TH Symbols))
+ * $(TR $(TD Tid) $(TD
+ * $(MYREF locate)
+ * $(MYREF ownerTid)
+ * $(MYREF register)
+ * $(MYREF spawn)
+ * $(MYREF spawnLinked)
+ * $(MYREF thisTid)
+ * $(MYREF Tid)
+ * $(MYREF TidMissingException)
+ * $(MYREF unregister)
+ * ))
+ * $(TR $(TD Message passing) $(TD
+ * $(MYREF prioritySend)
+ * $(MYREF receive)
+ * $(MYREF receiveOnly)
+ * $(MYREF receiveTimeout)
+ * $(MYREF send)
+ * $(MYREF setMaxMailboxSize)
+ * ))
+ * $(TR $(TD Message-related types) $(TD
+ * $(MYREF LinkTerminated)
+ * $(MYREF MailboxFull)
+ * $(MYREF MessageMismatch)
+ * $(MYREF OnCrowding)
+ * $(MYREF OwnerTerminated)
+ * $(MYREF PriorityMessageException)
+ * ))
+ * $(TR $(TD Scheduler) $(TD
+ * $(MYREF FiberScheduler)
+ * $(MYREF Generator)
+ * $(MYREF Scheduler)
+ * $(MYREF scheduler)
+ * $(MYREF ThreadInfo)
+ * $(MYREF ThreadScheduler)
+ * $(MYREF yield)
+ * ))
+ * $(TR $(TD Misc) $(TD
+ * $(MYREF initOnce)
+ * ))
+ * ))
+ *
* This is a low-level messaging API upon which more structured or restrictive
* APIs may be built. The general idea is that every messageable entity is
* represented by a common handle type called a Tid, which allows messages to
@@ -22,7 +67,7 @@
* Copyright: Copyright Sean Kelly 2009 - 2014.
* License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
* Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak
- * Source: $(PHOBOSSRC std/_concurrency.d)
+ * Source: $(PHOBOSSRC std/concurrency.d)
*/
/* Copyright Sean Kelly 2009 - 2014.
* Distributed under the Boost Software License, Version 1.0.
@@ -72,13 +117,38 @@ import std.traits;
private
{
- template hasLocalAliasing(T...)
+ bool hasLocalAliasing(Types...)()
{
- static if (!T.length)
- enum hasLocalAliasing = false;
- else
- enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) ||
- std.concurrency.hasLocalAliasing!(T[1 .. $]);
+ import std.typecons : Rebindable;
+
+ // Works around "statement is not reachable"
+ bool doesIt = false;
+ static foreach (T; Types)
+ {
+ static if (is(T == Tid))
+ { /* Allowed */ }
+ else static if (is(T : Rebindable!R, R))
+ doesIt |= hasLocalAliasing!R;
+ else static if (is(T == struct))
+ doesIt |= hasLocalAliasing!(typeof(T.tupleof));
+ else
+ doesIt |= std.traits.hasUnsharedAliasing!(T);
+ }
+ return doesIt;
+ }
+
+ @safe unittest
+ {
+ static struct Container { Tid t; }
+ static assert(!hasLocalAliasing!(Tid, Container, int));
+ }
+
+ // https://issues.dlang.org/show_bug.cgi?id=20097
+ @safe unittest
+ {
+ import std.datetime.systime : SysTime;
+ static struct Container { SysTime time; }
+ static assert(!hasLocalAliasing!(SysTime, Container));
}
enum MsgType
@@ -159,9 +229,12 @@ private
void checkops(T...)(T ops)
{
+ import std.format : format;
+
foreach (i, t1; T)
{
- static assert(isFunctionPointer!t1 || isDelegate!t1);
+ static assert(isFunctionPointer!t1 || isDelegate!t1,
+ format!"T %d is not a function pointer or delegates"(i));
alias a1 = Parameters!(t1);
alias r1 = ReturnType!(t1);
@@ -173,7 +246,6 @@ private
foreach (t2; T[i + 1 .. $])
{
- static assert(isFunctionPointer!t2 || isDelegate!t2);
alias a2 = Parameters!(t2);
static assert(!is(a1 == a2),
@@ -199,7 +271,7 @@ static ~this()
// Exceptions
/**
- * Thrown on calls to $(D receiveOnly) if a message other than the type
+ * Thrown on calls to `receiveOnly` if a message other than the type
* the receiving thread expected is sent.
*/
class MessageMismatch : Exception
@@ -212,7 +284,7 @@ class MessageMismatch : Exception
}
/**
- * Thrown on calls to $(D receive) if the thread that spawned the receiving
+ * Thrown on calls to `receive` if the thread that spawned the receiving
* thread has terminated and no more messages exist.
*/
class OwnerTerminated : Exception
@@ -264,7 +336,7 @@ class PriorityMessageException : Exception
/**
* Thrown on mailbox crowding if the mailbox is configured with
- * $(D OnCrowding.throwException).
+ * `OnCrowding.throwException`.
*/
class MailboxFull : Exception
{
@@ -279,7 +351,7 @@ class MailboxFull : Exception
}
/**
- * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't
+ * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
* find an owner thread.
*/
class TidMissingException : Exception
@@ -315,17 +387,17 @@ public:
* that a Tid executed in the future will have the same toString() output
* as another Tid that has already terminated.
*/
- void toString(scope void delegate(const(char)[]) sink)
+ void toString(W)(ref W w) const
{
- import std.format : formattedWrite;
- formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
+ import std.format.write : formattedWrite;
+ auto p = () @trusted { return cast(void*) mbox; }();
+ formattedWrite(w, "Tid(%x)", p);
}
}
-@system unittest
+@safe unittest
{
- // text!Tid is @system
import std.conv : text;
Tid tid;
assert(text(tid) == "Tid(0)");
@@ -335,6 +407,15 @@ public:
assert(text(tid2) == text(tid3));
}
+// https://issues.dlang.org/show_bug.cgi?id=21512
+@system unittest
+{
+ import std.format : format;
+
+ const(Tid) b = spawn(() {});
+ assert(format!"%s"(b)[0 .. 4] == "Tid(");
+}
+
/**
* Returns: The $(LREF Tid) of the caller's thread.
*/
@@ -355,7 +436,7 @@ public:
/**
* Return the Tid of the thread which spawned the caller's thread.
*
- * Throws: A $(D TidMissingException) exception if
+ * Throws: A `TidMissingException` exception if
* there is no owner thread.
*/
@property Tid ownerTid()
@@ -412,10 +493,10 @@ private template isSpawnable(F, T...)
* Starts fn(args) in a new logical thread.
*
* Executes the supplied function in a new logical thread represented by
- * $(D Tid). The calling thread is designated as the owner of the new thread.
- * When the owner thread terminates an $(D OwnerTerminated) message will be
- * sent to the new thread, causing an $(D OwnerTerminated) exception to be
- * thrown on $(D receive()).
+ * `Tid`. The calling thread is designated as the owner of the new thread.
+ * When the owner thread terminates an `OwnerTerminated` message will be
+ * sent to the new thread, causing an `OwnerTerminated` exception to be
+ * thrown on `receive()`.
*
* Params:
* fn = The function to execute.
@@ -425,46 +506,69 @@ private template isSpawnable(F, T...)
* A Tid representing the new logical thread.
*
* Notes:
- * $(D args) must not have unshared aliasing. In other words, all arguments
- * to $(D fn) must either be $(D shared) or $(D immutable) or have no
+ * `args` must not have unshared aliasing. In other words, all arguments
+ * to `fn` must either be `shared` or `immutable` or have no
* pointer indirection. This is necessary for enforcing isolation among
* threads.
*
- * Example:
- * ---
- * import std.stdio, std.concurrency;
- *
- * void f1(string str)
- * {
- * writeln(str);
- * }
- *
- * void f2(char[] str)
- * {
- * writeln(str);
- * }
- *
- * void main()
- * {
- * auto str = "Hello, world";
- *
- * // Works: string is immutable.
- * auto tid1 = spawn(&f1, str);
- *
- * // Fails: char[] has mutable aliasing.
- * auto tid2 = spawn(&f2, str.dup);
- *
- * // New thread with anonymous function
- * spawn({ writeln("This is so great!"); });
- * }
- * ---
- */
-Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
+ * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
+ * `fn` must be either `shared` or `immutable`. */
+Tid spawn(F, T...)(F fn, T args)
+if (isSpawnable!(F, T))
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
return _spawn(false, fn, args);
}
+///
+@system unittest
+{
+ static void f(string msg)
+ {
+ assert(msg == "Hello World");
+ }
+
+ auto tid = spawn(&f, "Hello World");
+}
+
+/// Fails: char[] has mutable aliasing.
+@system unittest
+{
+ string msg = "Hello, World!";
+
+ static void f1(string msg) {}
+ static assert(!__traits(compiles, spawn(&f1, msg.dup)));
+ static assert( __traits(compiles, spawn(&f1, msg.idup)));
+
+ static void f2(char[] msg) {}
+ static assert(!__traits(compiles, spawn(&f2, msg.dup)));
+ static assert(!__traits(compiles, spawn(&f2, msg.idup)));
+}
+
+/// New thread with anonymous function
+@system unittest
+{
+ spawn({
+ ownerTid.send("This is so great!");
+ });
+ assert(receiveOnly!string == "This is so great!");
+}
+
+@system unittest
+{
+ import core.thread : thread_joinAll;
+
+ __gshared string receivedMessage;
+ static void f1(string msg)
+ {
+ receivedMessage = msg;
+ }
+
+ auto tid1 = spawn(&f1, "Hello World");
+ thread_joinAll;
+ assert(receivedMessage == "Hello World");
+}
+
/**
* Starts fn(args) in a logical thread and will receive a LinkTerminated
* message when the operation terminates.
@@ -484,7 +588,8 @@ Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
* Returns:
* A Tid representing the new thread.
*/
-Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
+Tid spawnLinked(F, T...)(F fn, T args)
+if (isSpawnable!(F, T))
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
return _spawn(true, fn, args);
@@ -493,7 +598,8 @@ Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
/*
*
*/
-private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
+private Tid _spawn(F, T...)(bool linked, F fn, T args)
+if (isSpawnable!(F, T))
{
// TODO: MessageList and &exec should be shared.
auto spawnTid = Tid(new MessageBox);
@@ -568,9 +674,10 @@ private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
* Places the values as a message at the back of tid's message queue.
*
* Sends the supplied value to the thread represented by tid. As with
- * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing.
+ * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
*/
void send(T...)(Tid tid, T vals)
+in (tid.mbox !is null)
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
_send(tid, vals);
@@ -579,11 +686,12 @@ void send(T...)(Tid tid, T vals)
/**
* Places the values as a message on the front of tid's message queue.
*
- * Send a message to $(D tid) but place it at the front of $(D tid)'s message
+ * Send a message to `tid` but place it at the front of `tid`'s message
* queue instead of at the back. This function is typically used for
* out-of-band communication, to signal exceptional conditions, etc.
*/
void prioritySend(T...)(Tid tid, T vals)
+in (tid.mbox !is null)
{
static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
_send(MsgType.priority, tid, vals);
@@ -593,6 +701,7 @@ void prioritySend(T...)(Tid tid, T vals)
* ditto
*/
private void _send(T...)(Tid tid, T vals)
+in (tid.mbox !is null)
{
_send(MsgType.standard, tid, vals);
}
@@ -602,6 +711,7 @@ private void _send(T...)(Tid tid, T vals)
* both Tid.send() and .send().
*/
private void _send(T...)(MsgType type, Tid tid, T vals)
+in (tid.mbox !is null)
{
auto msg = Message(type, vals);
tid.mbox.put(msg);
@@ -615,32 +725,16 @@ private void _send(T...)(MsgType type, Tid tid, T vals)
* a message against a set of delegates and executing the first match found.
*
* If a delegate that accepts a $(REF Variant, std,variant) is included as
- * the last argument to $(D receive), it will match any message that was not
+ * the last argument to `receive`, it will match any message that was not
* matched by an earlier delegate. If more than one argument is sent,
- * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values
+ * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
* sent.
*
- * Example:
- * ---
- * import std.stdio;
- * import std.variant;
- * import std.concurrency;
- *
- * void spawnedFunction()
- * {
- * receive(
- * (int i) { writeln("Received an int."); },
- * (float f) { writeln("Received a float."); },
- * (Variant v) { writeln("Received some other type."); }
- * );
- * }
+ * Params:
+ * ops = Variadic list of function pointers and delegates. Entries
+ * in this list must not occlude later entries.
*
- * void main()
- * {
- * auto tid = spawn(&spawnedFunction);
- * send(tid, 42);
- * }
- * ---
+ * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
*/
void receive(T...)( T ops )
in
@@ -649,13 +743,45 @@ in
"Cannot receive a message until a thread was spawned "
~ "or thisTid was passed to a running thread.");
}
-body
+do
{
checkops( ops );
thisInfo.ident.mbox.get( ops );
}
+///
+@system unittest
+{
+ import std.variant : Variant;
+
+ auto process = ()
+ {
+ receive(
+ (int i) { ownerTid.send(1); },
+ (double f) { ownerTid.send(2); },
+ (Variant v) { ownerTid.send(3); }
+ );
+ };
+
+ {
+ auto tid = spawn(process);
+ send(tid, 42);
+ assert(receiveOnly!int == 1);
+ }
+
+ {
+ auto tid = spawn(process);
+ send(tid, 3.14);
+ assert(receiveOnly!int == 2);
+ }
+
+ {
+ auto tid = spawn(process);
+ send(tid, "something else");
+ assert(receiveOnly!int == 3);
+ }
+}
@safe unittest
{
@@ -677,7 +803,7 @@ body
}
// Make sure receive() works with free functions as well.
-version (unittest)
+version (StdUnittest)
{
private void receiveFunction(int x) {}
}
@@ -705,31 +831,17 @@ private template receiveOnlyRet(T...)
}
/**
- * Receives only messages with arguments of types $(D T).
+ * Receives only messages with arguments of the specified types.
*
- * Throws: $(D MessageMismatch) if a message of types other than $(D T)
- * is received.
+ * Params:
+ * T = Variadic list of types to be received.
*
- * Returns: The received message. If $(D T.length) is greater than one,
+ * Returns: The received message. If `T` has more than one entry,
* the message will be packed into a $(REF Tuple, std,typecons).
*
- * Example:
- * ---
- * import std.concurrency;
- *
- * void spawnedFunc()
- * {
- * auto msg = receiveOnly!(int, string)();
- * assert(msg[0] == 42);
- * assert(msg[1] == "42");
- * }
- *
- * void main()
- * {
- * auto tid = spawn(&spawnedFunc);
- * send(tid, 42, "42");
- * }
- * ---
+ * Throws: $(LREF MessageMismatch) if a message of types other than `T`
+ * is received,
+ * $(LREF OwnerTerminated) when the sending thread was terminated.
*/
receiveOnlyRet!(T) receiveOnly(T...)()
in
@@ -737,16 +849,27 @@ in
assert(thisInfo.ident.mbox !is null,
"Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
}
-body
+do
{
import std.format : format;
+ import std.meta : allSatisfy;
import std.typecons : Tuple;
Tuple!(T) ret;
thisInfo.ident.mbox.get((T val) {
static if (T.length)
- ret.field = val;
+ {
+ static if (allSatisfy!(isAssignable, T))
+ {
+ ret.field = val;
+ }
+ else
+ {
+ import core.lifetime : emplace;
+ emplace(&ret, val);
+ }
+ }
},
(LinkTerminated e) { throw e; },
(OwnerTerminated e) { throw e; },
@@ -765,6 +888,42 @@ body
return ret;
}
+///
+@system unittest
+{
+ auto tid = spawn(
+ {
+ assert(receiveOnly!int == 42);
+ });
+ send(tid, 42);
+}
+
+///
+@system unittest
+{
+ auto tid = spawn(
+ {
+ assert(receiveOnly!string == "text");
+ });
+ send(tid, "text");
+}
+
+///
+@system unittest
+{
+ struct Record { string name; int age; }
+
+ auto tid = spawn(
+ {
+ auto msg = receiveOnly!(double, Record);
+ assert(msg[0] == 0.5);
+ assert(msg[1].name == "Alice");
+ assert(msg[1].age == 31);
+ });
+
+ send(tid, 0.5, Record("Alice", 31));
+}
+
@system unittest
{
static void t1(Tid mainTid)
@@ -786,14 +945,37 @@ body
assert(result == "Unexpected message type: expected 'string', got 'int'");
}
+// https://issues.dlang.org/show_bug.cgi?id=21663
+@safe unittest
+{
+ alias test = receiveOnly!(string, bool, bool);
+}
+
/**
- * Tries to receive but will give up if no matches arrive within duration.
- * Won't wait at all if provided $(REF Duration, core,time) is negative.
+ * Receives a message from another thread and gives up if no match
+ * arrives within a specified duration.
+ *
+ * Receive a message from another thread, or block until `duration` exceeds,
+ * if no messages of the specified types are available. This function works
+ * by pattern matching a message against a set of delegates and executing
+ * the first match found.
+ *
+ * If a delegate that accepts a $(REF Variant, std,variant) is included as
+ * the last argument, it will match any message that was not
+ * matched by an earlier delegate. If more than one argument is sent,
+ * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
+ * sent.
+ *
+ * Params:
+ * duration = Duration, how long to wait. If `duration` is negative,
+ * won't wait at all.
+ * ops = Variadic list of function pointers and delegates. Entries
+ * in this list must not occlude later entries.
*
- * Same as $(D receive) except that rather than wait forever for a message,
- * it waits until either it receives a message or the given
- * $(REF Duration, core,time) has passed. It returns $(D true) if it received a
- * message and $(D false) if it timed out waiting for one.
+ * Returns: `true` if it received a message and `false` if it timed out waiting
+ * for one.
+ *
+ * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
*/
bool receiveTimeout(T...)(Duration duration, T ops)
in
@@ -801,7 +983,7 @@ in
assert(thisInfo.ident.mbox !is null,
"Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
}
-body
+do
{
checkops(ops);
@@ -873,6 +1055,7 @@ private
* mailbox.
*/
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
+in (tid.mbox !is null)
{
final switch (doThis)
{
@@ -899,6 +1082,7 @@ void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
* mailbox.
*/
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
+in (tid.mbox !is null)
{
tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
}
@@ -916,18 +1100,17 @@ private @property Mutex registryLock()
return impl;
}
-private void unregisterMe()
+private void unregisterMe(ref ThreadInfo me)
{
- auto me = thisInfo.ident;
- if (thisInfo.ident != Tid.init)
+ if (me.ident != Tid.init)
{
synchronized (registryLock)
{
- if (auto allNames = me in namesByTid)
+ if (auto allNames = me.ident in namesByTid)
{
foreach (name; *allNames)
tidByName.remove(name);
- namesByTid.remove(me);
+ namesByTid.remove(me.ident);
}
}
}
@@ -949,6 +1132,7 @@ private void unregisterMe()
* defunct thread.
*/
bool register(string name, Tid tid)
+in (tid.mbox !is null)
{
synchronized (registryLock)
{
@@ -1050,7 +1234,18 @@ struct ThreadInfo
_send(MsgType.linkDead, tid, ident);
if (owner != Tid.init)
_send(MsgType.linkDead, owner, ident);
- unregisterMe(); // clean up registry entries
+ unregisterMe(this); // clean up registry entries
+ }
+
+ // https://issues.dlang.org/show_bug.cgi?id=20160
+ @system unittest
+ {
+ register("main_thread", thisTid());
+
+ ThreadInfo t;
+ t.cleanup();
+
+ assert(locate("main_thread") == thisTid());
}
}
@@ -1270,13 +1465,45 @@ class FiberScheduler : Scheduler
/**
* Returns a Condition analog that yields when wait or notify is called.
+ *
+ * Bug:
+ * For the default implementation, `notifyAll`will behave like `notify`.
+ *
+ * Params:
+ * m = A `Mutex` to use for locking if the condition needs to be waited on
+ * or notified from multiple `Thread`s.
+ * If `null`, no `Mutex` will be used and it is assumed that the
+ * `Condition` is only waited on/notified from one `Thread`.
*/
Condition newCondition(Mutex m) nothrow
{
return new FiberCondition(m);
}
-private:
+protected:
+ /**
+ * Creates a new Fiber which calls the given delegate.
+ *
+ * Params:
+ * op = The delegate the fiber should call
+ */
+ void create(void delegate() op) nothrow
+ {
+ void wrap()
+ {
+ scope (exit)
+ {
+ thisInfo.cleanup();
+ }
+ op();
+ }
+
+ m_fibers ~= new InfoFiber(&wrap);
+ }
+
+ /**
+ * Fiber which embeds a ThreadInfo
+ */
static class InfoFiber : Fiber
{
ThreadInfo info;
@@ -1285,8 +1512,14 @@ private:
{
super(op);
}
+
+ this(void delegate() op, size_t sz) nothrow
+ {
+ super(op, sz);
+ }
}
+private:
class FiberCondition : Condition
{
this(Mutex m) nothrow
@@ -1313,7 +1546,7 @@ private:
!notified && !period.isNegative;
period = limit - MonoTime.currTime)
{
- yield();
+ this.outer.yield();
}
return notified;
}
@@ -1333,9 +1566,11 @@ private:
private:
void switchContext() nothrow
{
- mutex_nothrow.unlock_nothrow();
- scope (exit) mutex_nothrow.lock_nothrow();
- yield();
+ if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
+ scope (exit)
+ if (mutex_nothrow)
+ mutex_nothrow.lock_nothrow();
+ this.outer.yield();
}
private bool notified;
@@ -1365,20 +1600,6 @@ private:
}
}
- void create(void delegate() op) nothrow
- {
- void wrap()
- {
- scope (exit)
- {
- thisInfo.cleanup();
- }
- op();
- }
-
- m_fibers ~= new InfoFiber(&wrap);
- }
-
private:
Fiber[] m_fibers;
size_t m_pos;
@@ -1464,35 +1685,6 @@ private interface IsGenerator {}
/**
* A Generator is a Fiber that periodically returns values of type T to the
* caller via yield. This is represented as an InputRange.
- *
- * Example:
- * ---
- * import std.concurrency;
- * import std.stdio;
- *
- *
- * void main()
- * {
- * auto tid = spawn(
- * {
- * while (true)
- * {
- * writeln(receiveOnly!int());
- * }
- * });
- *
- * auto r = new Generator!int(
- * {
- * foreach (i; 1 .. 10)
- * yield(i);
- * });
- *
- * foreach (e; r)
- * {
- * tid.send(e);
- * }
- * }
- * ---
*/
class Generator(T) :
Fiber, IsGenerator, InputRange!T
@@ -1533,6 +1725,27 @@ class Generator(T) :
}
/**
+ * Initializes a generator object which is associated with a static
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * fn = The fiber function.
+ * sz = The stack size for this fiber.
+ * guardPageSize = size of the guard page to trap fiber's stack
+ * overflows. Refer to $(REF Fiber, core,thread)'s
+ * documentation for more details.
+ *
+ * In:
+ * fn must not be null.
+ */
+ this(void function() fn, size_t sz, size_t guardPageSize)
+ {
+ super(fn, sz, guardPageSize);
+ call();
+ }
+
+ /**
* Initializes a generator object which is associated with a dynamic
* D function. The function will be called once to prepare the range
* for iteration.
@@ -1568,6 +1781,27 @@ class Generator(T) :
}
/**
+ * Initializes a generator object which is associated with a dynamic
+ * D function. The function will be called once to prepare the range
+ * for iteration.
+ *
+ * Params:
+ * dg = The fiber function.
+ * sz = The stack size for this fiber.
+ * guardPageSize = size of the guard page to trap fiber's stack
+ * overflows. Refer to $(REF Fiber, core,thread)'s
+ * documentation for more details.
+ *
+ * In:
+ * dg must not be null.
+ */
+ this(void delegate() dg, size_t sz, size_t guardPageSize)
+ {
+ super(dg, sz, guardPageSize);
+ call();
+ }
+
+ /**
* Returns true if the generator is empty.
*/
final bool empty() @property
@@ -1634,6 +1868,28 @@ private:
T* m_value;
}
+///
+@system unittest
+{
+ auto tid = spawn({
+ int i;
+ while (i < 9)
+ i = receiveOnly!int;
+
+ ownerTid.send(i * 2);
+ });
+
+ auto r = new Generator!int({
+ foreach (i; 1 .. 10)
+ yield(i);
+ });
+
+ foreach (e; r)
+ tid.send(e);
+
+ assert(receiveOnly!int == 18);
+}
+
/**
* Yields a value of type T to the caller of the currently executing
* generator.
@@ -1865,7 +2121,7 @@ private
{
import std.meta : AliasSeq;
- static assert(T.length);
+ static assert(T.length, "T must not be empty");
static if (isImplicitlyConvertible!(T[0], Duration))
{
@@ -1906,7 +2162,8 @@ private
bool onLinkDeadMsg(ref Message msg)
{
- assert(msg.convertsTo!(Tid));
+ assert(msg.convertsTo!(Tid),
+ "Message could be converted to Tid");
auto tid = msg.get!(Tid);
if (bool* pDepends = tid in thisInfo.links)
@@ -2083,7 +2340,8 @@ private
{
static void onLinkDeadMsg(ref Message msg)
{
- assert(msg.convertsTo!(Tid));
+ assert(msg.convertsTo!(Tid),
+ "Message could be converted to Tid");
auto tid = msg.get!(Tid);
thisInfo.links.remove(tid);
@@ -2228,7 +2486,7 @@ private
{
import std.exception : enforce;
- assert(m_count);
+ assert(m_count, "Can not remove from empty Range");
Node* n = r.m_prev;
enforce(n && n.next, "attempting to remove invalid list node");
@@ -2295,7 +2553,7 @@ private
}
if (n)
{
- import std.conv : emplace;
+ import core.lifetime : emplace;
emplace!Node(n, v);
}
else
@@ -2337,12 +2595,11 @@ private
}
}
-version (unittest)
+@system unittest
{
- import std.stdio;
import std.typecons : tuple, Tuple;
- void testfn(Tid tid)
+ static void testfn(Tid tid)
{
receive((float val) { assert(0); }, (int val, int val2) {
assert(val == 42 && val2 == 86);
@@ -2357,7 +2614,7 @@ version (unittest)
prioritySend(tid, "done");
}
- void runTest(Tid tid)
+ static void runTest(Tid tid)
{
send(tid, 42, 86);
send(tid, tuple(42, 86));
@@ -2366,7 +2623,7 @@ version (unittest)
receive((string val) { assert(val == "done"); });
}
- void simpleTest()
+ static void simpleTest()
{
auto tid = spawn(&testfn, thisTid);
runTest(tid);
@@ -2377,28 +2634,22 @@ version (unittest)
runTest(tid);
}
- @system unittest
- {
- simpleTest();
- }
+ simpleTest();
- @system unittest
- {
- scheduler = new ThreadScheduler;
- simpleTest();
- scheduler = null;
- }
+ scheduler = new ThreadScheduler;
+ simpleTest();
+ scheduler = null;
}
-private @property Mutex initOnceLock()
+private @property shared(Mutex) initOnceLock()
{
- __gshared Mutex lock;
- if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock))
+ static shared Mutex lock;
+ if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
return mtx;
- auto mtx = new Mutex;
- if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx))
+ auto mtx = new shared Mutex;
+ if (cas(&lock, cast(shared) null, mtx))
return mtx;
- return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock);
+ return atomicLoad!(MemoryOrder.acq)(lock);
}
/**
@@ -2429,7 +2680,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init)
{
static MySingleton instance()
{
- static __gshared MySingleton inst;
+ __gshared MySingleton inst;
return initOnce!inst(new MySingleton);
}
}
@@ -2443,14 +2694,14 @@ auto ref initOnce(alias var)(lazy typeof(var) init)
{
static MySingleton instance()
{
- static __gshared MySingleton inst;
+ __gshared MySingleton inst;
return initOnce!inst(new MySingleton);
}
private:
this() { val = ++cnt; }
size_t val;
- static __gshared size_t cnt;
+ __gshared size_t cnt;
}
foreach (_; 0 .. 10)
@@ -2476,7 +2727,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init)
* Returns:
* A reference to the initialized variable
*/
-auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
+auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
{
// check that var is global, can't take address of a TLS variable
static assert(is(typeof({ __gshared p = &var; })),
@@ -2488,7 +2739,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
{
synchronized (mutex)
{
- if (!atomicLoad!(MemoryOrder.acq)(flag))
+ if (!atomicLoad!(MemoryOrder.raw)(flag))
{
var = init;
atomicStore!(MemoryOrder.rel)(flag, true);
@@ -2498,14 +2749,20 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
return var;
}
+/// ditto
+auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
+{
+ return initOnce!var(init, cast(shared) mutex);
+}
+
/// Use a separate mutex when init blocks on another thread that might also call initOnce.
@system unittest
{
import core.sync.mutex : Mutex;
static shared bool varA, varB;
- __gshared Mutex m;
- m = new Mutex;
+ static shared Mutex m;
+ m = new shared Mutex;
spawn({
// use a different mutex for varB to avoid a dead-lock
@@ -2529,3 +2786,41 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
static assert(!__traits(compiles, initOnce!c(true))); // TLS
static assert(!__traits(compiles, initOnce!d(true))); // local variable
}
+
+// test ability to send shared arrays
+@system unittest
+{
+ static shared int[] x = new shared(int)[1];
+ auto tid = spawn({
+ auto arr = receiveOnly!(shared(int)[]);
+ arr[0] = 5;
+ ownerTid.send(true);
+ });
+ tid.send(x);
+ receiveOnly!(bool);
+ assert(x[0] == 5);
+}
+
+// https://issues.dlang.org/show_bug.cgi?id=13930
+@system unittest
+{
+ immutable aa = ["0":0];
+ thisTid.send(aa);
+ receiveOnly!(immutable int[string]); // compile error
+}
+
+// https://issues.dlang.org/show_bug.cgi?id=19345
+@system unittest
+{
+ static struct Aggregate { const int a; const int[5] b; }
+ static void t1(Tid mainTid)
+ {
+ const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
+ mainTid.send(sendMe);
+ }
+
+ spawn(&t1, thisTid);
+ auto result1 = receiveOnly!(const Aggregate)();
+ immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
+ assert(result1 == expected);
+}