/**
 * $(SCRIPT inhibitQuickIndex = 1;)
 * $(DIVC quickindex,
 * $(BOOKTABLE,
 * $(TR $(TH Category) $(TH Symbols))
 * $(TR $(TD Tid) $(TD
 *     $(MYREF locate)
 *     $(MYREF ownerTid)
 *     $(MYREF register)
 *     $(MYREF spawn)
 *     $(MYREF spawnLinked)
 *     $(MYREF thisTid)
 *     $(MYREF Tid)
 *     $(MYREF TidMissingException)
 *     $(MYREF unregister)
 * ))
 * $(TR $(TD Message passing) $(TD
 *     $(MYREF prioritySend)
 *     $(MYREF receive)
 *     $(MYREF receiveOnly)
 *     $(MYREF receiveTimeout)
 *     $(MYREF send)
 *     $(MYREF setMaxMailboxSize)
 * ))
 * $(TR $(TD Message-related types) $(TD
 *     $(MYREF LinkTerminated)
 *     $(MYREF MailboxFull)
 *     $(MYREF MessageMismatch)
 *     $(MYREF OnCrowding)
 *     $(MYREF OwnerTerminated)
 *     $(MYREF PriorityMessageException)
 * ))
 * $(TR $(TD Scheduler) $(TD
 *     $(MYREF FiberScheduler)
 *     $(MYREF Generator)
 *     $(MYREF Scheduler)
 *     $(MYREF scheduler)
 *     $(MYREF ThreadInfo)
 *     $(MYREF ThreadScheduler)
 *     $(MYREF yield)
 * ))
 * $(TR $(TD Misc) $(TD
 *     $(MYREF initOnce)
 * ))
 * ))
 *
 * This is a low-level messaging API upon which more structured or restrictive
 * APIs may be built.  The general idea is that every messageable entity is
 * represented by a common handle type called a Tid, which allows messages to
 * be sent to logical threads that are executing in both the current process
 * and in external processes using the same interface.  This is an important
 * aspect of scalability because it allows the components of a program to be
 * spread across available resources with few to no changes to the actual
 * implementation.
 *
 * A logical thread is an execution context that has its own stack and which
 * runs asynchronously to other logical threads.  These may be preemptively
 * scheduled kernel threads, fibers (cooperative user-space threads), or some
 * other concept with similar behavior.
 *
 * The type of concurrency used when logical threads are created is determined
 * by the Scheduler selected at initialization time.  The default behavior is
 * currently to create a new kernel thread per call to spawn, but other
 * schedulers are available that multiplex fibers across the main thread or
 * use some combination of the two approaches.
 *
 * Copyright: Copyright Sean Kelly 2009 - 2014.
 * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
 * Authors:   Sean Kelly, Alex Rønne Petersen, Martin Nowak
 * Source:    $(PHOBOSSRC std/concurrency.d)
 */
/*          Copyright Sean Kelly 2009 - 2014.
 * Distributed under the Boost Software License, Version 1.0.
 *    (See accompanying file LICENSE_1_0.txt or copy at
 *          http://www.boost.org/LICENSE_1_0.txt)
 */
module std.concurrency;

public import std.variant;

import core.atomic;
import core.sync.condition;
import core.sync.mutex;
import core.thread;
import std.range.primitives;
import std.range.interfaces : InputRange;
import std.traits;

///
@system unittest
{
    __gshared string received;
    static void spawnedFunc(Tid ownerTid)
    {
        import std.conv : text;
        // Receive a message from the owner thread.
        receive((int i){
            received = text("Received the number ", i);

            // Send a message back to the owner thread
            // indicating success.
            send(ownerTid, true);
        });
    }

    // Start spawnedFunc in a new thread.
    auto childTid = spawn(&spawnedFunc, thisTid);

    // Send the number 42 to this new thread.
    send(childTid, 42);

    // Receive the result code.
    auto wasSuccessful = receiveOnly!(bool);
    assert(wasSuccessful);
    assert(received == "Received the number 42");
}

private
{
    bool hasLocalAliasing(Types...)()
    {
        import std.typecons : Rebindable;

        // Works around "statement is not reachable"
        bool doesIt = false;
        static foreach (T; Types)
        {
            static if (is(T == Tid))
            { /* Allowed */ }
            else static if (is(T : Rebindable!R, R))
                doesIt |= hasLocalAliasing!R;
            else static if (is(T == struct))
                doesIt |= hasLocalAliasing!(typeof(T.tupleof));
            else
                doesIt |= std.traits.hasUnsharedAliasing!(T);
        }
        return doesIt;
    }

    @safe unittest
    {
        static struct Container { Tid t; }
        static assert(!hasLocalAliasing!(Tid, Container, int));
    }

    // https://issues.dlang.org/show_bug.cgi?id=20097
    @safe unittest
    {
        import std.datetime.systime : SysTime;
        static struct Container { SysTime time; }
        static assert(!hasLocalAliasing!(SysTime, Container));
    }

    enum MsgType
    {
        standard,
        priority,
        linkDead,
    }

    struct Message
    {
        MsgType type;
        Variant data;

        this(T...)(MsgType t, T vals) if (T.length > 0)
        {
            static if (T.length == 1)
            {
                type = t;
                data = vals[0];
            }
            else
            {
                import std.typecons : Tuple;

                type = t;
                data = Tuple!(T)(vals);
            }
        }

        @property auto convertsTo(T...)()
        {
            static if (T.length == 1)
            {
                return is(T[0] == Variant) || data.convertsTo!(T);
            }
            else
            {
                import std.typecons : Tuple;
                return data.convertsTo!(Tuple!(T));
            }
        }

        @property auto get(T...)()
        {
            static if (T.length == 1)
            {
                static if (is(T[0] == Variant))
                    return data;
                else
                    return data.get!(T);
            }
            else
            {
                import std.typecons : Tuple;
                return data.get!(Tuple!(T));
            }
        }

        auto map(Op)(Op op)
        {
            alias Args = Parameters!(Op);

            static if (Args.length == 1)
            {
                static if (is(Args[0] == Variant))
                    return op(data);
                else
                    return op(data.get!(Args));
            }
            else
            {
                import std.typecons : Tuple;
                return op(data.get!(Tuple!(Args)).expand);
            }
        }
    }

    void checkops(T...)(T ops)
    {
        import std.format : format;

        foreach (i, t1; T)
        {
            static assert(isFunctionPointer!t1 || isDelegate!t1,
                    format!"T %d is not a function pointer or delegates"(i));
            alias a1 = Parameters!(t1);
            alias r1 = ReturnType!(t1);

            static if (i < T.length - 1 && is(r1 == void))
            {
                static assert(a1.length != 1 || !is(a1[0] == Variant),
                              "function with arguments " ~ a1.stringof ~
                              " occludes successive function");

                foreach (t2; T[i + 1 .. $])
                {
                    alias a2 = Parameters!(t2);

                    static assert(!is(a1 == a2),
                        "function with arguments " ~ a1.stringof ~ " occludes successive function");
                }
            }
        }
    }

    @property ref ThreadInfo thisInfo() nothrow
    {
        if (scheduler is null)
            return ThreadInfo.thisInfo;
        return scheduler.thisInfo;
    }
}

static ~this()
{
    thisInfo.cleanup();
}

// Exceptions

/**
 * Thrown on calls to `receiveOnly` if a message other than the type
 * the receiving thread expected is sent.
 */
class MessageMismatch : Exception
{
    ///
    this(string msg = "Unexpected message type") @safe pure nothrow @nogc
    {
        super(msg);
    }
}

/**
 * Thrown on calls to `receive` if the thread that spawned the receiving
 * thread has terminated and no more messages exist.
 */
class OwnerTerminated : Exception
{
    ///
    this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
    {
        super(msg);
        tid = t;
    }

    Tid tid;
}

/**
 * Thrown if a linked thread has terminated.
 */
class LinkTerminated : Exception
{
    ///
    this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
    {
        super(msg);
        tid = t;
    }

    Tid tid;
}

/**
 * Thrown if a message was sent to a thread via
 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
 * for a message of this type.
 */
class PriorityMessageException : Exception
{
    ///
    this(Variant vals)
    {
        super("Priority message");
        message = vals;
    }

    /**
     * The message that was sent.
     */
    Variant message;
}

