This is a low-level messaging API upon which more structured or restrictive APIs may be built. The general idea is that every messageable entity is represented by a common handle type called a Tid, which allows messages to be sent to logical threads that are executing in both the current process and in external processes using the same interface. This is an important aspect of scalability because it allows the components of a program to be spread across available resources with few to no changes to the actual implementation.
A logical thread is an execution context that has its own stack and which runs asynchronously to other logical threads. These may be preemptively scheduled kernel threads, fibers (cooperative user-space threads), or some other concept with similar behavior.
The type of concurrency used when logical threads are created is determined by the Scheduler selected at initialization time. The default behavior is currently to create a new kernel thread per call to spawn, but other schedulers are available that multiplex fibers across the main thread or use some combination of the two approaches.
__gshared string received; static void spawnedFunc(Tid ownerTid) { import std.conv : text; // Receive a message from the owner thread. receive((int i){ received = text("Received the number ", i); // Send a message back to the owner thread // indicating success. send(ownerTid, true); }); } // Start spawnedFunc in a new thread. auto childTid = spawn(&spawnedFunc, thisTid); // Send the number 42 to this new thread. send(childTid, 42); // Receive the result code. auto wasSuccessful = receiveOnly!(bool); assert(wasSuccessful); writeln(received); // "Received the number 42"
Thrown on calls to receiveOnly
if a message other than the type the receiving thread expected is sent.
Thrown on calls to receive
if the thread that spawned the receiving thread has terminated and no more messages exist.
Thrown if a linked thread has terminated.
Thrown if a message was sent to a thread via std.concurrency.prioritySend
and the receiver does not have a handler for a message of this type.
The message that was sent.
Thrown on mailbox crowding if the mailbox is configured with OnCrowding.throwException
.
Thrown when a Tid is missing, e.g. when ownerTid
doesn't find an owner thread.
An opaque type used to represent a logical thread.
Generate a convenient string for identifying this Tid. This is only useful to see if Tid's that are currently executing are the same or different, e.g. for logging and debugging. It is potentially possible that a Tid executed in the future will have the same toString() output as another Tid that has already terminated.
Tid
of the caller's thread.Return the Tid of the thread which spawned the caller's thread.
TidMissingException
exception if there is no owner thread.Starts fn(args) in a new logical thread.
Executes the supplied function in a new logical thread represented by Tid
. The calling thread is designated as the owner of the new thread. When the owner thread terminates an OwnerTerminated
message will be sent to the new thread, causing an OwnerTerminated
exception to be thrown on receive()
.
F fn
| The function to execute. |
T args
| Arguments to the function. |
args
must not have unshared aliasing. In other words, all arguments to fn
must either be shared
or immutable
or have no pointer indirection. This is necessary for enforcing isolation among threads.static void f(string msg) { writeln(msg); // "Hello World" } auto tid = spawn(&f, "Hello World");
string msg = "Hello, World!"; static void f1(string msg) {} static assert(!__traits(compiles, spawn(&f1, msg.dup))); static assert( __traits(compiles, spawn(&f1, msg.idup))); static void f2(char[] msg) {} static assert(!__traits(compiles, spawn(&f2, msg.dup))); static assert(!__traits(compiles, spawn(&f2, msg.idup)));
spawn({ ownerTid.send("This is so great!"); }); writeln(receiveOnly!string); // "This is so great!"
Starts fn(args) in a logical thread and will receive a LinkTerminated message when the operation terminates.
Executes the supplied function in a new logical thread represented by Tid. This new thread is linked to the calling thread so that if either it or the calling thread terminates a LinkTerminated message will be sent to the other, causing a LinkTerminated exception to be thrown on receive(). The owner relationship from spawn() is preserved as well, so if the link between threads is broken, owner termination will still result in an OwnerTerminated exception to be thrown on receive().
F fn
| The function to execute. |
T args
| Arguments to the function. |
Places the values as a message at the back of tid's message queue.
Sends the supplied value to the thread represented by tid. As with std.concurrency.spawn
, T
must not have unshared aliasing.
Places the values as a message on the front of tid's message queue.
Send a message to tid
but place it at the front of tid
's message queue instead of at the back. This function is typically used for out-of-band communication, to signal exceptional conditions, etc.
Receives a message from another thread.
Receive a message from another thread, or block if no messages of the specified types are available. This function works by pattern matching a message against a set of delegates and executing the first match found.
If a delegate that accepts a std.variant.Variant
is included as the last argument to receive
, it will match any message that was not matched by an earlier delegate. If more than one argument is sent, the Variant
will contain a std.typecons.Tuple
of all values sent.
import std.variant : Variant; auto process = () { receive( (int i) { ownerTid.send(1); }, (double f) { ownerTid.send(2); }, (Variant v) { ownerTid.send(3); } ); }; { auto tid = spawn(process); send(tid, 42); writeln(receiveOnly!int); // 1 } { auto tid = spawn(process); send(tid, 3.14); writeln(receiveOnly!int); // 2 } { auto tid = spawn(process); send(tid, "something else"); writeln(receiveOnly!int); // 3 }
Receives only messages with arguments of types T
.
MessageMismatch
if a message of types other than T
is received. T.length
is greater than one, the message will be packed into a std.typecons.Tuple
.auto tid = spawn( { assert(receiveOnly!int == 42); }); send(tid, 42);
auto tid = spawn( { assert(receiveOnly!string == "text"); }); send(tid, "text");
struct Record { string name; int age; } auto tid = spawn( { auto msg = receiveOnly!(double, Record); assert(msg[0] == 0.5); assert(msg[1].name == "Alice"); assert(msg[1].age == 31); }); send(tid, 0.5, Record("Alice", 31));
Tries to receive but will give up if no matches arrive within duration. Won't wait at all if provided core.time.Duration
is negative.
Same as receive
except that rather than wait forever for a message, it waits until either it receives a message or the given core.time.Duration
has passed. It returns true
if it received a message and false
if it timed out waiting for one.
These behaviors may be specified when a mailbox is full.
Wait until room is available.
Throw a MailboxFull exception.
Abort the send and return.
Sets a maximum mailbox size.
Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute the behavior specified by doThis. If messages is zero, the mailbox is unbounded.
Tid tid
| The Tid of the thread for which this limit should be set. |
size_t messages
| The maximum number of messages or zero if no limit. |
OnCrowding doThis
| The behavior executed when a message is sent to a full mailbox. |
Sets a maximum mailbox size.
Sets a limit on the maximum number of user messages allowed in the mailbox. If this limit is reached, the caller attempting to add a new message will execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded.
Tid tid
| The Tid of the thread for which this limit should be set. |
size_t messages
| The maximum number of messages or zero if no limit. |
bool function(Tid) onCrowdingDoThis
| The routine called when a message is sent to a full mailbox. |
Associates name with tid.
Associates name with tid in a process-local map. When the thread represented by tid terminates, any names associated with it will be automatically unregistered.
string name
| The name to associate with tid. |
Tid tid
| The tid register by name. |
Removes the registered name associated with a tid.
string name
| The name to unregister. |
Gets the Tid associated with name.
string name
| The name to locate within the registry. |
Encapsulates all implementation-level data needed for scheduling.
When defining a Scheduler, an instance of this struct must be associated with each logical thread. It contains all implementation-level information needed by the internal API.
Gets a thread-local instance of ThreadInfo.
Gets a thread-local instance of ThreadInfo, which should be used as the default instance when info is requested for a thread not created by the Scheduler.
Cleans up this ThreadInfo.
This must be called when a scheduled thread terminates. It tears down the messaging system for the thread and notifies interested parties of the thread's termination.
A Scheduler controls how threading is performed by spawn.
Implementing a Scheduler allows the concurrency mechanism used by this module to be customized according to different needs. By default, a call to spawn will create a new kernel thread that executes the supplied routine and terminates when finished. But it is possible to create Schedulers that reuse threads, that multiplex Fibers (coroutines) across a single thread, or any number of other approaches. By making the choice of Scheduler a user-level option, std.concurrency may be used for far more types of application than if this behavior were predefined.
import std.concurrency; import std.stdio; void main() { scheduler = new FiberScheduler; scheduler.start( { writeln("the rest of main goes here"); }); }Some schedulers have a dispatching loop that must run if they are to work properly, so for the sake of consistency, when using a scheduler, start() must be called within main(). This yields control to the scheduler and will ensure that any spawned threads are executed in an expected manner.
Spawns the supplied op and starts the Scheduler.
This is intended to be called at the start of the program to yield all scheduling to the active Scheduler instance. This is necessary for schedulers that explicitly dispatch threads rather than simply relying on the operating system to do so, and so start should always be called within main() to begin normal program execution.
void delegate() op
| A wrapper for whatever the main thread would have done in the absence of a custom scheduler. It will be automatically executed via a call to spawn by the Scheduler. |
Assigns a logical thread to execute the supplied op.
This routine is called by spawn. It is expected to instantiate a new logical thread and run the supplied operation. This thread must call thisInfo.cleanup() when the thread terminates if the scheduled thread is not a kernel thread--all kernel threads will have their ThreadInfo cleaned up automatically by a thread-local destructor.
void delegate() op
| The function to execute. This may be the actual function passed by the user to spawn itself, or may be a wrapper function. |
Yields execution to another logical thread.
This routine is called at various points within concurrency-aware APIs to provide a scheduler a chance to yield execution when using some sort of cooperative multithreading model. If this is not appropriate, such as when each logical thread is backed by a dedicated kernel thread, this routine may be a no-op.
Returns an appropriate ThreadInfo instance.
Returns an instance of ThreadInfo specific to the logical thread that is calling this routine or, if the calling thread was not create by this scheduler, returns ThreadInfo.thisInfo instead.
Creates a Condition variable analog for signaling.
Creates a new Condition variable analog which is used to check for and to signal the addition of messages to a thread's message queue. Like yield, some schedulers may need to define custom behavior so that calls to Condition.wait() yield to another thread when no new messages are available instead of blocking.
Mutex m
| The Mutex that will be associated with this condition. It will be locked prior to any operation on the condition, and so in some cases a Scheduler may need to hold this reference and unlock the mutex before yielding execution to another logical thread. |
An example Scheduler using kernel threads.
This is an example Scheduler that mirrors the default scheduling behavior of creating one kernel thread per call to spawn. It is fully functional and may be instantiated and used, but is not a necessary part of the default functioning of this module.
This simply runs op directly, since no real scheduling is needed by this approach.
Creates a new kernel thread and assigns it to run the supplied op.
This scheduler does no explicit multiplexing, so this is a no-op.
Returns ThreadInfo.thisInfo, since it is a thread-local instance of ThreadInfo, which is the correct behavior for this scheduler.
Creates a new Condition variable. No custom behavior is needed here.
An example Scheduler using Fibers.
This is an example scheduler that creates a new Fiber per call to spawn and multiplexes the execution of all fibers within the main thread.
This creates a new Fiber for the supplied op and then starts the dispatcher.
This created a new Fiber for the supplied op and adds it to the dispatch list.
If the caller is a scheduled Fiber, this yields execution to another scheduled Fiber.
Returns an appropriate ThreadInfo instance.
Returns a ThreadInfo instance specific to the calling Fiber if the Fiber was created by this dispatcher, otherwise it returns ThreadInfo.thisInfo.
Returns a Condition analog that yields when wait or notify is called.
Sets the Scheduler behavior within the program.
This variable sets the Scheduler behavior within this program. Typically, when setting a Scheduler, scheduler.start() should be called in main. This routine will not return until program execution is complete.
If the caller is a Fiber and is not a Generator, this function will call scheduler.yield() or Fiber.yield(), as appropriate.
A Generator is a Fiber that periodically returns values of type T to the caller via yield. This is represented as an InputRange.
auto tid = spawn({ int i; while (i < 9) i = receiveOnly!int; ownerTid.send(i * 2); }); auto r = new Generator!int({ foreach (i; 1 .. 10) yield(i); }); foreach (e; r) tid.send(e); writeln(receiveOnly!int); // 18
Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.
void function() fn
| The fiber function. |
Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.
void function() fn
| The fiber function. |
size_t sz
| The stack size for this fiber. |
Initializes a generator object which is associated with a static D function. The function will be called once to prepare the range for iteration.
void function() fn
| The fiber function. |
size_t sz
| The stack size for this fiber. |
size_t guardPageSize
| size of the guard page to trap fiber's stack overflows. Refer to core.thread.Fiber 's documentation for more details. |
Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.
void delegate() dg
| The fiber function. |
Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.
void delegate() dg
| The fiber function. |
size_t sz
| The stack size for this fiber. |
Initializes a generator object which is associated with a dynamic D function. The function will be called once to prepare the range for iteration.
void delegate() dg
| The fiber function. |
size_t sz
| The stack size for this fiber. |
size_t guardPageSize
| size of the guard page to trap fiber's stack overflows. Refer to core.thread.Fiber 's documentation for more details. |
Returns true if the generator is empty.
Obtains the next value from the underlying function.
Returns the most recently generated value by shallow copy.
Returns the most recently generated value without executing a copy contructor. Will not compile for element types defining a postblit, because Generator does not return by reference.
Yields a value of type T to the caller of the currently executing generator.
T value
| The value to yield. |
import std.range; InputRange!int myIota = iota(10).inputRangeObject; myIota.popFront(); myIota.popFront(); writeln(myIota.moveFront); // 2 writeln(myIota.front); // 2 myIota.popFront(); writeln(myIota.front); // 3 //can be assigned to std.range.interfaces.InputRange directly myIota = new Generator!int( { foreach (i; 0 .. 10) yield(i); }); myIota.popFront(); myIota.popFront(); writeln(myIota.moveFront); // 2 writeln(myIota.front); // 2 myIota.popFront(); writeln(myIota.front); // 3 size_t[2] counter = [0, 0]; foreach (i, unused; myIota) counter[] += [1, i]; assert(myIota.empty); writeln(counter); // [7, 21]
Initializes var with the lazy init value in a thread-safe manner.
The implementation guarantees that all threads simultaneously calling initOnce with the same var argument block until var is fully initialized. All side-effects of init are globally visible afterwards.
var | The variable to initialize |
typeof(var) init
| The lazy initializer value |
static class MySingleton { static MySingleton instance() { __gshared MySingleton inst; return initOnce!inst(new MySingleton); } } assert(MySingleton.instance !is null);
Same as above, but takes a separate mutex instead of sharing one among all initOnce instances.
This should be used to avoid dead-locks when the init expression waits for the result of another thread that might also call initOnce. Use with care.
var | The variable to initialize |
typeof(var) init
| The lazy initializer value |
Mutex mutex
| A mutex to prevent race conditions |
import core.sync.mutex : Mutex; static shared bool varA, varB; static shared Mutex m; m = new shared Mutex; spawn({ // use a different mutex for varB to avoid a dead-lock initOnce!varB(true, m); ownerTid.send(true); }); // init depends on the result of the spawned thread initOnce!varA(receiveOnly!bool); writeln(varA); // true writeln(varB); // true
© 1999–2018 The D Language Foundation
Licensed under the Boost License 1.0.
https://dlang.org/phobos/std_concurrency.html