diff options
Diffstat (limited to 'libphobos/src/std/concurrency.d')
-rw-r--r-- | libphobos/src/std/concurrency.d | 695 |
1 files changed, 495 insertions, 200 deletions
diff --git a/libphobos/src/std/concurrency.d b/libphobos/src/std/concurrency.d index cf77911..d101ce4 100644 --- a/libphobos/src/std/concurrency.d +++ b/libphobos/src/std/concurrency.d @@ -1,4 +1,49 @@ /** + * $(SCRIPT inhibitQuickIndex = 1;) + * $(DIVC quickindex, + * $(BOOKTABLE, + * $(TR $(TH Category) $(TH Symbols)) + * $(TR $(TD Tid) $(TD + * $(MYREF locate) + * $(MYREF ownerTid) + * $(MYREF register) + * $(MYREF spawn) + * $(MYREF spawnLinked) + * $(MYREF thisTid) + * $(MYREF Tid) + * $(MYREF TidMissingException) + * $(MYREF unregister) + * )) + * $(TR $(TD Message passing) $(TD + * $(MYREF prioritySend) + * $(MYREF receive) + * $(MYREF receiveOnly) + * $(MYREF receiveTimeout) + * $(MYREF send) + * $(MYREF setMaxMailboxSize) + * )) + * $(TR $(TD Message-related types) $(TD + * $(MYREF LinkTerminated) + * $(MYREF MailboxFull) + * $(MYREF MessageMismatch) + * $(MYREF OnCrowding) + * $(MYREF OwnerTerminated) + * $(MYREF PriorityMessageException) + * )) + * $(TR $(TD Scheduler) $(TD + * $(MYREF FiberScheduler) + * $(MYREF Generator) + * $(MYREF Scheduler) + * $(MYREF scheduler) + * $(MYREF ThreadInfo) + * $(MYREF ThreadScheduler) + * $(MYREF yield) + * )) + * $(TR $(TD Misc) $(TD + * $(MYREF initOnce) + * )) + * )) + * * This is a low-level messaging API upon which more structured or restrictive * APIs may be built. The general idea is that every messageable entity is * represented by a common handle type called a Tid, which allows messages to @@ -22,7 +67,7 @@ * Copyright: Copyright Sean Kelly 2009 - 2014. * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak - * Source: $(PHOBOSSRC std/_concurrency.d) + * Source: $(PHOBOSSRC std/concurrency.d) */ /* Copyright Sean Kelly 2009 - 2014. * Distributed under the Boost Software License, Version 1.0. @@ -72,13 +117,38 @@ import std.traits; private { - template hasLocalAliasing(T...) + bool hasLocalAliasing(Types...)() { - static if (!T.length) - enum hasLocalAliasing = false; - else - enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) || - std.concurrency.hasLocalAliasing!(T[1 .. $]); + import std.typecons : Rebindable; + + // Works around "statement is not reachable" + bool doesIt = false; + static foreach (T; Types) + { + static if (is(T == Tid)) + { /* Allowed */ } + else static if (is(T : Rebindable!R, R)) + doesIt |= hasLocalAliasing!R; + else static if (is(T == struct)) + doesIt |= hasLocalAliasing!(typeof(T.tupleof)); + else + doesIt |= std.traits.hasUnsharedAliasing!(T); + } + return doesIt; + } + + @safe unittest + { + static struct Container { Tid t; } + static assert(!hasLocalAliasing!(Tid, Container, int)); + } + + // https://issues.dlang.org/show_bug.cgi?id=20097 + @safe unittest + { + import std.datetime.systime : SysTime; + static struct Container { SysTime time; } + static assert(!hasLocalAliasing!(SysTime, Container)); } enum MsgType @@ -159,9 +229,12 @@ private void checkops(T...)(T ops) { + import std.format : format; + foreach (i, t1; T) { - static assert(isFunctionPointer!t1 || isDelegate!t1); + static assert(isFunctionPointer!t1 || isDelegate!t1, + format!"T %d is not a function pointer or delegates"(i)); alias a1 = Parameters!(t1); alias r1 = ReturnType!(t1); @@ -173,7 +246,6 @@ private foreach (t2; T[i + 1 .. $]) { - static assert(isFunctionPointer!t2 || isDelegate!t2); alias a2 = Parameters!(t2); static assert(!is(a1 == a2), @@ -199,7 +271,7 @@ static ~this() // Exceptions /** - * Thrown on calls to $(D receiveOnly) if a message other than the type + * Thrown on calls to `receiveOnly` if a message other than the type * the receiving thread expected is sent. */ class MessageMismatch : Exception @@ -212,7 +284,7 @@ class MessageMismatch : Exception } /** - * Thrown on calls to $(D receive) if the thread that spawned the receiving + * Thrown on calls to `receive` if the thread that spawned the receiving * thread has terminated and no more messages exist. */ class OwnerTerminated : Exception @@ -264,7 +336,7 @@ class PriorityMessageException : Exception /** * Thrown on mailbox crowding if the mailbox is configured with - * $(D OnCrowding.throwException). + * `OnCrowding.throwException`. */ class MailboxFull : Exception { @@ -279,7 +351,7 @@ class MailboxFull : Exception } /** - * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't + * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't * find an owner thread. */ class TidMissingException : Exception @@ -315,17 +387,17 @@ public: * that a Tid executed in the future will have the same toString() output * as another Tid that has already terminated. */ - void toString(scope void delegate(const(char)[]) sink) + void toString(W)(ref W w) const { - import std.format : formattedWrite; - formattedWrite(sink, "Tid(%x)", cast(void*) mbox); + import std.format.write : formattedWrite; + auto p = () @trusted { return cast(void*) mbox; }(); + formattedWrite(w, "Tid(%x)", p); } } -@system unittest +@safe unittest { - // text!Tid is @system import std.conv : text; Tid tid; assert(text(tid) == "Tid(0)"); @@ -335,6 +407,15 @@ public: assert(text(tid2) == text(tid3)); } +// https://issues.dlang.org/show_bug.cgi?id=21512 +@system unittest +{ + import std.format : format; + + const(Tid) b = spawn(() {}); + assert(format!"%s"(b)[0 .. 4] == "Tid("); +} + /** * Returns: The $(LREF Tid) of the caller's thread. */ @@ -355,7 +436,7 @@ public: /** * Return the Tid of the thread which spawned the caller's thread. * - * Throws: A $(D TidMissingException) exception if + * Throws: A `TidMissingException` exception if * there is no owner thread. */ @property Tid ownerTid() @@ -412,10 +493,10 @@ private template isSpawnable(F, T...) * Starts fn(args) in a new logical thread. * * Executes the supplied function in a new logical thread represented by - * $(D Tid). The calling thread is designated as the owner of the new thread. - * When the owner thread terminates an $(D OwnerTerminated) message will be - * sent to the new thread, causing an $(D OwnerTerminated) exception to be - * thrown on $(D receive()). + * `Tid`. The calling thread is designated as the owner of the new thread. + * When the owner thread terminates an `OwnerTerminated` message will be + * sent to the new thread, causing an `OwnerTerminated` exception to be + * thrown on `receive()`. * * Params: * fn = The function to execute. @@ -425,46 +506,69 @@ private template isSpawnable(F, T...) * A Tid representing the new logical thread. * * Notes: - * $(D args) must not have unshared aliasing. In other words, all arguments - * to $(D fn) must either be $(D shared) or $(D immutable) or have no + * `args` must not have unshared aliasing. In other words, all arguments + * to `fn` must either be `shared` or `immutable` or have no * pointer indirection. This is necessary for enforcing isolation among * threads. * - * Example: - * --- - * import std.stdio, std.concurrency; - * - * void f1(string str) - * { - * writeln(str); - * } - * - * void f2(char[] str) - * { - * writeln(str); - * } - * - * void main() - * { - * auto str = "Hello, world"; - * - * // Works: string is immutable. - * auto tid1 = spawn(&f1, str); - * - * // Fails: char[] has mutable aliasing. - * auto tid2 = spawn(&f2, str.dup); - * - * // New thread with anonymous function - * spawn({ writeln("This is so great!"); }); - * } - * --- - */ -Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) + * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning + * `fn` must be either `shared` or `immutable`. */ +Tid spawn(F, T...)(F fn, T args) +if (isSpawnable!(F, T)) { static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); return _spawn(false, fn, args); } +/// +@system unittest +{ + static void f(string msg) + { + assert(msg == "Hello World"); + } + + auto tid = spawn(&f, "Hello World"); +} + +/// Fails: char[] has mutable aliasing. +@system unittest +{ + string msg = "Hello, World!"; + + static void f1(string msg) {} + static assert(!__traits(compiles, spawn(&f1, msg.dup))); + static assert( __traits(compiles, spawn(&f1, msg.idup))); + + static void f2(char[] msg) {} + static assert(!__traits(compiles, spawn(&f2, msg.dup))); + static assert(!__traits(compiles, spawn(&f2, msg.idup))); +} + +/// New thread with anonymous function +@system unittest +{ + spawn({ + ownerTid.send("This is so great!"); + }); + assert(receiveOnly!string == "This is so great!"); +} + +@system unittest +{ + import core.thread : thread_joinAll; + + __gshared string receivedMessage; + static void f1(string msg) + { + receivedMessage = msg; + } + + auto tid1 = spawn(&f1, "Hello World"); + thread_joinAll; + assert(receivedMessage == "Hello World"); +} + /** * Starts fn(args) in a logical thread and will receive a LinkTerminated * message when the operation terminates. @@ -484,7 +588,8 @@ Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) * Returns: * A Tid representing the new thread. */ -Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T)) +Tid spawnLinked(F, T...)(F fn, T args) +if (isSpawnable!(F, T)) { static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); return _spawn(true, fn, args); @@ -493,7 +598,8 @@ Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T)) /* * */ -private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T)) +private Tid _spawn(F, T...)(bool linked, F fn, T args) +if (isSpawnable!(F, T)) { // TODO: MessageList and &exec should be shared. auto spawnTid = Tid(new MessageBox); @@ -568,9 +674,10 @@ private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T)) * Places the values as a message at the back of tid's message queue. * * Sends the supplied value to the thread represented by tid. As with - * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing. + * $(REF spawn, std,concurrency), `T` must not have unshared aliasing. */ void send(T...)(Tid tid, T vals) +in (tid.mbox !is null) { static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); _send(tid, vals); @@ -579,11 +686,12 @@ void send(T...)(Tid tid, T vals) /** * Places the values as a message on the front of tid's message queue. * - * Send a message to $(D tid) but place it at the front of $(D tid)'s message + * Send a message to `tid` but place it at the front of `tid`'s message * queue instead of at the back. This function is typically used for * out-of-band communication, to signal exceptional conditions, etc. */ void prioritySend(T...)(Tid tid, T vals) +in (tid.mbox !is null) { static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); _send(MsgType.priority, tid, vals); @@ -593,6 +701,7 @@ void prioritySend(T...)(Tid tid, T vals) * ditto */ private void _send(T...)(Tid tid, T vals) +in (tid.mbox !is null) { _send(MsgType.standard, tid, vals); } @@ -602,6 +711,7 @@ private void _send(T...)(Tid tid, T vals) * both Tid.send() and .send(). */ private void _send(T...)(MsgType type, Tid tid, T vals) +in (tid.mbox !is null) { auto msg = Message(type, vals); tid.mbox.put(msg); @@ -615,32 +725,16 @@ private void _send(T...)(MsgType type, Tid tid, T vals) * a message against a set of delegates and executing the first match found. * * If a delegate that accepts a $(REF Variant, std,variant) is included as - * the last argument to $(D receive), it will match any message that was not + * the last argument to `receive`, it will match any message that was not * matched by an earlier delegate. If more than one argument is sent, - * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values + * the `Variant` will contain a $(REF Tuple, std,typecons) of all values * sent. * - * Example: - * --- - * import std.stdio; - * import std.variant; - * import std.concurrency; - * - * void spawnedFunction() - * { - * receive( - * (int i) { writeln("Received an int."); }, - * (float f) { writeln("Received a float."); }, - * (Variant v) { writeln("Received some other type."); } - * ); - * } + * Params: + * ops = Variadic list of function pointers and delegates. Entries + * in this list must not occlude later entries. * - * void main() - * { - * auto tid = spawn(&spawnedFunction); - * send(tid, 42); - * } - * --- + * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. */ void receive(T...)( T ops ) in @@ -649,13 +743,45 @@ in "Cannot receive a message until a thread was spawned " ~ "or thisTid was passed to a running thread."); } -body +do { checkops( ops ); thisInfo.ident.mbox.get( ops ); } +/// +@system unittest +{ + import std.variant : Variant; + + auto process = () + { + receive( + (int i) { ownerTid.send(1); }, + (double f) { ownerTid.send(2); }, + (Variant v) { ownerTid.send(3); } + ); + }; + + { + auto tid = spawn(process); + send(tid, 42); + assert(receiveOnly!int == 1); + } + + { + auto tid = spawn(process); + send(tid, 3.14); + assert(receiveOnly!int == 2); + } + + { + auto tid = spawn(process); + send(tid, "something else"); + assert(receiveOnly!int == 3); + } +} @safe unittest { @@ -677,7 +803,7 @@ body } // Make sure receive() works with free functions as well. -version (unittest) +version (StdUnittest) { private void receiveFunction(int x) {} } @@ -705,31 +831,17 @@ private template receiveOnlyRet(T...) } /** - * Receives only messages with arguments of types $(D T). + * Receives only messages with arguments of the specified types. * - * Throws: $(D MessageMismatch) if a message of types other than $(D T) - * is received. + * Params: + * T = Variadic list of types to be received. * - * Returns: The received message. If $(D T.length) is greater than one, + * Returns: The received message. If `T` has more than one entry, * the message will be packed into a $(REF Tuple, std,typecons). * - * Example: - * --- - * import std.concurrency; - * - * void spawnedFunc() - * { - * auto msg = receiveOnly!(int, string)(); - * assert(msg[0] == 42); - * assert(msg[1] == "42"); - * } - * - * void main() - * { - * auto tid = spawn(&spawnedFunc); - * send(tid, 42, "42"); - * } - * --- + * Throws: $(LREF MessageMismatch) if a message of types other than `T` + * is received, + * $(LREF OwnerTerminated) when the sending thread was terminated. */ receiveOnlyRet!(T) receiveOnly(T...)() in @@ -737,16 +849,27 @@ in assert(thisInfo.ident.mbox !is null, "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); } -body +do { import std.format : format; + import std.meta : allSatisfy; import std.typecons : Tuple; Tuple!(T) ret; thisInfo.ident.mbox.get((T val) { static if (T.length) - ret.field = val; + { + static if (allSatisfy!(isAssignable, T)) + { + ret.field = val; + } + else + { + import core.lifetime : emplace; + emplace(&ret, val); + } + } }, (LinkTerminated e) { throw e; }, (OwnerTerminated e) { throw e; }, @@ -765,6 +888,42 @@ body return ret; } +/// +@system unittest +{ + auto tid = spawn( + { + assert(receiveOnly!int == 42); + }); + send(tid, 42); +} + +/// +@system unittest +{ + auto tid = spawn( + { + assert(receiveOnly!string == "text"); + }); + send(tid, "text"); +} + +/// +@system unittest +{ + struct Record { string name; int age; } + + auto tid = spawn( + { + auto msg = receiveOnly!(double, Record); + assert(msg[0] == 0.5); + assert(msg[1].name == "Alice"); + assert(msg[1].age == 31); + }); + + send(tid, 0.5, Record("Alice", 31)); +} + @system unittest { static void t1(Tid mainTid) @@ -786,14 +945,37 @@ body assert(result == "Unexpected message type: expected 'string', got 'int'"); } +// https://issues.dlang.org/show_bug.cgi?id=21663 +@safe unittest +{ + alias test = receiveOnly!(string, bool, bool); +} + /** - * Tries to receive but will give up if no matches arrive within duration. - * Won't wait at all if provided $(REF Duration, core,time) is negative. + * Receives a message from another thread and gives up if no match + * arrives within a specified duration. + * + * Receive a message from another thread, or block until `duration` exceeds, + * if no messages of the specified types are available. This function works + * by pattern matching a message against a set of delegates and executing + * the first match found. + * + * If a delegate that accepts a $(REF Variant, std,variant) is included as + * the last argument, it will match any message that was not + * matched by an earlier delegate. If more than one argument is sent, + * the `Variant` will contain a $(REF Tuple, std,typecons) of all values + * sent. + * + * Params: + * duration = Duration, how long to wait. If `duration` is negative, + * won't wait at all. + * ops = Variadic list of function pointers and delegates. Entries + * in this list must not occlude later entries. * - * Same as $(D receive) except that rather than wait forever for a message, - * it waits until either it receives a message or the given - * $(REF Duration, core,time) has passed. It returns $(D true) if it received a - * message and $(D false) if it timed out waiting for one. + * Returns: `true` if it received a message and `false` if it timed out waiting + * for one. + * + * Throws: $(LREF OwnerTerminated) when the sending thread was terminated. */ bool receiveTimeout(T...)(Duration duration, T ops) in @@ -801,7 +983,7 @@ in assert(thisInfo.ident.mbox !is null, "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); } -body +do { checkops(ops); @@ -873,6 +1055,7 @@ private * mailbox. */ void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure +in (tid.mbox !is null) { final switch (doThis) { @@ -899,6 +1082,7 @@ void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure * mailbox. */ void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) +in (tid.mbox !is null) { tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); } @@ -916,18 +1100,17 @@ private @property Mutex registryLock() return impl; } -private void unregisterMe() +private void unregisterMe(ref ThreadInfo me) { - auto me = thisInfo.ident; - if (thisInfo.ident != Tid.init) + if (me.ident != Tid.init) { synchronized (registryLock) { - if (auto allNames = me in namesByTid) + if (auto allNames = me.ident in namesByTid) { foreach (name; *allNames) tidByName.remove(name); - namesByTid.remove(me); + namesByTid.remove(me.ident); } } } @@ -949,6 +1132,7 @@ private void unregisterMe() * defunct thread. */ bool register(string name, Tid tid) +in (tid.mbox !is null) { synchronized (registryLock) { @@ -1050,7 +1234,18 @@ struct ThreadInfo _send(MsgType.linkDead, tid, ident); if (owner != Tid.init) _send(MsgType.linkDead, owner, ident); - unregisterMe(); // clean up registry entries + unregisterMe(this); // clean up registry entries + } + + // https://issues.dlang.org/show_bug.cgi?id=20160 + @system unittest + { + register("main_thread", thisTid()); + + ThreadInfo t; + t.cleanup(); + + assert(locate("main_thread") == thisTid()); } } @@ -1270,13 +1465,45 @@ class FiberScheduler : Scheduler /** * Returns a Condition analog that yields when wait or notify is called. + * + * Bug: + * For the default implementation, `notifyAll`will behave like `notify`. + * + * Params: + * m = A `Mutex` to use for locking if the condition needs to be waited on + * or notified from multiple `Thread`s. + * If `null`, no `Mutex` will be used and it is assumed that the + * `Condition` is only waited on/notified from one `Thread`. */ Condition newCondition(Mutex m) nothrow { return new FiberCondition(m); } -private: +protected: + /** + * Creates a new Fiber which calls the given delegate. + * + * Params: + * op = The delegate the fiber should call + */ + void create(void delegate() op) nothrow + { + void wrap() + { + scope (exit) + { + thisInfo.cleanup(); + } + op(); + } + + m_fibers ~= new InfoFiber(&wrap); + } + + /** + * Fiber which embeds a ThreadInfo + */ static class InfoFiber : Fiber { ThreadInfo info; @@ -1285,8 +1512,14 @@ private: { super(op); } + + this(void delegate() op, size_t sz) nothrow + { + super(op, sz); + } } +private: class FiberCondition : Condition { this(Mutex m) nothrow @@ -1313,7 +1546,7 @@ private: !notified && !period.isNegative; period = limit - MonoTime.currTime) { - yield(); + this.outer.yield(); } return notified; } @@ -1333,9 +1566,11 @@ private: private: void switchContext() nothrow { - mutex_nothrow.unlock_nothrow(); - scope (exit) mutex_nothrow.lock_nothrow(); - yield(); + if (mutex_nothrow) mutex_nothrow.unlock_nothrow(); + scope (exit) + if (mutex_nothrow) + mutex_nothrow.lock_nothrow(); + this.outer.yield(); } private bool notified; @@ -1365,20 +1600,6 @@ private: } } - void create(void delegate() op) nothrow - { - void wrap() - { - scope (exit) - { - thisInfo.cleanup(); - } - op(); - } - - m_fibers ~= new InfoFiber(&wrap); - } - private: Fiber[] m_fibers; size_t m_pos; @@ -1464,35 +1685,6 @@ private interface IsGenerator {} /** * A Generator is a Fiber that periodically returns values of type T to the * caller via yield. This is represented as an InputRange. - * - * Example: - * --- - * import std.concurrency; - * import std.stdio; - * - * - * void main() - * { - * auto tid = spawn( - * { - * while (true) - * { - * writeln(receiveOnly!int()); - * } - * }); - * - * auto r = new Generator!int( - * { - * foreach (i; 1 .. 10) - * yield(i); - * }); - * - * foreach (e; r) - * { - * tid.send(e); - * } - * } - * --- */ class Generator(T) : Fiber, IsGenerator, InputRange!T @@ -1533,6 +1725,27 @@ class Generator(T) : } /** + * Initializes a generator object which is associated with a static + * D function. The function will be called once to prepare the range + * for iteration. + * + * Params: + * fn = The fiber function. + * sz = The stack size for this fiber. + * guardPageSize = size of the guard page to trap fiber's stack + * overflows. Refer to $(REF Fiber, core,thread)'s + * documentation for more details. + * + * In: + * fn must not be null. + */ + this(void function() fn, size_t sz, size_t guardPageSize) + { + super(fn, sz, guardPageSize); + call(); + } + + /** * Initializes a generator object which is associated with a dynamic * D function. The function will be called once to prepare the range * for iteration. @@ -1568,6 +1781,27 @@ class Generator(T) : } /** + * Initializes a generator object which is associated with a dynamic + * D function. The function will be called once to prepare the range + * for iteration. + * + * Params: + * dg = The fiber function. + * sz = The stack size for this fiber. + * guardPageSize = size of the guard page to trap fiber's stack + * overflows. Refer to $(REF Fiber, core,thread)'s + * documentation for more details. + * + * In: + * dg must not be null. + */ + this(void delegate() dg, size_t sz, size_t guardPageSize) + { + super(dg, sz, guardPageSize); + call(); + } + + /** * Returns true if the generator is empty. */ final bool empty() @property @@ -1634,6 +1868,28 @@ private: T* m_value; } +/// +@system unittest +{ + auto tid = spawn({ + int i; + while (i < 9) + i = receiveOnly!int; + + ownerTid.send(i * 2); + }); + + auto r = new Generator!int({ + foreach (i; 1 .. 10) + yield(i); + }); + + foreach (e; r) + tid.send(e); + + assert(receiveOnly!int == 18); +} + /** * Yields a value of type T to the caller of the currently executing * generator. @@ -1865,7 +2121,7 @@ private { import std.meta : AliasSeq; - static assert(T.length); + static assert(T.length, "T must not be empty"); static if (isImplicitlyConvertible!(T[0], Duration)) { @@ -1906,7 +2162,8 @@ private bool onLinkDeadMsg(ref Message msg) { - assert(msg.convertsTo!(Tid)); + assert(msg.convertsTo!(Tid), + "Message could be converted to Tid"); auto tid = msg.get!(Tid); if (bool* pDepends = tid in thisInfo.links) @@ -2083,7 +2340,8 @@ private { static void onLinkDeadMsg(ref Message msg) { - assert(msg.convertsTo!(Tid)); + assert(msg.convertsTo!(Tid), + "Message could be converted to Tid"); auto tid = msg.get!(Tid); thisInfo.links.remove(tid); @@ -2228,7 +2486,7 @@ private { import std.exception : enforce; - assert(m_count); + assert(m_count, "Can not remove from empty Range"); Node* n = r.m_prev; enforce(n && n.next, "attempting to remove invalid list node"); @@ -2295,7 +2553,7 @@ private } if (n) { - import std.conv : emplace; + import core.lifetime : emplace; emplace!Node(n, v); } else @@ -2337,12 +2595,11 @@ private } } -version (unittest) +@system unittest { - import std.stdio; import std.typecons : tuple, Tuple; - void testfn(Tid tid) + static void testfn(Tid tid) { receive((float val) { assert(0); }, (int val, int val2) { assert(val == 42 && val2 == 86); @@ -2357,7 +2614,7 @@ version (unittest) prioritySend(tid, "done"); } - void runTest(Tid tid) + static void runTest(Tid tid) { send(tid, 42, 86); send(tid, tuple(42, 86)); @@ -2366,7 +2623,7 @@ version (unittest) receive((string val) { assert(val == "done"); }); } - void simpleTest() + static void simpleTest() { auto tid = spawn(&testfn, thisTid); runTest(tid); @@ -2377,28 +2634,22 @@ version (unittest) runTest(tid); } - @system unittest - { - simpleTest(); - } + simpleTest(); - @system unittest - { - scheduler = new ThreadScheduler; - simpleTest(); - scheduler = null; - } + scheduler = new ThreadScheduler; + simpleTest(); + scheduler = null; } -private @property Mutex initOnceLock() +private @property shared(Mutex) initOnceLock() { - __gshared Mutex lock; - if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock)) + static shared Mutex lock; + if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock)) return mtx; - auto mtx = new Mutex; - if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx)) + auto mtx = new shared Mutex; + if (cas(&lock, cast(shared) null, mtx)) return mtx; - return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock); + return atomicLoad!(MemoryOrder.acq)(lock); } /** @@ -2429,7 +2680,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init) { static MySingleton instance() { - static __gshared MySingleton inst; + __gshared MySingleton inst; return initOnce!inst(new MySingleton); } } @@ -2443,14 +2694,14 @@ auto ref initOnce(alias var)(lazy typeof(var) init) { static MySingleton instance() { - static __gshared MySingleton inst; + __gshared MySingleton inst; return initOnce!inst(new MySingleton); } private: this() { val = ++cnt; } size_t val; - static __gshared size_t cnt; + __gshared size_t cnt; } foreach (_; 0 .. 10) @@ -2476,7 +2727,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init) * Returns: * A reference to the initialized variable */ -auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) +auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex) { // check that var is global, can't take address of a TLS variable static assert(is(typeof({ __gshared p = &var; })), @@ -2488,7 +2739,7 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) { synchronized (mutex) { - if (!atomicLoad!(MemoryOrder.acq)(flag)) + if (!atomicLoad!(MemoryOrder.raw)(flag)) { var = init; atomicStore!(MemoryOrder.rel)(flag, true); @@ -2498,14 +2749,20 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) return var; } +/// ditto +auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) +{ + return initOnce!var(init, cast(shared) mutex); +} + /// Use a separate mutex when init blocks on another thread that might also call initOnce. @system unittest { import core.sync.mutex : Mutex; static shared bool varA, varB; - __gshared Mutex m; - m = new Mutex; + static shared Mutex m; + m = new shared Mutex; spawn({ // use a different mutex for varB to avoid a dead-lock @@ -2529,3 +2786,41 @@ auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) static assert(!__traits(compiles, initOnce!c(true))); // TLS static assert(!__traits(compiles, initOnce!d(true))); // local variable } + +// test ability to send shared arrays +@system unittest +{ + static shared int[] x = new shared(int)[1]; + auto tid = spawn({ + auto arr = receiveOnly!(shared(int)[]); + arr[0] = 5; + ownerTid.send(true); + }); + tid.send(x); + receiveOnly!(bool); + assert(x[0] == 5); +} + +// https://issues.dlang.org/show_bug.cgi?id=13930 +@system unittest +{ + immutable aa = ["0":0]; + thisTid.send(aa); + receiveOnly!(immutable int[string]); // compile error +} + +// https://issues.dlang.org/show_bug.cgi?id=19345 +@system unittest +{ + static struct Aggregate { const int a; const int[5] b; } + static void t1(Tid mainTid) + { + const sendMe = Aggregate(42, [1, 2, 3, 4, 5]); + mainTid.send(sendMe); + } + + spawn(&t1, thisTid); + auto result1 = receiveOnly!(const Aggregate)(); + immutable expected = Aggregate(42, [1, 2, 3, 4, 5]); + assert(result1 == expected); +} |