/**
 * Thrown on mailbox crowding if the mailbox is configured with
 * `OnCrowding.throwException`.
 */
class MailboxFull : Exception
{
    ///
    this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
    {
        super(msg);
        tid = t;
    }

    Tid tid;
}

/**
 * Thrown when a Tid is missing, e.g. when `ownerTid` doesn't
 * find an owner thread.
 */
class TidMissingException : Exception
{
    import std.exception : basicExceptionCtors;
    ///
    mixin basicExceptionCtors;
}


// Thread ID


/**
 * An opaque type used to represent a logical thread.
 */
struct Tid
{
private:
    this(MessageBox m) @safe pure nothrow @nogc
    {
        mbox = m;
    }

    MessageBox mbox;

public:

    /**
     * Generate a convenient string for identifying this Tid.  This is only
     * useful to see if Tid's that are currently executing are the same or
     * different, e.g. for logging and debugging.  It is potentially possible
     * that a Tid executed in the future will have the same toString() output
     * as another Tid that has already terminated.
     */
    void toString(W)(ref W w) const
    {
        import std.format.write : formattedWrite;
        auto p = () @trusted { return cast(void*) mbox; }();
        formattedWrite(w, "Tid(%x)", p);
    }

}

@safe unittest
{
    import std.conv : text;
    Tid tid;
    assert(text(tid) == "Tid(0)");
    auto tid2 = thisTid;
    assert(text(tid2) != "Tid(0)");
    auto tid3 = tid2;
    assert(text(tid2) == text(tid3));
}

// https://issues.dlang.org/show_bug.cgi?id=21512
@system unittest
{
    import std.format : format;

    const(Tid) b = spawn(() {});
    assert(format!"%s"(b)[0 .. 4] == "Tid(");
}

/**
 * Returns: The $(LREF Tid) of the caller's thread.
 */
@property Tid thisTid() @safe
{
    // TODO: remove when concurrency is safe
    static auto trus() @trusted
    {
        if (thisInfo.ident != Tid.init)
            return thisInfo.ident;
        thisInfo.ident = Tid(new MessageBox);
        return thisInfo.ident;
    }

    return trus();
}

/**
 * Return the Tid of the thread which spawned the caller's thread.
 *
 * Throws: A `TidMissingException` exception if
 * there is no owner thread.
 */
@property Tid ownerTid()
{
    import std.exception : enforce;

    enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
    return thisInfo.owner;
}

@system unittest
{
    import std.exception : assertThrown;

    static void fun()
    {
        string res = receiveOnly!string();
        assert(res == "Main calling");
        ownerTid.send("Child responding");
    }

    assertThrown!TidMissingException(ownerTid);
    auto child = spawn(&fun);
    child.send("Main calling");
    string res = receiveOnly!string();
    assert(res == "Child responding");
}

// Thread Creation

private template isSpawnable(F, T...)
{
    template isParamsImplicitlyConvertible(F1, F2, int i = 0)
    {
        alias param1 = Parameters!F1;
        alias param2 = Parameters!F2;
        static if (param1.length != param2.length)
            enum isParamsImplicitlyConvertible = false;
        else static if (param1.length == i)
            enum isParamsImplicitlyConvertible = true;
        else static if (isImplicitlyConvertible!(param2[i], param1[i]))
            enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
                    F2, i + 1);
        else
            enum isParamsImplicitlyConvertible = false;
    }

    enum isSpawnable = isCallable!F && is(ReturnType!F : void)
            && isParamsImplicitlyConvertible!(F, void function(T))
            && (isFunctionPointer!F || !hasUnsharedAliasing!F);
}

/**
 * Starts fn(args) in a new logical thread.
 *
 * Executes the supplied function in a new logical thread represented by
 * `Tid`.  The calling thread is designated as the owner of the new thread.
 * When the owner thread terminates an `OwnerTerminated` message will be
 * sent to the new thread, causing an `OwnerTerminated` exception to be
 * thrown on `receive()`.
 *
 * Params:
 *  fn   = The function to execute.
 *  args = Arguments to the function.
 *
 * Returns:
 *  A Tid representing the new logical thread.
 *
 * Notes:
 *  `args` must not have unshared aliasing.  In other words, all arguments
 *  to `fn` must either be `shared` or `immutable` or have no
 *  pointer indirection.  This is necessary for enforcing isolation among
 *  threads.
 *
 * Similarly, if `fn` is a delegate, it must not have unshared aliases, meaning
 * `fn` must be either `shared` or `immutable`. */
Tid spawn(F, T...)(F fn, T args)
if (isSpawnable!(F, T))
{
    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
    return _spawn(false, fn, args);
}

///
@system unittest
{
    static void f(string msg)
    {
        assert(msg == "Hello World");
    }

    auto tid = spawn(&f, "Hello World");
}

/// Fails: char[] has mutable aliasing.
@system unittest
{
    string msg = "Hello, World!";

    static void f1(string msg) {}
    static assert(!__traits(compiles, spawn(&f1, msg.dup)));
    static assert( __traits(compiles, spawn(&f1, msg.idup)));

    static void f2(char[] msg) {}
    static assert(!__traits(compiles, spawn(&f2, msg.dup)));
    static assert(!__traits(compiles, spawn(&f2, msg.idup)));
}

/// New thread with anonymous function
@system unittest
{
    spawn({
        ownerTid.send("This is so great!");
    });
    assert(receiveOnly!string == "This is so great!");
}

@system unittest
{
    import core.thread : thread_joinAll;

    __gshared string receivedMessage;
    static void f1(string msg)
    {
        receivedMessage = msg;
    }

    auto tid1 = spawn(&f1, "Hello World");
    thread_joinAll;
    assert(receivedMessage == "Hello World");
}

/**
 * Starts fn(args) in a logical thread and will receive a LinkTerminated
 * message when the operation terminates.
 *
 * Executes the supplied function in a new logical thread represented by
 * Tid.  This new thread is linked to the calling thread so that if either
 * it or the calling thread terminates a LinkTerminated message will be sent
 * to the other, causing a LinkTerminated exception to be thrown on receive().
 * The owner relationship from spawn() is preserved as well, so if the link
 * between threads is broken, owner termination will still result in an
 * OwnerTerminated exception to be thrown on receive().
 *
 * Params:
 *  fn   = The function to execute.
 *  args = Arguments to the function.
 *
 * Returns:
 *  A Tid representing the new thread.
 */
Tid spawnLinked(F, T...)(F fn, T args)
if (isSpawnable!(F, T))
{
    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
    return _spawn(true, fn, args);
}

/*
 *
 */
private Tid _spawn(F, T...)(bool linked, F fn, T args)
if (isSpawnable!(F, T))
{
    // TODO: MessageList and &exec should be shared.
    auto spawnTid = Tid(new MessageBox);
    auto ownerTid = thisTid;

    void exec()
    {
        thisInfo.ident = spawnTid;
        thisInfo.owner = ownerTid;
        fn(args);
    }

    // TODO: MessageList and &exec should be shared.
    if (scheduler !is null)
        scheduler.spawn(&exec);
    else
    {
        auto t = new Thread(&exec);
        t.start();
    }
    thisInfo.links[spawnTid] = linked;
    return spawnTid;
}

@system unittest
{
    void function() fn1;
    void function(int) fn2;
    static assert(__traits(compiles, spawn(fn1)));
    static assert(__traits(compiles, spawn(fn2, 2)));
    static assert(!__traits(compiles, spawn(fn1, 1)));
    static assert(!__traits(compiles, spawn(fn2)));

    void delegate(int) shared dg1;
    shared(void delegate(int)) dg2;
    shared(void delegate(long) shared) dg3;
    shared(void delegate(real, int, long) shared) dg4;
    void delegate(int) immutable dg5;
    void delegate(int) dg6;
    static assert(__traits(compiles, spawn(dg1, 1)));
    static assert(__traits(compiles, spawn(dg2, 2)));
    static assert(__traits(compiles, spawn(dg3, 3)));
    static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
    static assert(__traits(compiles, spawn(dg5, 5)));
    static assert(!__traits(compiles, spawn(dg6, 6)));

    auto callable1  = new class{ void opCall(int) shared {} };
    auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
    auto callable3  = new class{ void opCall(int) immutable {} };
    auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
    auto callable5  = new class{ void opCall(int) {} };
    auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
    auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
    auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
    auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
    auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
    auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
    static assert(!__traits(compiles, spawn(callable1,  1)));
    static assert( __traits(compiles, spawn(callable2,  2)));
    static assert(!__traits(compiles, spawn(callable3,  3)));
    static assert( __traits(compiles, spawn(callable4,  4)));
    static assert(!__traits(compiles, spawn(callable5,  5)));
    static assert(!__traits(compiles, spawn(callable6,  6)));
    static assert(!__traits(compiles, spawn(callable7,  7)));
    static assert( __traits(compiles, spawn(callable8,  8)));
    static assert(!__traits(compiles, spawn(callable9,  9)));
    static assert( __traits(compiles, spawn(callable10, 10)));
    static assert( __traits(compiles, spawn(callable11, 11)));
}

