| /** |
| * This is a low-level messaging API upon which more structured or restrictive |
| * APIs may be built. The general idea is that every messageable entity is |
| * represented by a common handle type called a Tid, which allows messages to |
| * be sent to logical threads that are executing in both the current process |
| * and in external processes using the same interface. This is an important |
| * aspect of scalability because it allows the components of a program to be |
| * spread across available resources with few to no changes to the actual |
| * implementation. |
| * |
| * A logical thread is an execution context that has its own stack and which |
| * runs asynchronously to other logical threads. These may be preemptively |
| * scheduled kernel threads, fibers (cooperative user-space threads), or some |
| * other concept with similar behavior. |
| * |
| * The type of concurrency used when logical threads are created is determined |
| * by the Scheduler selected at initialization time. The default behavior is |
| * currently to create a new kernel thread per call to spawn, but other |
| * schedulers are available that multiplex fibers across the main thread or |
| * use some combination of the two approaches. |
| * |
| * Copyright: Copyright Sean Kelly 2009 - 2014. |
| * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. |
| * Authors: Sean Kelly, Alex Rønne Petersen, Martin Nowak |
| * Source: $(PHOBOSSRC std/_concurrency.d) |
| */ |
| /* Copyright Sean Kelly 2009 - 2014. |
| * Distributed under the Boost Software License, Version 1.0. |
| * (See accompanying file LICENSE_1_0.txt or copy at |
| * http://www.boost.org/LICENSE_1_0.txt) |
| */ |
| module std.concurrency; |
| |
| public import std.variant; |
| |
| import core.atomic; |
| import core.sync.condition; |
| import core.sync.mutex; |
| import core.thread; |
| import std.range.primitives; |
| import std.range.interfaces : InputRange; |
| import std.traits; |
| |
| /// |
| @system unittest |
| { |
| __gshared string received; |
| static void spawnedFunc(Tid ownerTid) |
| { |
| import std.conv : text; |
| // Receive a message from the owner thread. |
| receive((int i){ |
| received = text("Received the number ", i); |
| |
| // Send a message back to the owner thread |
| // indicating success. |
| send(ownerTid, true); |
| }); |
| } |
| |
| // Start spawnedFunc in a new thread. |
| auto childTid = spawn(&spawnedFunc, thisTid); |
| |
| // Send the number 42 to this new thread. |
| send(childTid, 42); |
| |
| // Receive the result code. |
| auto wasSuccessful = receiveOnly!(bool); |
| assert(wasSuccessful); |
| assert(received == "Received the number 42"); |
| } |
| |
| private |
| { |
| template hasLocalAliasing(T...) |
| { |
| static if (!T.length) |
| enum hasLocalAliasing = false; |
| else |
| enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) || |
| std.concurrency.hasLocalAliasing!(T[1 .. $]); |
| } |
| |
| enum MsgType |
| { |
| standard, |
| priority, |
| linkDead, |
| } |
| |
| struct Message |
| { |
| MsgType type; |
| Variant data; |
| |
| this(T...)(MsgType t, T vals) if (T.length > 0) |
| { |
| static if (T.length == 1) |
| { |
| type = t; |
| data = vals[0]; |
| } |
| else |
| { |
| import std.typecons : Tuple; |
| |
| type = t; |
| data = Tuple!(T)(vals); |
| } |
| } |
| |
| @property auto convertsTo(T...)() |
| { |
| static if (T.length == 1) |
| { |
| return is(T[0] == Variant) || data.convertsTo!(T); |
| } |
| else |
| { |
| import std.typecons : Tuple; |
| return data.convertsTo!(Tuple!(T)); |
| } |
| } |
| |
| @property auto get(T...)() |
| { |
| static if (T.length == 1) |
| { |
| static if (is(T[0] == Variant)) |
| return data; |
| else |
| return data.get!(T); |
| } |
| else |
| { |
| import std.typecons : Tuple; |
| return data.get!(Tuple!(T)); |
| } |
| } |
| |
| auto map(Op)(Op op) |
| { |
| alias Args = Parameters!(Op); |
| |
| static if (Args.length == 1) |
| { |
| static if (is(Args[0] == Variant)) |
| return op(data); |
| else |
| return op(data.get!(Args)); |
| } |
| else |
| { |
| import std.typecons : Tuple; |
| return op(data.get!(Tuple!(Args)).expand); |
| } |
| } |
| } |
| |
| void checkops(T...)(T ops) |
| { |
| foreach (i, t1; T) |
| { |
| static assert(isFunctionPointer!t1 || isDelegate!t1); |
| alias a1 = Parameters!(t1); |
| alias r1 = ReturnType!(t1); |
| |
| static if (i < T.length - 1 && is(r1 == void)) |
| { |
| static assert(a1.length != 1 || !is(a1[0] == Variant), |
| "function with arguments " ~ a1.stringof ~ |
| " occludes successive function"); |
| |
| foreach (t2; T[i + 1 .. $]) |
| { |
| static assert(isFunctionPointer!t2 || isDelegate!t2); |
| alias a2 = Parameters!(t2); |
| |
| static assert(!is(a1 == a2), |
| "function with arguments " ~ a1.stringof ~ " occludes successive function"); |
| } |
| } |
| } |
| } |
| |
| @property ref ThreadInfo thisInfo() nothrow |
| { |
| if (scheduler is null) |
| return ThreadInfo.thisInfo; |
| return scheduler.thisInfo; |
| } |
| } |
| |
| static ~this() |
| { |
| thisInfo.cleanup(); |
| } |
| |
| // Exceptions |
| |
| /** |
| * Thrown on calls to $(D receiveOnly) if a message other than the type |
| * the receiving thread expected is sent. |
| */ |
| class MessageMismatch : Exception |
| { |
| /// |
| this(string msg = "Unexpected message type") @safe pure nothrow @nogc |
| { |
| super(msg); |
| } |
| } |
| |
| /** |
| * Thrown on calls to $(D receive) if the thread that spawned the receiving |
| * thread has terminated and no more messages exist. |
| */ |
| class OwnerTerminated : Exception |
| { |
| /// |
| this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc |
| { |
| super(msg); |
| tid = t; |
| } |
| |
| Tid tid; |
| } |
| |
| /** |
| * Thrown if a linked thread has terminated. |
| */ |
| class LinkTerminated : Exception |
| { |
| /// |
| this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc |
| { |
| super(msg); |
| tid = t; |
| } |
| |
| Tid tid; |
| } |
| |
| /** |
| * Thrown if a message was sent to a thread via |
| * $(REF prioritySend, std,concurrency) and the receiver does not have a handler |
| * for a message of this type. |
| */ |
| class PriorityMessageException : Exception |
| { |
| /// |
| this(Variant vals) |
| { |
| super("Priority message"); |
| message = vals; |
| } |
| |
| /** |
| * The message that was sent. |
| */ |
| Variant message; |
| } |
| |
| /** |
| * Thrown on mailbox crowding if the mailbox is configured with |
| * $(D OnCrowding.throwException). |
| */ |
| class MailboxFull : Exception |
| { |
| /// |
| this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc |
| { |
| super(msg); |
| tid = t; |
| } |
| |
| Tid tid; |
| } |
| |
| /** |
| * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't |
| * find an owner thread. |
| */ |
| class TidMissingException : Exception |
| { |
| import std.exception : basicExceptionCtors; |
| /// |
| mixin basicExceptionCtors; |
| } |
| |
| |
| // Thread ID |
| |
| |
| /** |
| * An opaque type used to represent a logical thread. |
| */ |
| struct Tid |
| { |
| private: |
| this(MessageBox m) @safe pure nothrow @nogc |
| { |
| mbox = m; |
| } |
| |
| MessageBox mbox; |
| |
| public: |
| |
| /** |
| * Generate a convenient string for identifying this Tid. This is only |
| * useful to see if Tid's that are currently executing are the same or |
| * different, e.g. for logging and debugging. It is potentially possible |
| * that a Tid executed in the future will have the same toString() output |
| * as another Tid that has already terminated. |
| */ |
| void toString(scope void delegate(const(char)[]) sink) |
| { |
| import std.format : formattedWrite; |
| formattedWrite(sink, "Tid(%x)", cast(void*) mbox); |
| } |
| |
| } |
| |
| @system unittest |
| { |
| // text!Tid is @system |
| import std.conv : text; |
| Tid tid; |
| assert(text(tid) == "Tid(0)"); |
| auto tid2 = thisTid; |
| assert(text(tid2) != "Tid(0)"); |
| auto tid3 = tid2; |
| assert(text(tid2) == text(tid3)); |
| } |
| |
| /** |
| * Returns: The $(LREF Tid) of the caller's thread. |
| */ |
| @property Tid thisTid() @safe |
| { |
| // TODO: remove when concurrency is safe |
| static auto trus() @trusted |
| { |
| if (thisInfo.ident != Tid.init) |
| return thisInfo.ident; |
| thisInfo.ident = Tid(new MessageBox); |
| return thisInfo.ident; |
| } |
| |
| return trus(); |
| } |
| |
| /** |
| * Return the Tid of the thread which spawned the caller's thread. |
| * |
| * Throws: A $(D TidMissingException) exception if |
| * there is no owner thread. |
| */ |
| @property Tid ownerTid() |
| { |
| import std.exception : enforce; |
| |
| enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); |
| return thisInfo.owner; |
| } |
| |
| @system unittest |
| { |
| import std.exception : assertThrown; |
| |
| static void fun() |
| { |
| string res = receiveOnly!string(); |
| assert(res == "Main calling"); |
| ownerTid.send("Child responding"); |
| } |
| |
| assertThrown!TidMissingException(ownerTid); |
| auto child = spawn(&fun); |
| child.send("Main calling"); |
| string res = receiveOnly!string(); |
| assert(res == "Child responding"); |
| } |
| |
| // Thread Creation |
| |
| private template isSpawnable(F, T...) |
| { |
| template isParamsImplicitlyConvertible(F1, F2, int i = 0) |
| { |
| alias param1 = Parameters!F1; |
| alias param2 = Parameters!F2; |
| static if (param1.length != param2.length) |
| enum isParamsImplicitlyConvertible = false; |
| else static if (param1.length == i) |
| enum isParamsImplicitlyConvertible = true; |
| else static if (isImplicitlyConvertible!(param2[i], param1[i])) |
| enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, |
| F2, i + 1); |
| else |
| enum isParamsImplicitlyConvertible = false; |
| } |
| |
| enum isSpawnable = isCallable!F && is(ReturnType!F == void) |
| && isParamsImplicitlyConvertible!(F, void function(T)) |
| && (isFunctionPointer!F || !hasUnsharedAliasing!F); |
| } |
| |
| /** |
| * Starts fn(args) in a new logical thread. |
| * |
| * Executes the supplied function in a new logical thread represented by |
| * $(D Tid). The calling thread is designated as the owner of the new thread. |
| * When the owner thread terminates an $(D OwnerTerminated) message will be |
| * sent to the new thread, causing an $(D OwnerTerminated) exception to be |
| * thrown on $(D receive()). |
| * |
| * Params: |
| * fn = The function to execute. |
| * args = Arguments to the function. |
| * |
| * Returns: |
| * A Tid representing the new logical thread. |
| * |
| * Notes: |
| * $(D args) must not have unshared aliasing. In other words, all arguments |
| * to $(D fn) must either be $(D shared) or $(D immutable) or have no |
| * pointer indirection. This is necessary for enforcing isolation among |
| * threads. |
| * |
| * Example: |
| * --- |
| * import std.stdio, std.concurrency; |
| * |
| * void f1(string str) |
| * { |
| * writeln(str); |
| * } |
| * |
| * void f2(char[] str) |
| * { |
| * writeln(str); |
| * } |
| * |
| * void main() |
| * { |
| * auto str = "Hello, world"; |
| * |
| * // Works: string is immutable. |
| * auto tid1 = spawn(&f1, str); |
| * |
| * // Fails: char[] has mutable aliasing. |
| * auto tid2 = spawn(&f2, str.dup); |
| * |
| * // New thread with anonymous function |
| * spawn({ writeln("This is so great!"); }); |
| * } |
| * --- |
| */ |
| Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) |
| { |
| static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); |
| return _spawn(false, fn, args); |
| } |
| |
| /** |
| * Starts fn(args) in a logical thread and will receive a LinkTerminated |
| * message when the operation terminates. |
| * |
| * Executes the supplied function in a new logical thread represented by |
| * Tid. This new thread is linked to the calling thread so that if either |
| * it or the calling thread terminates a LinkTerminated message will be sent |
| * to the other, causing a LinkTerminated exception to be thrown on receive(). |
| * The owner relationship from spawn() is preserved as well, so if the link |
| * between threads is broken, owner termination will still result in an |
| * OwnerTerminated exception to be thrown on receive(). |
| * |
| * Params: |
| * fn = The function to execute. |
| * args = Arguments to the function. |
| * |
| * Returns: |
| * A Tid representing the new thread. |
| */ |
| Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T)) |
| { |
| static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); |
| return _spawn(true, fn, args); |
| } |
| |
| /* |
| * |
| */ |
| private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T)) |
| { |
| // TODO: MessageList and &exec should be shared. |
| auto spawnTid = Tid(new MessageBox); |
| auto ownerTid = thisTid; |
| |
| void exec() |
| { |
| thisInfo.ident = spawnTid; |
| thisInfo.owner = ownerTid; |
| fn(args); |
| } |
| |
| // TODO: MessageList and &exec should be shared. |
| if (scheduler !is null) |
| scheduler.spawn(&exec); |
| else |
| { |
| auto t = new Thread(&exec); |
| t.start(); |
| } |
| thisInfo.links[spawnTid] = linked; |
| return spawnTid; |
| } |
| |
| @system unittest |
| { |
| void function() fn1; |
| void function(int) fn2; |
| static assert(__traits(compiles, spawn(fn1))); |
| static assert(__traits(compiles, spawn(fn2, 2))); |
| static assert(!__traits(compiles, spawn(fn1, 1))); |
| static assert(!__traits(compiles, spawn(fn2))); |
| |
| void delegate(int) shared dg1; |
| shared(void delegate(int)) dg2; |
| shared(void delegate(long) shared) dg3; |
| shared(void delegate(real, int, long) shared) dg4; |
| void delegate(int) immutable dg5; |
| void delegate(int) dg6; |
| static assert(__traits(compiles, spawn(dg1, 1))); |
| static assert(__traits(compiles, spawn(dg2, 2))); |
| static assert(__traits(compiles, spawn(dg3, 3))); |
| static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); |
| static assert(__traits(compiles, spawn(dg5, 5))); |
| static assert(!__traits(compiles, spawn(dg6, 6))); |
| |
| auto callable1 = new class{ void opCall(int) shared {} }; |
| auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; |
| auto callable3 = new class{ void opCall(int) immutable {} }; |
| auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; |
| auto callable5 = new class{ void opCall(int) {} }; |
| auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; |
| auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; |
| auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; |
| auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; |
| auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; |
| auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; |
| static assert(!__traits(compiles, spawn(callable1, 1))); |
| static assert( __traits(compiles, spawn(callable2, 2))); |
| static assert(!__traits(compiles, spawn(callable3, 3))); |
| static assert( __traits(compiles, spawn(callable4, 4))); |
| static assert(!__traits(compiles, spawn(callable5, 5))); |
| static assert(!__traits(compiles, spawn(callable6, 6))); |
| static assert(!__traits(compiles, spawn(callable7, 7))); |
| static assert( __traits(compiles, spawn(callable8, 8))); |
| static assert(!__traits(compiles, spawn(callable9, 9))); |
| static assert( __traits(compiles, spawn(callable10, 10))); |
| static assert( __traits(compiles, spawn(callable11, 11))); |
| } |
| |
| /** |
| * Places the values as a message at the back of tid's message queue. |
| * |
| * Sends the supplied value to the thread represented by tid. As with |
| * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing. |
| */ |
| void send(T...)(Tid tid, T vals) |
| { |
| static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); |
| _send(tid, vals); |
| } |
| |
| /** |
| * Places the values as a message on the front of tid's message queue. |
| * |
| * Send a message to $(D tid) but place it at the front of $(D tid)'s message |
| * queue instead of at the back. This function is typically used for |
| * out-of-band communication, to signal exceptional conditions, etc. |
| */ |
| void prioritySend(T...)(Tid tid, T vals) |
| { |
| static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); |
| _send(MsgType.priority, tid, vals); |
| } |
| |
| /* |
| * ditto |
| */ |
| private void _send(T...)(Tid tid, T vals) |
| { |
| _send(MsgType.standard, tid, vals); |
| } |
| |
| /* |
| * Implementation of send. This allows parameter checking to be different for |
| * both Tid.send() and .send(). |
| */ |
| private void _send(T...)(MsgType type, Tid tid, T vals) |
| { |
| auto msg = Message(type, vals); |
| tid.mbox.put(msg); |
| } |
| |
| /** |
| * Receives a message from another thread. |
| * |
| * Receive a message from another thread, or block if no messages of the |
| * specified types are available. This function works by pattern matching |
| * a message against a set of delegates and executing the first match found. |
| * |
| * If a delegate that accepts a $(REF Variant, std,variant) is included as |
| * the last argument to $(D receive), it will match any message that was not |
| * matched by an earlier delegate. If more than one argument is sent, |
| * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values |
| * sent. |
| * |
| * Example: |
| * --- |
| * import std.stdio; |
| * import std.variant; |
| * import std.concurrency; |
| * |
| * void spawnedFunction() |
| * { |
| * receive( |
| * (int i) { writeln("Received an int."); }, |
| * (float f) { writeln("Received a float."); }, |
| * (Variant v) { writeln("Received some other type."); } |
| * ); |
| * } |
| * |
| * void main() |
| * { |
| * auto tid = spawn(&spawnedFunction); |
| * send(tid, 42); |
| * } |
| * --- |
| */ |
| void receive(T...)( T ops ) |
| in |
| { |
| assert(thisInfo.ident.mbox !is null, |
| "Cannot receive a message until a thread was spawned " |
| ~ "or thisTid was passed to a running thread."); |
| } |
| body |
| { |
| checkops( ops ); |
| |
| thisInfo.ident.mbox.get( ops ); |
| } |
| |
| |
| @safe unittest |
| { |
| static assert( __traits( compiles, |
| { |
| receive( (Variant x) {} ); |
| receive( (int x) {}, (Variant x) {} ); |
| } ) ); |
| |
| static assert( !__traits( compiles, |
| { |
| receive( (Variant x) {}, (int x) {} ); |
| } ) ); |
| |
| static assert( !__traits( compiles, |
| { |
| receive( (int x) {}, (int x) {} ); |
| } ) ); |
| } |
| |
| // Make sure receive() works with free functions as well. |
| version (unittest) |
| { |
| private void receiveFunction(int x) {} |
| } |
| @safe unittest |
| { |
| static assert( __traits( compiles, |
| { |
| receive( &receiveFunction ); |
| receive( &receiveFunction, (Variant x) {} ); |
| } ) ); |
| } |
| |
| |
| private template receiveOnlyRet(T...) |
| { |
| static if ( T.length == 1 ) |
| { |
| alias receiveOnlyRet = T[0]; |
| } |
| else |
| { |
| import std.typecons : Tuple; |
| alias receiveOnlyRet = Tuple!(T); |
| } |
| } |
| |
| /** |
| * Receives only messages with arguments of types $(D T). |
| * |
| * Throws: $(D MessageMismatch) if a message of types other than $(D T) |
| * is received. |
| * |
| * Returns: The received message. If $(D T.length) is greater than one, |
| * the message will be packed into a $(REF Tuple, std,typecons). |
| * |
| * Example: |
| * --- |
| * import std.concurrency; |
| * |
| * void spawnedFunc() |
| * { |
| * auto msg = receiveOnly!(int, string)(); |
| * assert(msg[0] == 42); |
| * assert(msg[1] == "42"); |
| * } |
| * |
| * void main() |
| * { |
| * auto tid = spawn(&spawnedFunc); |
| * send(tid, 42, "42"); |
| * } |
| * --- |
| */ |
| receiveOnlyRet!(T) receiveOnly(T...)() |
| in |
| { |
| assert(thisInfo.ident.mbox !is null, |
| "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); |
| } |
| body |
| { |
| import std.format : format; |
| import std.typecons : Tuple; |
| |
| Tuple!(T) ret; |
| |
| thisInfo.ident.mbox.get((T val) { |
| static if (T.length) |
| ret.field = val; |
| }, |
| (LinkTerminated e) { throw e; }, |
| (OwnerTerminated e) { throw e; }, |
| (Variant val) { |
| static if (T.length > 1) |
| string exp = T.stringof; |
| else |
| string exp = T[0].stringof; |
| |
| throw new MessageMismatch( |
| format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); |
| }); |
| static if (T.length == 1) |
| return ret[0]; |
| else |
| return ret; |
| } |
| |
| @system unittest |
| { |
| static void t1(Tid mainTid) |
| { |
| try |
| { |
| receiveOnly!string(); |
| mainTid.send(""); |
| } |
| catch (Throwable th) |
| { |
| mainTid.send(th.msg); |
| } |
| } |
| |
| auto tid = spawn(&t1, thisTid); |
| tid.send(1); |
| string result = receiveOnly!string(); |
| assert(result == "Unexpected message type: expected 'string', got 'int'"); |
| } |
| |
| /** |
| * Tries to receive but will give up if no matches arrive within duration. |
| * Won't wait at all if provided $(REF Duration, core,time) is negative. |
| * |
| * Same as $(D receive) except that rather than wait forever for a message, |
| * it waits until either it receives a message or the given |
| * $(REF Duration, core,time) has passed. It returns $(D true) if it received a |
| * message and $(D false) if it timed out waiting for one. |
| */ |
| bool receiveTimeout(T...)(Duration duration, T ops) |
| in |
| { |
| assert(thisInfo.ident.mbox !is null, |
| "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); |
| } |
| body |
| { |
| checkops(ops); |
| |
| return thisInfo.ident.mbox.get(duration, ops); |
| } |
| |
| @safe unittest |
| { |
| static assert(__traits(compiles, { |
| receiveTimeout(msecs(0), (Variant x) {}); |
| receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); |
| })); |
| |
| static assert(!__traits(compiles, { |
| receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); |
| })); |
| |
| static assert(!__traits(compiles, { |
| receiveTimeout(msecs(0), (int x) {}, (int x) {}); |
| })); |
| |
| static assert(__traits(compiles, { |
| receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); |
| })); |
| } |
| |
| // MessageBox Limits |
| |
| /** |
| * These behaviors may be specified when a mailbox is full. |
| */ |
| enum OnCrowding |
| { |
| block, /// Wait until room is available. |
| throwException, /// Throw a MailboxFull exception. |
| ignore /// Abort the send and return. |
| } |
| |
| private |
| { |
| bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc |
| { |
| return true; |
| } |
| |
| bool onCrowdingThrow(Tid tid) @safe pure |
| { |
| throw new MailboxFull(tid); |
| } |
| |
| bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc |
| { |
| return false; |
| } |
| } |
| |
| /** |
| * Sets a maximum mailbox size. |
| * |
| * Sets a limit on the maximum number of user messages allowed in the mailbox. |
| * If this limit is reached, the caller attempting to add a new message will |
| * execute the behavior specified by doThis. If messages is zero, the mailbox |
| * is unbounded. |
| * |
| * Params: |
| * tid = The Tid of the thread for which this limit should be set. |
| * messages = The maximum number of messages or zero if no limit. |
| * doThis = The behavior executed when a message is sent to a full |
| * mailbox. |
| */ |
| void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure |
| { |
| final switch (doThis) |
| { |
| case OnCrowding.block: |
| return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); |
| case OnCrowding.throwException: |
| return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); |
| case OnCrowding.ignore: |
| return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); |
| } |
| } |
| |
| /** |
| * Sets a maximum mailbox size. |
| * |
| * Sets a limit on the maximum number of user messages allowed in the mailbox. |
| * If this limit is reached, the caller attempting to add a new message will |
| * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. |
| * |
| * Params: |
| * tid = The Tid of the thread for which this limit should be set. |
| * messages = The maximum number of messages or zero if no limit. |
| * onCrowdingDoThis = The routine called when a message is sent to a full |
| * mailbox. |
| */ |
| void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) |
| { |
| tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); |
| } |
| |
| private |
| { |
| __gshared Tid[string] tidByName; |
| __gshared string[][Tid] namesByTid; |
| } |
| |
| private @property Mutex registryLock() |
| { |
| __gshared Mutex impl; |
| initOnce!impl(new Mutex); |
| return impl; |
| } |
| |
| private void unregisterMe() |
| { |
| auto me = thisInfo.ident; |
| if (thisInfo.ident != Tid.init) |
| { |
| synchronized (registryLock) |
| { |
| if (auto allNames = me in namesByTid) |
| { |
| foreach (name; *allNames) |
| tidByName.remove(name); |
| namesByTid.remove(me); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Associates name with tid. |
| * |
| * Associates name with tid in a process-local map. When the thread |
| * represented by tid terminates, any names associated with it will be |
| * automatically unregistered. |
| * |
| * Params: |
| * name = The name to associate with tid. |
| * tid = The tid register by name. |
| * |
| * Returns: |
| * true if the name is available and tid is not known to represent a |
| * defunct thread. |
| */ |
| bool register(string name, Tid tid) |
| { |
| synchronized (registryLock) |
| { |
| if (name in tidByName) |
| return false; |
| if (tid.mbox.isClosed) |
| return false; |
| namesByTid[tid] ~= name; |
| tidByName[name] = tid; |
| return true; |
| } |
| } |
| |
| /** |
| * Removes the registered name associated with a tid. |
| * |
| * Params: |
| * name = The name to unregister. |
| * |
| * Returns: |
| * true if the name is registered, false if not. |
| */ |
| bool unregister(string name) |
| { |
| import std.algorithm.mutation : remove, SwapStrategy; |
| import std.algorithm.searching : countUntil; |
| |
| synchronized (registryLock) |
| { |
| if (auto tid = name in tidByName) |
| { |
| auto allNames = *tid in namesByTid; |
| auto pos = countUntil(*allNames, name); |
| remove!(SwapStrategy.unstable)(*allNames, pos); |
| tidByName.remove(name); |
| return true; |
| } |
| return false; |
| } |
| } |
| |
| /** |
| * Gets the Tid associated with name. |
| * |
| * Params: |
| * name = The name to locate within the registry. |
| * |
| * Returns: |
| * The associated Tid or Tid.init if name is not registered. |
| */ |
| Tid locate(string name) |
| { |
| synchronized (registryLock) |
| { |
| if (auto tid = name in tidByName) |
| return *tid; |
| return Tid.init; |
| } |
| } |
| |
| /** |
| * Encapsulates all implementation-level data needed for scheduling. |
| * |
| * When defining a Scheduler, an instance of this struct must be associated |
| * with each logical thread. It contains all implementation-level information |
| * needed by the internal API. |
| */ |
| struct ThreadInfo |
| { |
| Tid ident; |
| bool[Tid] links; |
| Tid owner; |
| |
| /** |
| * Gets a thread-local instance of ThreadInfo. |
| * |
| * Gets a thread-local instance of ThreadInfo, which should be used as the |
| * default instance when info is requested for a thread not created by the |
| * Scheduler. |
| */ |
| static @property ref thisInfo() nothrow |
| { |
| static ThreadInfo val; |
| return val; |
| } |
| |
| /** |
| * Cleans up this ThreadInfo. |
| * |
| * This must be called when a scheduled thread terminates. It tears down |
| * the messaging system for the thread and notifies interested parties of |
| * the thread's termination. |
| */ |
| void cleanup() |
| { |
| if (ident.mbox !is null) |
| ident.mbox.close(); |
| foreach (tid; links.keys) |
| _send(MsgType.linkDead, tid, ident); |
| if (owner != Tid.init) |
| _send(MsgType.linkDead, owner, ident); |
| unregisterMe(); // clean up registry entries |
| } |
| } |
| |
| /** |
| * A Scheduler controls how threading is performed by spawn. |
| * |
| * Implementing a Scheduler allows the concurrency mechanism used by this |
| * module to be customized according to different needs. By default, a call |
| * to spawn will create a new kernel thread that executes the supplied routine |
| * and terminates when finished. But it is possible to create Schedulers that |
| * reuse threads, that multiplex Fibers (coroutines) across a single thread, |
| * or any number of other approaches. By making the choice of Scheduler a |
| * user-level option, std.concurrency may be used for far more types of |
| * application than if this behavior were predefined. |
| * |
| * Example: |
| * --- |
| * import std.concurrency; |
| * import std.stdio; |
| * |
| * void main() |
| * { |
| * scheduler = new FiberScheduler; |
| * scheduler.start( |
| * { |
| * writeln("the rest of main goes here"); |
| * }); |
| * } |
| * --- |
| * |
| * Some schedulers have a dispatching loop that must run if they are to work |
| * properly, so for the sake of consistency, when using a scheduler, start() |
| * must be called within main(). This yields control to the scheduler and |
| * will ensure that any spawned threads are executed in an expected manner. |
| */ |
| interface Scheduler |
| { |
| /** |
| * Spawns the supplied op and starts the Scheduler. |
| * |
| * This is intended to be called at the start of the program to yield all |
| * scheduling to the active Scheduler instance. This is necessary for |
| * schedulers that explicitly dispatch threads rather than simply relying |
| * on the operating system to do so, and so start should always be called |
| * within main() to begin normal program execution. |
| * |
| * Params: |
| * op = A wrapper for whatever the main thread would have done in the |
| * absence of a custom scheduler. It will be automatically executed |
| * via a call to spawn by the Scheduler. |
| */ |
| void start(void delegate() op); |
| |
| /** |
| * Assigns a logical thread to execute the supplied op. |
| * |
| * This routine is called by spawn. It is expected to instantiate a new |
| * logical thread and run the supplied operation. This thread must call |
| * thisInfo.cleanup() when the thread terminates if the scheduled thread |
| * is not a kernel thread--all kernel threads will have their ThreadInfo |
| * cleaned up automatically by a thread-local destructor. |
| * |
| * Params: |
| * op = The function to execute. This may be the actual function passed |
| * by the user to spawn itself, or may be a wrapper function. |
| */ |
| void spawn(void delegate() op); |
| |
| /** |
| * Yields execution to another logical thread. |
| * |
| * This routine is called at various points within concurrency-aware APIs |
| * to provide a scheduler a chance to yield execution when using some sort |
| * of cooperative multithreading model. If this is not appropriate, such |
| * as when each logical thread is backed by a dedicated kernel thread, |
| * this routine may be a no-op. |
| */ |
| void yield() nothrow; |
| |
| /** |
| * Returns an appropriate ThreadInfo instance. |
| * |
| * Returns an instance of ThreadInfo specific to the logical thread that |
| * is calling this routine or, if the calling thread was not create by |
| * this scheduler, returns ThreadInfo.thisInfo instead. |
| */ |
| @property ref ThreadInfo thisInfo() nothrow; |
| |
| /** |
| * Creates a Condition variable analog for signaling. |
| * |
| * Creates a new Condition variable analog which is used to check for and |
| * to signal the addition of messages to a thread's message queue. Like |
| * yield, some schedulers may need to define custom behavior so that calls |
| * to Condition.wait() yield to another thread when no new messages are |
| * available instead of blocking. |
| * |
| * Params: |
| * m = The Mutex that will be associated with this condition. It will be |
| * locked prior to any operation on the condition, and so in some |
| * cases a Scheduler may need to hold this reference and unlock the |
| * mutex before yielding execution to another logical thread. |
| */ |
| Condition newCondition(Mutex m) nothrow; |
| } |
| |
| /** |
| * An example Scheduler using kernel threads. |
| * |
| * This is an example Scheduler that mirrors the default scheduling behavior |
| * of creating one kernel thread per call to spawn. It is fully functional |
| * and may be instantiated and used, but is not a necessary part of the |
| * default functioning of this module. |
| */ |
| class ThreadScheduler : Scheduler |
| { |
| /** |
| * This simply runs op directly, since no real scheduling is needed by |
| * this approach. |
| */ |
| void start(void delegate() op) |
| { |
| op(); |
| } |
| |
| /** |
| * Creates a new kernel thread and assigns it to run the supplied op. |
| */ |
| void spawn(void delegate() op) |
| { |
| auto t = new Thread(op); |
| t.start(); |
| } |
| |
| /** |
| * This scheduler does no explicit multiplexing, so this is a no-op. |
| */ |
| void yield() nothrow |
| { |
| // no explicit yield needed |
| } |
| |
| /** |
| * Returns ThreadInfo.thisInfo, since it is a thread-local instance of |
| * ThreadInfo, which is the correct behavior for this scheduler. |
| */ |
| @property ref ThreadInfo thisInfo() nothrow |
| { |
| return ThreadInfo.thisInfo; |
| } |
| |
| /** |
| * Creates a new Condition variable. No custom behavior is needed here. |
| */ |
| Condition newCondition(Mutex m) nothrow |
| { |
| return new Condition(m); |
| } |
| } |
| |
| /** |
| * An example Scheduler using Fibers. |
| * |
| * This is an example scheduler that creates a new Fiber per call to spawn |
| * and multiplexes the execution of all fibers within the main thread. |
| */ |
| class FiberScheduler : Scheduler |
| { |
| /** |
| * This creates a new Fiber for the supplied op and then starts the |
| * dispatcher. |
| */ |
| void start(void delegate() op) |
| { |
| create(op); |
| dispatch(); |
| } |
| |
| /** |
| * This created a new Fiber for the supplied op and adds it to the |
| * dispatch list. |
| */ |
| void spawn(void delegate() op) nothrow |
| { |
| create(op); |
| yield(); |
| } |
| |
| /** |
| * If the caller is a scheduled Fiber, this yields execution to another |
| * scheduled Fiber. |
| */ |
| void yield() nothrow |
| { |
| // NOTE: It's possible that we should test whether the calling Fiber |
| // is an InfoFiber before yielding, but I think it's reasonable |
| // that any (non-Generator) fiber should yield here. |
| if (Fiber.getThis()) |
| Fiber.yield(); |
| } |
| |
| /** |
| * Returns an appropriate ThreadInfo instance. |
| * |
| * Returns a ThreadInfo instance specific to the calling Fiber if the |
| * Fiber was created by this dispatcher, otherwise it returns |
| * ThreadInfo.thisInfo. |
| */ |
| @property ref ThreadInfo thisInfo() nothrow |
| { |
| auto f = cast(InfoFiber) Fiber.getThis(); |
| |
| if (f !is null) |
| return f.info; |
| return ThreadInfo.thisInfo; |
| } |
| |
| /** |
| * Returns a Condition analog that yields when wait or notify is called. |
| */ |
| Condition newCondition(Mutex m) nothrow |
| { |
| return new FiberCondition(m); |
| } |
| |
| private: |
| static class InfoFiber : Fiber |
| { |
| ThreadInfo info; |
| |
| this(void delegate() op) nothrow |
| { |
| super(op); |
| } |
| } |
| |
| class FiberCondition : Condition |
| { |
| this(Mutex m) nothrow |
| { |
| super(m); |
| notified = false; |
| } |
| |
| override void wait() nothrow |
| { |
| scope (exit) notified = false; |
| |
| while (!notified) |
| switchContext(); |
| } |
| |
| override bool wait(Duration period) nothrow |
| { |
| import core.time : MonoTime; |
| |
| scope (exit) notified = false; |
| |
| for (auto limit = MonoTime.currTime + period; |
| !notified && !period.isNegative; |
| period = limit - MonoTime.currTime) |
| { |
| yield(); |
| } |
| return notified; |
| } |
| |
| override void notify() nothrow |
| { |
| notified = true; |
| switchContext(); |
| } |
| |
| override void notifyAll() nothrow |
| { |
| notified = true; |
| switchContext(); |
| } |
| |
| private: |
| void switchContext() nothrow |
| { |
| mutex_nothrow.unlock_nothrow(); |
| scope (exit) mutex_nothrow.lock_nothrow(); |
| yield(); |
| } |
| |
| private bool notified; |
| } |
| |
| private: |
| void dispatch() |
| { |
| import std.algorithm.mutation : remove; |
| |
| while (m_fibers.length > 0) |
| { |
| auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); |
| if (t !is null && !(cast(OwnerTerminated) t)) |
| { |
| throw t; |
| } |
| if (m_fibers[m_pos].state == Fiber.State.TERM) |
| { |
| if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) |
| m_pos = 0; |
| } |
| else if (m_pos++ >= m_fibers.length - 1) |
| { |
| m_pos = 0; |
| } |
| } |
| } |
| |
| void create(void delegate() op) nothrow |
| { |
| void wrap() |
| { |
| scope (exit) |
| { |
| thisInfo.cleanup(); |
| } |
| op(); |
| } |
| |
| m_fibers ~= new InfoFiber(&wrap); |
| } |
| |
| private: |
| Fiber[] m_fibers; |
| size_t m_pos; |
| } |
| |
| @system unittest |
| { |
| static void receive(Condition cond, ref size_t received) |
| { |
| while (true) |
| { |
| synchronized (cond.mutex) |
| { |
| cond.wait(); |
| ++received; |
| } |
| } |
| } |
| |
| static void send(Condition cond, ref size_t sent) |
| { |
| while (true) |
| { |
| synchronized (cond.mutex) |
| { |
| ++sent; |
| cond.notify(); |
| } |
| } |
| } |
| |
| auto fs = new FiberScheduler; |
| auto mtx = new Mutex; |
| auto cond = fs.newCondition(mtx); |
| |
| size_t received, sent; |
| auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); |
| waiter.call(); |
| assert(received == 0); |
| notifier.call(); |
| assert(sent == 1); |
| assert(received == 0); |
| waiter.call(); |
| assert(received == 1); |
| waiter.call(); |
| assert(received == 1); |
| } |
| |
| /** |
| * Sets the Scheduler behavior within the program. |
| * |
| * This variable sets the Scheduler behavior within this program. Typically, |
| * when setting a Scheduler, scheduler.start() should be called in main. This |
| * routine will not return until program execution is complete. |
| */ |
| __gshared Scheduler scheduler; |
| |
| // Generator |
| |
| /** |
| * If the caller is a Fiber and is not a Generator, this function will call |
| * scheduler.yield() or Fiber.yield(), as appropriate. |
| */ |
| void yield() nothrow |
| { |
| auto fiber = Fiber.getThis(); |
| if (!(cast(IsGenerator) fiber)) |
| { |
| if (scheduler is null) |
| { |
| if (fiber) |
| return Fiber.yield(); |
| } |
| else |
| scheduler.yield(); |
| } |
| } |
| |
| /// Used to determine whether a Generator is running. |
| private interface IsGenerator {} |
| |
| |
| /** |
| * A Generator is a Fiber that periodically returns values of type T to the |
| * caller via yield. This is represented as an InputRange. |
| * |
| * Example: |
| * --- |
| * import std.concurrency; |
| * import std.stdio; |
| * |
| * |
| * void main() |
| * { |
| * auto tid = spawn( |
| * { |
| * while (true) |
| * { |
| * writeln(receiveOnly!int()); |
| * } |
| * }); |
| * |
| * auto r = new Generator!int( |
| * { |
| * foreach (i; 1 .. 10) |
| * yield(i); |
| * }); |
| * |
| * foreach (e; r) |
| * { |
| * tid.send(e); |
| * } |
| * } |
| * --- |
| */ |
| class Generator(T) : |
| Fiber, IsGenerator, InputRange!T |
| { |
| /** |
| * Initializes a generator object which is associated with a static |
| * D function. The function will be called once to prepare the range |
| * for iteration. |
| * |
| * Params: |
| * fn = The fiber function. |
| * |
| * In: |
| * fn must not be null. |
| */ |
| this(void function() fn) |
| { |
| super(fn); |
| call(); |
| } |
| |
| /** |
| * Initializes a generator object which is associated with a static |
| * D function. The function will be called once to prepare the range |
| * for iteration. |
| * |
| * Params: |
| * fn = The fiber function. |
| * sz = The stack size for this fiber. |
| * |
| * In: |
| * fn must not be null. |
| */ |
| this(void function() fn, size_t sz) |
| { |
| super(fn, sz); |
| call(); |
| } |
| |
| /** |
| * Initializes a generator object which is associated with a dynamic |
| * D function. The function will be called once to prepare the range |
| * for iteration. |
| * |
| * Params: |
| * dg = The fiber function. |
| * |
| * In: |
| * dg must not be null. |
| */ |
| this(void delegate() dg) |
| { |
| super(dg); |
| call(); |
| } |
| |
| /** |
| * Initializes a generator object which is associated with a dynamic |
| * D function. The function will be called once to prepare the range |
| * for iteration. |
| * |
| * Params: |
| * dg = The fiber function. |
| * sz = The stack size for this fiber. |
| * |
| * In: |
| * dg must not be null. |
| */ |
| this(void delegate() dg, size_t sz) |
| { |
| super(dg, sz); |
| call(); |
| } |
| |
| /** |
| * Returns true if the generator is empty. |
| */ |
| final bool empty() @property |
| { |
| return m_value is null || state == State.TERM; |
| } |
| |
| /** |
| * Obtains the next value from the underlying function. |
| */ |
| final void popFront() |
| { |
| call(); |
| } |
| |
| /** |
| * Returns the most recently generated value by shallow copy. |
| */ |
| final T front() @property |
| { |
| return *m_value; |
| } |
| |
| /** |
| * Returns the most recently generated value without executing a |
| * copy contructor. Will not compile for element types defining a |
| * postblit, because Generator does not return by reference. |
| */ |
| final T moveFront() |
| { |
| static if (!hasElaborateCopyConstructor!T) |
| { |
| return front; |
| } |
| else |
| { |
| static assert(0, |
| "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); |
| } |
| } |
| |
| final int opApply(scope int delegate(T) loopBody) |
| { |
| int broken; |
| for (; !empty; popFront()) |
| { |
| broken = loopBody(front); |
| if (broken) break; |
| } |
| return broken; |
| } |
| |
| final int opApply(scope int delegate(size_t, T) loopBody) |
| { |
| int broken; |
| for (size_t i; !empty; ++i, popFront()) |
| { |
| broken = loopBody(i, front); |
| if (broken) break; |
| } |
| return broken; |
| } |
| private: |
| T* m_value; |
| } |
| |
| /** |
| * Yields a value of type T to the caller of the currently executing |
| * generator. |
| * |
| * Params: |
| * value = The value to yield. |
| */ |
| void yield(T)(ref T value) |
| { |
| Generator!T cur = cast(Generator!T) Fiber.getThis(); |
| if (cur !is null && cur.state == Fiber.State.EXEC) |
| { |
| cur.m_value = &value; |
| return Fiber.yield(); |
| } |
| throw new Exception("yield(T) called with no active generator for the supplied type"); |
| } |
| |
| /// ditto |
| void yield(T)(T value) |
| { |
| yield(value); |
| } |
| |
| @system unittest |
| { |
| import core.exception; |
| import std.exception; |
| |
| static void testScheduler(Scheduler s) |
| { |
| scheduler = s; |
| scheduler.start({ |
| auto tid = spawn({ |
| int i; |
| |
| try |
| { |
| for (i = 1; i < 10; i++) |
| { |
| assertNotThrown!AssertError(assert(receiveOnly!int() == i)); |
| } |
| } |
| catch (OwnerTerminated e) |
| { |
| |
| } |
| |
| // i will advance 1 past the last value expected |
| assert(i == 4); |
| }); |
| |
| auto r = new Generator!int({ |
| assertThrown!Exception(yield(2.0)); |
| yield(); // ensure this is a no-op |
| yield(1); |
| yield(); // also once something has been yielded |
| yield(2); |
| yield(3); |
| }); |
| |
| foreach (e; r) |
| { |
| tid.send(e); |
| } |
| }); |
| scheduler = null; |
| } |
| |
| testScheduler(new ThreadScheduler); |
| testScheduler(new FiberScheduler); |
| } |
| /// |
| @system unittest |
| { |
| import std.range; |
| |
| InputRange!int myIota = iota(10).inputRangeObject; |
| |
| myIota.popFront(); |
| myIota.popFront(); |
| assert(myIota.moveFront == 2); |
| assert(myIota.front == 2); |
| myIota.popFront(); |
| assert(myIota.front == 3); |
| |
| //can be assigned to std.range.interfaces.InputRange directly |
| myIota = new Generator!int( |
| { |
| foreach (i; 0 .. 10) yield(i); |
| }); |
| |
| myIota.popFront(); |
| myIota.popFront(); |
| assert(myIota.moveFront == 2); |
| assert(myIota.front == 2); |
| myIota.popFront(); |
| assert(myIota.front == 3); |
| |
| size_t[2] counter = [0, 0]; |
| foreach (i, unused; myIota) counter[] += [1, i]; |
| |
| assert(myIota.empty); |
| assert(counter == [7, 21]); |
| } |
| |
| private |
| { |
| /* |
| * A MessageBox is a message queue for one thread. Other threads may send |
| * messages to this owner by calling put(), and the owner receives them by |
| * calling get(). The put() call is therefore effectively shared and the |
| * get() call is effectively local. setMaxMsgs may be used by any thread |
| * to limit the size of the message queue. |
| */ |
| class MessageBox |
| { |
| this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ |
| { |
| m_lock = new Mutex; |
| m_closed = false; |
| |
| if (scheduler is null) |
| { |
| m_putMsg = new Condition(m_lock); |
| m_notFull = new Condition(m_lock); |
| } |
| else |
| { |
| m_putMsg = scheduler.newCondition(m_lock); |
| m_notFull = scheduler.newCondition(m_lock); |
| } |
| } |
| |
| /// |
| final @property bool isClosed() @safe @nogc pure |
| { |
| synchronized (m_lock) |
| { |
| return m_closed; |
| } |
| } |
| |
| /* |
| * Sets a limit on the maximum number of user messages allowed in the |
| * mailbox. If this limit is reached, the caller attempting to add |
| * a new message will execute call. If num is zero, there is no limit |
| * on the message queue. |
| * |
| * Params: |
| * num = The maximum size of the queue or zero if the queue is |
| * unbounded. |
| * call = The routine to call when the queue is full. |
| */ |
| final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure |
| { |
| synchronized (m_lock) |
| { |
| m_maxMsgs = num; |
| m_onMaxMsgs = call; |
| } |
| } |
| |
| /* |
| * If maxMsgs is not set, the message is added to the queue and the |
| * owner is notified. If the queue is full, the message will still be |
| * accepted if it is a control message, otherwise onCrowdingDoThis is |
| * called. If the routine returns true, this call will block until |
| * the owner has made space available in the queue. If it returns |
| * false, this call will abort. |
| * |
| * Params: |
| * msg = The message to put in the queue. |
| * |
| * Throws: |
| * An exception if the queue is full and onCrowdingDoThis throws. |
| */ |
| final void put(ref Message msg) |
| { |
| synchronized (m_lock) |
| { |
| // TODO: Generate an error here if m_closed is true, or maybe |
| // put a message in the caller's queue? |
| if (!m_closed) |
| { |
| while (true) |
| { |
| if (isPriorityMsg(msg)) |
| { |
| m_sharedPty.put(msg); |
| m_putMsg.notify(); |
| return; |
| } |
| if (!mboxFull() || isControlMsg(msg)) |
| { |
| m_sharedBox.put(msg); |
| m_putMsg.notify(); |
| return; |
| } |
| if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) |
| { |
| return; |
| } |
| m_putQueue++; |
| m_notFull.wait(); |
| m_putQueue--; |
| } |
| } |
| } |
| } |
| |
| /* |
| * Matches ops against each message in turn until a match is found. |
| * |
| * Params: |
| * ops = The operations to match. Each may return a bool to indicate |
| * whether a message with a matching type is truly a match. |
| * |
| * Returns: |
| * true if a message was retrieved and false if not (such as if a |
| * timeout occurred). |
| * |
| * Throws: |
| * LinkTerminated if a linked thread terminated, or OwnerTerminated |
| * if the owner thread terminates and no existing messages match the |
| * supplied ops. |
| */ |
| bool get(T...)(scope T vals) |
| { |
| import std.meta : AliasSeq; |
| |
| static assert(T.length); |
| |
| static if (isImplicitlyConvertible!(T[0], Duration)) |
| { |
| alias Ops = AliasSeq!(T[1 .. $]); |
| alias ops = vals[1 .. $]; |
| enum timedWait = true; |
| Duration period = vals[0]; |
| } |
| else |
| { |
| alias Ops = AliasSeq!(T); |
| alias ops = vals[0 .. $]; |
| enum timedWait = false; |
| } |
| |
| bool onStandardMsg(ref Message msg) |
| { |
| foreach (i, t; Ops) |
| { |
| alias Args = Parameters!(t); |
| auto op = ops[i]; |
| |
| if (msg.convertsTo!(Args)) |
| { |
| static if (is(ReturnType!(t) == bool)) |
| { |
| return msg.map(op); |
| } |
| else |
| { |
| msg.map(op); |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| bool onLinkDeadMsg(ref Message msg) |
| { |
| assert(msg.convertsTo!(Tid)); |
| auto tid = msg.get!(Tid); |
| |
| if (bool* pDepends = tid in thisInfo.links) |
| { |
| auto depends = *pDepends; |
| thisInfo.links.remove(tid); |
| // Give the owner relationship precedence. |
| if (depends && tid != thisInfo.owner) |
| { |
| auto e = new LinkTerminated(tid); |
| auto m = Message(MsgType.standard, e); |
| if (onStandardMsg(m)) |
| return true; |
| throw e; |
| } |
| } |
| if (tid == thisInfo.owner) |
| { |
| thisInfo.owner = Tid.init; |
| auto e = new OwnerTerminated(tid); |
| auto m = Message(MsgType.standard, e); |
| if (onStandardMsg(m)) |
| return true; |
| throw e; |
| } |
| return false; |
| } |
| |
| bool onControlMsg(ref Message msg) |
| { |
| switch (msg.type) |
| { |
| case MsgType.linkDead: |
| return onLinkDeadMsg(msg); |
| default: |
| return false; |
| } |
| } |
| |
| bool scan(ref ListT list) |
| { |
| for (auto range = list[]; !range.empty;) |
| { |
| // Only the message handler will throw, so if this occurs |
| // we can be certain that the message was handled. |
| scope (failure) |
| list.removeAt(range); |
| |
| if (isControlMsg(range.front)) |
| { |
| if (onControlMsg(range.front)) |
| { |
| // Although the linkDead message is a control message, |
| // it can be handled by the user. Since the linkDead |
| // message throws if not handled, if we get here then |
| // it has been handled and we can return from receive. |
| // This is a weird special case that will have to be |
| // handled in a more general way if more are added. |
| if (!isLinkDeadMsg(range.front)) |
| { |
| list.removeAt(range); |
| continue; |
| } |
| list.removeAt(range); |
| return true; |
| } |
| range.popFront(); |
| continue; |
| } |
| else |
| { |
| if (onStandardMsg(range.front)) |
| { |
| list.removeAt(range); |
| return true; |
| } |
| range.popFront(); |
| continue; |
| } |
| } |
| return false; |
| } |
| |
| bool pty(ref ListT list) |
| { |
| if (!list.empty) |
| { |
| auto range = list[]; |
| |
| if (onStandardMsg(range.front)) |
| { |
| list.removeAt(range); |
| return true; |
| } |
| if (range.front.convertsTo!(Throwable)) |
| throw range.front.get!(Throwable); |
| else if (range.front.convertsTo!(shared(Throwable))) |
| throw range.front.get!(shared(Throwable)); |
| else |
| throw new PriorityMessageException(range.front.data); |
| } |
| return false; |
| } |
| |
| static if (timedWait) |
| { |
| import core.time : MonoTime; |
| auto limit = MonoTime.currTime + period; |
| } |
| |
| while (true) |
| { |
| ListT arrived; |
| |
| if (pty(m_localPty) || scan(m_localBox)) |
| { |
| return true; |
| } |
| yield(); |
| synchronized (m_lock) |
| { |
| updateMsgCount(); |
| while (m_sharedPty.empty && m_sharedBox.empty) |
| { |
| // NOTE: We're notifying all waiters here instead of just |
| // a few because the onCrowding behavior may have |
| // changed and we don't want to block sender threads |
| // unnecessarily if the new behavior is not to block. |
| // This will admittedly result in spurious wakeups |
| // in other situations, but what can you do? |
| if (m_putQueue && !mboxFull()) |
| m_notFull.notifyAll(); |
| static if (timedWait) |
| { |
| if (period <= Duration.zero || !m_putMsg.wait(period)) |
| return false; |
| } |
| else |
| { |
| m_putMsg.wait(); |
| } |
| } |
| m_localPty.put(m_sharedPty); |
| arrived.put(m_sharedBox); |
| } |
| if (m_localPty.empty) |
| { |
| scope (exit) m_localBox.put(arrived); |
| if (scan(arrived)) |
| { |
| return true; |
| } |
| else |
| { |
| static if (timedWait) |
| { |
| period = limit - MonoTime.currTime; |
| } |
| continue; |
| } |
| } |
| m_localBox.put(arrived); |
| pty(m_localPty); |
| return true; |
| } |
| } |
| |
| /* |
| * Called on thread termination. This routine processes any remaining |
| * control messages, clears out message queues, and sets a flag to |
| * reject any future messages. |
| */ |
| final void close() |
| { |
| static void onLinkDeadMsg(ref Message msg) |
| { |
| assert(msg.convertsTo!(Tid)); |
| auto tid = msg.get!(Tid); |
| |
| thisInfo.links.remove(tid); |
| if (tid == thisInfo.owner) |
| thisInfo.owner = Tid.init; |
| } |
| |
| static void sweep(ref ListT list) |
| { |
| for (auto range = list[]; !range.empty; range.popFront()) |
| { |
| if (range.front.type == MsgType.linkDead) |
| onLinkDeadMsg(range.front); |
| } |
| } |
| |
| ListT arrived; |
| |
| sweep(m_localBox); |
| synchronized (m_lock) |
| { |
| arrived.put(m_sharedBox); |
| m_closed = true; |
| } |
| m_localBox.clear(); |
| sweep(arrived); |
| } |
| |
| private: |
| // Routines involving local data only, no lock needed. |
| |
| bool mboxFull() @safe @nogc pure nothrow |
| { |
| return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; |
| } |
| |
| void updateMsgCount() @safe @nogc pure nothrow |
| { |
| m_localMsgs = m_localBox.length; |
| } |
| |
| bool isControlMsg(ref Message msg) @safe @nogc pure nothrow |
| { |
| return msg.type != MsgType.standard && msg.type != MsgType.priority; |
| } |
| |
| bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow |
| { |
| return msg.type == MsgType.priority; |
| } |
| |
| bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow |
| { |
| return msg.type == MsgType.linkDead; |
| } |
| |
| alias OnMaxFn = bool function(Tid); |
| alias ListT = List!(Message); |
| |
| ListT m_localBox; |
| ListT m_localPty; |
| |
| Mutex m_lock; |
| Condition m_putMsg; |
| Condition m_notFull; |
| size_t m_putQueue; |
| ListT m_sharedBox; |
| ListT m_sharedPty; |
| OnMaxFn m_onMaxMsgs; |
| size_t m_localMsgs; |
| size_t m_maxMsgs; |
| bool m_closed; |
| } |
| |
| /* |
| * |
| */ |
| struct List(T) |
| { |
| struct Range |
| { |
| import std.exception : enforce; |
| |
| @property bool empty() const |
| { |
| return !m_prev.next; |
| } |
| |
| @property ref T front() |
| { |
| enforce(m_prev.next, "invalid list node"); |
| return m_prev.next.val; |
| } |
| |
| @property void front(T val) |
| { |
| enforce(m_prev.next, "invalid list node"); |
| m_prev.next.val = val; |
| } |
| |
| void popFront() |
| { |
| enforce(m_prev.next, "invalid list node"); |
| m_prev = m_prev.next; |
| } |
| |
| private this(Node* p) |
| { |
| m_prev = p; |
| } |
| |
| private Node* m_prev; |
| } |
| |
| void put(T val) |
| { |
| put(newNode(val)); |
| } |
| |
| void put(ref List!(T) rhs) |
| { |
| if (!rhs.empty) |
| { |
| put(rhs.m_first); |
| while (m_last.next !is null) |
| { |
| m_last = m_last.next; |
| m_count++; |
| } |
| rhs.m_first = null; |
| rhs.m_last = null; |
| rhs.m_count = 0; |
| } |
| } |
| |
| Range opSlice() |
| { |
| return Range(cast(Node*)&m_first); |
| } |
| |
| void removeAt(Range r) |
| { |
| import std.exception : enforce; |
| |
| assert(m_count); |
| Node* n = r.m_prev; |
| enforce(n && n.next, "attempting to remove invalid list node"); |
| |
| if (m_last is m_first) |
| m_last = null; |
| else if (m_last is n.next) |
| m_last = n; // nocoverage |
| Node* to_free = n.next; |
| n.next = n.next.next; |
| freeNode(to_free); |
| m_count--; |
| } |
| |
| @property size_t length() |
| { |
| return m_count; |
| } |
| |
| void clear() |
| { |
| m_first = m_last = null; |
| m_count = 0; |
| } |
| |
| @property bool empty() |
| { |
| return m_first is null; |
| } |
| |
| private: |
| struct Node |
| { |
| Node* next; |
| T val; |
| |
| this(T v) |
| { |
| val = v; |
| } |
| } |
| |
| static shared struct SpinLock |
| { |
| void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } |
| void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } |
| bool locked; |
| } |
| |
| static shared SpinLock sm_lock; |
| static shared Node* sm_head; |
| |
| Node* newNode(T v) |
| { |
| Node* n; |
| { |
| sm_lock.lock(); |
| scope (exit) sm_lock.unlock(); |
| |
| if (sm_head) |
| { |
| n = cast(Node*) sm_head; |
| sm_head = sm_head.next; |
| } |
| } |
| if (n) |
| { |
| import std.conv : emplace; |
| emplace!Node(n, v); |
| } |
| else |
| { |
| n = new Node(v); |
| } |
| return n; |
| } |
| |
| void freeNode(Node* n) |
| { |
| // destroy val to free any owned GC memory |
| destroy(n.val); |
| |
| sm_lock.lock(); |
| scope (exit) sm_lock.unlock(); |
| |
| auto sn = cast(shared(Node)*) n; |
| sn.next = sm_head; |
| sm_head = sn; |
| } |
| |
| void put(Node* n) |
| { |
| m_count++; |
| if (!empty) |
| { |
| m_last.next = n; |
| m_last = n; |
| return; |
| } |
| m_first = n; |
| m_last = n; |
| } |
| |
| Node* m_first; |
| Node* m_last; |
| size_t m_count; |
| } |
| } |
| |
| version (unittest) |
| { |
| import std.stdio; |
| import std.typecons : tuple, Tuple; |
| |
| void testfn(Tid tid) |
| { |
| receive((float val) { assert(0); }, (int val, int val2) { |
| assert(val == 42 && val2 == 86); |
| }); |
| receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); |
| receive((Variant val) { }); |
| receive((string val) { |
| if ("the quick brown fox" != val) |
| return false; |
| return true; |
| }, (string val) { assert(false); }); |
| prioritySend(tid, "done"); |
| } |
| |
| void runTest(Tid tid) |
| { |
| send(tid, 42, 86); |
| send(tid, tuple(42, 86)); |
| send(tid, "hello", "there"); |
| send(tid, "the quick brown fox"); |
| receive((string val) { assert(val == "done"); }); |
| } |
| |
| void simpleTest() |
| { |
| auto tid = spawn(&testfn, thisTid); |
| runTest(tid); |
| |
| // Run the test again with a limited mailbox size. |
| tid = spawn(&testfn, thisTid); |
| setMaxMailboxSize(tid, 2, OnCrowding.block); |
| runTest(tid); |
| } |
| |
| @system unittest |
| { |
| simpleTest(); |
| } |
| |
| @system unittest |
| { |
| scheduler = new ThreadScheduler; |
| simpleTest(); |
| scheduler = null; |
| } |
| } |
| |
| private @property Mutex initOnceLock() |
| { |
| __gshared Mutex lock; |
| if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock)) |
| return mtx; |
| auto mtx = new Mutex; |
| if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx)) |
| return mtx; |
| return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock); |
| } |
| |
| /** |
| * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a |
| * thread-safe manner. |
| * |
| * The implementation guarantees that all threads simultaneously calling |
| * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is |
| * fully initialized. All side-effects of $(D_PARAM init) are globally visible |
| * afterwards. |
| * |
| * Params: |
| * var = The variable to initialize |
| * init = The lazy initializer value |
| * |
| * Returns: |
| * A reference to the initialized variable |
| */ |
| auto ref initOnce(alias var)(lazy typeof(var) init) |
| { |
| return initOnce!var(init, initOnceLock); |
| } |
| |
| /// A typical use-case is to perform lazy but thread-safe initialization. |
| @system unittest |
| { |
| static class MySingleton |
| { |
| static MySingleton instance() |
| { |
| static __gshared MySingleton inst; |
| return initOnce!inst(new MySingleton); |
| } |
| } |
| |
| assert(MySingleton.instance !is null); |
| } |
| |
| @system unittest |
| { |
| static class MySingleton |
| { |
| static MySingleton instance() |
| { |
| static __gshared MySingleton inst; |
| return initOnce!inst(new MySingleton); |
| } |
| |
| private: |
| this() { val = ++cnt; } |
| size_t val; |
| static __gshared size_t cnt; |
| } |
| |
| foreach (_; 0 .. 10) |
| spawn({ ownerTid.send(MySingleton.instance.val); }); |
| foreach (_; 0 .. 10) |
| assert(receiveOnly!size_t == MySingleton.instance.val); |
| assert(MySingleton.cnt == 1); |
| } |
| |
| /** |
| * Same as above, but takes a separate mutex instead of sharing one among |
| * all initOnce instances. |
| * |
| * This should be used to avoid dead-locks when the $(D_PARAM init) |
| * expression waits for the result of another thread that might also |
| * call initOnce. Use with care. |
| * |
| * Params: |
| * var = The variable to initialize |
| * init = The lazy initializer value |
| * mutex = A mutex to prevent race conditions |
| * |
| * Returns: |
| * A reference to the initialized variable |
| */ |
| auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) |
| { |
| // check that var is global, can't take address of a TLS variable |
| static assert(is(typeof({ __gshared p = &var; })), |
| "var must be 'static shared' or '__gshared'."); |
| import core.atomic : atomicLoad, MemoryOrder, atomicStore; |
| |
| static shared bool flag; |
| if (!atomicLoad!(MemoryOrder.acq)(flag)) |
| { |
| synchronized (mutex) |
| { |
| if (!atomicLoad!(MemoryOrder.acq)(flag)) |
| { |
| var = init; |
| atomicStore!(MemoryOrder.rel)(flag, true); |
| } |
| } |
| } |
| return var; |
| } |
| |
| /// Use a separate mutex when init blocks on another thread that might also call initOnce. |
| @system unittest |
| { |
| import core.sync.mutex : Mutex; |
| |
| static shared bool varA, varB; |
| __gshared Mutex m; |
| m = new Mutex; |
| |
| spawn({ |
| // use a different mutex for varB to avoid a dead-lock |
| initOnce!varB(true, m); |
| ownerTid.send(true); |
| }); |
| // init depends on the result of the spawned thread |
| initOnce!varA(receiveOnly!bool); |
| assert(varA == true); |
| assert(varB == true); |
| } |
| |
| @system unittest |
| { |
| static shared bool a; |
| __gshared bool b; |
| static bool c; |
| bool d; |
| initOnce!a(true); |
| initOnce!b(true); |
| static assert(!__traits(compiles, initOnce!c(true))); // TLS |
| static assert(!__traits(compiles, initOnce!d(true))); // local variable |
| } |