/**
 * Places the values as a message at the back of tid's message queue.
 *
 * Sends the supplied value to the thread represented by tid.  As with
 * $(REF spawn, std,concurrency), `T` must not have unshared aliasing.
 */
void send(T...)(Tid tid, T vals)
in (tid.mbox !is null)
{
    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
    _send(tid, vals);
}

/**
 * Places the values as a message on the front of tid's message queue.
 *
 * Send a message to `tid` but place it at the front of `tid`'s message
 * queue instead of at the back.  This function is typically used for
 * out-of-band communication, to signal exceptional conditions, etc.
 */
void prioritySend(T...)(Tid tid, T vals)
in (tid.mbox !is null)
{
    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
    _send(MsgType.priority, tid, vals);
}

/*
 * ditto
 */
private void _send(T...)(Tid tid, T vals)
in (tid.mbox !is null)
{
    _send(MsgType.standard, tid, vals);
}

/*
 * Implementation of send.  This allows parameter checking to be different for
 * both Tid.send() and .send().
 */
private void _send(T...)(MsgType type, Tid tid, T vals)
in (tid.mbox !is null)
{
    auto msg = Message(type, vals);
    tid.mbox.put(msg);
}

/**
 * Receives a message from another thread.
 *
 * Receive a message from another thread, or block if no messages of the
 * specified types are available.  This function works by pattern matching
 * a message against a set of delegates and executing the first match found.
 *
 * If a delegate that accepts a $(REF Variant, std,variant) is included as
 * the last argument to `receive`, it will match any message that was not
 * matched by an earlier delegate.  If more than one argument is sent,
 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
 * sent.
 *
 * Params:
 *     ops = Variadic list of function pointers and delegates. Entries
 *           in this list must not occlude later entries.
 *
 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
 */
void receive(T...)( T ops )
in
{
    assert(thisInfo.ident.mbox !is null,
           "Cannot receive a message until a thread was spawned "
           ~ "or thisTid was passed to a running thread.");
}
do
{
    checkops( ops );

    thisInfo.ident.mbox.get( ops );
}

///
@system unittest
{
    import std.variant : Variant;

    auto process = ()
    {
        receive(
            (int i) { ownerTid.send(1); },
            (double f) { ownerTid.send(2); },
            (Variant v) { ownerTid.send(3); }
        );
    };

    {
        auto tid = spawn(process);
        send(tid, 42);
        assert(receiveOnly!int == 1);
    }

    {
        auto tid = spawn(process);
        send(tid, 3.14);
        assert(receiveOnly!int == 2);
    }

    {
        auto tid = spawn(process);
        send(tid, "something else");
        assert(receiveOnly!int == 3);
    }
}

@safe unittest
{
    static assert( __traits( compiles,
                      {
                          receive( (Variant x) {} );
                          receive( (int x) {}, (Variant x) {} );
                      } ) );

    static assert( !__traits( compiles,
                       {
                           receive( (Variant x) {}, (int x) {} );
                       } ) );

    static assert( !__traits( compiles,
                       {
                           receive( (int x) {}, (int x) {} );
                       } ) );
}

// Make sure receive() works with free functions as well.
version (StdUnittest)
{
    private void receiveFunction(int x) {}
}
@safe unittest
{
    static assert( __traits( compiles,
                      {
                          receive( &receiveFunction );
                          receive( &receiveFunction, (Variant x) {} );
                      } ) );
}


private template receiveOnlyRet(T...)
{
    static if ( T.length == 1 )
    {
        alias receiveOnlyRet = T[0];
    }
    else
    {
        import std.typecons : Tuple;
        alias receiveOnlyRet = Tuple!(T);
    }
}

/**
 * Receives only messages with arguments of the specified types.
 *
 * Params:
 *     T = Variadic list of types to be received.
 *
 * Returns: The received message.  If `T` has more than one entry,
 *          the message will be packed into a $(REF Tuple, std,typecons).
 *
 * Throws: $(LREF MessageMismatch) if a message of types other than `T`
 *         is received,
 *         $(LREF OwnerTerminated) when the sending thread was terminated.
 */
receiveOnlyRet!(T) receiveOnly(T...)()
in
{
    assert(thisInfo.ident.mbox !is null,
        "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
}
do
{
    import std.format : format;
    import std.meta : allSatisfy;
    import std.typecons : Tuple;

    Tuple!(T) ret;

    thisInfo.ident.mbox.get((T val) {
        static if (T.length)
        {
            static if (allSatisfy!(isAssignable, T))
            {
                ret.field = val;
            }
            else
            {
                import core.lifetime : emplace;
                emplace(&ret, val);
            }
        }
    },
    (LinkTerminated e) { throw e; },
    (OwnerTerminated e) { throw e; },
    (Variant val) {
        static if (T.length > 1)
            string exp = T.stringof;
        else
            string exp = T[0].stringof;

        throw new MessageMismatch(
            format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
    });
    static if (T.length == 1)
        return ret[0];
    else
        return ret;
}

///
@system unittest
{
    auto tid = spawn(
    {
        assert(receiveOnly!int == 42);
    });
    send(tid, 42);
}

///
@system unittest
{
    auto tid = spawn(
    {
        assert(receiveOnly!string == "text");
    });
    send(tid, "text");
}

///
@system unittest
{
    struct Record { string name; int age; }

    auto tid = spawn(
    {
        auto msg = receiveOnly!(double, Record);
        assert(msg[0] == 0.5);
        assert(msg[1].name == "Alice");
        assert(msg[1].age == 31);
    });

    send(tid, 0.5, Record("Alice", 31));
}

@system unittest
{
    static void t1(Tid mainTid)
    {
        try
        {
            receiveOnly!string();
            mainTid.send("");
        }
        catch (Throwable th)
        {
            mainTid.send(th.msg);
        }
    }

    auto tid = spawn(&t1, thisTid);
    tid.send(1);
    string result = receiveOnly!string();
    assert(result == "Unexpected message type: expected 'string', got 'int'");
}

// https://issues.dlang.org/show_bug.cgi?id=21663
@safe unittest
{
    alias test = receiveOnly!(string, bool, bool);
}

/**
 * Receives a message from another thread and gives up if no match
 * arrives within a specified duration.
 *
 * Receive a message from another thread, or block until `duration` exceeds,
 * if no messages of the specified types are available. This function works
 * by pattern matching a message against a set of delegates and executing
 * the first match found.
 *
 * If a delegate that accepts a $(REF Variant, std,variant) is included as
 * the last argument, it will match any message that was not
 * matched by an earlier delegate.  If more than one argument is sent,
 * the `Variant` will contain a $(REF Tuple, std,typecons) of all values
 * sent.
 *
 * Params:
 *     duration = Duration, how long to wait. If `duration` is negative,
 *                won't wait at all.
 *     ops = Variadic list of function pointers and delegates. Entries
 *           in this list must not occlude later entries.
 *
 * Returns: `true` if it received a message and `false` if it timed out waiting
 *          for one.
 *
 * Throws: $(LREF OwnerTerminated) when the sending thread was terminated.
 */
bool receiveTimeout(T...)(Duration duration, T ops)
in
{
    assert(thisInfo.ident.mbox !is null,
        "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
}
do
{
    checkops(ops);

    return thisInfo.ident.mbox.get(duration, ops);
}

@safe unittest
{
    static assert(__traits(compiles, {
        receiveTimeout(msecs(0), (Variant x) {});
        receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
    }));

    static assert(!__traits(compiles, {
        receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
    }));

    static assert(!__traits(compiles, {
        receiveTimeout(msecs(0), (int x) {}, (int x) {});
    }));

    static assert(__traits(compiles, {
        receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
    }));
}

// MessageBox Limits

/**
 * These behaviors may be specified when a mailbox is full.
 */
enum OnCrowding
{
    block, /// Wait until room is available.
    throwException, /// Throw a MailboxFull exception.
    ignore /// Abort the send and return.
}

private
{
    bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
    {
        return true;
    }

    bool onCrowdingThrow(Tid tid) @safe pure
    {
        throw new MailboxFull(tid);
    }

    bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
    {
        return false;
    }
}

/**
 * Sets a maximum mailbox size.
 *
 * Sets a limit on the maximum number of user messages allowed in the mailbox.
 * If this limit is reached, the caller attempting to add a new message will
 * execute the behavior specified by doThis.  If messages is zero, the mailbox
 * is unbounded.
 *
 * Params:
 *  tid      = The Tid of the thread for which this limit should be set.
 *  messages = The maximum number of messages or zero if no limit.
 *  doThis   = The behavior executed when a message is sent to a full
 *             mailbox.
 */
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
in (tid.mbox !is null)
{
    final switch (doThis)
    {
    case OnCrowding.block:
        return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
    case OnCrowding.throwException:
        return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
    case OnCrowding.ignore:
        return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
    }
}

/**
 * Sets a maximum mailbox size.
 *
 * Sets a limit on the maximum number of user messages allowed in the mailbox.
 * If this limit is reached, the caller attempting to add a new message will
 * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
 *
 * Params:
 *  tid      = The Tid of the thread for which this limit should be set.
 *  messages = The maximum number of messages or zero if no limit.
 *  onCrowdingDoThis = The routine called when a message is sent to a full
 *                     mailbox.
 */
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
in (tid.mbox !is null)
{
    tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
}

private
{
    __gshared Tid[string] tidByName;
    __gshared string[][Tid] namesByTid;
}

private @property Mutex registryLock()
{
    __gshared Mutex impl;
    initOnce!impl(new Mutex);
    return impl;
}

private void unregisterMe(ref ThreadInfo me)
{
    if (me.ident != Tid.init)
    {
        synchronized (registryLock)
        {
            if (auto allNames = me.ident in namesByTid)
            {
                foreach (name; *allNames)
                    tidByName.remove(name);
                namesByTid.remove(me.ident);
            }
        }
    }
}

/**
 * Associates name with tid.
 *
 * Associates name with tid in a process-local map.  When the thread
 * represented by tid terminates, any names associated with it will be
 * automatically unregistered.
 *
 * Params:
 *  name = The name to associate with tid.
 *  tid  = The tid register by name.
 *
 * Returns:
 *  true if the name is available and tid is not known to represent a
 *  defunct thread.
 */
bool register(string name, Tid tid)
in (tid.mbox !is null)
{
    synchronized (registryLock)
    {
        if (name in tidByName)
            return false;
        if (tid.mbox.isClosed)
            return false;
        namesByTid[tid] ~= name;
        tidByName[name] = tid;
        return true;
    }
}

/**
 * Removes the registered name associated with a tid.
 *
 * Params:
 *  name = The name to unregister.
 *
 * Returns:
 *  true if the name is registered, false if not.
 */
bool unregister(string name)
{
    import std.algorithm.mutation : remove, SwapStrategy;
    import std.algorithm.searching : countUntil;

    synchronized (registryLock)
    {
        if (auto tid = name in tidByName)
        {
            auto allNames = *tid in namesByTid;
            auto pos = countUntil(*allNames, name);
            remove!(SwapStrategy.unstable)(*allNames, pos);
            tidByName.remove(name);
            return true;
        }
        return false;
    }
}

/**
 * Gets the Tid associated with name.
 *
 * Params:
 *  name = The name to locate within the registry.
 *
 * Returns:
 *  The associated Tid or Tid.init if name is not registered.
 */
Tid locate(string name)
{
    synchronized (registryLock)
    {
        if (auto tid = name in tidByName)
            return *tid;
        return Tid.init;
    }
}

/**
 * Encapsulates all implementation-level data needed for scheduling.
 *
 * When defining a Scheduler, an instance of this struct must be associated
 * with each logical thread.  It contains all implementation-level information
 * needed by the internal API.
 */
struct ThreadInfo
{
    Tid ident;
    bool[Tid] links;
    Tid owner;

    /**
     * Gets a thread-local instance of ThreadInfo.
     *
     * Gets a thread-local instance of ThreadInfo, which should be used as the
     * default instance when info is requested for a thread not created by the
     * Scheduler.
     */
    static @property ref thisInfo() nothrow
    {
        static ThreadInfo val;
        return val;
    }

    /**
     * Cleans up this ThreadInfo.
     *
     * This must be called when a scheduled thread terminates.  It tears down
     * the messaging system for the thread and notifies interested parties of
     * the thread's termination.
     */
    void cleanup()
    {
        if (ident.mbox !is null)
            ident.mbox.close();
        foreach (tid; links.keys)
            _send(MsgType.linkDead, tid, ident);
        if (owner != Tid.init)
            _send(MsgType.linkDead, owner, ident);
        unregisterMe(this); // clean up registry entries
    }

    // https://issues.dlang.org/show_bug.cgi?id=20160
    @system unittest
    {
        register("main_thread", thisTid());

        ThreadInfo t;
        t.cleanup();

        assert(locate("main_thread") == thisTid());
    }
}

/**
 * A Scheduler controls how threading is performed by spawn.
 *
 * Implementing a Scheduler allows the concurrency mechanism used by this
 * module to be customized according to different needs.  By default, a call
 * to spawn will create a new kernel thread that executes the supplied routine
 * and terminates when finished.  But it is possible to create Schedulers that
 * reuse threads, that multiplex Fibers (coroutines) across a single thread,
 * or any number of other approaches.  By making the choice of Scheduler a
 * user-level option, std.concurrency may be used for far more types of
 * application than if this behavior were predefined.
 *
 * Example:
 * ---
 * import std.concurrency;
 * import std.stdio;
 *
 * void main()
 * {
 *     scheduler = new FiberScheduler;
 *     scheduler.start(
 *     {
 *         writeln("the rest of main goes here");
 *     });
 * }
 * ---
 *
 * Some schedulers have a dispatching loop that must run if they are to work
 * properly, so for the sake of consistency, when using a scheduler, start()
 * must be called within main().  This yields control to the scheduler and
 * will ensure that any spawned threads are executed in an expected manner.
 */
interface Scheduler
{
    /**
     * Spawns the supplied op and starts the Scheduler.
     *
     * This is intended to be called at the start of the program to yield all
     * scheduling to the active Scheduler instance.  This is necessary for
     * schedulers that explicitly dispatch threads rather than simply relying
     * on the operating system to do so, and so start should always be called
     * within main() to begin normal program execution.
     *
     * Params:
     *  op = A wrapper for whatever the main thread would have done in the
     *       absence of a custom scheduler.  It will be automatically executed
     *       via a call to spawn by the Scheduler.
     */
    void start(void delegate() op);

    /**
     * Assigns a logical thread to execute the supplied op.
     *
     * This routine is called by spawn.  It is expected to instantiate a new
     * logical thread and run the supplied operation.  This thread must call
     * thisInfo.cleanup() when the thread terminates if the scheduled thread
     * is not a kernel thread--all kernel threads will have their ThreadInfo
     * cleaned up automatically by a thread-local destructor.
     *
     * Params:
     *  op = The function to execute.  This may be the actual function passed
     *       by the user to spawn itself, or may be a wrapper function.
     */
    void spawn(void delegate() op);

    /**
     * Yields execution to another logical thread.
     *
     * This routine is called at various points within concurrency-aware APIs
     * to provide a scheduler a chance to yield execution when using some sort
     * of cooperative multithreading model.  If this is not appropriate, such
     * as when each logical thread is backed by a dedicated kernel thread,
     * this routine may be a no-op.
     */
    void yield() nothrow;

    /**
     * Returns an appropriate ThreadInfo instance.
     *
     * Returns an instance of ThreadInfo specific to the logical thread that
     * is calling this routine or, if the calling thread was not create by
     * this scheduler, returns ThreadInfo.thisInfo instead.
     */
    @property ref ThreadInfo thisInfo() nothrow;

    /**
     * Creates a Condition variable analog for signaling.
     *
     * Creates a new Condition variable analog which is used to check for and
     * to signal the addition of messages to a thread's message queue.  Like
     * yield, some schedulers may need to define custom behavior so that calls
     * to Condition.wait() yield to another thread when no new messages are
     * available instead of blocking.
     *
     * Params:
     *  m = The Mutex that will be associated with this condition.  It will be
     *      locked prior to any operation on the condition, and so in some
     *      cases a Scheduler may need to hold this reference and unlock the
     *      mutex before yielding execution to another logical thread.
     */
    Condition newCondition(Mutex m) nothrow;
}

/**
 * An example Scheduler using kernel threads.
 *
 * This is an example Scheduler that mirrors the default scheduling behavior
 * of creating one kernel thread per call to spawn.  It is fully functional
 * and may be instantiated and used, but is not a necessary part of the
 * default functioning of this module.
 */
class ThreadScheduler : Scheduler
{
    /**
     * This simply runs op directly, since no real scheduling is needed by
     * this approach.
     */
    void start(void delegate() op)
    {
        op();
    }

    /**
     * Creates a new kernel thread and assigns it to run the supplied op.
     */
    void spawn(void delegate() op)
    {
        auto t = new Thread(op);
        t.start();
    }

    /**
     * This scheduler does no explicit multiplexing, so this is a no-op.
     */
    void yield() nothrow
    {
        // no explicit yield needed
    }

    /**
     * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
     * ThreadInfo, which is the correct behavior for this scheduler.
     */
    @property ref ThreadInfo thisInfo() nothrow
    {
        return ThreadInfo.thisInfo;
    }

    /**
     * Creates a new Condition variable.  No custom behavior is needed here.
     */
    Condition newCondition(Mutex m) nothrow
    {
        return new Condition(m);
    }
}

/**
 * An example Scheduler using Fibers.
 *
 * This is an example scheduler that creates a new Fiber per call to spawn
 * and multiplexes the execution of all fibers within the main thread.
 */
class FiberScheduler : Scheduler
{
    /**
     * This creates a new Fiber for the supplied op and then starts the
     * dispatcher.
     */
    void start(void delegate() op)
    {
        create(op);
        dispatch();
    }

    /**
     * This created a new Fiber for the supplied op and adds it to the
     * dispatch list.
     */
    void spawn(void delegate() op) nothrow
    {
        create(op);
        yield();
    }

    /**
     * If the caller is a scheduled Fiber, this yields execution to another
     * scheduled Fiber.
     */
    void yield() nothrow
    {
        // NOTE: It's possible that we should test whether the calling Fiber
        //       is an InfoFiber before yielding, but I think it's reasonable
        //       that any (non-Generator) fiber should yield here.
        if (Fiber.getThis())
            Fiber.yield();
    }

    /**
     * Returns an appropriate ThreadInfo instance.
     *
     * Returns a ThreadInfo instance specific to the calling Fiber if the
     * Fiber was created by this dispatcher, otherwise it returns
     * ThreadInfo.thisInfo.
     */
    @property ref ThreadInfo thisInfo() nothrow
    {
        auto f = cast(InfoFiber) Fiber.getThis();

        if (f !is null)
            return f.info;
        return ThreadInfo.thisInfo;
    }

    /**
     * Returns a Condition analog that yields when wait or notify is called.
     *
     * Bug:
     * For the default implementation, `notifyAll`will behave like `notify`.
     *
     * Params:
     *   m = A `Mutex` to use for locking if the condition needs to be waited on
     *       or notified from multiple `Thread`s.
     *       If `null`, no `Mutex` will be used and it is assumed that the
     *       `Condition` is only waited on/notified from one `Thread`.
     */
    Condition newCondition(Mutex m) nothrow
    {
        return new FiberCondition(m);
    }

protected:
    /**
     * Creates a new Fiber which calls the given delegate.
     *
     * Params:
     *   op = The delegate the fiber should call
     */
    void create(void delegate() op) nothrow
    {
        void wrap()
        {
            scope (exit)
            {
                thisInfo.cleanup();
            }
            op();
        }

        m_fibers ~= new InfoFiber(&wrap);
    }

    /**
     * Fiber which embeds a ThreadInfo
     */
    static class InfoFiber : Fiber
    {
        ThreadInfo info;

        this(void delegate() op) nothrow
        {
            super(op);
        }

        this(void delegate() op, size_t sz) nothrow
        {
            super(op, sz);
        }
    }

private:
    class FiberCondition : Condition
    {
        this(Mutex m) nothrow
        {
            super(m);
            notified = false;
        }

        override void wait() nothrow
        {
            scope (exit) notified = false;

            while (!notified)
                switchContext();
        }

        override bool wait(Duration period) nothrow
        {
            import core.time : MonoTime;

            scope (exit) notified = false;

            for (auto limit = MonoTime.currTime + period;
                 !notified && !period.isNegative;
                 period = limit - MonoTime.currTime)
            {
                this.outer.yield();
            }
            return notified;
        }

        override void notify() nothrow
        {
            notified = true;
            switchContext();
        }

        override void notifyAll() nothrow
        {
            notified = true;
            switchContext();
        }

    private:
        void switchContext() nothrow
        {
            if (mutex_nothrow) mutex_nothrow.unlock_nothrow();
            scope (exit)
                if (mutex_nothrow)
                    mutex_nothrow.lock_nothrow();
            this.outer.yield();
        }

        bool notified;
    }

    void dispatch()
    {
        import std.algorithm.mutation : remove;

        while (m_fibers.length > 0)
        {
            auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
            if (t !is null && !(cast(OwnerTerminated) t))
            {
                throw t;
            }
            if (m_fibers[m_pos].state == Fiber.State.TERM)
            {
                if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
                    m_pos = 0;
            }
            else if (m_pos++ >= m_fibers.length - 1)
            {
                m_pos = 0;
            }
        }
    }

    Fiber[] m_fibers;
    size_t m_pos;
}

@system unittest
{
    static void receive(Condition cond, ref size_t received)
    {
        while (true)
        {
            synchronized (cond.mutex)
            {
                cond.wait();
                ++received;
            }
        }
    }

    static void send(Condition cond, ref size_t sent)
    {
        while (true)
        {
            synchronized (cond.mutex)
            {
                ++sent;
                cond.notify();
            }
        }
    }

    auto fs = new FiberScheduler;
    auto mtx = new Mutex;
    auto cond = fs.newCondition(mtx);

    size_t received, sent;
    auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
    waiter.call();
    assert(received == 0);
    notifier.call();
    assert(sent == 1);
    assert(received == 0);
    waiter.call();
    assert(received == 1);
    waiter.call();
    assert(received == 1);
}

/**
 * Sets the Scheduler behavior within the program.
 *
 * This variable sets the Scheduler behavior within this program.  Typically,
 * when setting a Scheduler, scheduler.start() should be called in main.  This
 * routine will not return until program execution is complete.
 */
__gshared Scheduler scheduler;

// Generator

/**
 * If the caller is a Fiber and is not a Generator, this function will call
 * scheduler.yield() or Fiber.yield(), as appropriate.
 */
void yield() nothrow
{
    auto fiber = Fiber.getThis();
    if (!(cast(IsGenerator) fiber))
    {
        if (scheduler is null)
        {
            if (fiber)
                return Fiber.yield();
        }
        else
            scheduler.yield();
    }
}

/// Used to determine whether a Generator is running.
private interface IsGenerator {}


/**
 * A Generator is a Fiber that periodically returns values of type T to the
 * caller via yield.  This is represented as an InputRange.
 */
class Generator(T) :
    Fiber, IsGenerator, InputRange!T
{
    /**
     * Initializes a generator object which is associated with a static
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  fn = The fiber function.
     *
     * In:
     *  fn must not be null.
     */
    this(void function() fn)
    {
        super(fn);
        call();
    }

    /**
     * Initializes a generator object which is associated with a static
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  fn = The fiber function.
     *  sz = The stack size for this fiber.
     *
     * In:
     *  fn must not be null.
     */
    this(void function() fn, size_t sz)
    {
        super(fn, sz);
        call();
    }

    /**
     * Initializes a generator object which is associated with a static
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  fn = The fiber function.
     *  sz = The stack size for this fiber.
     *  guardPageSize = size of the guard page to trap fiber's stack
     *                  overflows. Refer to $(REF Fiber, core,thread)'s
     *                  documentation for more details.
     *
     * In:
     *  fn must not be null.
     */
    this(void function() fn, size_t sz, size_t guardPageSize)
    {
        super(fn, sz, guardPageSize);
        call();
    }

    /**
     * Initializes a generator object which is associated with a dynamic
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  dg = The fiber function.
     *
     * In:
     *  dg must not be null.
     */
    this(void delegate() dg)
    {
        super(dg);
        call();
    }

    /**
     * Initializes a generator object which is associated with a dynamic
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  dg = The fiber function.
     *  sz = The stack size for this fiber.
     *
     * In:
     *  dg must not be null.
     */
    this(void delegate() dg, size_t sz)
    {
        super(dg, sz);
        call();
    }

    /**
     * Initializes a generator object which is associated with a dynamic
     * D function.  The function will be called once to prepare the range
     * for iteration.
     *
     * Params:
     *  dg = The fiber function.
     *  sz = The stack size for this fiber.
     *  guardPageSize = size of the guard page to trap fiber's stack
     *                  overflows. Refer to $(REF Fiber, core,thread)'s
     *                  documentation for more details.
     *
     * In:
     *  dg must not be null.
     */
    this(void delegate() dg, size_t sz, size_t guardPageSize)
    {
        super(dg, sz, guardPageSize);
        call();
    }

    /**
     * Returns true if the generator is empty.
     */
    final bool empty() @property
    {
        return m_value is null || state == State.TERM;
    }

    /**
     * Obtains the next value from the underlying function.
     */
    final void popFront()
    {
        call();
    }

    /**
     * Returns the most recently generated value by shallow copy.
     */
    final T front() @property
    {
        return *m_value;
    }

    /**
     * Returns the most recently generated value without executing a
     * copy contructor. Will not compile for element types defining a
     * postblit, because Generator does not return by reference.
     */
    final T moveFront()
    {
        static if (!hasElaborateCopyConstructor!T)
        {
            return front;
        }
        else
        {
            static assert(0,
                    "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
        }
    }

    final int opApply(scope int delegate(T) loopBody)
    {
        int broken;
        for (; !empty; popFront())
        {
            broken = loopBody(front);
            if (broken) break;
        }
        return broken;
    }

    final int opApply(scope int delegate(size_t, T) loopBody)
    {
        int broken;
        for (size_t i; !empty; ++i, popFront())
        {
            broken = loopBody(i, front);
            if (broken) break;
        }
        return broken;
    }
private:
    T* m_value;
}

///
@system unittest
{
    auto tid = spawn({
        int i;
        while (i < 9)
            i = receiveOnly!int;

        ownerTid.send(i * 2);
    });

    auto r = new Generator!int({
        foreach (i; 1 .. 10)
            yield(i);
    });

    foreach (e; r)
        tid.send(e);

    assert(receiveOnly!int == 18);
}

/**
 * Yields a value of type T to the caller of the currently executing
 * generator.
 *
 * Params:
 *  value = The value to yield.
 */
void yield(T)(ref T value)
{
    Generator!T cur = cast(Generator!T) Fiber.getThis();
    if (cur !is null && cur.state == Fiber.State.EXEC)
    {
        cur.m_value = &value;
        return Fiber.yield();
    }
    throw new Exception("yield(T) called with no active generator for the supplied type");
}

/// ditto
void yield(T)(T value)
{
    yield(value);
}

@system unittest
{
    import core.exception;
    import std.exception;

    auto mainTid = thisTid;
    alias testdg = () {
        auto tid = spawn(
        (Tid mainTid) {
            int i;
            scope (failure) mainTid.send(false);
            try
            {
                for (i = 1; i < 10; i++)
                {
                    if (receiveOnly!int() != i)
                    {
                        mainTid.send(false);
                        break;
                    }
                }
            }
            catch (OwnerTerminated e)
            {
                // i will advance 1 past the last value expected
                mainTid.send(i == 4);
            }
        }, mainTid);
        auto r = new Generator!int(
        {
            assertThrown!Exception(yield(2.0));
            yield(); // ensure this is a no-op
            yield(1);
            yield(); // also once something has been yielded
            yield(2);
            yield(3);
        });

        foreach (e; r)
        {
            tid.send(e);
        }
    };

    scheduler = new ThreadScheduler;
    scheduler.spawn(testdg);
    assert(receiveOnly!bool());

    scheduler = new FiberScheduler;
    scheduler.start(testdg);
    assert(receiveOnly!bool());
    scheduler = null;
}
///
@system unittest
{
    import std.range;

    InputRange!int myIota = iota(10).inputRangeObject;

    myIota.popFront();
    myIota.popFront();
    assert(myIota.moveFront == 2);
    assert(myIota.front == 2);
    myIota.popFront();
    assert(myIota.front == 3);

    //can be assigned to std.range.interfaces.InputRange directly
    myIota = new Generator!int(
    {
        foreach (i; 0 .. 10) yield(i);
    });

    myIota.popFront();
    myIota.popFront();
    assert(myIota.moveFront == 2);
    assert(myIota.front == 2);
    myIota.popFront();
    assert(myIota.front == 3);

    size_t[2] counter = [0, 0];
    foreach (i, unused; myIota) counter[] += [1, i];

    assert(myIota.empty);
    assert(counter == [7, 21]);
}

private
{
    /*
     * A MessageBox is a message queue for one thread.  Other threads may send
     * messages to this owner by calling put(), and the owner receives them by
     * calling get().  The put() call is therefore effectively shared and the
     * get() call is effectively local.  setMaxMsgs may be used by any thread
     * to limit the size of the message queue.
     */
    class MessageBox
    {
        this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
        {
            m_lock = new Mutex;
            m_closed = false;

            if (scheduler is null)
            {
                m_putMsg = new Condition(m_lock);
                m_notFull = new Condition(m_lock);
            }
            else
            {
                m_putMsg = scheduler.newCondition(m_lock);
                m_notFull = scheduler.newCondition(m_lock);
            }
        }

        ///
        final @property bool isClosed() @safe @nogc pure
        {
            synchronized (m_lock)
            {
                return m_closed;
            }
        }

        /*
         * Sets a limit on the maximum number of user messages allowed in the
         * mailbox.  If this limit is reached, the caller attempting to add
         * a new message will execute call.  If num is zero, there is no limit
         * on the message queue.
         *
         * Params:
         *  num  = The maximum size of the queue or zero if the queue is
         *         unbounded.
         *  call = The routine to call when the queue is full.
         */
        final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
        {
            synchronized (m_lock)
            {
                m_maxMsgs = num;
                m_onMaxMsgs = call;
            }
        }

        /*
         * If maxMsgs is not set, the message is added to the queue and the
         * owner is notified.  If the queue is full, the message will still be
         * accepted if it is a control message, otherwise onCrowdingDoThis is
         * called.  If the routine returns true, this call will block until
         * the owner has made space available in the queue.  If it returns
         * false, this call will abort.
         *
         * Params:
         *  msg = The message to put in the queue.
         *
         * Throws:
         *  An exception if the queue is full and onCrowdingDoThis throws.
         */
        final void put(ref Message msg)
        {
            synchronized (m_lock)
            {
                // TODO: Generate an error here if m_closed is true, or maybe
                //       put a message in the caller's queue?
                if (!m_closed)
                {
                    while (true)
                    {
                        if (isPriorityMsg(msg))
                        {
                            m_sharedPty.put(msg);
                            m_putMsg.notify();
                            return;
                        }
                        if (!mboxFull() || isControlMsg(msg))
                        {
                            m_sharedBox.put(msg);
                            m_putMsg.notify();
                            return;
                        }
                        if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
                        {
                            return;
                        }
                        m_putQueue++;
                        m_notFull.wait();
                        m_putQueue--;
                    }
                }
            }
        }

        /*
         * Matches ops against each message in turn until a match is found.
         *
         * Params:
         *  ops = The operations to match.  Each may return a bool to indicate
         *        whether a message with a matching type is truly a match.
         *
         * Returns:
         *  true if a message was retrieved and false if not (such as if a
         *  timeout occurred).
         *
         * Throws:
         *  LinkTerminated if a linked thread terminated, or OwnerTerminated
         * if the owner thread terminates and no existing messages match the
         * supplied ops.
         */
        bool get(T...)(scope T vals)
        {
            import std.meta : AliasSeq;

            static assert(T.length, "T must not be empty");

            static if (isImplicitlyConvertible!(T[0], Duration))
            {
                alias Ops = AliasSeq!(T[1 .. $]);
                alias ops = vals[1 .. $];
                enum timedWait = true;
                Duration period = vals[0];
            }
            else
            {
                alias Ops = AliasSeq!(T);
                alias ops = vals[0 .. $];
                enum timedWait = false;
            }

            bool onStandardMsg(ref Message msg)
            {
                foreach (i, t; Ops)
                {
                    alias Args = Parameters!(t);
                    auto op = ops[i];

                    if (msg.convertsTo!(Args))
                    {
                        alias RT = ReturnType!(t);
                        static if (is(RT == bool))
                        {
                            return msg.map(op);
                        }
                        else
                        {
                            msg.map(op);
                            static if (!is(immutable RT == immutable noreturn))
                                return true;
                        }
                    }
                }
                return false;
            }

            bool onLinkDeadMsg(ref Message msg)
            {
                assert(msg.convertsTo!(Tid),
                        "Message could be converted to Tid");
                auto tid = msg.get!(Tid);

                if (bool* pDepends = tid in thisInfo.links)
                {
                    auto depends = *pDepends;
                    thisInfo.links.remove(tid);
                    // Give the owner relationship precedence.
                    if (depends && tid != thisInfo.owner)
                    {
                        auto e = new LinkTerminated(tid);
                        auto m = Message(MsgType.standard, e);
                        if (onStandardMsg(m))
                            return true;
                        throw e;
                    }
                }
                if (tid == thisInfo.owner)
                {
                    thisInfo.owner = Tid.init;
                    auto e = new OwnerTerminated(tid);
                    auto m = Message(MsgType.standard, e);
                    if (onStandardMsg(m))
                        return true;
                    throw e;
                }
                return false;
            }

            bool onControlMsg(ref Message msg)
            {
                switch (msg.type)
                {
                case MsgType.linkDead:
                    return onLinkDeadMsg(msg);
                default:
                    return false;
                }
            }

            bool scan(ref ListT list)
            {
                for (auto range = list[]; !range.empty;)
                {
                    // Only the message handler will throw, so if this occurs
                    // we can be certain that the message was handled.
                    scope (failure)
                        list.removeAt(range);

                    if (isControlMsg(range.front))
                    {
                        if (onControlMsg(range.front))
                        {
                            // Although the linkDead message is a control message,
                            // it can be handled by the user.  Since the linkDead
                            // message throws if not handled, if we get here then
                            // it has been handled and we can return from receive.
                            // This is a weird special case that will have to be
                            // handled in a more general way if more are added.
                            if (!isLinkDeadMsg(range.front))
                            {
                                list.removeAt(range);
                                continue;
                            }
                            list.removeAt(range);
                            return true;
                        }
                        range.popFront();
                        continue;
                    }
                    else
                    {
                        if (onStandardMsg(range.front))
                        {
                            list.removeAt(range);
                            return true;
                        }
                        range.popFront();
                        continue;
                    }
                }
                return false;
            }

            bool pty(ref ListT list)
            {
                if (!list.empty)
                {
                    auto range = list[];

                    if (onStandardMsg(range.front))
                    {
                        list.removeAt(range);
                        return true;
                    }
                    if (range.front.convertsTo!(Throwable))
                        throw range.front.get!(Throwable);
                    else if (range.front.convertsTo!(shared(Throwable)))
                        throw range.front.get!(shared(Throwable));
                    else
                        throw new PriorityMessageException(range.front.data);
                }
                return false;
            }

            static if (timedWait)
            {
                import core.time : MonoTime;
                auto limit = MonoTime.currTime + period;
            }

            while (true)
            {
                ListT arrived;

                if (pty(m_localPty) || scan(m_localBox))
                {
                    return true;
                }
                yield();
                synchronized (m_lock)
                {
                    updateMsgCount();
                    while (m_sharedPty.empty && m_sharedBox.empty)
                    {
                        // NOTE: We're notifying all waiters here instead of just
                        //       a few because the onCrowding behavior may have
                        //       changed and we don't want to block sender threads
                        //       unnecessarily if the new behavior is not to block.
                        //       This will admittedly result in spurious wakeups
                        //       in other situations, but what can you do?
                        if (m_putQueue && !mboxFull())
                            m_notFull.notifyAll();
                        static if (timedWait)
                        {
                            if (period <= Duration.zero || !m_putMsg.wait(period))
                                return false;
                        }
                        else
                        {
                            m_putMsg.wait();
                        }
                    }
                    m_localPty.put(m_sharedPty);
                    arrived.put(m_sharedBox);
                }
                if (m_localPty.empty)
                {
                    scope (exit) m_localBox.put(arrived);
                    if (scan(arrived))
                    {
                        return true;
                    }
                    else
                    {
                        static if (timedWait)
                        {
                            period = limit - MonoTime.currTime;
                        }
                        continue;
                    }
                }
                m_localBox.put(arrived);
                pty(m_localPty);
                return true;
            }
        }

        /*
         * Called on thread termination.  This routine processes any remaining
         * control messages, clears out message queues, and sets a flag to
         * reject any future messages.
         */
        final void close()
        {
            static void onLinkDeadMsg(ref Message msg)
            {
                assert(msg.convertsTo!(Tid),
                        "Message could be converted to Tid");
                auto tid = msg.get!(Tid);

                thisInfo.links.remove(tid);
                if (tid == thisInfo.owner)
                    thisInfo.owner = Tid.init;
            }

            static void sweep(ref ListT list)
            {
                for (auto range = list[]; !range.empty; range.popFront())
                {
                    if (range.front.type == MsgType.linkDead)
                        onLinkDeadMsg(range.front);
                }
            }

            ListT arrived;

            sweep(m_localBox);
            synchronized (m_lock)
            {
                arrived.put(m_sharedBox);
                m_closed = true;
            }
            m_localBox.clear();
            sweep(arrived);
        }

    private:
        // Routines involving local data only, no lock needed.

        bool mboxFull() @safe @nogc pure nothrow
        {
            return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
        }

        void updateMsgCount() @safe @nogc pure nothrow
        {
            m_localMsgs = m_localBox.length;
        }

        bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
        {
            return msg.type != MsgType.standard && msg.type != MsgType.priority;
        }

        bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
        {
            return msg.type == MsgType.priority;
        }

        bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
        {
            return msg.type == MsgType.linkDead;
        }

        alias OnMaxFn = bool function(Tid);
        alias ListT = List!(Message);

        ListT m_localBox;
        ListT m_localPty;

        Mutex m_lock;
        Condition m_putMsg;
        Condition m_notFull;
        size_t m_putQueue;
        ListT m_sharedBox;
        ListT m_sharedPty;
        OnMaxFn m_onMaxMsgs;
        size_t m_localMsgs;
        size_t m_maxMsgs;
        bool m_closed;
    }

    /*
     *
     */
    struct List(T)
    {
        struct Range
        {
            import std.exception : enforce;

            @property bool empty() const
            {
                return !m_prev.next;
            }

            @property ref T front()
            {
                enforce(m_prev.next, "invalid list node");
                return m_prev.next.val;
            }

            @property void front(T val)
            {
                enforce(m_prev.next, "invalid list node");
                m_prev.next.val = val;
            }

            void popFront()
            {
                enforce(m_prev.next, "invalid list node");
                m_prev = m_prev.next;
            }

            private this(Node* p)
            {
                m_prev = p;
            }

            private Node* m_prev;
        }

        void put(T val)
        {
            put(newNode(val));
        }

        void put(ref List!(T) rhs)
        {
            if (!rhs.empty)
            {
                put(rhs.m_first);
                while (m_last.next !is null)
                {
                    m_last = m_last.next;
                    m_count++;
                }
                rhs.m_first = null;
                rhs.m_last = null;
                rhs.m_count = 0;
            }
        }

        Range opSlice()
        {
            return Range(cast(Node*)&m_first);
        }

        void removeAt(Range r)
        {
            import std.exception : enforce;

            assert(m_count, "Can not remove from empty Range");
            Node* n = r.m_prev;
            enforce(n && n.next, "attempting to remove invalid list node");

            if (m_last is m_first)
                m_last = null;
            else if (m_last is n.next)
                m_last = n; // nocoverage
            Node* to_free = n.next;
            n.next = n.next.next;
            freeNode(to_free);
            m_count--;
        }

        @property size_t length()
        {
            return m_count;
        }

        void clear()
        {
            m_first = m_last = null;
            m_count = 0;
        }

        @property bool empty()
        {
            return m_first is null;
        }

    private:
        struct Node
        {
            Node* next;
            T val;

            this(T v)
            {
                val = v;
            }
        }

        static shared struct SpinLock
        {
            void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
            void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
            bool locked;
        }

        static shared SpinLock sm_lock;
        static shared Node* sm_head;

        Node* newNode(T v)
        {
            Node* n;
            {
                sm_lock.lock();
                scope (exit) sm_lock.unlock();

                if (sm_head)
                {
                    n = cast(Node*) sm_head;
                    sm_head = sm_head.next;
                }
            }
            if (n)
            {
                import core.lifetime : emplace;
                emplace!Node(n, v);
            }
            else
            {
                n = new Node(v);
            }
            return n;
        }

        void freeNode(Node* n)
        {
            // destroy val to free any owned GC memory
            destroy(n.val);

            sm_lock.lock();
            scope (exit) sm_lock.unlock();

            auto sn = cast(shared(Node)*) n;
            sn.next = sm_head;
            sm_head = sn;
        }

        void put(Node* n)
        {
            m_count++;
            if (!empty)
            {
                m_last.next = n;
                m_last = n;
                return;
            }
            m_first = n;
            m_last = n;
        }

        Node* m_first;
        Node* m_last;
        size_t m_count;
    }
}

@system unittest
{
    import std.typecons : tuple, Tuple;

    static void testfn(Tid tid)
    {
        receive((float val) { assert(0); }, (int val, int val2) {
            assert(val == 42 && val2 == 86);
        });
        receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
        receive((Variant val) {  });
        receive((string val) {
            if ("the quick brown fox" != val)
                return false;
            return true;
        }, (string val) { assert(false); });
        prioritySend(tid, "done");
    }

    static void runTest(Tid tid)
    {
        send(tid, 42, 86);
        send(tid, tuple(42, 86));
        send(tid, "hello", "there");
        send(tid, "the quick brown fox");
        receive((string val) { assert(val == "done"); });
    }

    static void simpleTest()
    {
        auto tid = spawn(&testfn, thisTid);
        runTest(tid);

        // Run the test again with a limited mailbox size.
        tid = spawn(&testfn, thisTid);
        setMaxMailboxSize(tid, 2, OnCrowding.block);
        runTest(tid);
    }

    simpleTest();

    scheduler = new ThreadScheduler;
    simpleTest();
    scheduler = null;
}

private @property shared(Mutex) initOnceLock()
{
    static shared Mutex lock;
    if (auto mtx = atomicLoad!(MemoryOrder.acq)(lock))
        return mtx;
    auto mtx = new shared Mutex;
    if (cas(&lock, cast(shared) null, mtx))
        return mtx;
    return atomicLoad!(MemoryOrder.acq)(lock);
}

/**
 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
 * thread-safe manner.
 *
 * The implementation guarantees that all threads simultaneously calling
 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
 * fully initialized. All side-effects of $(D_PARAM init) are globally visible
 * afterwards.
 *
 * Params:
 *   var = The variable to initialize
 *   init = The lazy initializer value
 *
 * Returns:
 *   A reference to the initialized variable
 */
auto ref initOnce(alias var)(lazy typeof(var) init)
{
    return initOnce!var(init, initOnceLock);
}

/// A typical use-case is to perform lazy but thread-safe initialization.
@system unittest
{
    static class MySingleton
    {
        static MySingleton instance()
        {
            __gshared MySingleton inst;
            return initOnce!inst(new MySingleton);
        }
    }

    assert(MySingleton.instance !is null);
}

@system unittest
{
    static class MySingleton
    {
        static MySingleton instance()
        {
            __gshared MySingleton inst;
            return initOnce!inst(new MySingleton);
        }

    private:
        this() { val = ++cnt; }
        size_t val;
        __gshared size_t cnt;
    }

    foreach (_; 0 .. 10)
        spawn({ ownerTid.send(MySingleton.instance.val); });
    foreach (_; 0 .. 10)
        assert(receiveOnly!size_t == MySingleton.instance.val);
    assert(MySingleton.cnt == 1);
}

/**
 * Same as above, but takes a separate mutex instead of sharing one among
 * all initOnce instances.
 *
 * This should be used to avoid dead-locks when the $(D_PARAM init)
 * expression waits for the result of another thread that might also
 * call initOnce. Use with care.
 *
 * Params:
 *   var = The variable to initialize
 *   init = The lazy initializer value
 *   mutex = A mutex to prevent race conditions
 *
 * Returns:
 *   A reference to the initialized variable
 */
auto ref initOnce(alias var)(lazy typeof(var) init, shared Mutex mutex)
{
    // check that var is global, can't take address of a TLS variable
    static assert(is(typeof({ __gshared p = &var; })),
        "var must be 'static shared' or '__gshared'.");
    import core.atomic : atomicLoad, MemoryOrder, atomicStore;

    static shared bool flag;
    if (!atomicLoad!(MemoryOrder.acq)(flag))
    {
        synchronized (mutex)
        {
            if (!atomicLoad!(MemoryOrder.raw)(flag))
            {
                var = init;
                static if (!is(immutable typeof(var) == immutable noreturn))
                    atomicStore!(MemoryOrder.rel)(flag, true);
            }
        }
    }
    return var;
}

/// ditto
auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
{
    return initOnce!var(init, cast(shared) mutex);
}

/// Use a separate mutex when init blocks on another thread that might also call initOnce.
@system unittest
{
    import core.sync.mutex : Mutex;

    static shared bool varA, varB;
    static shared Mutex m;
    m = new shared Mutex;

    spawn({
        // use a different mutex for varB to avoid a dead-lock
        initOnce!varB(true, m);
        ownerTid.send(true);
    });
    // init depends on the result of the spawned thread
    initOnce!varA(receiveOnly!bool);
    assert(varA == true);
    assert(varB == true);
}

@system unittest
{
    static shared bool a;
    __gshared bool b;
    static bool c;
    bool d;
    initOnce!a(true);
    initOnce!b(true);
    static assert(!__traits(compiles, initOnce!c(true))); // TLS
    static assert(!__traits(compiles, initOnce!d(true))); // local variable
}

// test ability to send shared arrays
@system unittest
{
    static shared int[] x = new shared(int)[1];
    auto tid = spawn({
        auto arr = receiveOnly!(shared(int)[]);
        arr[0] = 5;
        ownerTid.send(true);
    });
    tid.send(x);
    receiveOnly!(bool);
    assert(x[0] == 5);
}

// https://issues.dlang.org/show_bug.cgi?id=13930
@system unittest
{
    immutable aa = ["0":0];
    thisTid.send(aa);
    receiveOnly!(immutable int[string]); // compile error
}

// https://issues.dlang.org/show_bug.cgi?id=19345
@system unittest
{
    static struct Aggregate { const int a; const int[5] b; }
    static void t1(Tid mainTid)
    {
        const sendMe = Aggregate(42, [1, 2, 3, 4, 5]);
        mainTid.send(sendMe);
    }

    spawn(&t1, thisTid);
    auto result1 = receiveOnly!(const Aggregate)();
    immutable expected = Aggregate(42, [1, 2, 3, 4, 5]);
    assert(result1 == expected);
}

// Noreturn support
@system unittest
{
    static noreturn foo(int) { throw new Exception(""); }

    if (false) spawn(&foo, 1);
    if (false) spawnLinked(&foo, 1);

    if (false) receive(&foo);
    if (false) receiveTimeout(Duration.init, &foo);

    // Wrapped in __traits(compiles) to skip codegen which crashes dmd's backend
    static assert(__traits(compiles, receiveOnly!noreturn()                 ));
    static assert(__traits(compiles, send(Tid.init, noreturn.init)          ));
    static assert(__traits(compiles, prioritySend(Tid.init, noreturn.init)  ));
    static assert(__traits(compiles, yield(noreturn.init)                   ));

    static assert(__traits(compiles, {
        __gshared noreturn n;
        initOnce!n(noreturn.init);
    }));
}