| /** | 
 | `std.parallelism` implements high-level primitives for SMP parallelism. | 
 | These include parallel foreach, parallel reduce, parallel eager map, pipelining | 
 | and future/promise parallelism.  `std.parallelism` is recommended when the | 
 | same operation is to be executed in parallel on different data, or when a | 
 | function is to be executed in a background thread and its result returned to a | 
 | well-defined main thread.  For communication between arbitrary threads, see | 
 | `std.concurrency`. | 
 |  | 
 | `std.parallelism` is based on the concept of a `Task`.  A `Task` is an | 
 | object that represents the fundamental unit of work in this library and may be | 
 | executed in parallel with any other `Task`.  Using `Task` | 
 | directly allows programming with a future/promise paradigm.  All other | 
 | supported parallelism paradigms (parallel foreach, map, reduce, pipelining) | 
 | represent an additional level of abstraction over `Task`.  They | 
 | automatically create one or more `Task` objects, or closely related types | 
 | that are conceptually identical but not part of the public API. | 
 |  | 
 | After creation, a `Task` may be executed in a new thread, or submitted | 
 | to a `TaskPool` for execution.  A `TaskPool` encapsulates a task queue | 
 | and its worker threads.  Its purpose is to efficiently map a large | 
 | number of `Task`s onto a smaller number of threads.  A task queue is a | 
 | FIFO queue of `Task` objects that have been submitted to the | 
 | `TaskPool` and are awaiting execution.  A worker thread is a thread that | 
 | is associated with exactly one task queue.  It executes the `Task` at the | 
 | front of its queue when the queue has work available, or sleeps when | 
 | no work is available.  Each task queue is associated with zero or | 
 | more worker threads.  If the result of a `Task` is needed before execution | 
 | by a worker thread has begun, the `Task` can be removed from the task queue | 
 | and executed immediately in the thread where the result is needed. | 
 |  | 
 | Warning:  Unless marked as `@trusted` or `@safe`, artifacts in | 
 |           this module allow implicit data sharing between threads and cannot | 
 |           guarantee that client code is free from low level data races. | 
 |  | 
 | Source:    $(PHOBOSSRC std/parallelism.d) | 
 | Author:  David Simcha | 
 | Copyright:  Copyright (c) 2009-2011, David Simcha. | 
 | License:    $(HTTP boost.org/LICENSE_1_0.txt, Boost License 1.0) | 
 | */ | 
 | module std.parallelism; | 
 |  | 
 | version (OSX) | 
 |     version = Darwin; | 
 | else version (iOS) | 
 |     version = Darwin; | 
 | else version (TVOS) | 
 |     version = Darwin; | 
 | else version (WatchOS) | 
 |     version = Darwin; | 
 |  | 
 | /// | 
 | @system unittest | 
 | { | 
 |     import std.algorithm.iteration : map; | 
 |     import std.math.operations : isClose; | 
 |     import std.parallelism : taskPool; | 
 |     import std.range : iota; | 
 |  | 
 |     // Parallel reduce can be combined with | 
 |     // std.algorithm.iteration.map to interesting effect. | 
 |     // The following example (thanks to Russel Winder) | 
 |     // calculates pi by quadrature  using | 
 |     // std.algorithm.map and TaskPool.reduce. | 
 |     // getTerm is evaluated in parallel as needed by | 
 |     // TaskPool.reduce. | 
 |     // | 
 |     // Timings on an Intel i5-3450 quad core machine | 
 |     // for n = 1_000_000_000: | 
 |     // | 
 |     // TaskPool.reduce:       1.067 s | 
 |     // std.algorithm.reduce:  4.011 s | 
 |  | 
 |     enum n = 1_000_000; | 
 |     enum delta = 1.0 / n; | 
 |  | 
 |     alias getTerm = (int i) | 
 |     { | 
 |         immutable x = ( i - 0.5 ) * delta; | 
 |         return delta / ( 1.0 + x * x ) ; | 
 |     }; | 
 |  | 
 |     immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm); | 
 |  | 
 |     assert(pi.isClose(3.14159, 1e-5)); | 
 | } | 
 |  | 
 | import core.atomic; | 
 | import core.memory; | 
 | import core.sync.condition; | 
 | import core.thread; | 
 |  | 
 | import std.functional; | 
 | import std.meta; | 
 | import std.range.primitives; | 
 | import std.traits; | 
 |  | 
 | /* | 
 | (For now public undocumented with reserved name.) | 
 |  | 
 | A lazily initialized global constant. The underlying value is a shared global | 
 | statically initialized to `outOfBandValue` which must not be a legit value of | 
 | the constant. Upon the first call the situation is detected and the global is | 
 | initialized by calling `initializer`. The initializer is assumed to be pure | 
 | (even if not marked as such), i.e. return the same value upon repeated calls. | 
 | For that reason, no special precautions are taken so `initializer` may be called | 
 | more than one time leading to benign races on the cached value. | 
 |  | 
 | In the quiescent state the cost of the function is an atomic load from a global. | 
 |  | 
 | Params: | 
 |     T = The type of the pseudo-constant (may be qualified) | 
 |     outOfBandValue = A value that cannot be valid, it is used for initialization | 
 |     initializer = The function performing initialization; must be `nothrow` | 
 |  | 
 | Returns: | 
 |     The lazily initialized value | 
 | */ | 
 | @property pure | 
 | T __lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() | 
 | if (is(Unqual!T : T) | 
 |     && is(typeof(initializer()) : T) | 
 |     && is(typeof(outOfBandValue) : T)) | 
 | { | 
 |     static T impl() nothrow | 
 |     { | 
 |         // Thread-local cache | 
 |         static Unqual!T tls = outOfBandValue; | 
 |         auto local = tls; | 
 |         // Shortest path, no atomic operations | 
 |         if (local != outOfBandValue) return local; | 
 |         // Process-level cache | 
 |         static shared Unqual!T result = outOfBandValue; | 
 |         // Initialize both process-level cache and tls | 
 |         local = atomicLoad(result); | 
 |         if (local == outOfBandValue) | 
 |         { | 
 |             local = initializer(); | 
 |             atomicStore(result, local); | 
 |         } | 
 |         tls = local; | 
 |         return local; | 
 |     } | 
 |  | 
 |     import std.traits : SetFunctionAttributes; | 
 |     alias Fun = SetFunctionAttributes!(typeof(&impl), "D", | 
 |         functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); | 
 |     auto purified = (() @trusted => cast(Fun) &impl)(); | 
 |     return purified(); | 
 | } | 
 |  | 
 | // Returns the size of a cache line. | 
 | alias cacheLineSize = | 
 |     __lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); | 
 |  | 
 | private size_t cacheLineSizeImpl() @nogc nothrow @trusted | 
 | { | 
 |     size_t result = 0; | 
 |     import core.cpuid : datacache; | 
 |     foreach (ref const cachelevel; datacache) | 
 |     { | 
 |         if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) | 
 |         { | 
 |             result = cachelevel.lineSize; | 
 |         } | 
 |     } | 
 |     return result; | 
 | } | 
 |  | 
 | @nogc @safe nothrow unittest | 
 | { | 
 |     assert(cacheLineSize == cacheLineSizeImpl); | 
 | } | 
 |  | 
 | /* Atomics code.  These forward to core.atomic, but are written like this | 
 |    for two reasons: | 
 |  | 
 |    1.  They used to actually contain ASM code and I don' want to have to change | 
 |        to directly calling core.atomic in a zillion different places. | 
 |  | 
 |    2.  core.atomic has some misc. issues that make my use cases difficult | 
 |        without wrapping it.  If I didn't wrap it, casts would be required | 
 |        basically everywhere. | 
 | */ | 
 | private void atomicSetUbyte(T)(ref T stuff, T newVal) | 
 | if (__traits(isIntegral, T) && is(T : ubyte)) | 
 | { | 
 |     //core.atomic.cas(cast(shared) &stuff, stuff, newVal); | 
 |     atomicStore(*(cast(shared) &stuff), newVal); | 
 | } | 
 |  | 
 | private ubyte atomicReadUbyte(T)(ref T val) | 
 | if (__traits(isIntegral, T) && is(T : ubyte)) | 
 | { | 
 |     return atomicLoad(*(cast(shared) &val)); | 
 | } | 
 |  | 
 | // This gets rid of the need for a lot of annoying casts in other parts of the | 
 | // code, when enums are involved. | 
 | private bool atomicCasUbyte(T)(ref T stuff, T testVal, T newVal) | 
 | if (__traits(isIntegral, T) && is(T : ubyte)) | 
 | { | 
 |     return core.atomic.cas(cast(shared) &stuff, testVal, newVal); | 
 | } | 
 |  | 
 | /*--------------------- Generic helper functions, etc.------------------------*/ | 
 | private template MapType(R, functions...) | 
 | { | 
 |     static assert(functions.length); | 
 |  | 
 |     ElementType!R e = void; | 
 |     alias MapType = | 
 |         typeof(adjoin!(staticMap!(unaryFun, functions))(e)); | 
 | } | 
 |  | 
 | private template ReduceType(alias fun, R, E) | 
 | { | 
 |     alias ReduceType = typeof(binaryFun!fun(E.init, ElementType!R.init)); | 
 | } | 
 |  | 
 | private template noUnsharedAliasing(T) | 
 | { | 
 |     enum bool noUnsharedAliasing = !hasUnsharedAliasing!T; | 
 | } | 
 |  | 
 | // This template tests whether a function may be executed in parallel from | 
 | // @safe code via Task.executeInNewThread().  There is an additional | 
 | // requirement for executing it via a TaskPool.  (See isSafeReturn). | 
 | private template isSafeTask(F) | 
 | { | 
 |     enum bool isSafeTask = | 
 |         (functionAttributes!F & (FunctionAttribute.safe | FunctionAttribute.trusted)) != 0 && | 
 |         (functionAttributes!F & FunctionAttribute.ref_) == 0 && | 
 |         (isFunctionPointer!F || !hasUnsharedAliasing!F) && | 
 |         allSatisfy!(noUnsharedAliasing, Parameters!F); | 
 | } | 
 |  | 
 | @safe unittest | 
 | { | 
 |     alias F1 = void function() @safe; | 
 |     alias F2 = void function(); | 
 |     alias F3 = void function(uint, string) @trusted; | 
 |     alias F4 = void function(uint, char[]); | 
 |  | 
 |     static assert( isSafeTask!F1); | 
 |     static assert(!isSafeTask!F2); | 
 |     static assert( isSafeTask!F3); | 
 |     static assert(!isSafeTask!F4); | 
 |  | 
 |     alias F5 = uint[] function(uint, string) pure @trusted; | 
 |     static assert( isSafeTask!F5); | 
 | } | 
 |  | 
 | // This function decides whether Tasks that meet all of the other requirements | 
 | // for being executed from @safe code can be executed on a TaskPool. | 
 | // When executing via TaskPool, it's theoretically possible | 
 | // to return a value that is also pointed to by a worker thread's thread local | 
 | // storage.  When executing from executeInNewThread(), the thread that executed | 
 | // the Task is terminated by the time the return value is visible in the calling | 
 | // thread, so this is a non-issue.  It's also a non-issue for pure functions | 
 | // since they can't read global state. | 
 | private template isSafeReturn(T) | 
 | { | 
 |     static if (!hasUnsharedAliasing!(T.ReturnType)) | 
 |     { | 
 |         enum isSafeReturn = true; | 
 |     } | 
 |     else static if (T.isPure) | 
 |     { | 
 |         enum isSafeReturn = true; | 
 |     } | 
 |     else | 
 |     { | 
 |         enum isSafeReturn = false; | 
 |     } | 
 | } | 
 |  | 
 | private template randAssignable(R) | 
 | { | 
 |     enum randAssignable = isRandomAccessRange!R && hasAssignableElements!R; | 
 | } | 
 |  | 
 | private enum TaskStatus : ubyte | 
 | { | 
 |     notStarted, | 
 |     inProgress, | 
 |     done | 
 | } | 
 |  | 
 | private template AliasReturn(alias fun, T...) | 
 | { | 
 |     alias AliasReturn = typeof({ T args; return fun(args); }); | 
 | } | 
 |  | 
 | // Should be private, but std.algorithm.reduce is used in the zero-thread case | 
 | // and won't work w/ private. | 
 | template reduceAdjoin(functions...) | 
 | { | 
 |     static if (functions.length == 1) | 
 |     { | 
 |         alias reduceAdjoin = binaryFun!(functions[0]); | 
 |     } | 
 |     else | 
 |     { | 
 |         T reduceAdjoin(T, U)(T lhs, U rhs) | 
 |         { | 
 |             alias funs = staticMap!(binaryFun, functions); | 
 |  | 
 |             foreach (i, Unused; typeof(lhs.expand)) | 
 |             { | 
 |                 lhs.expand[i] = funs[i](lhs.expand[i], rhs); | 
 |             } | 
 |  | 
 |             return lhs; | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | private template reduceFinish(functions...) | 
 | { | 
 |     static if (functions.length == 1) | 
 |     { | 
 |         alias reduceFinish = binaryFun!(functions[0]); | 
 |     } | 
 |     else | 
 |     { | 
 |         T reduceFinish(T)(T lhs, T rhs) | 
 |         { | 
 |             alias funs = staticMap!(binaryFun, functions); | 
 |  | 
 |             foreach (i, Unused; typeof(lhs.expand)) | 
 |             { | 
 |                 lhs.expand[i] = funs[i](lhs.expand[i], rhs.expand[i]); | 
 |             } | 
 |  | 
 |             return lhs; | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | private template isRoundRobin(R : RoundRobinBuffer!(C1, C2), C1, C2) | 
 | { | 
 |     enum isRoundRobin = true; | 
 | } | 
 |  | 
 | private template isRoundRobin(T) | 
 | { | 
 |     enum isRoundRobin = false; | 
 | } | 
 |  | 
 | @safe unittest | 
 | { | 
 |     static assert( isRoundRobin!(RoundRobinBuffer!(void delegate(char[]), bool delegate()))); | 
 |     static assert(!isRoundRobin!(uint)); | 
 | } | 
 |  | 
 | // This is the base "class" for all of the other tasks.  Using C-style | 
 | // polymorphism to allow more direct control over memory allocation, etc. | 
 | private struct AbstractTask | 
 | { | 
 |     AbstractTask* prev; | 
 |     AbstractTask* next; | 
 |  | 
 |     // Pointer to a function that executes this task. | 
 |     void function(void*) runTask; | 
 |  | 
 |     Throwable exception; | 
 |     ubyte taskStatus = TaskStatus.notStarted; | 
 |  | 
 |     bool done() @property | 
 |     { | 
 |         if (atomicReadUbyte(taskStatus) == TaskStatus.done) | 
 |         { | 
 |             if (exception) | 
 |             { | 
 |                 throw exception; | 
 |             } | 
 |  | 
 |             return true; | 
 |         } | 
 |  | 
 |         return false; | 
 |     } | 
 |  | 
 |     void job() | 
 |     { | 
 |         runTask(&this); | 
 |     } | 
 | } | 
 |  | 
 | /** | 
 | `Task` represents the fundamental unit of work.  A `Task` may be | 
 | executed in parallel with any other `Task`.  Using this struct directly | 
 | allows future/promise parallelism.  In this paradigm, a function (or delegate | 
 | or other callable) is executed in a thread other than the one it was called | 
 | from.  The calling thread does not block while the function is being executed. | 
 | A call to `workForce`, `yieldForce`, or `spinForce` is used to | 
 | ensure that the `Task` has finished executing and to obtain the return | 
 | value, if any.  These functions and `done` also act as full memory barriers, | 
 | meaning that any memory writes made in the thread that executed the `Task` | 
 | are guaranteed to be visible in the calling thread after one of these functions | 
 | returns. | 
 |  | 
 | The $(REF task, std,parallelism) and $(REF scopedTask, std,parallelism) functions can | 
 | be used to create an instance of this struct.  See `task` for usage examples. | 
 |  | 
 | Function results are returned from `yieldForce`, `spinForce` and | 
 | `workForce` by ref.  If `fun` returns by ref, the reference will point | 
 | to the returned reference of `fun`.  Otherwise it will point to a | 
 | field in this struct. | 
 |  | 
 | Copying of this struct is disabled, since it would provide no useful semantics. | 
 | If you want to pass this struct around, you should do so by reference or | 
 | pointer. | 
 |  | 
 | Bugs:  Changes to `ref` and `out` arguments are not propagated to the | 
 |        call site, only to `args` in this struct. | 
 | */ | 
 | struct Task(alias fun, Args...) | 
 | { | 
 |     private AbstractTask base = {runTask : &impl}; | 
 |     private alias base this; | 
 |  | 
 |     private @property AbstractTask* basePtr() | 
 |     { | 
 |         return &base; | 
 |     } | 
 |  | 
 |     private static void impl(void* myTask) | 
 |     { | 
 |         import std.algorithm.internal : addressOf; | 
 |  | 
 |         Task* myCastedTask = cast(typeof(this)*) myTask; | 
 |         static if (is(ReturnType == void)) | 
 |         { | 
 |             fun(myCastedTask._args); | 
 |         } | 
 |         else static if (is(typeof(&(fun(myCastedTask._args))))) | 
 |         { | 
 |             myCastedTask.returnVal = addressOf(fun(myCastedTask._args)); | 
 |         } | 
 |         else | 
 |         { | 
 |             myCastedTask.returnVal = fun(myCastedTask._args); | 
 |         } | 
 |     } | 
 |  | 
 |     private TaskPool pool; | 
 |     private bool isScoped;  // True if created with scopedTask. | 
 |  | 
 |     Args _args; | 
 |  | 
 |     /** | 
 |     The arguments the function was called with.  Changes to `out` and | 
 |     `ref` arguments will be visible here. | 
 |     */ | 
 |     static if (__traits(isSame, fun, run)) | 
 |     { | 
 |         alias args = _args[1..$]; | 
 |     } | 
 |     else | 
 |     { | 
 |         alias args = _args; | 
 |     } | 
 |  | 
 |  | 
 |     // The purpose of this code is to decide whether functions whose | 
 |     // return values have unshared aliasing can be executed via | 
 |     // TaskPool from @safe code.  See isSafeReturn. | 
 |     static if (__traits(isSame, fun, run)) | 
 |     { | 
 |         static if (isFunctionPointer!(_args[0])) | 
 |         { | 
 |             private enum bool isPure = | 
 |             (functionAttributes!(Args[0]) & FunctionAttribute.pure_) != 0; | 
 |         } | 
 |         else | 
 |         { | 
 |             // BUG:  Should check this for delegates too, but std.traits | 
 |             //       apparently doesn't allow this.  isPure is irrelevant | 
 |             //       for delegates, at least for now since shared delegates | 
 |             //       don't work. | 
 |             private enum bool isPure = false; | 
 |         } | 
 |  | 
 |     } | 
 |     else | 
 |     { | 
 |         // We already know that we can't execute aliases in @safe code, so | 
 |         // just put a dummy value here. | 
 |         private enum bool isPure = false; | 
 |     } | 
 |  | 
 |  | 
 |     /** | 
 |     The return type of the function called by this `Task`.  This can be | 
 |     `void`. | 
 |     */ | 
 |     alias ReturnType = typeof(fun(_args)); | 
 |  | 
 |     static if (!is(ReturnType == void)) | 
 |     { | 
 |         static if (is(typeof(&fun(_args)))) | 
 |         { | 
 |             // Ref return. | 
 |             ReturnType* returnVal; | 
 |  | 
 |             ref ReturnType fixRef(ReturnType* val) | 
 |             { | 
 |                 return *val; | 
 |             } | 
 |  | 
 |         } | 
 |         else | 
 |         { | 
 |             ReturnType returnVal; | 
 |  | 
 |             ref ReturnType fixRef(ref ReturnType val) | 
 |             { | 
 |                 return val; | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     private void enforcePool() | 
 |     { | 
 |         import std.exception : enforce; | 
 |         enforce(this.pool !is null, "Job not submitted yet."); | 
 |     } | 
 |  | 
 |     static if (Args.length > 0) | 
 |     { | 
 |         private this(Args args) | 
 |         { | 
 |             _args = args; | 
 |         } | 
 |     } | 
 |  | 
 |     // Work around DMD bug https://issues.dlang.org/show_bug.cgi?id=6588, | 
 |     // allow immutable elements. | 
 |     static if (allSatisfy!(isAssignable, Args)) | 
 |     { | 
 |         typeof(this) opAssign(typeof(this) rhs) | 
 |         { | 
 |             foreach (i, Type; typeof(this.tupleof)) | 
 |             { | 
 |                 this.tupleof[i] = rhs.tupleof[i]; | 
 |             } | 
 |             return this; | 
 |         } | 
 |     } | 
 |     else | 
 |     { | 
 |         @disable typeof(this) opAssign(typeof(this) rhs); | 
 |     } | 
 |  | 
 |     /** | 
 |     If the `Task` isn't started yet, execute it in the current thread. | 
 |     If it's done, return its return value, if any.  If it's in progress, | 
 |     busy spin until it's done, then return the return value.  If it threw | 
 |     an exception, rethrow that exception. | 
 |  | 
 |     This function should be used when you expect the result of the | 
 |     `Task` to be available on a timescale shorter than that of an OS | 
 |     context switch. | 
 |      */ | 
 |     @property ref ReturnType spinForce() @trusted | 
 |     { | 
 |         enforcePool(); | 
 |  | 
 |         this.pool.tryDeleteExecute(basePtr); | 
 |  | 
 |         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) {} | 
 |  | 
 |         if (exception) | 
 |         { | 
 |             throw exception; | 
 |         } | 
 |  | 
 |         static if (!is(ReturnType == void)) | 
 |         { | 
 |             return fixRef(this.returnVal); | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     If the `Task` isn't started yet, execute it in the current thread. | 
 |     If it's done, return its return value, if any.  If it's in progress, | 
 |     wait on a condition variable.  If it threw an exception, rethrow that | 
 |     exception. | 
 |  | 
 |     This function should be used for expensive functions, as waiting on a | 
 |     condition variable introduces latency, but avoids wasted CPU cycles. | 
 |      */ | 
 |     @property ref ReturnType yieldForce() @trusted | 
 |     { | 
 |         enforcePool(); | 
 |         this.pool.tryDeleteExecute(basePtr); | 
 |  | 
 |         if (done) | 
 |         { | 
 |             static if (is(ReturnType == void)) | 
 |             { | 
 |                 return; | 
 |             } | 
 |             else | 
 |             { | 
 |                 return fixRef(this.returnVal); | 
 |             } | 
 |         } | 
 |  | 
 |         pool.waiterLock(); | 
 |         scope(exit) pool.waiterUnlock(); | 
 |  | 
 |         while (atomicReadUbyte(this.taskStatus) != TaskStatus.done) | 
 |         { | 
 |             pool.waitUntilCompletion(); | 
 |         } | 
 |  | 
 |         if (exception) | 
 |         { | 
 |             throw exception; // nocoverage | 
 |         } | 
 |  | 
 |         static if (!is(ReturnType == void)) | 
 |         { | 
 |             return fixRef(this.returnVal); | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     If this `Task` was not started yet, execute it in the current | 
 |     thread.  If it is finished, return its result.  If it is in progress, | 
 |     execute any other `Task` from the `TaskPool` instance that | 
 |     this `Task` was submitted to until this one | 
 |     is finished.  If it threw an exception, rethrow that exception. | 
 |     If no other tasks are available or this `Task` was executed using | 
 |     `executeInNewThread`, wait on a condition variable. | 
 |      */ | 
 |     @property ref ReturnType workForce() @trusted | 
 |     { | 
 |         enforcePool(); | 
 |         this.pool.tryDeleteExecute(basePtr); | 
 |  | 
 |         while (true) | 
 |         { | 
 |             if (done)    // done() implicitly checks for exceptions. | 
 |             { | 
 |                 static if (is(ReturnType == void)) | 
 |                 { | 
 |                     return; | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     return fixRef(this.returnVal); | 
 |                 } | 
 |             } | 
 |  | 
 |             AbstractTask* job; | 
 |             { | 
 |                 // Locking explicitly and calling popNoSync() because | 
 |                 // pop() waits on a condition variable if there are no Tasks | 
 |                 // in the queue. | 
 |  | 
 |                 pool.queueLock(); | 
 |                 scope(exit) pool.queueUnlock(); | 
 |                 job = pool.popNoSync(); | 
 |             } | 
 |  | 
 |  | 
 |             if (job !is null) | 
 |             { | 
 |  | 
 |                 version (verboseUnittest) | 
 |                 { | 
 |                     stderr.writeln("Doing workForce work."); | 
 |                 } | 
 |  | 
 |                 pool.doJob(job); | 
 |  | 
 |                 if (done) | 
 |                 { | 
 |                     static if (is(ReturnType == void)) | 
 |                     { | 
 |                         return; | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         return fixRef(this.returnVal); | 
 |                     } | 
 |                 } | 
 |             } | 
 |             else | 
 |             { | 
 |                 version (verboseUnittest) | 
 |                 { | 
 |                     stderr.writeln("Yield from workForce."); | 
 |                 } | 
 |  | 
 |                 return yieldForce; | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     Returns `true` if the `Task` is finished executing. | 
 |  | 
 |     Throws:  Rethrows any exception thrown during the execution of the | 
 |              `Task`. | 
 |     */ | 
 |     @property bool done() @trusted | 
 |     { | 
 |         // Explicitly forwarded for documentation purposes. | 
 |         return base.done; | 
 |     } | 
 |  | 
 |     /** | 
 |     Create a new thread for executing this `Task`, execute it in the | 
 |     newly created thread, then terminate the thread.  This can be used for | 
 |     future/promise parallelism.  An explicit priority may be given | 
 |     to the `Task`.  If one is provided, its value is forwarded to | 
 |     `core.thread.Thread.priority`. See $(REF task, std,parallelism) for | 
 |     usage example. | 
 |     */ | 
 |     void executeInNewThread() @trusted | 
 |     { | 
 |         pool = new TaskPool(basePtr); | 
 |     } | 
 |  | 
 |     /// Ditto | 
 |     void executeInNewThread(int priority) @trusted | 
 |     { | 
 |         pool = new TaskPool(basePtr, priority); | 
 |     } | 
 |  | 
 |     @safe ~this() | 
 |     { | 
 |         if (isScoped && pool !is null && taskStatus != TaskStatus.done) | 
 |         { | 
 |             yieldForce; | 
 |         } | 
 |     } | 
 |  | 
 |     // When this is uncommented, it somehow gets called on returning from | 
 |     // scopedTask even though the struct shouldn't be getting copied. | 
 |     //@disable this(this) {} | 
 | } | 
 |  | 
 | // Calls `fpOrDelegate` with `args`.  This is an | 
 | // adapter that makes `Task` work with delegates, function pointers and | 
 | // functors instead of just aliases. | 
 | ReturnType!F run(F, Args...)(F fpOrDelegate, ref Args args) | 
 | { | 
 |     return fpOrDelegate(args); | 
 | } | 
 |  | 
 | /** | 
 | Creates a `Task` on the GC heap that calls an alias.  This may be executed | 
 | via `Task.executeInNewThread` or by submitting to a | 
 | $(REF TaskPool, std,parallelism).  A globally accessible instance of | 
 | `TaskPool` is provided by $(REF taskPool, std,parallelism). | 
 |  | 
 | Returns:  A pointer to the `Task`. | 
 |  | 
 | Example: | 
 | --- | 
 | // Read two files into memory at the same time. | 
 | import std.file; | 
 |  | 
 | void main() | 
 | { | 
 |     // Create and execute a Task for reading | 
 |     // foo.txt. | 
 |     auto file1Task = task!read("foo.txt"); | 
 |     file1Task.executeInNewThread(); | 
 |  | 
 |     // Read bar.txt in parallel. | 
 |     auto file2Data = read("bar.txt"); | 
 |  | 
 |     // Get the results of reading foo.txt. | 
 |     auto file1Data = file1Task.yieldForce; | 
 | } | 
 | --- | 
 |  | 
 | --- | 
 | // Sorts an array using a parallel quick sort algorithm. | 
 | // The first partition is done serially.  Both recursion | 
 | // branches are then executed in parallel. | 
 | // | 
 | // Timings for sorting an array of 1,000,000 doubles on | 
 | // an Athlon 64 X2 dual core machine: | 
 | // | 
 | // This implementation:               176 milliseconds. | 
 | // Equivalent serial implementation:  280 milliseconds | 
 | void parallelSort(T)(T[] data) | 
 | { | 
 |     // Sort small subarrays serially. | 
 |     if (data.length < 100) | 
 |     { | 
 |          std.algorithm.sort(data); | 
 |          return; | 
 |     } | 
 |  | 
 |     // Partition the array. | 
 |     swap(data[$ / 2], data[$ - 1]); | 
 |     auto pivot = data[$ - 1]; | 
 |     bool lessThanPivot(T elem) { return elem < pivot; } | 
 |  | 
 |     auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]); | 
 |     swap(data[$ - greaterEqual.length - 1], data[$ - 1]); | 
 |  | 
 |     auto less = data[0..$ - greaterEqual.length - 1]; | 
 |     greaterEqual = data[$ - greaterEqual.length..$]; | 
 |  | 
 |     // Execute both recursion branches in parallel. | 
 |     auto recurseTask = task!parallelSort(greaterEqual); | 
 |     taskPool.put(recurseTask); | 
 |     parallelSort(less); | 
 |     recurseTask.yieldForce; | 
 | } | 
 | --- | 
 | */ | 
 | auto task(alias fun, Args...)(Args args) | 
 | { | 
 |     return new Task!(fun, Args)(args); | 
 | } | 
 |  | 
 | /** | 
 | Creates a `Task` on the GC heap that calls a function pointer, delegate, or | 
 | class/struct with overloaded opCall. | 
 |  | 
 | Example: | 
 | --- | 
 | // Read two files in at the same time again, | 
 | // but this time use a function pointer instead | 
 | // of an alias to represent std.file.read. | 
 | import std.file; | 
 |  | 
 | void main() | 
 | { | 
 |     // Create and execute a Task for reading | 
 |     // foo.txt. | 
 |     auto file1Task = task(&read!string, "foo.txt", size_t.max); | 
 |     file1Task.executeInNewThread(); | 
 |  | 
 |     // Read bar.txt in parallel. | 
 |     auto file2Data = read("bar.txt"); | 
 |  | 
 |     // Get the results of reading foo.txt. | 
 |     auto file1Data = file1Task.yieldForce; | 
 | } | 
 | --- | 
 |  | 
 | Notes: This function takes a non-scope delegate, meaning it can be | 
 |        used with closures.  If you can't allocate a closure due to objects | 
 |        on the stack that have scoped destruction, see `scopedTask`, which | 
 |        takes a scope delegate. | 
 |  */ | 
 | auto task(F, Args...)(F delegateOrFp, Args args) | 
 | if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) | 
 | { | 
 |     return new Task!(run, F, Args)(delegateOrFp, args); | 
 | } | 
 |  | 
 | /** | 
 | Version of `task` usable from `@safe` code.  Usage mechanics are | 
 | identical to the non-@safe case, but safety introduces some restrictions: | 
 |  | 
 | 1.  `fun` must be @safe or @trusted. | 
 |  | 
 | 2.  `F` must not have any unshared aliasing as defined by | 
 |     $(REF hasUnsharedAliasing, std,traits).  This means it | 
 |     may not be an unshared delegate or a non-shared class or struct | 
 |     with overloaded `opCall`.  This also precludes accepting template | 
 |     alias parameters. | 
 |  | 
 | 3.  `Args` must not have unshared aliasing. | 
 |  | 
 | 4.  `fun` must not return by reference. | 
 |  | 
 | 5.  The return type must not have unshared aliasing unless `fun` is | 
 |     `pure` or the `Task` is executed via `executeInNewThread` instead | 
 |     of using a `TaskPool`. | 
 |  | 
 | */ | 
 | @trusted auto task(F, Args...)(F fun, Args args) | 
 | if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) | 
 | { | 
 |     return new Task!(run, F, Args)(fun, args); | 
 | } | 
 |  | 
 | @safe unittest | 
 | { | 
 |     static struct Oops { | 
 |         int convert() { | 
 |             *cast(int*) 0xcafebabe = 0xdeadbeef; | 
 |             return 0; | 
 |         } | 
 |         alias convert this; | 
 |     } | 
 |     static void foo(int) @safe {} | 
 |  | 
 |     static assert(!__traits(compiles, task(&foo, Oops.init))); | 
 |     static assert(!__traits(compiles, scopedTask(&foo, Oops.init))); | 
 | } | 
 |  | 
 | /** | 
 | These functions allow the creation of `Task` objects on the stack rather | 
 | than the GC heap.  The lifetime of a `Task` created by `scopedTask` | 
 | cannot exceed the lifetime of the scope it was created in. | 
 |  | 
 | `scopedTask` might be preferred over `task`: | 
 |  | 
 | 1.  When a `Task` that calls a delegate is being created and a closure | 
 |     cannot be allocated due to objects on the stack that have scoped | 
 |     destruction.  The delegate overload of `scopedTask` takes a `scope` | 
 |     delegate. | 
 |  | 
 | 2.  As a micro-optimization, to avoid the heap allocation associated with | 
 |     `task` or with the creation of a closure. | 
 |  | 
 | Usage is otherwise identical to `task`. | 
 |  | 
 | Notes:  `Task` objects created using `scopedTask` will automatically | 
 | call `Task.yieldForce` in their destructor if necessary to ensure | 
 | the `Task` is complete before the stack frame they reside on is destroyed. | 
 | */ | 
 | auto scopedTask(alias fun, Args...)(Args args) | 
 | { | 
 |     auto ret = Task!(fun, Args)(args); | 
 |     ret.isScoped = true; | 
 |     return ret; | 
 | } | 
 |  | 
 | /// Ditto | 
 | auto scopedTask(F, Args...)(scope F delegateOrFp, Args args) | 
 | if (is(typeof(delegateOrFp(args))) && !isSafeTask!F) | 
 | { | 
 |     auto ret = Task!(run, F, Args)(delegateOrFp, args); | 
 |     ret.isScoped = true; | 
 |     return ret; | 
 | } | 
 |  | 
 | /// Ditto | 
 | @trusted auto scopedTask(F, Args...)(F fun, Args args) | 
 | if (__traits(compiles, () @safe => fun(args)) && isSafeTask!F) | 
 | { | 
 |     auto ret = Task!(run, F, Args)(fun, args); | 
 |     ret.isScoped = true; | 
 |     return ret; | 
 | } | 
 |  | 
 | /** | 
 | The total number of CPU cores available on the current machine, as reported by | 
 | the operating system. | 
 | */ | 
 | alias totalCPUs = | 
 |     __lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); | 
 |  | 
 | uint totalCPUsImpl() @nogc nothrow @trusted | 
 | { | 
 |     version (Windows) | 
 |     { | 
 |         // BUGS:  Only works on Windows 2000 and above. | 
 |         import core.sys.windows.winbase : SYSTEM_INFO, GetSystemInfo; | 
 |         import std.algorithm.comparison : max; | 
 |         SYSTEM_INFO si; | 
 |         GetSystemInfo(&si); | 
 |         return max(1, cast(uint) si.dwNumberOfProcessors); | 
 |     } | 
 |     else version (linux) | 
 |     { | 
 |         import core.stdc.stdlib : calloc; | 
 |         import core.stdc.string : memset; | 
 |         import core.sys.linux.sched : CPU_ALLOC_SIZE, CPU_FREE, CPU_COUNT, CPU_COUNT_S, cpu_set_t, sched_getaffinity; | 
 |         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; | 
 |  | 
 |         int count = 0; | 
 |  | 
 |         /** | 
 |          * According to ruby's source code, CPU_ALLOC() doesn't work as expected. | 
 |          *  see: https://github.com/ruby/ruby/commit/7d9e04de496915dd9e4544ee18b1a0026dc79242 | 
 |          * | 
 |          *  The hardcode number also comes from ruby's source code. | 
 |          *  see: https://github.com/ruby/ruby/commit/0fa75e813ecf0f5f3dd01f89aa76d0e25ab4fcd4 | 
 |          */ | 
 |         for (int n = 64; n <= 16384; n *= 2) | 
 |         { | 
 |             size_t size = CPU_ALLOC_SIZE(count); | 
 |             if (size >= 0x400) | 
 |             { | 
 |                 auto cpuset = cast(cpu_set_t*) calloc(1, size); | 
 |                 if (cpuset is null) break; | 
 |                 if (sched_getaffinity(0, size, cpuset) == 0) | 
 |                 { | 
 |                     count = CPU_COUNT_S(size, cpuset); | 
 |                 } | 
 |                 CPU_FREE(cpuset); | 
 |             } | 
 |             else | 
 |             { | 
 |                 cpu_set_t cpuset; | 
 |                 if (sched_getaffinity(0, cpu_set_t.sizeof, &cpuset) == 0) | 
 |                 { | 
 |                     count = CPU_COUNT(&cpuset); | 
 |                 } | 
 |             } | 
 |  | 
 |             if (count > 0) | 
 |                 return cast(uint) count; | 
 |         } | 
 |  | 
 |         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); | 
 |     } | 
 |     else version (Darwin) | 
 |     { | 
 |         import core.sys.darwin.sys.sysctl : sysctlbyname; | 
 |         uint result; | 
 |         size_t len = result.sizeof; | 
 |         sysctlbyname("hw.physicalcpu", &result, &len, null, 0); | 
 |         return result; | 
 |     } | 
 |     else version (DragonFlyBSD) | 
 |     { | 
 |         import core.sys.dragonflybsd.sys.sysctl : sysctlbyname; | 
 |         uint result; | 
 |         size_t len = result.sizeof; | 
 |         sysctlbyname("hw.ncpu", &result, &len, null, 0); | 
 |         return result; | 
 |     } | 
 |     else version (FreeBSD) | 
 |     { | 
 |         import core.sys.freebsd.sys.sysctl : sysctlbyname; | 
 |         uint result; | 
 |         size_t len = result.sizeof; | 
 |         sysctlbyname("hw.ncpu", &result, &len, null, 0); | 
 |         return result; | 
 |     } | 
 |     else version (NetBSD) | 
 |     { | 
 |         import core.sys.netbsd.sys.sysctl : sysctlbyname; | 
 |         uint result; | 
 |         size_t len = result.sizeof; | 
 |         sysctlbyname("hw.ncpu", &result, &len, null, 0); | 
 |         return result; | 
 |     } | 
 |     else version (Solaris) | 
 |     { | 
 |         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; | 
 |         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); | 
 |     } | 
 |     else version (OpenBSD) | 
 |     { | 
 |         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; | 
 |         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); | 
 |     } | 
 |     else version (Hurd) | 
 |     { | 
 |         import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; | 
 |         return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); | 
 |     } | 
 |     else | 
 |     { | 
 |         static assert(0, "Don't know how to get N CPUs on this OS."); | 
 |     } | 
 | } | 
 |  | 
 | /* | 
 | This class serves two purposes: | 
 |  | 
 | 1.  It distinguishes std.parallelism threads from other threads so that | 
 |     the std.parallelism daemon threads can be terminated. | 
 |  | 
 | 2.  It adds a reference to the pool that the thread is a member of, | 
 |     which is also necessary to allow the daemon threads to be properly | 
 |     terminated. | 
 | */ | 
 | private final class ParallelismThread : Thread | 
 | { | 
 |     this(void delegate() dg) | 
 |     { | 
 |         super(dg); | 
 |     } | 
 |  | 
 |     TaskPool pool; | 
 | } | 
 |  | 
 | // Kill daemon threads. | 
 | shared static ~this() | 
 | { | 
 |     foreach (ref thread; Thread) | 
 |     { | 
 |         auto pthread = cast(ParallelismThread) thread; | 
 |         if (pthread is null) continue; | 
 |         auto pool = pthread.pool; | 
 |         if (!pool.isDaemon) continue; | 
 |         pool.stop(); | 
 |         pthread.join(); | 
 |     } | 
 | } | 
 |  | 
 | /** | 
 | This class encapsulates a task queue and a set of worker threads.  Its purpose | 
 | is to efficiently map a large number of `Task`s onto a smaller number of | 
 | threads.  A task queue is a FIFO queue of `Task` objects that have been | 
 | submitted to the `TaskPool` and are awaiting execution.  A worker thread is a | 
 | thread that executes the `Task` at the front of the queue when one is | 
 | available and sleeps when the queue is empty. | 
 |  | 
 | This class should usually be used via the global instantiation | 
 | available via the $(REF taskPool, std,parallelism) property. | 
 | Occasionally it is useful to explicitly instantiate a `TaskPool`: | 
 |  | 
 | 1.  When you want `TaskPool` instances with multiple priorities, for example | 
 |     a low priority pool and a high priority pool. | 
 |  | 
 | 2.  When the threads in the global task pool are waiting on a synchronization | 
 |     primitive (for example a mutex), and you want to parallelize the code that | 
 |     needs to run before these threads can be resumed. | 
 |  | 
 | Note: The worker threads in this pool will not stop until | 
 |       `stop` or `finish` is called, even if the main thread | 
 |       has finished already. This may lead to programs that | 
 |       never end. If you do not want this behaviour, you can set `isDaemon` | 
 |       to true. | 
 |  */ | 
 | final class TaskPool | 
 | { | 
 | private: | 
 |  | 
 |     // A pool can either be a regular pool or a single-task pool.  A | 
 |     // single-task pool is a dummy pool that's fired up for | 
 |     // Task.executeInNewThread(). | 
 |     bool isSingleTask; | 
 |  | 
 |     ParallelismThread[] pool; | 
 |     Thread singleTaskThread; | 
 |  | 
 |     AbstractTask* head; | 
 |     AbstractTask* tail; | 
 |     PoolState status = PoolState.running; | 
 |     Condition workerCondition; | 
 |     Condition waiterCondition; | 
 |     Mutex queueMutex; | 
 |     Mutex waiterMutex;  // For waiterCondition | 
 |  | 
 |     // The instanceStartIndex of the next instance that will be created. | 
 |     __gshared size_t nextInstanceIndex = 1; | 
 |  | 
 |     // The index of the current thread. | 
 |     static size_t threadIndex; | 
 |  | 
 |     // The index of the first thread in this instance. | 
 |     immutable size_t instanceStartIndex; | 
 |  | 
 |     // The index that the next thread to be initialized in this pool will have. | 
 |     size_t nextThreadIndex; | 
 |  | 
 |     enum PoolState : ubyte | 
 |     { | 
 |         running, | 
 |         finishing, | 
 |         stopNow | 
 |     } | 
 |  | 
 |     void doJob(AbstractTask* job) | 
 |     { | 
 |         assert(job.taskStatus == TaskStatus.inProgress); | 
 |         assert(job.next is null); | 
 |         assert(job.prev is null); | 
 |  | 
 |         scope(exit) | 
 |         { | 
 |             if (!isSingleTask) | 
 |             { | 
 |                 waiterLock(); | 
 |                 scope(exit) waiterUnlock(); | 
 |                 notifyWaiters(); | 
 |             } | 
 |         } | 
 |  | 
 |         try | 
 |         { | 
 |             job.job(); | 
 |         } | 
 |         catch (Throwable e) | 
 |         { | 
 |             job.exception = e; | 
 |         } | 
 |  | 
 |         atomicSetUbyte(job.taskStatus, TaskStatus.done); | 
 |     } | 
 |  | 
 |     // This function is used for dummy pools created by Task.executeInNewThread(). | 
 |     void doSingleTask() | 
 |     { | 
 |         // No synchronization.  Pool is guaranteed to only have one thread, | 
 |         // and the queue is submitted to before this thread is created. | 
 |         assert(head); | 
 |         auto t = head; | 
 |         t.next = t.prev = head = null; | 
 |         doJob(t); | 
 |     } | 
 |  | 
 |     // This function performs initialization for each thread that affects | 
 |     // thread local storage and therefore must be done from within the | 
 |     // worker thread.  It then calls executeWorkLoop(). | 
 |     void startWorkLoop() | 
 |     { | 
 |         // Initialize thread index. | 
 |         { | 
 |             queueLock(); | 
 |             scope(exit) queueUnlock(); | 
 |             threadIndex = nextThreadIndex; | 
 |             nextThreadIndex++; | 
 |         } | 
 |  | 
 |         executeWorkLoop(); | 
 |     } | 
 |  | 
 |     // This is the main work loop that worker threads spend their time in | 
 |     // until they terminate.  It's also entered by non-worker threads when | 
 |     // finish() is called with the blocking variable set to true. | 
 |     void executeWorkLoop() | 
 |     { | 
 |         while (atomicReadUbyte(status) != PoolState.stopNow) | 
 |         { | 
 |             AbstractTask* task = pop(); | 
 |             if (task is null) | 
 |             { | 
 |                 if (atomicReadUbyte(status) == PoolState.finishing) | 
 |                 { | 
 |                     atomicSetUbyte(status, PoolState.stopNow); | 
 |                     return; | 
 |                 } | 
 |             } | 
 |             else | 
 |             { | 
 |                 doJob(task); | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     // Pop a task off the queue. | 
 |     AbstractTask* pop() | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         auto ret = popNoSync(); | 
 |         while (ret is null && status == PoolState.running) | 
 |         { | 
 |             wait(); | 
 |             ret = popNoSync(); | 
 |         } | 
 |         return ret; | 
 |     } | 
 |  | 
 |     AbstractTask* popNoSync() | 
 |     out(returned) | 
 |     { | 
 |         /* If task.prev and task.next aren't null, then another thread | 
 |          * can try to delete this task from the pool after it's | 
 |          * alreadly been deleted/popped. | 
 |          */ | 
 |         if (returned !is null) | 
 |         { | 
 |             assert(returned.next is null); | 
 |             assert(returned.prev is null); | 
 |         } | 
 |     } | 
 |     do | 
 |     { | 
 |         if (isSingleTask) return null; | 
 |  | 
 |         AbstractTask* returned = head; | 
 |         if (head !is null) | 
 |         { | 
 |             head = head.next; | 
 |             returned.prev = null; | 
 |             returned.next = null; | 
 |             returned.taskStatus = TaskStatus.inProgress; | 
 |         } | 
 |         if (head !is null) | 
 |         { | 
 |             head.prev = null; | 
 |         } | 
 |  | 
 |         return returned; | 
 |     } | 
 |  | 
 |     // Push a task onto the queue. | 
 |     void abstractPut(AbstractTask* task) | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         abstractPutNoSync(task); | 
 |     } | 
 |  | 
 |     void abstractPutNoSync(AbstractTask* task) | 
 |     in | 
 |     { | 
 |         assert(task); | 
 |     } | 
 |     out | 
 |     { | 
 |         import std.conv : text; | 
 |  | 
 |         assert(tail.prev !is tail); | 
 |         assert(tail.next is null, text(tail.prev, '\t', tail.next)); | 
 |         if (tail.prev !is null) | 
 |         { | 
 |             assert(tail.prev.next is tail, text(tail.prev, '\t', tail.next)); | 
 |         } | 
 |     } | 
 |     do | 
 |     { | 
 |         // Not using enforce() to save on function call overhead since this | 
 |         // is a performance critical function. | 
 |         if (status != PoolState.running) | 
 |         { | 
 |             throw new Error( | 
 |                 "Cannot submit a new task to a pool after calling " ~ | 
 |                 "finish() or stop()." | 
 |             ); | 
 |         } | 
 |  | 
 |         task.next = null; | 
 |         if (head is null)   //Queue is empty. | 
 |         { | 
 |             head = task; | 
 |             tail = task; | 
 |             tail.prev = null; | 
 |         } | 
 |         else | 
 |         { | 
 |             assert(tail); | 
 |             task.prev = tail; | 
 |             tail.next = task; | 
 |             tail = task; | 
 |         } | 
 |         notify(); | 
 |     } | 
 |  | 
 |     void abstractPutGroupNoSync(AbstractTask* h, AbstractTask* t) | 
 |     { | 
 |         if (status != PoolState.running) | 
 |         { | 
 |             throw new Error( | 
 |                 "Cannot submit a new task to a pool after calling " ~ | 
 |                 "finish() or stop()." | 
 |             ); | 
 |         } | 
 |  | 
 |         if (head is null) | 
 |         { | 
 |             head = h; | 
 |             tail = t; | 
 |         } | 
 |         else | 
 |         { | 
 |             h.prev = tail; | 
 |             tail.next = h; | 
 |             tail = t; | 
 |         } | 
 |  | 
 |         notifyAll(); | 
 |     } | 
 |  | 
 |     void tryDeleteExecute(AbstractTask* toExecute) | 
 |     { | 
 |         if (isSingleTask) return; | 
 |  | 
 |         if ( !deleteItem(toExecute) ) | 
 |         { | 
 |             return; | 
 |         } | 
 |  | 
 |         try | 
 |         { | 
 |             toExecute.job(); | 
 |         } | 
 |         catch (Exception e) | 
 |         { | 
 |             toExecute.exception = e; | 
 |         } | 
 |  | 
 |         atomicSetUbyte(toExecute.taskStatus, TaskStatus.done); | 
 |     } | 
 |  | 
 |     bool deleteItem(AbstractTask* item) | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         return deleteItemNoSync(item); | 
 |     } | 
 |  | 
 |     bool deleteItemNoSync(AbstractTask* item) | 
 |     { | 
 |         if (item.taskStatus != TaskStatus.notStarted) | 
 |         { | 
 |             return false; | 
 |         } | 
 |         item.taskStatus = TaskStatus.inProgress; | 
 |  | 
 |         if (item is head) | 
 |         { | 
 |             // Make sure head gets set properly. | 
 |             popNoSync(); | 
 |             return true; | 
 |         } | 
 |         if (item is tail) | 
 |         { | 
 |             tail = tail.prev; | 
 |             if (tail !is null) | 
 |             { | 
 |                 tail.next = null; | 
 |             } | 
 |             item.next = null; | 
 |             item.prev = null; | 
 |             return true; | 
 |         } | 
 |         if (item.next !is null) | 
 |         { | 
 |             assert(item.next.prev is item);  // Check queue consistency. | 
 |             item.next.prev = item.prev; | 
 |         } | 
 |         if (item.prev !is null) | 
 |         { | 
 |             assert(item.prev.next is item);  // Check queue consistency. | 
 |             item.prev.next = item.next; | 
 |         } | 
 |         item.next = null; | 
 |         item.prev = null; | 
 |         return true; | 
 |     } | 
 |  | 
 |     void queueLock() | 
 |     { | 
 |         assert(queueMutex); | 
 |         if (!isSingleTask) queueMutex.lock(); | 
 |     } | 
 |  | 
 |     void queueUnlock() | 
 |     { | 
 |         assert(queueMutex); | 
 |         if (!isSingleTask) queueMutex.unlock(); | 
 |     } | 
 |  | 
 |     void waiterLock() | 
 |     { | 
 |         if (!isSingleTask) waiterMutex.lock(); | 
 |     } | 
 |  | 
 |     void waiterUnlock() | 
 |     { | 
 |         if (!isSingleTask) waiterMutex.unlock(); | 
 |     } | 
 |  | 
 |     void wait() | 
 |     { | 
 |         if (!isSingleTask) workerCondition.wait(); | 
 |     } | 
 |  | 
 |     void notify() | 
 |     { | 
 |         if (!isSingleTask) workerCondition.notify(); | 
 |     } | 
 |  | 
 |     void notifyAll() | 
 |     { | 
 |         if (!isSingleTask) workerCondition.notifyAll(); | 
 |     } | 
 |  | 
 |     void waitUntilCompletion() | 
 |     { | 
 |         if (isSingleTask) | 
 |         { | 
 |             singleTaskThread.join(); | 
 |         } | 
 |         else | 
 |         { | 
 |             waiterCondition.wait(); | 
 |         } | 
 |     } | 
 |  | 
 |     void notifyWaiters() | 
 |     { | 
 |         if (!isSingleTask) waiterCondition.notifyAll(); | 
 |     } | 
 |  | 
 |     // Private constructor for creating dummy pools that only have one thread, | 
 |     // only execute one Task, and then terminate.  This is used for | 
 |     // Task.executeInNewThread(). | 
 |     this(AbstractTask* task, int priority = int.max) | 
 |     { | 
 |         assert(task); | 
 |  | 
 |         // Dummy value, not used. | 
 |         instanceStartIndex = 0; | 
 |  | 
 |         this.isSingleTask = true; | 
 |         task.taskStatus = TaskStatus.inProgress; | 
 |         this.head = task; | 
 |         singleTaskThread = new Thread(&doSingleTask); | 
 |         singleTaskThread.start(); | 
 |  | 
 |         // Disabled until writing code to support | 
 |         // running thread with specified priority | 
 |         // See https://issues.dlang.org/show_bug.cgi?id=8960 | 
 |  | 
 |         /*if (priority != int.max) | 
 |         { | 
 |             singleTaskThread.priority = priority; | 
 |         }*/ | 
 |     } | 
 |  | 
 | public: | 
 |     // This is used in parallel_algorithm but is too unstable to document | 
 |     // as public API. | 
 |     size_t defaultWorkUnitSize(size_t rangeLen) const @safe pure nothrow | 
 |     { | 
 |         import std.algorithm.comparison : max; | 
 |  | 
 |         if (this.size == 0) | 
 |         { | 
 |             return max(rangeLen, 1); | 
 |         } | 
 |  | 
 |         immutable size_t eightSize = 4 * (this.size + 1); | 
 |         auto ret = (rangeLen / eightSize) + ((rangeLen % eightSize == 0) ? 0 : 1); | 
 |         return max(ret, 1); | 
 |     } | 
 |  | 
 |     /** | 
 |     Default constructor that initializes a `TaskPool` with | 
 |     `totalCPUs` - 1 worker threads.  The minus 1 is included because the | 
 |     main thread will also be available to do work. | 
 |  | 
 |     Note:  On single-core machines, the primitives provided by `TaskPool` | 
 |            operate transparently in single-threaded mode. | 
 |      */ | 
 |     this() @trusted | 
 |     { | 
 |         this(totalCPUs - 1); | 
 |     } | 
 |  | 
 |     /** | 
 |     Allows for custom number of worker threads. | 
 |     */ | 
 |     this(size_t nWorkers) @trusted | 
 |     { | 
 |         synchronized(typeid(TaskPool)) | 
 |         { | 
 |             instanceStartIndex = nextInstanceIndex; | 
 |  | 
 |             // The first worker thread to be initialized will have this index, | 
 |             // and will increment it.  The second worker to be initialized will | 
 |             // have this index plus 1. | 
 |             nextThreadIndex = instanceStartIndex; | 
 |             nextInstanceIndex += nWorkers; | 
 |         } | 
 |  | 
 |         queueMutex = new Mutex(this); | 
 |         waiterMutex = new Mutex(); | 
 |         workerCondition = new Condition(queueMutex); | 
 |         waiterCondition = new Condition(waiterMutex); | 
 |  | 
 |         pool = new ParallelismThread[nWorkers]; | 
 |         foreach (ref poolThread; pool) | 
 |         { | 
 |             poolThread = new ParallelismThread(&startWorkLoop); | 
 |             poolThread.pool = this; | 
 |             poolThread.start(); | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     Implements a parallel foreach loop over a range.  This works by implicitly | 
 |     creating and submitting one `Task` to the `TaskPool` for each worker | 
 |     thread.  A work unit is a set of consecutive elements of `range` to | 
 |     be processed by a worker thread between communication with any other | 
 |     thread.  The number of elements processed per work unit is controlled by the | 
 |     `workUnitSize` parameter.  Smaller work units provide better load | 
 |     balancing, but larger work units avoid the overhead of communicating | 
 |     with other threads frequently to fetch the next work unit.  Large work | 
 |     units also avoid false sharing in cases where the range is being modified. | 
 |     The less time a single iteration of the loop takes, the larger | 
 |     `workUnitSize` should be.  For very expensive loop bodies, | 
 |     `workUnitSize` should  be 1.  An overload that chooses a default work | 
 |     unit size is also available. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     // Find the logarithm of every number from 1 to | 
 |     // 10_000_000 in parallel. | 
 |     auto logs = new double[10_000_000]; | 
 |  | 
 |     // Parallel foreach works with or without an index | 
 |     // variable.  It can iterate by ref if range.front | 
 |     // returns by ref. | 
 |  | 
 |     // Iterate over logs using work units of size 100. | 
 |     foreach (i, ref elem; taskPool.parallel(logs, 100)) | 
 |     { | 
 |         elem = log(i + 1.0); | 
 |     } | 
 |  | 
 |     // Same thing, but use the default work unit size. | 
 |     // | 
 |     // Timings on an Athlon 64 X2 dual core machine: | 
 |     // | 
 |     // Parallel foreach:  388 milliseconds | 
 |     // Regular foreach:   619 milliseconds | 
 |     foreach (i, ref elem; taskPool.parallel(logs)) | 
 |     { | 
 |         elem = log(i + 1.0); | 
 |     } | 
 |     --- | 
 |  | 
 |     Notes: | 
 |  | 
 |     The memory usage of this implementation is guaranteed to be constant | 
 |     in `range.length`. | 
 |  | 
 |     Breaking from a parallel foreach loop via a break, labeled break, | 
 |     labeled continue, return or goto statement throws a | 
 |     `ParallelForeachError`. | 
 |  | 
 |     In the case of non-random access ranges, parallel foreach buffers lazily | 
 |     to an array of size `workUnitSize` before executing the parallel portion | 
 |     of the loop.  The exception is that, if a parallel foreach is executed | 
 |     over a range returned by `asyncBuf` or `map`, the copying is elided | 
 |     and the buffers are simply swapped.  In this case `workUnitSize` is | 
 |     ignored and the work unit size is set to the  buffer size of `range`. | 
 |  | 
 |     A memory barrier is guaranteed to be executed on exit from the loop, | 
 |     so that results produced by all threads are visible in the calling thread. | 
 |  | 
 |     $(B Exception Handling): | 
 |  | 
 |     When at least one exception is thrown from inside a parallel foreach loop, | 
 |     the submission of additional `Task` objects is terminated as soon as | 
 |     possible, in a non-deterministic manner.  All executing or | 
 |     enqueued work units are allowed to complete.  Then, all exceptions that | 
 |     were thrown by any work unit are chained using `Throwable.next` and | 
 |     rethrown.  The order of the exception chaining is non-deterministic. | 
 |     */ | 
 |     ParallelForeach!R parallel(R)(R range, size_t workUnitSize) | 
 |     { | 
 |         import std.exception : enforce; | 
 |         enforce(workUnitSize > 0, "workUnitSize must be > 0."); | 
 |         alias RetType = ParallelForeach!R; | 
 |         return RetType(this, range, workUnitSize); | 
 |     } | 
 |  | 
 |  | 
 |     /// Ditto | 
 |     ParallelForeach!R parallel(R)(R range) | 
 |     { | 
 |         static if (hasLength!R) | 
 |         { | 
 |             // Default work unit size is such that we would use 4x as many | 
 |             // slots as are in this thread pool. | 
 |             size_t workUnitSize = defaultWorkUnitSize(range.length); | 
 |             return parallel(range, workUnitSize); | 
 |         } | 
 |         else | 
 |         { | 
 |             // Just use a really, really dumb guess if the user is too lazy to | 
 |             // specify. | 
 |             return parallel(range, 512); | 
 |         } | 
 |     } | 
 |  | 
 |     /// | 
 |     template amap(functions...) | 
 |     { | 
 |         /** | 
 |         Eager parallel map.  The eagerness of this function means it has less | 
 |         overhead than the lazily evaluated `TaskPool.map` and should be | 
 |         preferred where the memory requirements of eagerness are acceptable. | 
 |         `functions` are the functions to be evaluated, passed as template | 
 |         alias parameters in a style similar to | 
 |         $(REF map, std,algorithm,iteration). | 
 |         The first argument must be a random access range. For performance | 
 |         reasons, amap will assume the range elements have not yet been | 
 |         initialized. Elements will be overwritten without calling a destructor | 
 |         nor doing an assignment. As such, the range must not contain meaningful | 
 |         data$(DDOC_COMMENT not a section): either un-initialized objects, or | 
 |         objects in their `.init` state. | 
 |  | 
 |         --- | 
 |         auto numbers = iota(100_000_000.0); | 
 |  | 
 |         // Find the square roots of numbers. | 
 |         // | 
 |         // Timings on an Athlon 64 X2 dual core machine: | 
 |         // | 
 |         // Parallel eager map:                   0.802 s | 
 |         // Equivalent serial implementation:     1.768 s | 
 |         auto squareRoots = taskPool.amap!sqrt(numbers); | 
 |         --- | 
 |  | 
 |         Immediately after the range argument, an optional work unit size argument | 
 |         may be provided.  Work units as used by `amap` are identical to those | 
 |         defined for parallel foreach.  If no work unit size is provided, the | 
 |         default work unit size is used. | 
 |  | 
 |         --- | 
 |         // Same thing, but make work unit size 100. | 
 |         auto squareRoots = taskPool.amap!sqrt(numbers, 100); | 
 |         --- | 
 |  | 
 |         An output range for returning the results may be provided as the last | 
 |         argument.  If one is not provided, an array of the proper type will be | 
 |         allocated on the garbage collected heap.  If one is provided, it must be a | 
 |         random access range with assignable elements, must have reference | 
 |         semantics with respect to assignment to its elements, and must have the | 
 |         same length as the input range.  Writing to adjacent elements from | 
 |         different threads must be safe. | 
 |  | 
 |         --- | 
 |         // Same thing, but explicitly allocate an array | 
 |         // to return the results in.  The element type | 
 |         // of the array may be either the exact type | 
 |         // returned by functions or an implicit conversion | 
 |         // target. | 
 |         auto squareRoots = new float[numbers.length]; | 
 |         taskPool.amap!sqrt(numbers, squareRoots); | 
 |  | 
 |         // Multiple functions, explicit output range, and | 
 |         // explicit work unit size. | 
 |         auto results = new Tuple!(float, real)[numbers.length]; | 
 |         taskPool.amap!(sqrt, log)(numbers, 100, results); | 
 |         --- | 
 |  | 
 |         Note: | 
 |  | 
 |         A memory barrier is guaranteed to be executed after all results are written | 
 |         but before returning so that results produced by all threads are visible | 
 |         in the calling thread. | 
 |  | 
 |         Tips: | 
 |  | 
 |         To perform the mapping operation in place, provide the same range for the | 
 |         input and output range. | 
 |  | 
 |         To parallelize the copying of a range with expensive to evaluate elements | 
 |         to an array, pass an identity function (a function that just returns | 
 |         whatever argument is provided to it) to `amap`. | 
 |  | 
 |         $(B Exception Handling): | 
 |  | 
 |         When at least one exception is thrown from inside the map functions, | 
 |         the submission of additional `Task` objects is terminated as soon as | 
 |         possible, in a non-deterministic manner.  All currently executing or | 
 |         enqueued work units are allowed to complete.  Then, all exceptions that | 
 |         were thrown from any work unit are chained using `Throwable.next` and | 
 |         rethrown.  The order of the exception chaining is non-deterministic. | 
 |         */ | 
 |         auto amap(Args...)(Args args) | 
 |         if (isRandomAccessRange!(Args[0])) | 
 |         { | 
 |             import core.internal.lifetime : emplaceRef; | 
 |  | 
 |             alias fun = adjoin!(staticMap!(unaryFun, functions)); | 
 |  | 
 |             alias range = args[0]; | 
 |             immutable len = range.length; | 
 |  | 
 |             static if ( | 
 |                 Args.length > 1 && | 
 |                 randAssignable!(Args[$ - 1]) && | 
 |                 is(MapType!(Args[0], functions) : ElementType!(Args[$ - 1])) | 
 |                 ) | 
 |             { | 
 |                 import std.conv : text; | 
 |                 import std.exception : enforce; | 
 |  | 
 |                 alias buf = args[$ - 1]; | 
 |                 alias args2 = args[0..$ - 1]; | 
 |                 alias Args2 = Args[0..$ - 1]; | 
 |                 enforce(buf.length == len, | 
 |                         text("Can't use a user supplied buffer that's the wrong ", | 
 |                              "size.  (Expected  :", len, " Got:  ", buf.length)); | 
 |             } | 
 |             else static if (randAssignable!(Args[$ - 1]) && Args.length > 1) | 
 |             { | 
 |                 static assert(0, "Wrong buffer type."); | 
 |             } | 
 |             else | 
 |             { | 
 |                 import std.array : uninitializedArray; | 
 |  | 
 |                 auto buf = uninitializedArray!(MapType!(Args[0], functions)[])(len); | 
 |                 alias args2 = args; | 
 |                 alias Args2 = Args; | 
 |             } | 
 |  | 
 |             if (!len) return buf; | 
 |  | 
 |             static if (isIntegral!(Args2[$ - 1])) | 
 |             { | 
 |                 static assert(args2.length == 2); | 
 |                 auto workUnitSize = cast(size_t) args2[1]; | 
 |             } | 
 |             else | 
 |             { | 
 |                 static assert(args2.length == 1, Args); | 
 |                 auto workUnitSize = defaultWorkUnitSize(range.length); | 
 |             } | 
 |  | 
 |             alias R = typeof(range); | 
 |  | 
 |             if (workUnitSize > len) | 
 |             { | 
 |                 workUnitSize = len; | 
 |             } | 
 |  | 
 |             // Handle as a special case: | 
 |             if (size == 0) | 
 |             { | 
 |                 size_t index = 0; | 
 |                 foreach (elem; range) | 
 |                 { | 
 |                     emplaceRef(buf[index++], fun(elem)); | 
 |                 } | 
 |                 return buf; | 
 |             } | 
 |  | 
 |             // Effectively -1:  chunkIndex + 1 == 0: | 
 |             shared size_t workUnitIndex = size_t.max; | 
 |             shared bool shouldContinue = true; | 
 |  | 
 |             void doIt() | 
 |             { | 
 |                 import std.algorithm.comparison : min; | 
 |  | 
 |                 scope(failure) | 
 |                 { | 
 |                     // If an exception is thrown, all threads should bail. | 
 |                     atomicStore(shouldContinue, false); | 
 |                 } | 
 |  | 
 |                 while (atomicLoad(shouldContinue)) | 
 |                 { | 
 |                     immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); | 
 |                     immutable start = workUnitSize * myUnitIndex; | 
 |                     if (start >= len) | 
 |                     { | 
 |                         atomicStore(shouldContinue, false); | 
 |                         break; | 
 |                     } | 
 |  | 
 |                     immutable end = min(len, start + workUnitSize); | 
 |  | 
 |                     static if (hasSlicing!R) | 
 |                     { | 
 |                         auto subrange = range[start .. end]; | 
 |                         foreach (i; start .. end) | 
 |                         { | 
 |                             emplaceRef(buf[i], fun(subrange.front)); | 
 |                             subrange.popFront(); | 
 |                         } | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         foreach (i; start .. end) | 
 |                         { | 
 |                             emplaceRef(buf[i], fun(range[i])); | 
 |                         } | 
 |                     } | 
 |                 } | 
 |             } | 
 |  | 
 |             submitAndExecute(this, &doIt); | 
 |             return buf; | 
 |         } | 
 |     } | 
 |  | 
 |     /// | 
 |     template map(functions...) | 
 |     { | 
 |         /** | 
 |         A semi-lazy parallel map that can be used for pipelining.  The map | 
 |         functions are evaluated for the first `bufSize` elements and stored in a | 
 |         buffer and made available to `popFront`.  Meanwhile, in the | 
 |         background a second buffer of the same size is filled.  When the first | 
 |         buffer is exhausted, it is swapped with the second buffer and filled while | 
 |         the values from what was originally the second buffer are read.  This | 
 |         implementation allows for elements to be written to the buffer without | 
 |         the need for atomic operations or synchronization for each write, and | 
 |         enables the mapping function to be evaluated efficiently in parallel. | 
 |  | 
 |         `map` has more overhead than the simpler procedure used by `amap` | 
 |         but avoids the need to keep all results in memory simultaneously and works | 
 |         with non-random access ranges. | 
 |  | 
 |         Params: | 
 |  | 
 |         source = The $(REF_ALTTEXT input range, isInputRange, std,range,primitives) | 
 |         to be mapped.  If `source` is not random | 
 |         access it will be lazily buffered to an array of size `bufSize` before | 
 |         the map function is evaluated.  (For an exception to this rule, see Notes.) | 
 |  | 
 |         bufSize = The size of the buffer to store the evaluated elements. | 
 |  | 
 |         workUnitSize = The number of elements to evaluate in a single | 
 |         `Task`.  Must be less than or equal to `bufSize`, and | 
 |         should be a fraction of `bufSize` such that all worker threads can be | 
 |         used.  If the default of size_t.max is used, workUnitSize will be set to | 
 |         the pool-wide default. | 
 |  | 
 |         Returns:  An input range representing the results of the map.  This range | 
 |                   has a length iff `source` has a length. | 
 |  | 
 |         Notes: | 
 |  | 
 |         If a range returned by `map` or `asyncBuf` is used as an input to | 
 |         `map`, then as an optimization the copying from the output buffer | 
 |         of the first range to the input buffer of the second range is elided, even | 
 |         though the ranges returned by `map` and `asyncBuf` are non-random | 
 |         access ranges.  This means that the `bufSize` parameter passed to the | 
 |         current call to `map` will be ignored and the size of the buffer | 
 |         will be the buffer size of `source`. | 
 |  | 
 |         Example: | 
 |         --- | 
 |         // Pipeline reading a file, converting each line | 
 |         // to a number, taking the logarithms of the numbers, | 
 |         // and performing the additions necessary to find | 
 |         // the sum of the logarithms. | 
 |  | 
 |         auto lineRange = File("numberList.txt").byLine(); | 
 |         auto dupedLines = std.algorithm.map!"a.idup"(lineRange); | 
 |         auto nums = taskPool.map!(to!double)(dupedLines); | 
 |         auto logs = taskPool.map!log10(nums); | 
 |  | 
 |         double sum = 0; | 
 |         foreach (elem; logs) | 
 |         { | 
 |             sum += elem; | 
 |         } | 
 |         --- | 
 |  | 
 |         $(B Exception Handling): | 
 |  | 
 |         Any exceptions thrown while iterating over `source` | 
 |         or computing the map function are re-thrown on a call to `popFront` or, | 
 |         if thrown during construction, are simply allowed to propagate to the | 
 |         caller.  In the case of exceptions thrown while computing the map function, | 
 |         the exceptions are chained as in `TaskPool.amap`. | 
 |         */ | 
 |         auto | 
 |         map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max) | 
 |         if (isInputRange!S) | 
 |         { | 
 |             import std.exception : enforce; | 
 |  | 
 |             enforce(workUnitSize == size_t.max || workUnitSize <= bufSize, | 
 |                     "Work unit size must be smaller than buffer size."); | 
 |             alias fun = adjoin!(staticMap!(unaryFun, functions)); | 
 |  | 
 |             static final class Map | 
 |             { | 
 |                 // This is a class because the task needs to be located on the | 
 |                 // heap and in the non-random access case source needs to be on | 
 |                 // the heap, too. | 
 |  | 
 |             private: | 
 |                 enum bufferTrick = is(typeof(source.buf1)) && | 
 |                 is(typeof(source.bufPos)) && | 
 |                 is(typeof(source.doBufSwap())); | 
 |  | 
 |                 alias E = MapType!(S, functions); | 
 |                 E[] buf1, buf2; | 
 |                 S source; | 
 |                 TaskPool pool; | 
 |                 Task!(run, E[] delegate(E[]), E[]) nextBufTask; | 
 |                 size_t workUnitSize; | 
 |                 size_t bufPos; | 
 |                 bool lastTaskWaited; | 
 |  | 
 |             static if (isRandomAccessRange!S) | 
 |             { | 
 |                 alias FromType = S; | 
 |  | 
 |                 void popSource() | 
 |                 { | 
 |                     import std.algorithm.comparison : min; | 
 |  | 
 |                     static if (__traits(compiles, source[0 .. source.length])) | 
 |                     { | 
 |                         source = source[min(buf1.length, source.length)..source.length]; | 
 |                     } | 
 |                     else static if (__traits(compiles, source[0..$])) | 
 |                     { | 
 |                         source = source[min(buf1.length, source.length)..$]; | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         static assert(0, "S must have slicing for Map." | 
 |                                       ~ "  " ~ S.stringof ~ " doesn't."); | 
 |                     } | 
 |                 } | 
 |             } | 
 |             else static if (bufferTrick) | 
 |             { | 
 |                 // Make sure we don't have the buffer recycling overload of | 
 |                 // asyncBuf. | 
 |                 static if ( | 
 |                     is(typeof(source.source)) && | 
 |                     isRoundRobin!(typeof(source.source)) | 
 |                 ) | 
 |                 { | 
 |                     static assert(0, "Cannot execute a parallel map on " ~ | 
 |                                   "the buffer recycling overload of asyncBuf." | 
 |                                  ); | 
 |                 } | 
 |  | 
 |                 alias FromType = typeof(source.buf1); | 
 |                 FromType from; | 
 |  | 
 |                 // Just swap our input buffer with source's output buffer. | 
 |                 // No need to copy element by element. | 
 |                 FromType dumpToFrom() | 
 |                 { | 
 |                     import std.algorithm.mutation : swap; | 
 |  | 
 |                     assert(source.buf1.length <= from.length); | 
 |                     from.length = source.buf1.length; | 
 |                     swap(source.buf1, from); | 
 |  | 
 |                     // Just in case this source has been popped before | 
 |                     // being sent to map: | 
 |                     from = from[source.bufPos..$]; | 
 |  | 
 |                     static if (is(typeof(source._length))) | 
 |                     { | 
 |                         source._length -= (from.length - source.bufPos); | 
 |                     } | 
 |  | 
 |                     source.doBufSwap(); | 
 |  | 
 |                     return from; | 
 |                 } | 
 |             } | 
 |             else | 
 |             { | 
 |                 alias FromType = ElementType!S[]; | 
 |  | 
 |                 // The temporary array that data is copied to before being | 
 |                 // mapped. | 
 |                 FromType from; | 
 |  | 
 |                 FromType dumpToFrom() | 
 |                 { | 
 |                     assert(from !is null); | 
 |  | 
 |                     size_t i; | 
 |                     for (; !source.empty && i < from.length; source.popFront()) | 
 |                     { | 
 |                         from[i++] = source.front; | 
 |                     } | 
 |  | 
 |                     from = from[0 .. i]; | 
 |                     return from; | 
 |                 } | 
 |             } | 
 |  | 
 |             static if (hasLength!S) | 
 |             { | 
 |                 size_t _length; | 
 |  | 
 |                 public @property size_t length() const @safe pure nothrow | 
 |                 { | 
 |                     return _length; | 
 |                 } | 
 |             } | 
 |  | 
 |                 this(S source, size_t bufSize, size_t workUnitSize, TaskPool pool) | 
 |                 { | 
 |                     static if (bufferTrick) | 
 |                     { | 
 |                         bufSize = source.buf1.length; | 
 |                     } | 
 |  | 
 |                     buf1.length = bufSize; | 
 |                     buf2.length = bufSize; | 
 |  | 
 |                     static if (!isRandomAccessRange!S) | 
 |                     { | 
 |                         from.length = bufSize; | 
 |                     } | 
 |  | 
 |                     this.workUnitSize = (workUnitSize == size_t.max) ? | 
 |                             pool.defaultWorkUnitSize(bufSize) : workUnitSize; | 
 |                     this.source = source; | 
 |                     this.pool = pool; | 
 |  | 
 |                     static if (hasLength!S) | 
 |                     { | 
 |                         _length = source.length; | 
 |                     } | 
 |  | 
 |                     buf1 = fillBuf(buf1); | 
 |                     submitBuf2(); | 
 |                 } | 
 |  | 
 |                 // The from parameter is a dummy and ignored in the random access | 
 |                 // case. | 
 |                 E[] fillBuf(E[] buf) | 
 |                 { | 
 |                     import std.algorithm.comparison : min; | 
 |  | 
 |                     static if (isRandomAccessRange!S) | 
 |                     { | 
 |                         import std.range : take; | 
 |                         auto toMap = take(source, buf.length); | 
 |                         scope(success) popSource(); | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         auto toMap = dumpToFrom(); | 
 |                     } | 
 |  | 
 |                     buf = buf[0 .. min(buf.length, toMap.length)]; | 
 |  | 
 |                     // Handle as a special case: | 
 |                     if (pool.size == 0) | 
 |                     { | 
 |                         size_t index = 0; | 
 |                         foreach (elem; toMap) | 
 |                         { | 
 |                             buf[index++] = fun(elem); | 
 |                         } | 
 |                         return buf; | 
 |                     } | 
 |  | 
 |                     pool.amap!functions(toMap, workUnitSize, buf); | 
 |  | 
 |                     return buf; | 
 |                 } | 
 |  | 
 |                 void submitBuf2() | 
 |                 in | 
 |                 { | 
 |                     assert(nextBufTask.prev is null); | 
 |                     assert(nextBufTask.next is null); | 
 |                 } | 
 |                 do | 
 |                 { | 
 |                     // Hack to reuse the task object. | 
 |  | 
 |                     nextBufTask = typeof(nextBufTask).init; | 
 |                     nextBufTask._args[0] = &fillBuf; | 
 |                     nextBufTask._args[1] = buf2; | 
 |                     pool.put(nextBufTask); | 
 |                 } | 
 |  | 
 |                 void doBufSwap() | 
 |                 { | 
 |                     if (lastTaskWaited) | 
 |                     { | 
 |                         // Then the source is empty.  Signal it here. | 
 |                         buf1 = null; | 
 |                         buf2 = null; | 
 |  | 
 |                         static if (!isRandomAccessRange!S) | 
 |                         { | 
 |                             from = null; | 
 |                         } | 
 |  | 
 |                         return; | 
 |                     } | 
 |  | 
 |                     buf2 = buf1; | 
 |                     buf1 = nextBufTask.yieldForce; | 
 |                     bufPos = 0; | 
 |  | 
 |                     if (source.empty) | 
 |                     { | 
 |                         lastTaskWaited = true; | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         submitBuf2(); | 
 |                     } | 
 |                 } | 
 |  | 
 |             public: | 
 |                 @property auto front() | 
 |                 { | 
 |                     return buf1[bufPos]; | 
 |                 } | 
 |  | 
 |                 void popFront() | 
 |                 { | 
 |                     static if (hasLength!S) | 
 |                     { | 
 |                         _length--; | 
 |                     } | 
 |  | 
 |                     bufPos++; | 
 |                     if (bufPos >= buf1.length) | 
 |                     { | 
 |                         doBufSwap(); | 
 |                     } | 
 |                 } | 
 |  | 
 |                 static if (isInfinite!S) | 
 |                 { | 
 |                     enum bool empty = false; | 
 |                 } | 
 |                 else | 
 |                 { | 
 |  | 
 |                     bool empty() const @property | 
 |                     { | 
 |                         // popFront() sets this when source is empty | 
 |                         return buf1.length == 0; | 
 |                     } | 
 |                 } | 
 |             } | 
 |             return new Map(source, bufSize, workUnitSize, this); | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     Given a `source` range that is expensive to iterate over, returns an | 
 |     $(REF_ALTTEXT input range, isInputRange, std,range,primitives) that | 
 |     asynchronously buffers the contents of `source` into a buffer of `bufSize` elements in a worker thread, | 
 |     while making previously buffered elements from a second buffer, also of size | 
 |     `bufSize`, available via the range interface of the returned | 
 |     object.  The returned range has a length iff `hasLength!S`. | 
 |     `asyncBuf` is useful, for example, when performing expensive operations | 
 |     on the elements of ranges that represent data on a disk or network. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     import std.conv, std.stdio; | 
 |  | 
 |     void main() | 
 |     { | 
 |         // Fetch lines of a file in a background thread | 
 |         // while processing previously fetched lines, | 
 |         // dealing with byLine's buffer recycling by | 
 |         // eagerly duplicating every line. | 
 |         auto lines = File("foo.txt").byLine(); | 
 |         auto duped = std.algorithm.map!"a.idup"(lines); | 
 |  | 
 |         // Fetch more lines in the background while we | 
 |         // process the lines already read into memory | 
 |         // into a matrix of doubles. | 
 |         double[][] matrix; | 
 |         auto asyncReader = taskPool.asyncBuf(duped); | 
 |  | 
 |         foreach (line; asyncReader) | 
 |         { | 
 |             auto ls = line.split("\t"); | 
 |             matrix ~= to!(double[])(ls); | 
 |         } | 
 |     } | 
 |     --- | 
 |  | 
 |     $(B Exception Handling): | 
 |  | 
 |     Any exceptions thrown while iterating over `source` are re-thrown on a | 
 |     call to `popFront` or, if thrown during construction, simply | 
 |     allowed to propagate to the caller. | 
 |     */ | 
 |     auto asyncBuf(S)(S source, size_t bufSize = 100) | 
 |     if (isInputRange!S) | 
 |     { | 
 |         static final class AsyncBuf | 
 |         { | 
 |             // This is a class because the task and source both need to be on | 
 |             // the heap. | 
 |  | 
 |             // The element type of S. | 
 |             alias E = ElementType!S;  // Needs to be here b/c of forward ref bugs. | 
 |  | 
 |         private: | 
 |             E[] buf1, buf2; | 
 |             S source; | 
 |             TaskPool pool; | 
 |             Task!(run, E[] delegate(E[]), E[]) nextBufTask; | 
 |             size_t bufPos; | 
 |             bool lastTaskWaited; | 
 |  | 
 |             static if (hasLength!S) | 
 |             { | 
 |                 size_t _length; | 
 |  | 
 |                 // Available if hasLength!S. | 
 |                 public @property size_t length() const @safe pure nothrow | 
 |                 { | 
 |                     return _length; | 
 |                 } | 
 |             } | 
 |  | 
 |             this(S source, size_t bufSize, TaskPool pool) | 
 |             { | 
 |                 buf1.length = bufSize; | 
 |                 buf2.length = bufSize; | 
 |  | 
 |                 this.source = source; | 
 |                 this.pool = pool; | 
 |  | 
 |                 static if (hasLength!S) | 
 |                 { | 
 |                     _length = source.length; | 
 |                 } | 
 |  | 
 |                 buf1 = fillBuf(buf1); | 
 |                 submitBuf2(); | 
 |             } | 
 |  | 
 |             E[] fillBuf(E[] buf) | 
 |             { | 
 |                 assert(buf !is null); | 
 |  | 
 |                 size_t i; | 
 |                 for (; !source.empty && i < buf.length; source.popFront()) | 
 |                 { | 
 |                     buf[i++] = source.front; | 
 |                 } | 
 |  | 
 |                 buf = buf[0 .. i]; | 
 |                 return buf; | 
 |             } | 
 |  | 
 |             void submitBuf2() | 
 |             in | 
 |             { | 
 |                 assert(nextBufTask.prev is null); | 
 |                 assert(nextBufTask.next is null); | 
 |             } | 
 |             do | 
 |             { | 
 |                 // Hack to reuse the task object. | 
 |  | 
 |                 nextBufTask = typeof(nextBufTask).init; | 
 |                 nextBufTask._args[0] = &fillBuf; | 
 |                 nextBufTask._args[1] = buf2; | 
 |                 pool.put(nextBufTask); | 
 |             } | 
 |  | 
 |             void doBufSwap() | 
 |             { | 
 |                 if (lastTaskWaited) | 
 |                 { | 
 |                     // Then source is empty.  Signal it here. | 
 |                     buf1 = null; | 
 |                     buf2 = null; | 
 |                     return; | 
 |                 } | 
 |  | 
 |                 buf2 = buf1; | 
 |                 buf1 = nextBufTask.yieldForce; | 
 |                 bufPos = 0; | 
 |  | 
 |                 if (source.empty) | 
 |                 { | 
 |                     lastTaskWaited = true; | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     submitBuf2(); | 
 |                 } | 
 |             } | 
 |  | 
 |         public: | 
 |             E front() @property | 
 |             { | 
 |                 return buf1[bufPos]; | 
 |             } | 
 |  | 
 |             void popFront() | 
 |             { | 
 |                 static if (hasLength!S) | 
 |                 { | 
 |                     _length--; | 
 |                 } | 
 |  | 
 |                 bufPos++; | 
 |                 if (bufPos >= buf1.length) | 
 |                 { | 
 |                     doBufSwap(); | 
 |                 } | 
 |             } | 
 |  | 
 |             static if (isInfinite!S) | 
 |             { | 
 |                 enum bool empty = false; | 
 |             } | 
 |  | 
 |             else | 
 |             { | 
 |                 /// | 
 |                 bool empty() @property | 
 |                 { | 
 |                     // popFront() sets this when source is empty: | 
 |                     return buf1.length == 0; | 
 |                 } | 
 |             } | 
 |         } | 
 |         return new AsyncBuf(source, bufSize, this); | 
 |     } | 
 |  | 
 |     /** | 
 |     Given a callable object `next` that writes to a user-provided buffer and | 
 |     a second callable object `empty` that determines whether more data is | 
 |     available to write via `next`, returns an input range that | 
 |     asynchronously calls `next` with a set of size `nBuffers` of buffers | 
 |     and makes the results available in the order they were obtained via the | 
 |     input range interface of the returned object.  Similarly to the | 
 |     input range overload of `asyncBuf`, the first half of the buffers | 
 |     are made available via the range interface while the second half are | 
 |     filled and vice-versa. | 
 |  | 
 |     Params: | 
 |  | 
 |     next = A callable object that takes a single argument that must be an array | 
 |            with mutable elements.  When called, `next` writes data to | 
 |            the array provided by the caller. | 
 |  | 
 |     empty = A callable object that takes no arguments and returns a type | 
 |             implicitly convertible to `bool`.  This is used to signify | 
 |             that no more data is available to be obtained by calling `next`. | 
 |  | 
 |     initialBufSize = The initial size of each buffer.  If `next` takes its | 
 |                      array by reference, it may resize the buffers. | 
 |  | 
 |     nBuffers = The number of buffers to cycle through when calling `next`. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     // Fetch lines of a file in a background | 
 |     // thread while processing previously fetched | 
 |     // lines, without duplicating any lines. | 
 |     auto file = File("foo.txt"); | 
 |  | 
 |     void next(ref char[] buf) | 
 |     { | 
 |         file.readln(buf); | 
 |     } | 
 |  | 
 |     // Fetch more lines in the background while we | 
 |     // process the lines already read into memory | 
 |     // into a matrix of doubles. | 
 |     double[][] matrix; | 
 |     auto asyncReader = taskPool.asyncBuf(&next, &file.eof); | 
 |  | 
 |     foreach (line; asyncReader) | 
 |     { | 
 |         auto ls = line.split("\t"); | 
 |         matrix ~= to!(double[])(ls); | 
 |     } | 
 |     --- | 
 |  | 
 |     $(B Exception Handling): | 
 |  | 
 |     Any exceptions thrown while iterating over `range` are re-thrown on a | 
 |     call to `popFront`. | 
 |  | 
 |     Warning: | 
 |  | 
 |     Using the range returned by this function in a parallel foreach loop | 
 |     will not work because buffers may be overwritten while the task that | 
 |     processes them is in queue.  This is checked for at compile time | 
 |     and will result in a static assertion failure. | 
 |     */ | 
 |     auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100) | 
 |     if (is(typeof(C2.init()) : bool) && | 
 |         Parameters!C1.length == 1 && | 
 |         Parameters!C2.length == 0 && | 
 |         isArray!(Parameters!C1[0]) | 
 |     ) { | 
 |         auto roundRobin = RoundRobinBuffer!(C1, C2)(next, empty, initialBufSize, nBuffers); | 
 |         return asyncBuf(roundRobin, nBuffers / 2); | 
 |     } | 
 |  | 
 |     /// | 
 |     template reduce(functions...) | 
 |     { | 
 |         /** | 
 |         Parallel reduce on a random access range.  Except as otherwise noted, | 
 |         usage is similar to $(REF _reduce, std,algorithm,iteration).  There is | 
 |         also $(LREF fold) which does the same thing with a different parameter | 
 |         order. | 
 |  | 
 |         This function works by splitting the range to be reduced into work | 
 |         units, which are slices to be reduced in parallel.  Once the results | 
 |         from all work units are computed, a final serial reduction is performed | 
 |         on these results to compute the final answer. Therefore, care must be | 
 |         taken to choose the seed value appropriately. | 
 |  | 
 |         Because the reduction is being performed in parallel, `functions` | 
 |         must be associative.  For notational simplicity, let # be an | 
 |         infix operator representing `functions`.  Then, (a # b) # c must equal | 
 |         a # (b # c).  Floating point addition is not associative | 
 |         even though addition in exact arithmetic is.  Summing floating | 
 |         point numbers using this function may give different results than summing | 
 |         serially.  However, for many practical purposes floating point addition | 
 |         can be treated as associative. | 
 |  | 
 |         Note that, since `functions` are assumed to be associative, | 
 |         additional optimizations are made to the serial portion of the reduction | 
 |         algorithm. These take advantage of the instruction level parallelism of | 
 |         modern CPUs, in addition to the thread-level parallelism that the rest | 
 |         of this module exploits.  This can lead to better than linear speedups | 
 |         relative to $(REF _reduce, std,algorithm,iteration), especially for | 
 |         fine-grained benchmarks like dot products. | 
 |  | 
 |         An explicit seed may be provided as the first argument.  If | 
 |         provided, it is used as the seed for all work units and for the final | 
 |         reduction of results from all work units.  Therefore, if it is not the | 
 |         identity value for the operation being performed, results may differ | 
 |         from those generated by $(REF _reduce, std,algorithm,iteration) or | 
 |         depending on how many work units are used.  The next argument must be | 
 |         the range to be reduced. | 
 |         --- | 
 |         // Find the sum of squares of a range in parallel, using | 
 |         // an explicit seed. | 
 |         // | 
 |         // Timings on an Athlon 64 X2 dual core machine: | 
 |         // | 
 |         // Parallel reduce:                     72 milliseconds | 
 |         // Using std.algorithm.reduce instead:  181 milliseconds | 
 |         auto nums = iota(10_000_000.0f); | 
 |         auto sumSquares = taskPool.reduce!"a + b"( | 
 |             0.0, std.algorithm.map!"a * a"(nums) | 
 |         ); | 
 |         --- | 
 |  | 
 |         If no explicit seed is provided, the first element of each work unit | 
 |         is used as a seed.  For the final reduction, the result from the first | 
 |         work unit is used as the seed. | 
 |         --- | 
 |         // Find the sum of a range in parallel, using the first | 
 |         // element of each work unit as the seed. | 
 |         auto sum = taskPool.reduce!"a + b"(nums); | 
 |         --- | 
 |  | 
 |         An explicit work unit size may be specified as the last argument. | 
 |         Specifying too small a work unit size will effectively serialize the | 
 |         reduction, as the final reduction of the result of each work unit will | 
 |         dominate computation time.  If `TaskPool.size` for this instance | 
 |         is zero, this parameter is ignored and one work unit is used. | 
 |         --- | 
 |         // Use a work unit size of 100. | 
 |         auto sum2 = taskPool.reduce!"a + b"(nums, 100); | 
 |  | 
 |         // Work unit size of 100 and explicit seed. | 
 |         auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100); | 
 |         --- | 
 |  | 
 |         Parallel reduce supports multiple functions, like | 
 |         `std.algorithm.reduce`. | 
 |         --- | 
 |         // Find both the min and max of nums. | 
 |         auto minMax = taskPool.reduce!(min, max)(nums); | 
 |         assert(minMax[0] == reduce!min(nums)); | 
 |         assert(minMax[1] == reduce!max(nums)); | 
 |         --- | 
 |  | 
 |         $(B Exception Handling): | 
 |  | 
 |         After this function is finished executing, any exceptions thrown | 
 |         are chained together via `Throwable.next` and rethrown.  The chaining | 
 |         order is non-deterministic. | 
 |  | 
 |         See_Also: | 
 |  | 
 |             $(LREF fold) is functionally equivalent to $(LREF _reduce) except the | 
 |             range parameter comes first and there is no need to use | 
 |             $(REF_ALTTEXT `tuple`,tuple,std,typecons) for multiple seeds. | 
 |          */ | 
 |         auto reduce(Args...)(Args args) | 
 |         { | 
 |             import core.exception : OutOfMemoryError; | 
 |             import core.internal.lifetime : emplaceRef; | 
 |             import std.exception : enforce; | 
 |  | 
 |             alias fun = reduceAdjoin!functions; | 
 |             alias finishFun = reduceFinish!functions; | 
 |  | 
 |             static if (isIntegral!(Args[$ - 1])) | 
 |             { | 
 |                 size_t workUnitSize = cast(size_t) args[$ - 1]; | 
 |                 alias args2 = args[0..$ - 1]; | 
 |                 alias Args2 = Args[0..$ - 1]; | 
 |             } | 
 |             else | 
 |             { | 
 |                 alias args2 = args; | 
 |                 alias Args2 = Args; | 
 |             } | 
 |  | 
 |             auto makeStartValue(Type)(Type e) | 
 |             { | 
 |                 static if (functions.length == 1) | 
 |                 { | 
 |                     return e; | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     typeof(adjoin!(staticMap!(binaryFun, functions))(e, e)) seed = void; | 
 |                     foreach (i, T; seed.Types) | 
 |                     { | 
 |                         emplaceRef(seed.expand[i], e); | 
 |                     } | 
 |  | 
 |                     return seed; | 
 |                 } | 
 |             } | 
 |  | 
 |             static if (args2.length == 2) | 
 |             { | 
 |                 static assert(isInputRange!(Args2[1])); | 
 |                 alias range = args2[1]; | 
 |                 alias seed = args2[0]; | 
 |                 enum explicitSeed = true; | 
 |  | 
 |                 static if (!is(typeof(workUnitSize))) | 
 |                 { | 
 |                     size_t workUnitSize = defaultWorkUnitSize(range.length); | 
 |                 } | 
 |             } | 
 |             else | 
 |             { | 
 |                 static assert(args2.length == 1); | 
 |                 alias range = args2[0]; | 
 |  | 
 |                 static if (!is(typeof(workUnitSize))) | 
 |                 { | 
 |                     size_t workUnitSize = defaultWorkUnitSize(range.length); | 
 |                 } | 
 |  | 
 |                 enforce(!range.empty, | 
 |                     "Cannot reduce an empty range with first element as start value."); | 
 |  | 
 |                 auto seed = makeStartValue(range.front); | 
 |                 enum explicitSeed = false; | 
 |                 range.popFront(); | 
 |             } | 
 |  | 
 |             alias E = typeof(seed); | 
 |             alias R = typeof(range); | 
 |  | 
 |             E reduceOnRange(R range, size_t lowerBound, size_t upperBound) | 
 |             { | 
 |                 // This is for exploiting instruction level parallelism by | 
 |                 // using multiple accumulator variables within each thread, | 
 |                 // since we're assuming functions are associative anyhow. | 
 |  | 
 |                 // This is so that loops can be unrolled automatically. | 
 |                 enum ilpTuple = AliasSeq!(0, 1, 2, 3, 4, 5); | 
 |                 enum nILP = ilpTuple.length; | 
 |                 immutable subSize = (upperBound - lowerBound) / nILP; | 
 |  | 
 |                 if (subSize <= 1) | 
 |                 { | 
 |                     // Handle as a special case. | 
 |                     static if (explicitSeed) | 
 |                     { | 
 |                         E result = seed; | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         E result = makeStartValue(range[lowerBound]); | 
 |                         lowerBound++; | 
 |                     } | 
 |  | 
 |                     foreach (i; lowerBound .. upperBound) | 
 |                     { | 
 |                         result = fun(result, range[i]); | 
 |                     } | 
 |  | 
 |                     return result; | 
 |                 } | 
 |  | 
 |                 assert(subSize > 1); | 
 |                 E[nILP] results; | 
 |                 size_t[nILP] offsets; | 
 |  | 
 |                 foreach (i; ilpTuple) | 
 |                 { | 
 |                     offsets[i] = lowerBound + subSize * i; | 
 |  | 
 |                     static if (explicitSeed) | 
 |                     { | 
 |                         results[i] = seed; | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         results[i] = makeStartValue(range[offsets[i]]); | 
 |                         offsets[i]++; | 
 |                     } | 
 |                 } | 
 |  | 
 |                 immutable nLoop = subSize - (!explicitSeed); | 
 |                 foreach (i; 0 .. nLoop) | 
 |                 { | 
 |                     foreach (j; ilpTuple) | 
 |                     { | 
 |                         results[j] = fun(results[j], range[offsets[j]]); | 
 |                         offsets[j]++; | 
 |                     } | 
 |                 } | 
 |  | 
 |                 // Finish the remainder. | 
 |                 foreach (i; nILP * subSize + lowerBound .. upperBound) | 
 |                 { | 
 |                     results[$ - 1] = fun(results[$ - 1], range[i]); | 
 |                 } | 
 |  | 
 |                 foreach (i; ilpTuple[1..$]) | 
 |                 { | 
 |                     results[0] = finishFun(results[0], results[i]); | 
 |                 } | 
 |  | 
 |                 return results[0]; | 
 |             } | 
 |  | 
 |             immutable len = range.length; | 
 |             if (len == 0) | 
 |             { | 
 |                 return seed; | 
 |             } | 
 |  | 
 |             if (this.size == 0) | 
 |             { | 
 |                 return finishFun(seed, reduceOnRange(range, 0, len)); | 
 |             } | 
 |  | 
 |             // Unlike the rest of the functions here, I can't use the Task object | 
 |             // recycling trick here because this has to work on non-commutative | 
 |             // operations.  After all the tasks are done executing, fun() has to | 
 |             // be applied on the results of these to get a final result, but | 
 |             // it can't be evaluated out of order. | 
 |  | 
 |             if (workUnitSize > len) | 
 |             { | 
 |                 workUnitSize = len; | 
 |             } | 
 |  | 
 |             immutable size_t nWorkUnits = (len / workUnitSize) + ((len % workUnitSize == 0) ? 0 : 1); | 
 |             assert(nWorkUnits * workUnitSize >= len); | 
 |  | 
 |             alias RTask = Task!(run, typeof(&reduceOnRange), R, size_t, size_t); | 
 |             RTask[] tasks; | 
 |  | 
 |             // Can't use alloca() due to https://issues.dlang.org/show_bug.cgi?id=3753 | 
 |             // Use a fixed buffer backed by malloc(). | 
 |             enum maxStack = 2_048; | 
 |             byte[maxStack] buf = void; | 
 |             immutable size_t nBytesNeeded = nWorkUnits * RTask.sizeof; | 
 |  | 
 |             import core.stdc.stdlib : malloc, free; | 
 |             if (nBytesNeeded <= maxStack) | 
 |             { | 
 |                 tasks = (cast(RTask*) buf.ptr)[0 .. nWorkUnits]; | 
 |             } | 
 |             else | 
 |             { | 
 |                 auto ptr = cast(RTask*) malloc(nBytesNeeded); | 
 |                 if (!ptr) | 
 |                 { | 
 |                     throw new OutOfMemoryError( | 
 |                         "Out of memory in std.parallelism." | 
 |                     ); | 
 |                 } | 
 |  | 
 |                 tasks = ptr[0 .. nWorkUnits]; | 
 |             } | 
 |  | 
 |             scope(exit) | 
 |             { | 
 |                 if (nBytesNeeded > maxStack) | 
 |                 { | 
 |                     free(tasks.ptr); | 
 |                 } | 
 |             } | 
 |  | 
 |             // Hack to take the address of a nested function w/o | 
 |             // making a closure. | 
 |             static auto scopedAddress(D)(scope D del) @system | 
 |             { | 
 |                 auto tmp = del; | 
 |                 return tmp; | 
 |             } | 
 |  | 
 |             size_t curPos = 0; | 
 |             void useTask(ref RTask task) | 
 |             { | 
 |                 import std.algorithm.comparison : min; | 
 |                 import core.lifetime : emplace; | 
 |  | 
 |                 // Private constructor, so can't feed it's arguments directly | 
 |                 // to emplace | 
 |                 emplace(&task, RTask | 
 |                 ( | 
 |                     scopedAddress(&reduceOnRange), | 
 |                     range, | 
 |                     curPos, // lower bound. | 
 |                     cast() min(len, curPos + workUnitSize)  // upper bound. | 
 |                 )); | 
 |  | 
 |                 task.pool = this; | 
 |  | 
 |                 curPos += workUnitSize; | 
 |             } | 
 |  | 
 |             foreach (ref task; tasks) | 
 |             { | 
 |                 useTask(task); | 
 |             } | 
 |  | 
 |             foreach (i; 1 .. tasks.length - 1) | 
 |             { | 
 |                 tasks[i].next = tasks[i + 1].basePtr; | 
 |                 tasks[i + 1].prev = tasks[i].basePtr; | 
 |             } | 
 |  | 
 |             if (tasks.length > 1) | 
 |             { | 
 |                 queueLock(); | 
 |                 scope(exit) queueUnlock(); | 
 |  | 
 |                 abstractPutGroupNoSync( | 
 |                     tasks[1].basePtr, | 
 |                     tasks[$ - 1].basePtr | 
 |                 ); | 
 |             } | 
 |  | 
 |             if (tasks.length > 0) | 
 |             { | 
 |                 try | 
 |                 { | 
 |                     tasks[0].job(); | 
 |                 } | 
 |                 catch (Throwable e) | 
 |                 { | 
 |                     tasks[0].exception = e; | 
 |                 } | 
 |                 tasks[0].taskStatus = TaskStatus.done; | 
 |  | 
 |                 // Try to execute each of these in the current thread | 
 |                 foreach (ref task; tasks[1..$]) | 
 |                 { | 
 |                     tryDeleteExecute(task.basePtr); | 
 |                 } | 
 |             } | 
 |  | 
 |             // Now that we've tried to execute every task, they're all either | 
 |             // done or in progress.  Force all of them. | 
 |             E result = seed; | 
 |  | 
 |             Throwable firstException; | 
 |  | 
 |             foreach (ref task; tasks) | 
 |             { | 
 |                 try | 
 |                 { | 
 |                     task.yieldForce; | 
 |                 } | 
 |                 catch (Throwable e) | 
 |                 { | 
 |                     /* Chain e to front because order doesn't matter and because | 
 |                      * e is not likely to be a chain itself (so fewer traversals) | 
 |                      */ | 
 |                     firstException = Throwable.chainTogether(e, firstException); | 
 |                     continue; | 
 |                 } | 
 |  | 
 |                 if (!firstException) result = finishFun(result, task.returnVal); | 
 |             } | 
 |  | 
 |             if (firstException) throw firstException; | 
 |  | 
 |             return result; | 
 |         } | 
 |     } | 
 |  | 
 |     /// | 
 |     template fold(functions...) | 
 |     { | 
 |         /** Implements the homonym function (also known as `accumulate`, `compress`, | 
 |             `inject`, or `foldl`) present in various programming languages of | 
 |             functional flavor. | 
 |  | 
 |             `fold` is functionally equivalent to $(LREF reduce) except the range | 
 |             parameter comes first and there is no need to use $(REF_ALTTEXT | 
 |             `tuple`,tuple,std,typecons) for multiple seeds. | 
 |  | 
 |             There may be one or more callable entities (`functions` argument) to | 
 |             apply. | 
 |  | 
 |             Params: | 
 |                 args = Just the range to _fold over; or the range and one seed | 
 |                        per function; or the range, one seed per function, and | 
 |                        the work unit size | 
 |  | 
 |             Returns: | 
 |                 The accumulated result as a single value for single function and | 
 |                 as a tuple of values for multiple functions | 
 |  | 
 |             See_Also: | 
 |             Similar to $(REF _fold, std,algorithm,iteration), `fold` is a wrapper around $(LREF reduce). | 
 |  | 
 |             Example: | 
 |             --- | 
 |             static int adder(int a, int b) | 
 |             { | 
 |                 return a + b; | 
 |             } | 
 |             static int multiplier(int a, int b) | 
 |             { | 
 |                 return a * b; | 
 |             } | 
 |  | 
 |             // Just the range | 
 |             auto x = taskPool.fold!adder([1, 2, 3, 4]); | 
 |             assert(x == 10); | 
 |  | 
 |             // The range and the seeds (0 and 1 below; also note multiple | 
 |             // functions in this example) | 
 |             auto y = taskPool.fold!(adder, multiplier)([1, 2, 3, 4], 0, 1); | 
 |             assert(y[0] == 10); | 
 |             assert(y[1] == 24); | 
 |  | 
 |             // The range, the seed (0), and the work unit size (20) | 
 |             auto z = taskPool.fold!adder([1, 2, 3, 4], 0, 20); | 
 |             assert(z == 10); | 
 |             --- | 
 |         */ | 
 |         auto fold(Args...)(Args args) | 
 |         { | 
 |             static assert(isInputRange!(Args[0]), "First argument must be an InputRange"); | 
 |  | 
 |             alias range = args[0]; | 
 |  | 
 |             static if (Args.length == 1) | 
 |             { | 
 |                 // Just the range | 
 |                 return reduce!functions(range); | 
 |             } | 
 |             else static if (Args.length == 1 + functions.length || | 
 |                             Args.length == 1 + functions.length + 1) | 
 |             { | 
 |                 static if (functions.length == 1) | 
 |                 { | 
 |                     alias seeds = args[1]; | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     auto seeds() | 
 |                     { | 
 |                         import std.typecons : tuple; | 
 |                         return tuple(args[1 .. functions.length+1]); | 
 |                     } | 
 |                 } | 
 |  | 
 |                 static if (Args.length == 1 + functions.length) | 
 |                 { | 
 |                     // The range and the seeds | 
 |                     return reduce!functions(seeds, range); | 
 |                 } | 
 |                 else static if (Args.length == 1 + functions.length + 1) | 
 |                 { | 
 |                     // The range, the seeds, and the work unit size | 
 |                     static assert(isIntegral!(Args[$-1]), "Work unit size must be an integral type"); | 
 |                     return reduce!functions(seeds, range, args[$-1]); | 
 |                 } | 
 |             } | 
 |             else | 
 |             { | 
 |                 import std.conv : text; | 
 |                 static assert(0, "Invalid number of arguments (" ~ Args.length.text ~ "): Should be an input range, " | 
 |                               ~ functions.length.text ~ " optional seed(s), and an optional work unit size."); | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     // This test is not included in the documentation because even though these | 
 |     // examples are for the inner fold() template, with their current location, | 
 |     // they would appear under the outer one. (We can't move this inside the | 
 |     // outer fold() template because then dmd runs out of memory possibly due to | 
 |     // recursive template instantiation, which is surprisingly not caught.) | 
 |     @system unittest | 
 |     { | 
 |         // Just the range | 
 |         auto x = taskPool.fold!"a + b"([1, 2, 3, 4]); | 
 |         assert(x == 10); | 
 |  | 
 |         // The range and the seeds (0 and 1 below; also note multiple | 
 |         // functions in this example) | 
 |         auto y = taskPool.fold!("a + b", "a * b")([1, 2, 3, 4], 0, 1); | 
 |         assert(y[0] == 10); | 
 |         assert(y[1] == 24); | 
 |  | 
 |         // The range, the seed (0), and the work unit size (20) | 
 |         auto z = taskPool.fold!"a + b"([1, 2, 3, 4], 0, 20); | 
 |         assert(z == 10); | 
 |     } | 
 |  | 
 |     /** | 
 |     Gets the index of the current thread relative to this `TaskPool`.  Any | 
 |     thread not in this pool will receive an index of 0.  The worker threads in | 
 |     this pool receive unique indices of 1 through `this.size`. | 
 |  | 
 |     This function is useful for maintaining worker-local resources. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     // Execute a loop that computes the greatest common | 
 |     // divisor of every number from 0 through 999 with | 
 |     // 42 in parallel.  Write the results out to | 
 |     // a set of files, one for each thread.  This allows | 
 |     // results to be written out without any synchronization. | 
 |  | 
 |     import std.conv, std.range, std.numeric, std.stdio; | 
 |  | 
 |     void main() | 
 |     { | 
 |         auto filesHandles = new File[taskPool.size + 1]; | 
 |         scope(exit) { | 
 |             foreach (ref handle; fileHandles) | 
 |             { | 
 |                 handle.close(); | 
 |             } | 
 |         } | 
 |  | 
 |         foreach (i, ref handle; fileHandles) | 
 |         { | 
 |             handle = File("workerResults" ~ to!string(i) ~ ".txt"); | 
 |         } | 
 |  | 
 |         foreach (num; parallel(iota(1_000))) | 
 |         { | 
 |             auto outHandle = fileHandles[taskPool.workerIndex]; | 
 |             outHandle.writeln(num, '\t', gcd(num, 42)); | 
 |         } | 
 |     } | 
 |     --- | 
 |     */ | 
 |     size_t workerIndex() @property @safe const nothrow | 
 |     { | 
 |         immutable rawInd = threadIndex; | 
 |         return (rawInd >= instanceStartIndex && rawInd < instanceStartIndex + size) ? | 
 |                 (rawInd - instanceStartIndex + 1) : 0; | 
 |     } | 
 |  | 
 |     /** | 
 |     Struct for creating worker-local storage.  Worker-local storage is | 
 |     thread-local storage that exists only for worker threads in a given | 
 |     `TaskPool` plus a single thread outside the pool.  It is allocated on the | 
 |     garbage collected heap in a way that avoids _false sharing, and doesn't | 
 |     necessarily have global scope within any thread.  It can be accessed from | 
 |     any worker thread in the `TaskPool` that created it, and one thread | 
 |     outside this `TaskPool`.  All threads outside the pool that created a | 
 |     given instance of worker-local storage share a single slot. | 
 |  | 
 |     Since the underlying data for this struct is heap-allocated, this struct | 
 |     has reference semantics when passed between functions. | 
 |  | 
 |     The main uses cases for `WorkerLocalStorage` are: | 
 |  | 
 |     1.  Performing parallel reductions with an imperative, as opposed to | 
 |         functional, programming style.  In this case, it's useful to treat | 
 |         `WorkerLocalStorage` as local to each thread for only the parallel | 
 |         portion of an algorithm. | 
 |  | 
 |     2.  Recycling temporary buffers across iterations of a parallel foreach loop. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     // Calculate pi as in our synopsis example, but | 
 |     // use an imperative instead of a functional style. | 
 |     immutable n = 1_000_000_000; | 
 |     immutable delta = 1.0L / n; | 
 |  | 
 |     auto sums = taskPool.workerLocalStorage(0.0L); | 
 |     foreach (i; parallel(iota(n))) | 
 |     { | 
 |         immutable x = ( i - 0.5L ) * delta; | 
 |         immutable toAdd = delta / ( 1.0 + x * x ); | 
 |         sums.get += toAdd; | 
 |     } | 
 |  | 
 |     // Add up the results from each worker thread. | 
 |     real pi = 0; | 
 |     foreach (threadResult; sums.toRange) | 
 |     { | 
 |         pi += 4.0L * threadResult; | 
 |     } | 
 |     --- | 
 |      */ | 
 |     static struct WorkerLocalStorage(T) | 
 |     { | 
 |     private: | 
 |         TaskPool pool; | 
 |         size_t size; | 
 |  | 
 |         size_t elemSize; | 
 |         bool* stillThreadLocal; | 
 |  | 
 |         static size_t roundToLine(size_t num) pure nothrow | 
 |         { | 
 |             if (num % cacheLineSize == 0) | 
 |             { | 
 |                 return num; | 
 |             } | 
 |             else | 
 |             { | 
 |                 return ((num / cacheLineSize) + 1) * cacheLineSize; | 
 |             } | 
 |         } | 
 |  | 
 |         void* data; | 
 |  | 
 |         void initialize(TaskPool pool) | 
 |         { | 
 |             this.pool = pool; | 
 |             size = pool.size + 1; | 
 |             stillThreadLocal = new bool; | 
 |             *stillThreadLocal = true; | 
 |  | 
 |             // Determines whether the GC should scan the array. | 
 |             auto blkInfo = (typeid(T).flags & 1) ? | 
 |                            cast(GC.BlkAttr) 0 : | 
 |                            GC.BlkAttr.NO_SCAN; | 
 |  | 
 |             immutable nElem = pool.size + 1; | 
 |             elemSize = roundToLine(T.sizeof); | 
 |  | 
 |             // The + 3 is to pad one full cache line worth of space on either side | 
 |             // of the data structure to make sure false sharing with completely | 
 |             // unrelated heap data is prevented, and to provide enough padding to | 
 |             // make sure that data is cache line-aligned. | 
 |             data = GC.malloc(elemSize * (nElem + 3), blkInfo) + elemSize; | 
 |  | 
 |             // Cache line align data ptr. | 
 |             data = cast(void*) roundToLine(cast(size_t) data); | 
 |  | 
 |             foreach (i; 0 .. nElem) | 
 |             { | 
 |                 this.opIndex(i) = T.init; | 
 |             } | 
 |         } | 
 |  | 
 |         ref opIndex(this Qualified)(size_t index) | 
 |         { | 
 |             import std.conv : text; | 
 |             assert(index < size, text(index, '\t', uint.max)); | 
 |             return *(cast(CopyTypeQualifiers!(Qualified, T)*) (data + elemSize * index)); | 
 |         } | 
 |  | 
 |         void opIndexAssign(T val, size_t index) | 
 |         { | 
 |             assert(index < size); | 
 |             *(cast(T*) (data + elemSize * index)) = val; | 
 |         } | 
 |  | 
 |     public: | 
 |         /** | 
 |         Get the current thread's instance.  Returns by ref. | 
 |         Note that calling `get` from any thread | 
 |         outside the `TaskPool` that created this instance will return the | 
 |         same reference, so an instance of worker-local storage should only be | 
 |         accessed from one thread outside the pool that created it.  If this | 
 |         rule is violated, undefined behavior will result. | 
 |  | 
 |         If assertions are enabled and `toRange` has been called, then this | 
 |         WorkerLocalStorage instance is no longer worker-local and an assertion | 
 |         failure will result when calling this method.  This is not checked | 
 |         when assertions are disabled for performance reasons. | 
 |          */ | 
 |         ref get(this Qualified)() @property | 
 |         { | 
 |             assert(*stillThreadLocal, | 
 |                 "Cannot call get() on this instance of WorkerLocalStorage " ~ | 
 |                 "because it is no longer worker-local." | 
 |             ); | 
 |             return opIndex(pool.workerIndex); | 
 |         } | 
 |  | 
 |         /** | 
 |         Assign a value to the current thread's instance.  This function has | 
 |         the same caveats as its overload. | 
 |         */ | 
 |         void get(T val) @property | 
 |         { | 
 |             assert(*stillThreadLocal, | 
 |                 "Cannot call get() on this instance of WorkerLocalStorage " ~ | 
 |                 "because it is no longer worker-local." | 
 |             ); | 
 |  | 
 |             opIndexAssign(val, pool.workerIndex); | 
 |         } | 
 |  | 
 |         /** | 
 |         Returns a range view of the values for all threads, which can be used | 
 |         to further process the results of each thread after running the parallel | 
 |         part of your algorithm.  Do not use this method in the parallel portion | 
 |         of your algorithm. | 
 |  | 
 |         Calling this function sets a flag indicating that this struct is no | 
 |         longer worker-local, and attempting to use the `get` method again | 
 |         will result in an assertion failure if assertions are enabled. | 
 |          */ | 
 |         WorkerLocalStorageRange!T toRange() @property | 
 |         { | 
 |             if (*stillThreadLocal) | 
 |             { | 
 |                 *stillThreadLocal = false; | 
 |  | 
 |                 // Make absolutely sure results are visible to all threads. | 
 |                 // This is probably not necessary since some other | 
 |                 // synchronization primitive will be used to signal that the | 
 |                 // parallel part of the algorithm is done, but the | 
 |                 // performance impact should be negligible, so it's better | 
 |                 // to be safe. | 
 |                 ubyte barrierDummy; | 
 |                 atomicSetUbyte(barrierDummy, 1); | 
 |             } | 
 |  | 
 |             return WorkerLocalStorageRange!T(this); | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     Range primitives for worker-local storage.  The purpose of this is to | 
 |     access results produced by each worker thread from a single thread once you | 
 |     are no longer using the worker-local storage from multiple threads. | 
 |     Do not use this struct in the parallel portion of your algorithm. | 
 |  | 
 |     The proper way to instantiate this object is to call | 
 |     `WorkerLocalStorage.toRange`.  Once instantiated, this object behaves | 
 |     as a finite random-access range with assignable, lvalue elements and | 
 |     a length equal to the number of worker threads in the `TaskPool` that | 
 |     created it plus 1. | 
 |      */ | 
 |     static struct WorkerLocalStorageRange(T) | 
 |     { | 
 |     private: | 
 |         WorkerLocalStorage!T workerLocalStorage; | 
 |  | 
 |         size_t _length; | 
 |         size_t beginOffset; | 
 |  | 
 |         this(WorkerLocalStorage!T wl) | 
 |         { | 
 |             this.workerLocalStorage = wl; | 
 |             _length = wl.size; | 
 |         } | 
 |  | 
 |     public: | 
 |         ref front(this Qualified)() @property | 
 |         { | 
 |             return this[0]; | 
 |         } | 
 |  | 
 |         ref back(this Qualified)() @property | 
 |         { | 
 |             return this[_length - 1]; | 
 |         } | 
 |  | 
 |         void popFront() | 
 |         { | 
 |             if (_length > 0) | 
 |             { | 
 |                 beginOffset++; | 
 |                 _length--; | 
 |             } | 
 |         } | 
 |  | 
 |         void popBack() | 
 |         { | 
 |             if (_length > 0) | 
 |             { | 
 |                 _length--; | 
 |             } | 
 |         } | 
 |  | 
 |         typeof(this) save() @property | 
 |         { | 
 |             return this; | 
 |         } | 
 |  | 
 |         ref opIndex(this Qualified)(size_t index) | 
 |         { | 
 |             assert(index < _length); | 
 |             return workerLocalStorage[index + beginOffset]; | 
 |         } | 
 |  | 
 |         void opIndexAssign(T val, size_t index) | 
 |         { | 
 |             assert(index < _length); | 
 |             workerLocalStorage[index] = val; | 
 |         } | 
 |  | 
 |         typeof(this) opSlice(size_t lower, size_t upper) | 
 |         { | 
 |             assert(upper <= _length); | 
 |             auto newWl = this.workerLocalStorage; | 
 |             newWl.data += lower * newWl.elemSize; | 
 |             newWl.size = upper - lower; | 
 |             return typeof(this)(newWl); | 
 |         } | 
 |  | 
 |         bool empty() const @property | 
 |         { | 
 |             return length == 0; | 
 |         } | 
 |  | 
 |         size_t length() const @property | 
 |         { | 
 |             return _length; | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     Creates an instance of worker-local storage, initialized with a given | 
 |     value.  The value is `lazy` so that you can, for example, easily | 
 |     create one instance of a class for each worker.  For usage example, | 
 |     see the `WorkerLocalStorage` struct. | 
 |      */ | 
 |     WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init) | 
 |     { | 
 |         WorkerLocalStorage!T ret; | 
 |         ret.initialize(this); | 
 |         foreach (i; 0 .. size + 1) | 
 |         { | 
 |             ret[i] = initialVal; | 
 |         } | 
 |  | 
 |         // Memory barrier to make absolutely sure that what we wrote is | 
 |         // visible to worker threads. | 
 |         ubyte barrierDummy; | 
 |         atomicSetUbyte(barrierDummy, 0); | 
 |  | 
 |         return ret; | 
 |     } | 
 |  | 
 |     /** | 
 |     Signals to all worker threads to terminate as soon as they are finished | 
 |     with their current `Task`, or immediately if they are not executing a | 
 |     `Task`.  `Task`s that were in queue will not be executed unless | 
 |     a call to `Task.workForce`, `Task.yieldForce` or `Task.spinForce` | 
 |     causes them to be executed. | 
 |  | 
 |     Use only if you have waited on every `Task` and therefore know the | 
 |     queue is empty, or if you speculatively executed some tasks and no longer | 
 |     need the results. | 
 |      */ | 
 |     void stop() @trusted | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         atomicSetUbyte(status, PoolState.stopNow); | 
 |         notifyAll(); | 
 |     } | 
 |  | 
 |     /** | 
 |     Signals worker threads to terminate when the queue becomes empty. | 
 |  | 
 |     If blocking argument is true, wait for all worker threads to terminate | 
 |     before returning.  This option might be used in applications where | 
 |     task results are never consumed-- e.g. when `TaskPool` is employed as a | 
 |     rudimentary scheduler for tasks which communicate by means other than | 
 |     return values. | 
 |  | 
 |     Warning:  Calling this function with $(D blocking = true) from a worker | 
 |               thread that is a member of the same `TaskPool` that | 
 |               `finish` is being called on will result in a deadlock. | 
 |      */ | 
 |     void finish(bool blocking = false) @trusted | 
 |     { | 
 |         { | 
 |             queueLock(); | 
 |             scope(exit) queueUnlock(); | 
 |             atomicCasUbyte(status, PoolState.running, PoolState.finishing); | 
 |             notifyAll(); | 
 |         } | 
 |         if (blocking) | 
 |         { | 
 |             // Use this thread as a worker until everything is finished. | 
 |             executeWorkLoop(); | 
 |  | 
 |             foreach (t; pool) | 
 |             { | 
 |                 // Maybe there should be something here to prevent a thread | 
 |                 // from calling join() on itself if this function is called | 
 |                 // from a worker thread in the same pool, but: | 
 |                 // | 
 |                 // 1.  Using an if statement to skip join() would result in | 
 |                 //     finish() returning without all tasks being finished. | 
 |                 // | 
 |                 // 2.  If an exception were thrown, it would bubble up to the | 
 |                 //     Task from which finish() was called and likely be | 
 |                 //     swallowed. | 
 |                 t.join(); | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     /// Returns the number of worker threads in the pool. | 
 |     @property size_t size() @safe const pure nothrow | 
 |     { | 
 |         return pool.length; | 
 |     } | 
 |  | 
 |     /** | 
 |     Put a `Task` object on the back of the task queue.  The `Task` | 
 |     object may be passed by pointer or reference. | 
 |  | 
 |     Example: | 
 |     --- | 
 |     import std.file; | 
 |  | 
 |     // Create a task. | 
 |     auto t = task!read("foo.txt"); | 
 |  | 
 |     // Add it to the queue to be executed. | 
 |     taskPool.put(t); | 
 |     --- | 
 |  | 
 |     Notes: | 
 |  | 
 |     @trusted overloads of this function are called for `Task`s if | 
 |     $(REF hasUnsharedAliasing, std,traits) is false for the `Task`'s | 
 |     return type or the function the `Task` executes is `pure`. | 
 |     `Task` objects that meet all other requirements specified in the | 
 |     `@trusted` overloads of `task` and `scopedTask` may be created | 
 |     and executed from `@safe` code via `Task.executeInNewThread` but | 
 |     not via `TaskPool`. | 
 |  | 
 |     While this function takes the address of variables that may | 
 |     be on the stack, some overloads are marked as @trusted. | 
 |     `Task` includes a destructor that waits for the task to complete | 
 |     before destroying the stack frame it is allocated on.  Therefore, | 
 |     it is impossible for the stack frame to be destroyed before the task is | 
 |     complete and no longer referenced by a `TaskPool`. | 
 |     */ | 
 |     void put(alias fun, Args...)(ref Task!(fun, Args) task) | 
 |     if (!isSafeReturn!(typeof(task))) | 
 |     { | 
 |         task.pool = this; | 
 |         abstractPut(task.basePtr); | 
 |     } | 
 |  | 
 |     /// Ditto | 
 |     void put(alias fun, Args...)(Task!(fun, Args)* task) | 
 |     if (!isSafeReturn!(typeof(*task))) | 
 |     { | 
 |         import std.exception : enforce; | 
 |         enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); | 
 |         put(*task); | 
 |     } | 
 |  | 
 |     @trusted void put(alias fun, Args...)(ref Task!(fun, Args) task) | 
 |     if (isSafeReturn!(typeof(task))) | 
 |     { | 
 |         task.pool = this; | 
 |         abstractPut(task.basePtr); | 
 |     } | 
 |  | 
 |     @trusted void put(alias fun, Args...)(Task!(fun, Args)* task) | 
 |     if (isSafeReturn!(typeof(*task))) | 
 |     { | 
 |         import std.exception : enforce; | 
 |         enforce(task !is null, "Cannot put a null Task on a TaskPool queue."); | 
 |         put(*task); | 
 |     } | 
 |  | 
 |     /** | 
 |     These properties control whether the worker threads are daemon threads. | 
 |     A daemon thread is automatically terminated when all non-daemon threads | 
 |     have terminated.  A non-daemon thread will prevent a program from | 
 |     terminating as long as it has not terminated. | 
 |  | 
 |     If any `TaskPool` with non-daemon threads is active, either `stop` | 
 |     or `finish` must be called on it before the program can terminate. | 
 |  | 
 |     The worker treads in the `TaskPool` instance returned by the | 
 |     `taskPool` property are daemon by default.  The worker threads of | 
 |     manually instantiated task pools are non-daemon by default. | 
 |  | 
 |     Note:  For a size zero pool, the getter arbitrarily returns true and the | 
 |            setter has no effect. | 
 |     */ | 
 |     bool isDaemon() @property @trusted | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         return (size == 0) ? true : pool[0].isDaemon; | 
 |     } | 
 |  | 
 |     /// Ditto | 
 |     void isDaemon(bool newVal) @property @trusted | 
 |     { | 
 |         queueLock(); | 
 |         scope(exit) queueUnlock(); | 
 |         foreach (thread; pool) | 
 |         { | 
 |             thread.isDaemon = newVal; | 
 |         } | 
 |     } | 
 |  | 
 |     /** | 
 |     These functions allow getting and setting the OS scheduling priority of | 
 |     the worker threads in this `TaskPool`.  They forward to | 
 |     `core.thread.Thread.priority`, so a given priority value here means the | 
 |     same thing as an identical priority value in `core.thread`. | 
 |  | 
 |     Note:  For a size zero pool, the getter arbitrarily returns | 
 |            `core.thread.Thread.PRIORITY_MIN` and the setter has no effect. | 
 |     */ | 
 |     int priority() @property @trusted | 
 |     { | 
 |         return (size == 0) ? core.thread.Thread.PRIORITY_MIN : | 
 |         pool[0].priority; | 
 |     } | 
 |  | 
 |     /// Ditto | 
 |     void priority(int newPriority) @property @trusted | 
 |     { | 
 |         if (size > 0) | 
 |         { | 
 |             foreach (t; pool) | 
 |             { | 
 |                 t.priority = newPriority; | 
 |             } | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | @system unittest | 
 | { | 
 |     import std.algorithm.iteration : sum; | 
 |     import std.range : iota; | 
 |     import std.typecons : tuple; | 
 |  | 
 |     enum N = 100; | 
 |     auto r = iota(1, N + 1); | 
 |     const expected = r.sum(); | 
 |  | 
 |     // Just the range | 
 |     assert(taskPool.fold!"a + b"(r) == expected); | 
 |  | 
 |     // Range and seeds | 
 |     assert(taskPool.fold!"a + b"(r, 0) == expected); | 
 |     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0) == tuple(expected, expected)); | 
 |  | 
 |     // Range, seeds, and work unit size | 
 |     assert(taskPool.fold!"a + b"(r, 0, 42) == expected); | 
 |     assert(taskPool.fold!("a + b", "a + b")(r, 0, 0, 42) == tuple(expected, expected)); | 
 | } | 
 |  | 
 | // Issue 16705 | 
 | @system unittest | 
 | { | 
 |     struct MyIota | 
 |     { | 
 |         size_t front; | 
 |         void popFront()(){front++;} | 
 |         auto empty(){return front >= 25;} | 
 |         auto opIndex(size_t i){return front+i;} | 
 |         auto length(){return 25-front;} | 
 |     } | 
 |  | 
 |     auto mySum = taskPool.reduce!"a + b"(MyIota()); | 
 | } | 
 |  | 
 | /** | 
 | Returns a lazily initialized global instantiation of `TaskPool`. | 
 | This function can safely be called concurrently from multiple non-worker | 
 | threads.  The worker threads in this pool are daemon threads, meaning that it | 
 | is not necessary to call `TaskPool.stop` or `TaskPool.finish` before | 
 | terminating the main thread. | 
 | */ | 
 | @property TaskPool taskPool() @trusted | 
 | { | 
 |     import std.concurrency : initOnce; | 
 |     __gshared TaskPool pool; | 
 |     return initOnce!pool({ | 
 |         auto p = new TaskPool(defaultPoolThreads); | 
 |         p.isDaemon = true; | 
 |         return p; | 
 |     }()); | 
 | } | 
 |  | 
 | private shared uint _defaultPoolThreads = uint.max; | 
 |  | 
 | /** | 
 | These properties get and set the number of worker threads in the `TaskPool` | 
 | instance returned by `taskPool`.  The default value is `totalCPUs` - 1. | 
 | Calling the setter after the first call to `taskPool` does not changes | 
 | number of worker threads in the instance returned by `taskPool`. | 
 | */ | 
 | @property uint defaultPoolThreads() @trusted | 
 | { | 
 |     const local = atomicLoad(_defaultPoolThreads); | 
 |     return local < uint.max ? local : totalCPUs - 1; | 
 | } | 
 |  | 
 | /// Ditto | 
 | @property void defaultPoolThreads(uint newVal) @trusted | 
 | { | 
 |     atomicStore(_defaultPoolThreads, newVal); | 
 | } | 
 |  | 
 | /** | 
 | Convenience functions that forwards to `taskPool.parallel`.  The | 
 | purpose of these is to make parallel foreach less verbose and more | 
 | readable. | 
 |  | 
 | Example: | 
 | --- | 
 | // Find the logarithm of every number from | 
 | // 1 to 1_000_000 in parallel, using the | 
 | // default TaskPool instance. | 
 | auto logs = new double[1_000_000]; | 
 |  | 
 | foreach (i, ref elem; parallel(logs)) | 
 | { | 
 |     elem = log(i + 1.0); | 
 | } | 
 | --- | 
 |  | 
 | */ | 
 | ParallelForeach!R parallel(R)(R range) | 
 | { | 
 |     return taskPool.parallel(range); | 
 | } | 
 |  | 
 | /// Ditto | 
 | ParallelForeach!R parallel(R)(R range, size_t workUnitSize) | 
 | { | 
 |     return taskPool.parallel(range, workUnitSize); | 
 | } | 
 |  | 
 | //  `each` should be usable with parallel | 
 | // https://issues.dlang.org/show_bug.cgi?id=17019 | 
 | @system unittest | 
 | { | 
 |     import std.algorithm.iteration : each, sum; | 
 |     import std.range : iota; | 
 |  | 
 |     // check behavior with parallel | 
 |     auto arr = new int[10]; | 
 |     parallel(arr).each!((ref e) => e += 1); | 
 |     assert(arr.sum == 10); | 
 |  | 
 |     auto arrIndex = new int[10]; | 
 |     parallel(arrIndex).each!((i, ref e) => e += i); | 
 |     assert(arrIndex.sum == 10.iota.sum); | 
 | } | 
 |  | 
 | // https://issues.dlang.org/show_bug.cgi?id=22745 | 
 | @system unittest | 
 | { | 
 |     auto pool = new TaskPool(0); | 
 |     int[] empty; | 
 |     foreach (i; pool.parallel(empty)) {} | 
 |     pool.finish(); | 
 | } | 
 |  | 
 | // Thrown when a parallel foreach loop is broken from. | 
 | class ParallelForeachError : Error | 
 | { | 
 |     this() | 
 |     { | 
 |         super("Cannot break from a parallel foreach loop using break, return, " | 
 |               ~ "labeled break/continue or goto statements."); | 
 |     } | 
 | } | 
 |  | 
 | /*------Structs that implement opApply for parallel foreach.------------------*/ | 
 | private template randLen(R) | 
 | { | 
 |     enum randLen = isRandomAccessRange!R && hasLength!R; | 
 | } | 
 |  | 
 | private void submitAndExecute( | 
 |     TaskPool pool, | 
 |     scope void delegate() doIt | 
 | ) | 
 | { | 
 |     import core.exception : OutOfMemoryError; | 
 |     immutable nThreads = pool.size + 1; | 
 |  | 
 |     alias PTask = typeof(scopedTask(doIt)); | 
 |     import core.stdc.stdlib : malloc, free; | 
 |     import core.stdc.string : memcpy; | 
 |  | 
 |     // The logical thing to do would be to just use alloca() here, but that | 
 |     // causes problems on Windows for reasons that I don't understand | 
 |     // (tentatively a compiler bug) and definitely doesn't work on Posix due | 
 |     // to https://issues.dlang.org/show_bug.cgi?id=3753. | 
 |     // Therefore, allocate a fixed buffer and fall back to `malloc()` if | 
 |     // someone's using a ridiculous amount of threads. | 
 |     // Also, the using a byte array instead of a PTask array as the fixed buffer | 
 |     // is to prevent d'tors from being called on uninitialized excess PTask | 
 |     // instances. | 
 |     enum nBuf = 64; | 
 |     byte[nBuf * PTask.sizeof] buf = void; | 
 |     PTask[] tasks; | 
 |     if (nThreads <= nBuf) | 
 |     { | 
 |         tasks = (cast(PTask*) buf.ptr)[0 .. nThreads]; | 
 |     } | 
 |     else | 
 |     { | 
 |         auto ptr = cast(PTask*) malloc(nThreads * PTask.sizeof); | 
 |         if (!ptr) throw new OutOfMemoryError("Out of memory in std.parallelism."); | 
 |         tasks = ptr[0 .. nThreads]; | 
 |     } | 
 |  | 
 |     scope(exit) | 
 |     { | 
 |         if (nThreads > nBuf) | 
 |         { | 
 |             free(tasks.ptr); | 
 |         } | 
 |     } | 
 |  | 
 |     foreach (ref t; tasks) | 
 |     { | 
 |         import core.stdc.string : memcpy; | 
 |  | 
 |         // This silly looking code is necessary to prevent d'tors from being | 
 |         // called on uninitialized objects. | 
 |         auto temp = scopedTask(doIt); | 
 |         memcpy(&t, &temp, PTask.sizeof); | 
 |  | 
 |         // This has to be done to t after copying, not temp before copying. | 
 |         // Otherwise, temp's destructor will sit here and wait for the | 
 |         // task to finish. | 
 |         t.pool = pool; | 
 |     } | 
 |  | 
 |     foreach (i; 1 .. tasks.length - 1) | 
 |     { | 
 |         tasks[i].next = tasks[i + 1].basePtr; | 
 |         tasks[i + 1].prev = tasks[i].basePtr; | 
 |     } | 
 |  | 
 |     if (tasks.length > 1) | 
 |     { | 
 |         pool.queueLock(); | 
 |         scope(exit) pool.queueUnlock(); | 
 |  | 
 |         pool.abstractPutGroupNoSync( | 
 |             tasks[1].basePtr, | 
 |             tasks[$ - 1].basePtr | 
 |         ); | 
 |     } | 
 |  | 
 |     if (tasks.length > 0) | 
 |     { | 
 |         try | 
 |         { | 
 |             tasks[0].job(); | 
 |         } | 
 |         catch (Throwable e) | 
 |         { | 
 |             tasks[0].exception = e; // nocoverage | 
 |         } | 
 |         tasks[0].taskStatus = TaskStatus.done; | 
 |  | 
 |         // Try to execute each of these in the current thread | 
 |         foreach (ref task; tasks[1..$]) | 
 |         { | 
 |             pool.tryDeleteExecute(task.basePtr); | 
 |         } | 
 |     } | 
 |  | 
 |     Throwable firstException; | 
 |  | 
 |     foreach (i, ref task; tasks) | 
 |     { | 
 |         try | 
 |         { | 
 |             task.yieldForce; | 
 |         } | 
 |         catch (Throwable e) | 
 |         { | 
 |             /* Chain e to front because order doesn't matter and because | 
 |              * e is not likely to be a chain itself (so fewer traversals) | 
 |              */ | 
 |             firstException = Throwable.chainTogether(e, firstException); | 
 |             continue; | 
 |         } | 
 |     } | 
 |  | 
 |     if (firstException) throw firstException; | 
 | } | 
 |  | 
 | void foreachErr() | 
 | { | 
 |     throw new ParallelForeachError(); | 
 | } | 
 |  | 
 | int doSizeZeroCase(R, Delegate)(ref ParallelForeach!R p, Delegate dg) | 
 | { | 
 |     with(p) | 
 |     { | 
 |         int res = 0; | 
 |         size_t index = 0; | 
 |  | 
 |         // The explicit ElementType!R in the foreach loops is necessary for | 
 |         // correct behavior when iterating over strings. | 
 |         static if (hasLvalueElements!R) | 
 |         { | 
 |             foreach (ref ElementType!R elem; range) | 
 |             { | 
 |                 static if (Parameters!dg.length == 2) | 
 |                 { | 
 |                     res = dg(index, elem); | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     res = dg(elem); | 
 |                 } | 
 |                 if (res) break; | 
 |                 index++; | 
 |             } | 
 |         } | 
 |         else | 
 |         { | 
 |             foreach (ElementType!R elem; range) | 
 |             { | 
 |                 static if (Parameters!dg.length == 2) | 
 |                 { | 
 |                     res = dg(index, elem); | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     res = dg(elem); | 
 |                 } | 
 |                 if (res) break; | 
 |                 index++; | 
 |             } | 
 |         } | 
 |         if (res) foreachErr; | 
 |         return res; | 
 |     } | 
 | } | 
 |  | 
 | private enum string parallelApplyMixinRandomAccess = q{ | 
 |     // Handle empty thread pool as special case. | 
 |     if (pool.size == 0) | 
 |     { | 
 |         return doSizeZeroCase(this, dg); | 
 |     } | 
 |  | 
 |     // Whether iteration is with or without an index variable. | 
 |     enum withIndex = Parameters!(typeof(dg)).length == 2; | 
 |  | 
 |     shared size_t workUnitIndex = size_t.max;  // Effectively -1:  chunkIndex + 1 == 0 | 
 |     immutable len = range.length; | 
 |     if (!len) return 0; | 
 |  | 
 |     shared bool shouldContinue = true; | 
 |  | 
 |     void doIt() | 
 |     { | 
 |         import std.algorithm.comparison : min; | 
 |  | 
 |         scope(failure) | 
 |         { | 
 |             // If an exception is thrown, all threads should bail. | 
 |             atomicStore(shouldContinue, false); | 
 |         } | 
 |  | 
 |         while (atomicLoad(shouldContinue)) | 
 |         { | 
 |             immutable myUnitIndex = atomicOp!"+="(workUnitIndex, 1); | 
 |             immutable start = workUnitSize * myUnitIndex; | 
 |             if (start >= len) | 
 |             { | 
 |                 atomicStore(shouldContinue, false); | 
 |                 break; | 
 |             } | 
 |  | 
 |             immutable end = min(len, start + workUnitSize); | 
 |  | 
 |             foreach (i; start .. end) | 
 |             { | 
 |                 static if (withIndex) | 
 |                 { | 
 |                     if (dg(i, range[i])) foreachErr(); | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     if (dg(range[i])) foreachErr(); | 
 |                 } | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     submitAndExecute(pool, &doIt); | 
 |  | 
 |     return 0; | 
 | }; | 
 |  | 
 | enum string parallelApplyMixinInputRange = q{ | 
 |     // Handle empty thread pool as special case. | 
 |     if (pool.size == 0) | 
 |     { | 
 |         return doSizeZeroCase(this, dg); | 
 |     } | 
 |  | 
 |     // Whether iteration is with or without an index variable. | 
 |     enum withIndex = Parameters!(typeof(dg)).length == 2; | 
 |  | 
 |     // This protects the range while copying it. | 
 |     auto rangeMutex = new Mutex(); | 
 |  | 
 |     shared bool shouldContinue = true; | 
 |  | 
 |     // The total number of elements that have been popped off range. | 
 |     // This is updated only while protected by rangeMutex; | 
 |     size_t nPopped = 0; | 
 |  | 
 |     static if ( | 
 |         is(typeof(range.buf1)) && | 
 |         is(typeof(range.bufPos)) && | 
 |         is(typeof(range.doBufSwap())) | 
 |     ) | 
 |     { | 
 |         // Make sure we don't have the buffer recycling overload of | 
 |         // asyncBuf. | 
 |         static if ( | 
 |             is(typeof(range.source)) && | 
 |             isRoundRobin!(typeof(range.source)) | 
 |         ) | 
 |         { | 
 |             static assert(0, "Cannot execute a parallel foreach loop on " ~ | 
 |             "the buffer recycling overload of asyncBuf."); | 
 |         } | 
 |  | 
 |         enum bool bufferTrick = true; | 
 |     } | 
 |     else | 
 |     { | 
 |         enum bool bufferTrick = false; | 
 |     } | 
 |  | 
 |     void doIt() | 
 |     { | 
 |         scope(failure) | 
 |         { | 
 |             // If an exception is thrown, all threads should bail. | 
 |             atomicStore(shouldContinue, false); | 
 |         } | 
 |  | 
 |         static if (hasLvalueElements!R) | 
 |         { | 
 |             alias Temp = ElementType!R*[]; | 
 |             Temp temp; | 
 |  | 
 |             // Returns:  The previous value of nPopped. | 
 |             size_t makeTemp() | 
 |             { | 
 |                 import std.algorithm.internal : addressOf; | 
 |                 import std.array : uninitializedArray; | 
 |  | 
 |                 if (temp is null) | 
 |                 { | 
 |                     temp = uninitializedArray!Temp(workUnitSize); | 
 |                 } | 
 |  | 
 |                 rangeMutex.lock(); | 
 |                 scope(exit) rangeMutex.unlock(); | 
 |  | 
 |                 size_t i = 0; | 
 |                 for (; i < workUnitSize && !range.empty; range.popFront(), i++) | 
 |                 { | 
 |                     temp[i] = addressOf(range.front); | 
 |                 } | 
 |  | 
 |                 temp = temp[0 .. i]; | 
 |                 auto ret = nPopped; | 
 |                 nPopped += temp.length; | 
 |                 return ret; | 
 |             } | 
 |  | 
 |         } | 
 |         else | 
 |         { | 
 |  | 
 |             alias Temp = ElementType!R[]; | 
 |             Temp temp; | 
 |  | 
 |             // Returns:  The previous value of nPopped. | 
 |             static if (!bufferTrick) size_t makeTemp() | 
 |             { | 
 |                 import std.array : uninitializedArray; | 
 |  | 
 |                 if (temp is null) | 
 |                 { | 
 |                     temp = uninitializedArray!Temp(workUnitSize); | 
 |                 } | 
 |  | 
 |                 rangeMutex.lock(); | 
 |                 scope(exit) rangeMutex.unlock(); | 
 |  | 
 |                 size_t i = 0; | 
 |                 for (; i < workUnitSize && !range.empty; range.popFront(), i++) | 
 |                 { | 
 |                     temp[i] = range.front; | 
 |                 } | 
 |  | 
 |                 temp = temp[0 .. i]; | 
 |                 auto ret = nPopped; | 
 |                 nPopped += temp.length; | 
 |                 return ret; | 
 |             } | 
 |  | 
 |             static if (bufferTrick) size_t makeTemp() | 
 |             { | 
 |                 import std.algorithm.mutation : swap; | 
 |                 rangeMutex.lock(); | 
 |                 scope(exit) rangeMutex.unlock(); | 
 |  | 
 |                 // Elide copying by just swapping buffers. | 
 |                 temp.length = range.buf1.length; | 
 |                 swap(range.buf1, temp); | 
 |  | 
 |                 // This is necessary in case popFront() has been called on | 
 |                 // range before entering the parallel foreach loop. | 
 |                 temp = temp[range.bufPos..$]; | 
 |  | 
 |                 static if (is(typeof(range._length))) | 
 |                 { | 
 |                     range._length -= (temp.length - range.bufPos); | 
 |                 } | 
 |  | 
 |                 range.doBufSwap(); | 
 |                 auto ret = nPopped; | 
 |                 nPopped += temp.length; | 
 |                 return ret; | 
 |             } | 
 |         } | 
 |  | 
 |         while (atomicLoad(shouldContinue)) | 
 |         { | 
 |             auto overallIndex = makeTemp(); | 
 |             if (temp.empty) | 
 |             { | 
 |                 atomicStore(shouldContinue, false); | 
 |                 break; | 
 |             } | 
 |  | 
 |             foreach (i; 0 .. temp.length) | 
 |             { | 
 |                 scope(success) overallIndex++; | 
 |  | 
 |                 static if (hasLvalueElements!R) | 
 |                 { | 
 |                     static if (withIndex) | 
 |                     { | 
 |                         if (dg(overallIndex, *temp[i])) foreachErr(); | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         if (dg(*temp[i])) foreachErr(); | 
 |                     } | 
 |                 } | 
 |                 else | 
 |                 { | 
 |                     static if (withIndex) | 
 |                     { | 
 |                         if (dg(overallIndex, temp[i])) foreachErr(); | 
 |                     } | 
 |                     else | 
 |                     { | 
 |                         if (dg(temp[i])) foreachErr(); | 
 |                     } | 
 |                 } | 
 |             } | 
 |         } | 
 |     } | 
 |  | 
 |     submitAndExecute(pool, &doIt); | 
 |  | 
 |     return 0; | 
 | }; | 
 |  | 
 |  | 
 | private struct ParallelForeach(R) | 
 | { | 
 |     TaskPool pool; | 
 |     R range; | 
 |     size_t workUnitSize; | 
 |     alias E = ElementType!R; | 
 |  | 
 |     static if (hasLvalueElements!R) | 
 |     { | 
 |         alias NoIndexDg = int delegate(ref E); | 
 |         alias IndexDg = int delegate(size_t, ref E); | 
 |     } | 
 |     else | 
 |     { | 
 |         alias NoIndexDg = int delegate(E); | 
 |         alias IndexDg = int delegate(size_t, E); | 
 |     } | 
 |  | 
 |     int opApply(scope NoIndexDg dg) | 
 |     { | 
 |         static if (randLen!R) | 
 |         { | 
 |             mixin(parallelApplyMixinRandomAccess); | 
 |         } | 
 |         else | 
 |         { | 
 |             mixin(parallelApplyMixinInputRange); | 
 |         } | 
 |     } | 
 |  | 
 |     int opApply(scope IndexDg dg) | 
 |     { | 
 |         static if (randLen!R) | 
 |         { | 
 |             mixin(parallelApplyMixinRandomAccess); | 
 |         } | 
 |         else | 
 |         { | 
 |             mixin(parallelApplyMixinInputRange); | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | /* | 
 | This struct buffers the output of a callable that outputs data into a | 
 | user-supplied buffer into a set of buffers of some fixed size.  It allows these | 
 | buffers to be accessed with an input range interface.  This is used internally | 
 | in the buffer-recycling overload of TaskPool.asyncBuf, which creates an | 
 | instance and forwards it to the input range overload of asyncBuf. | 
 | */ | 
 | private struct RoundRobinBuffer(C1, C2) | 
 | { | 
 |     // No need for constraints because they're already checked for in asyncBuf. | 
 |  | 
 |     alias Array = Parameters!(C1.init)[0]; | 
 |     alias T = typeof(Array.init[0]); | 
 |  | 
 |     T[][] bufs; | 
 |     size_t index; | 
 |     C1 nextDel; | 
 |     C2 emptyDel; | 
 |     bool _empty; | 
 |     bool primed; | 
 |  | 
 |     this( | 
 |         C1 nextDel, | 
 |         C2 emptyDel, | 
 |         size_t initialBufSize, | 
 |         size_t nBuffers | 
 |     ) { | 
 |         this.nextDel = nextDel; | 
 |         this.emptyDel = emptyDel; | 
 |         bufs.length = nBuffers; | 
 |  | 
 |         foreach (ref buf; bufs) | 
 |         { | 
 |             buf.length = initialBufSize; | 
 |         } | 
 |     } | 
 |  | 
 |     void prime() | 
 |     in | 
 |     { | 
 |         assert(!empty); | 
 |     } | 
 |     do | 
 |     { | 
 |         scope(success) primed = true; | 
 |         nextDel(bufs[index]); | 
 |     } | 
 |  | 
 |  | 
 |     T[] front() @property | 
 |     in | 
 |     { | 
 |         assert(!empty); | 
 |     } | 
 |     do | 
 |     { | 
 |         if (!primed) prime(); | 
 |         return bufs[index]; | 
 |     } | 
 |  | 
 |     void popFront() | 
 |     { | 
 |         if (empty || emptyDel()) | 
 |         { | 
 |             _empty = true; | 
 |             return; | 
 |         } | 
 |  | 
 |         index = (index + 1) % bufs.length; | 
 |         primed = false; | 
 |     } | 
 |  | 
 |     bool empty() @property const @safe pure nothrow | 
 |     { | 
 |         return _empty; | 
 |     } | 
 | } | 
 |  | 
 | version (StdUnittest) | 
 | { | 
 |     // This was the only way I could get nested maps to work. | 
 |     private __gshared TaskPool poolInstance; | 
 | } | 
 |  | 
 | // These test basic functionality but don't stress test for threading bugs. | 
 | // These are the tests that should be run every time Phobos is compiled. | 
 | @system unittest | 
 | { | 
 |     import std.algorithm.comparison : equal, min, max; | 
 |     import std.algorithm.iteration : filter, map, reduce; | 
 |     import std.array : split; | 
 |     import std.conv : text; | 
 |     import std.exception : assertThrown; | 
 |     import std.math.operations : isClose; | 
 |     import std.math.algebraic : sqrt, abs; | 
 |     import std.math.exponential : log; | 
 |     import std.range : indexed, iota, join; | 
 |     import std.typecons : Tuple, tuple; | 
 |     import std.stdio; | 
 |  | 
 |     poolInstance = new TaskPool(2); | 
 |     scope(exit) poolInstance.stop(); | 
 |  | 
 |     // The only way this can be verified is manually. | 
 |     debug(std_parallelism) stderr.writeln("totalCPUs = ", totalCPUs); | 
 |  | 
 |     auto oldPriority = poolInstance.priority; | 
 |     poolInstance.priority = Thread.PRIORITY_MAX; | 
 |     assert(poolInstance.priority == Thread.PRIORITY_MAX); | 
 |  | 
 |     poolInstance.priority = Thread.PRIORITY_MIN; | 
 |     assert(poolInstance.priority == Thread.PRIORITY_MIN); | 
 |  | 
 |     poolInstance.priority = oldPriority; | 
 |     assert(poolInstance.priority == oldPriority); | 
 |  | 
 |     static void refFun(ref uint num) | 
 |     { | 
 |         num++; | 
 |     } | 
 |  | 
 |     uint x; | 
 |  | 
 |     // Test task(). | 
 |     auto t = task!refFun(x); | 
 |     poolInstance.put(t); | 
 |     t.yieldForce; | 
 |     assert(t.args[0] == 1); | 
 |  | 
 |     auto t2 = task(&refFun, x); | 
 |     poolInstance.put(t2); | 
 |     t2.yieldForce; | 
 |     assert(t2.args[0] == 1); | 
 |  | 
 |     // Test scopedTask(). | 
 |     auto st = scopedTask!refFun(x); | 
 |     poolInstance.put(st); | 
 |     st.yieldForce; | 
 |     assert(st.args[0] == 1); | 
 |  | 
 |     auto st2 = scopedTask(&refFun, x); | 
 |     poolInstance.put(st2); | 
 |     st2.yieldForce; | 
 |     assert(st2.args[0] == 1); | 
 |  | 
 |     // Test executeInNewThread(). | 
 |     auto ct = scopedTask!refFun(x); | 
 |     ct.executeInNewThread(Thread.PRIORITY_MAX); | 
 |     ct.yieldForce; | 
 |     assert(ct.args[0] == 1); | 
 |  | 
 |     // Test ref return. | 
 |     uint toInc = 0; | 
 |     static ref T makeRef(T)(ref T num) | 
 |     { | 
 |         return num; | 
 |     } | 
 |  | 
 |     auto t3 = task!makeRef(toInc); | 
 |     taskPool.put(t3); | 
 |     assert(t3.args[0] == 0); | 
 |     t3.spinForce++; | 
 |     assert(t3.args[0] == 1); | 
 |  | 
 |     static void testSafe() @safe { | 
 |         static int bump(int num) | 
 |         { | 
 |             return num + 1; | 
 |         } | 
 |  | 
 |         auto safePool = new TaskPool(0); | 
 |         auto t = task(&bump, 1); | 
 |         taskPool.put(t); | 
 |         assert(t.yieldForce == 2); | 
 |  | 
 |         auto st = scopedTask(&bump, 1); | 
 |         taskPool.put(st); | 
 |         assert(st.yieldForce == 2); | 
 |         safePool.stop(); | 
 |     } | 
 |  | 
 |     auto arr = [1,2,3,4,5]; | 
 |     auto nums = new uint[5]; | 
 |     auto nums2 = new uint[5]; | 
 |  | 
 |     foreach (i, ref elem; poolInstance.parallel(arr)) | 
 |     { | 
 |         elem++; | 
 |         nums[i] = cast(uint) i + 2; | 
 |         nums2[i] = elem; | 
 |     } | 
 |  | 
 |     assert(nums == [2,3,4,5,6], text(nums)); | 
 |     assert(nums2 == nums, text(nums2)); | 
 |     assert(arr == nums, text(arr)); | 
 |  | 
 |     // Test const/immutable arguments. | 
 |     static int add(int lhs, int rhs) | 
 |     { | 
 |         return lhs + rhs; | 
 |     } | 
 |     immutable addLhs = 1; | 
 |     immutable addRhs = 2; | 
 |     auto addTask = task(&add, addLhs, addRhs); | 
 |     auto addScopedTask = scopedTask(&add, addLhs, addRhs); | 
 |     poolInstance.put(addTask); | 
 |     poolInstance.put(addScopedTask); | 
 |     assert(addTask.yieldForce == 3); | 
 |     assert(addScopedTask.yieldForce == 3); | 
 |  | 
 |     // Test parallel foreach with non-random access range. | 
 |     auto range = filter!"a != 666"([0, 1, 2, 3, 4]); | 
 |  | 
 |     foreach (i, elem; poolInstance.parallel(range)) | 
 |     { | 
 |         nums[i] = cast(uint) i; | 
 |     } | 
 |  | 
 |     assert(nums == [0,1,2,3,4]); | 
 |  | 
 |     auto logs = new double[1_000_000]; | 
 |     foreach (i, ref elem; poolInstance.parallel(logs)) | 
 |     { | 
 |         elem = log(i + 1.0); | 
 |     } | 
 |  | 
 |     foreach (i, elem; logs) | 
 |     { | 
 |         assert(isClose(elem, log(double(i + 1)))); | 
 |     } | 
 |  | 
 |     assert(poolInstance.amap!"a * a"([1,2,3,4,5]) == [1,4,9,16,25]); | 
 |     assert(poolInstance.amap!"a * a"([1,2,3,4,5], new long[5]) == [1,4,9,16,25]); | 
 |     assert(poolInstance.amap!("a * a", "-a")([1,2,3]) == | 
 |            [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); | 
 |  | 
 |     auto tupleBuf = new Tuple!(int, int)[3]; | 
 |     poolInstance.amap!("a * a", "-a")([1,2,3], tupleBuf); | 
 |     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); | 
 |     poolInstance.amap!("a * a", "-a")([1,2,3], 5, tupleBuf); | 
 |     assert(tupleBuf == [tuple(1, -1), tuple(4, -2), tuple(9, -3)]); | 
 |  | 
 |     // Test amap with a non-array buffer. | 
 |     auto toIndex = new int[5]; | 
 |     auto ind = indexed(toIndex, [3, 1, 4, 0, 2]); | 
 |     poolInstance.amap!"a * 2"([1, 2, 3, 4, 5], ind); | 
 |     assert(equal(ind, [2, 4, 6, 8, 10])); | 
 |     assert(equal(toIndex, [8, 4, 10, 2, 6])); | 
 |     poolInstance.amap!"a / 2"(ind, ind); | 
 |     assert(equal(ind, [1, 2, 3, 4, 5])); | 
 |     assert(equal(toIndex, [4, 2, 5, 1, 3])); | 
 |  | 
 |     auto buf = new int[5]; | 
 |     poolInstance.amap!"a * a"([1,2,3,4,5], buf); | 
 |     assert(buf == [1,4,9,16,25]); | 
 |     poolInstance.amap!"a * a"([1,2,3,4,5], 4, buf); | 
 |     assert(buf == [1,4,9,16,25]); | 
 |  | 
 |     assert(poolInstance.reduce!"a + b"([1]) == 1); | 
 |     assert(poolInstance.reduce!"a + b"([1,2,3,4]) == 10); | 
 |     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4]) == 10); | 
 |     assert(poolInstance.reduce!"a + b"(0.0, [1,2,3,4], 1) == 10); | 
 |     assert(poolInstance.reduce!(min, max)([1,2,3,4]) == tuple(1, 4)); | 
 |     assert(poolInstance.reduce!("a + b", "a * b")(tuple(0, 1), [1,2,3,4]) == | 
 |            tuple(10, 24)); | 
 |  | 
 |     immutable serialAns = reduce!"a + b"(iota(1000)); | 
 |     assert(poolInstance.reduce!"a + b"(0, iota(1000)) == serialAns); | 
 |     assert(poolInstance.reduce!"a + b"(iota(1000)) == serialAns); | 
 |  | 
 |     // Test worker-local storage. | 
 |     auto wl = poolInstance.workerLocalStorage(0); | 
 |     foreach (i; poolInstance.parallel(iota(1000), 1)) | 
 |     { | 
 |         wl.get = wl.get + i; | 
 |     } | 
 |  | 
 |     auto wlRange = wl.toRange; | 
 |     auto parallelSum = poolInstance.reduce!"a + b"(wlRange); | 
 |     assert(parallelSum == 499500); | 
 |     assert(wlRange[0 .. 1][0] == wlRange[0]); | 
 |     assert(wlRange[1 .. 2][0] == wlRange[1]); | 
 |  | 
 |     // Test finish() | 
 |     { | 
 |         static void slowFun() { Thread.sleep(dur!"msecs"(1)); } | 
 |  | 
 |         auto pool1 = new TaskPool(); | 
 |         auto tSlow = task!slowFun(); | 
 |         pool1.put(tSlow); | 
 |         pool1.finish(); | 
 |         tSlow.yieldForce; | 
 |         // Can't assert that pool1.status == PoolState.stopNow because status | 
 |         // doesn't change until after the "done" flag is set and the waiting | 
 |         // thread is woken up. | 
 |  | 
 |         auto pool2 = new TaskPool(); | 
 |         auto tSlow2 = task!slowFun(); | 
 |         pool2.put(tSlow2); | 
 |         pool2.finish(true); // blocking | 
 |         assert(tSlow2.done); | 
 |  | 
 |         // Test fix for https://issues.dlang.org/show_bug.cgi?id=8582 by making pool size zero. | 
 |         auto pool3 = new TaskPool(0); | 
 |         auto tSlow3 = task!slowFun(); | 
 |         pool3.put(tSlow3); | 
 |         pool3.finish(true); // blocking | 
 |         assert(tSlow3.done); | 
 |  | 
 |         // This is correct because no thread will terminate unless pool2.status | 
 |         // and pool3.status have already been set to stopNow. | 
 |         assert(pool2.status == TaskPool.PoolState.stopNow); | 
 |         assert(pool3.status == TaskPool.PoolState.stopNow); | 
 |     } | 
 |  | 
 |     // Test default pool stuff. | 
 |     assert(taskPool.size == totalCPUs - 1); | 
 |  | 
 |     nums = new uint[1000]; | 
 |     foreach (i; parallel(iota(1000))) | 
 |     { | 
 |         nums[i] = cast(uint) i; | 
 |     } | 
 |     assert(equal(nums, iota(1000))); | 
 |  | 
 |     assert(equal( | 
 |                poolInstance.map!"a * a"(iota(3_000_001), 10_000), | 
 |                map!"a * a"(iota(3_000_001)) | 
 |            )); | 
 |  | 
 |     // The filter is to kill random access and test the non-random access | 
 |     // branch. | 
 |     assert(equal( | 
 |                poolInstance.map!"a * a"( | 
 |                    filter!"a == a"(iota(3_000_001) | 
 |                                   ), 10_000, 1000), | 
 |                map!"a * a"(iota(3_000_001)) | 
 |            )); | 
 |  | 
 |     assert( | 
 |         reduce!"a + b"(0UL, | 
 |                        poolInstance.map!"a * a"(iota(300_001), 10_000) | 
 |                       ) == | 
 |         reduce!"a + b"(0UL, | 
 |                        map!"a * a"(iota(300_001)) | 
 |                       ) | 
 |     ); | 
 |  | 
 |     assert(equal( | 
 |                iota(1_000_002), | 
 |                poolInstance.asyncBuf(filter!"a == a"(iota(1_000_002))) | 
 |            )); | 
 |  | 
 |     { | 
 |         import std.conv : to; | 
 |         import std.file : deleteme; | 
 |  | 
 |         string temp_file = deleteme ~ "-tempDelMe.txt"; | 
 |         auto file = File(temp_file, "wb"); | 
 |         scope(exit) | 
 |         { | 
 |             file.close(); | 
 |             import std.file; | 
 |             remove(temp_file); | 
 |         } | 
 |  | 
 |         auto written = [[1.0, 2, 3], [4.0, 5, 6], [7.0, 8, 9]]; | 
 |         foreach (row; written) | 
 |         { | 
 |             file.writeln(join(to!(string[])(row), "\t")); | 
 |         } | 
 |  | 
 |         file = File(temp_file); | 
 |  | 
 |         void next(ref char[] buf) | 
 |         { | 
 |             file.readln(buf); | 
 |             import std.string : chomp; | 
 |             buf = chomp(buf); | 
 |         } | 
 |  | 
 |         double[][] read; | 
 |         auto asyncReader = taskPool.asyncBuf(&next, &file.eof); | 
 |  | 
 |         foreach (line; asyncReader) | 
 |         { | 
 |             if (line.length == 0) continue; | 
 |             auto ls = line.split("\t"); | 
 |             read ~= to!(double[])(ls); | 
 |         } | 
 |  | 
 |         assert(read == written); | 
 |         file.close(); | 
 |     } | 
 |  | 
 |     // Test Map/AsyncBuf chaining. | 
 |  | 
 |     auto abuf = poolInstance.asyncBuf(iota(-1.0, 3_000_000), 100); | 
 |     auto temp = poolInstance.map!sqrt( | 
 |                     abuf, 100, 5 | 
 |                 ); | 
 |     auto lmchain = poolInstance.map!"a * a"(temp, 100, 5); | 
 |     lmchain.popFront(); | 
 |  | 
 |     int ii; | 
 |     foreach ( elem; (lmchain)) | 
 |     { | 
 |         if (!isClose(elem, ii)) | 
 |         { | 
 |             stderr.writeln(ii, '\t', elem); | 
 |         } | 
 |         ii++; | 
 |     } | 
 |  | 
 |     // Test buffer trick in parallel foreach. | 
 |     abuf = poolInstance.asyncBuf(iota(-1.0, 1_000_000), 100); | 
 |     abuf.popFront(); | 
 |     auto bufTrickTest = new size_t[abuf.length]; | 
 |     foreach (i, elem; parallel(abuf)) | 
 |     { | 
 |         bufTrickTest[i] = i; | 
 |     } | 
 |  | 
 |     assert(equal(iota(1_000_000), bufTrickTest)); | 
 |  | 
 |     auto myTask = task!(abs)(-1); | 
 |     taskPool.put(myTask); | 
 |     assert(myTask.spinForce == 1); | 
 |  | 
 |     // Test that worker local storage from one pool receives an index of 0 | 
 |     // when the index is queried w.r.t. another pool.  The only way to do this | 
 |     // is non-deterministically. | 
 |     foreach (i; parallel(iota(1000), 1)) | 
 |     { | 
 |         assert(poolInstance.workerIndex == 0); | 
 |     } | 
 |  | 
 |     foreach (i; poolInstance.parallel(iota(1000), 1)) | 
 |     { | 
 |         assert(taskPool.workerIndex == 0); | 
 |     } | 
 |  | 
 |     // Test exception handling. | 
 |     static void parallelForeachThrow() | 
 |     { | 
 |         foreach (elem; parallel(iota(10))) | 
 |         { | 
 |             throw new Exception(""); | 
 |         } | 
 |     } | 
 |  | 
 |     assertThrown!Exception(parallelForeachThrow()); | 
 |  | 
 |     static int reduceException(int a, int b) | 
 |     { | 
 |         throw new Exception(""); | 
 |     } | 
 |  | 
 |     assertThrown!Exception( | 
 |         poolInstance.reduce!reduceException(iota(3)) | 
 |     ); | 
 |  | 
 |     static int mapException(int a) | 
 |     { | 
 |         throw new Exception(""); | 
 |     } | 
 |  | 
 |     assertThrown!Exception( | 
 |         poolInstance.amap!mapException(iota(3)) | 
 |     ); | 
 |  | 
 |     static void mapThrow() | 
 |     { | 
 |         auto m = poolInstance.map!mapException(iota(3)); | 
 |         m.popFront(); | 
 |     } | 
 |  | 
 |     assertThrown!Exception(mapThrow()); | 
 |  | 
 |     struct ThrowingRange | 
 |     { | 
 |         @property int front() | 
 |         { | 
 |             return 1; | 
 |         } | 
 |         void popFront() | 
 |         { | 
 |             throw new Exception(""); | 
 |         } | 
 |         enum bool empty = false; | 
 |     } | 
 |  | 
 |     assertThrown!Exception(poolInstance.asyncBuf(ThrowingRange.init)); | 
 | } | 
 |  | 
 | //version = parallelismStressTest; | 
 |  | 
 | // These are more like stress tests than real unit tests.  They print out | 
 | // tons of stuff and should not be run every time make unittest is run. | 
 | version (parallelismStressTest) | 
 | { | 
 |     @system unittest | 
 |     { | 
 |         import std.stdio : stderr, writeln, readln; | 
 |         import std.range : iota; | 
 |         import std.algorithm.iteration : filter, reduce; | 
 |  | 
 |         size_t attempt; | 
 |         for (; attempt < 10; attempt++) | 
 |             foreach (poolSize; [0, 4]) | 
 |         { | 
 |  | 
 |             poolInstance = new TaskPool(poolSize); | 
 |  | 
 |             uint[] numbers = new uint[1_000]; | 
 |  | 
 |             foreach (i; poolInstance.parallel( iota(0, numbers.length)) ) | 
 |             { | 
 |                 numbers[i] = cast(uint) i; | 
 |             } | 
 |  | 
 |             // Make sure it works. | 
 |             foreach (i; 0 .. numbers.length) | 
 |             { | 
 |                 assert(numbers[i] == i); | 
 |             } | 
 |  | 
 |             stderr.writeln("Done creating nums."); | 
 |  | 
 |  | 
 |             auto myNumbers = filter!"a % 7 > 0"( iota(0, 1000)); | 
 |             foreach (num; poolInstance.parallel(myNumbers)) | 
 |             { | 
 |                 assert(num % 7 > 0 && num < 1000); | 
 |             } | 
 |             stderr.writeln("Done modulus test."); | 
 |  | 
 |             uint[] squares = poolInstance.amap!"a * a"(numbers, 100); | 
 |             assert(squares.length == numbers.length); | 
 |             foreach (i, number; numbers) | 
 |             { | 
 |                 assert(squares[i] == number * number); | 
 |             } | 
 |             stderr.writeln("Done squares."); | 
 |  | 
 |             auto sumFuture = task!( reduce!"a + b" )(numbers); | 
 |             poolInstance.put(sumFuture); | 
 |  | 
 |             ulong sumSquares = 0; | 
 |             foreach (elem; numbers) | 
 |             { | 
 |                 sumSquares += elem * elem; | 
 |             } | 
 |  | 
 |             uint mySum = sumFuture.spinForce(); | 
 |             assert(mySum == 999 * 1000 / 2); | 
 |  | 
 |             auto mySumParallel = poolInstance.reduce!"a + b"(numbers); | 
 |             assert(mySum == mySumParallel); | 
 |             stderr.writeln("Done sums."); | 
 |  | 
 |             auto myTask = task( | 
 |             { | 
 |                 synchronized writeln("Our lives are parallel...Our lives are parallel."); | 
 |             }); | 
 |             poolInstance.put(myTask); | 
 |  | 
 |             auto nestedOuter = "abcd"; | 
 |             auto nestedInner =  iota(0, 10, 2); | 
 |  | 
 |             foreach (i, letter; poolInstance.parallel(nestedOuter, 1)) | 
 |             { | 
 |                 foreach (j, number; poolInstance.parallel(nestedInner, 1)) | 
 |                 { | 
 |                     synchronized writeln(i, ": ", letter, "  ", j, ": ", number); | 
 |                 } | 
 |             } | 
 |  | 
 |             poolInstance.stop(); | 
 |         } | 
 |  | 
 |         assert(attempt == 10); | 
 |         writeln("Press enter to go to next round of unittests."); | 
 |         readln(); | 
 |     } | 
 |  | 
 |     // These unittests are intended more for actual testing and not so much | 
 |     // as examples. | 
 |     @system unittest | 
 |     { | 
 |         import std.stdio : stderr; | 
 |         import std.range : iota; | 
 |         import std.algorithm.iteration : filter, reduce; | 
 |         import std.math.algebraic : sqrt; | 
 |         import std.math.operations : isClose; | 
 |         import std.math.traits : isNaN; | 
 |         import std.conv : text; | 
 |  | 
 |         foreach (attempt; 0 .. 10) | 
 |         foreach (poolSize; [0, 4]) | 
 |         { | 
 |             poolInstance = new TaskPool(poolSize); | 
 |  | 
 |             // Test indexing. | 
 |             stderr.writeln("Creator Raw Index:  ", poolInstance.threadIndex); | 
 |             assert(poolInstance.workerIndex() == 0); | 
 |  | 
 |             // Test worker-local storage. | 
 |             auto workerLocalStorage = poolInstance.workerLocalStorage!uint(1); | 
 |             foreach (i; poolInstance.parallel(iota(0U, 1_000_000))) | 
 |             { | 
 |                 workerLocalStorage.get++; | 
 |             } | 
 |             assert(reduce!"a + b"(workerLocalStorage.toRange) == | 
 |             1_000_000 + poolInstance.size + 1); | 
 |  | 
 |             // Make sure work is reasonably balanced among threads.  This test is | 
 |             // non-deterministic and is more of a sanity check than something that | 
 |             // has an absolute pass/fail. | 
 |             shared(uint)[void*] nJobsByThread; | 
 |             foreach (thread; poolInstance.pool) | 
 |             { | 
 |                 nJobsByThread[cast(void*) thread] = 0; | 
 |             } | 
 |             nJobsByThread[ cast(void*) Thread.getThis()] = 0; | 
 |  | 
 |             foreach (i; poolInstance.parallel( iota(0, 1_000_000), 100 )) | 
 |             { | 
 |                 atomicOp!"+="( nJobsByThread[ cast(void*) Thread.getThis() ], 1); | 
 |             } | 
 |  | 
 |             stderr.writeln("\nCurrent thread is:  ", | 
 |             cast(void*) Thread.getThis()); | 
 |             stderr.writeln("Workload distribution:  "); | 
 |             foreach (k, v; nJobsByThread) | 
 |             { | 
 |                 stderr.writeln(k, '\t', v); | 
 |             } | 
 |  | 
 |             // Test whether amap can be nested. | 
 |             real[][] matrix = new real[][](1000, 1000); | 
 |             foreach (i; poolInstance.parallel( iota(0, matrix.length) )) | 
 |             { | 
 |                 foreach (j; poolInstance.parallel( iota(0, matrix[0].length) )) | 
 |                 { | 
 |                     matrix[i][j] = i * j; | 
 |                 } | 
 |             } | 
 |  | 
 |             // Get around weird bugs having to do w/ sqrt being an intrinsic: | 
 |             static real mySqrt(real num) | 
 |             { | 
 |                 return sqrt(num); | 
 |             } | 
 |  | 
 |             static real[] parallelSqrt(real[] nums) | 
 |             { | 
 |                 return poolInstance.amap!mySqrt(nums); | 
 |             } | 
 |  | 
 |             real[][] sqrtMatrix = poolInstance.amap!parallelSqrt(matrix); | 
 |  | 
 |             foreach (i, row; sqrtMatrix) | 
 |             { | 
 |                 foreach (j, elem; row) | 
 |                 { | 
 |                     real shouldBe = sqrt( cast(real) i * j); | 
 |                     assert(isClose(shouldBe, elem)); | 
 |                     sqrtMatrix[i][j] = shouldBe; | 
 |                 } | 
 |             } | 
 |  | 
 |             auto saySuccess = task( | 
 |             { | 
 |                 stderr.writeln( | 
 |                     "Success doing matrix stuff that involves nested pool use."); | 
 |             }); | 
 |             poolInstance.put(saySuccess); | 
 |             saySuccess.workForce(); | 
 |  | 
 |             // A more thorough test of amap, reduce:  Find the sum of the square roots of | 
 |             // matrix. | 
 |  | 
 |             static real parallelSum(real[] input) | 
 |             { | 
 |                 return poolInstance.reduce!"a + b"(input); | 
 |             } | 
 |  | 
 |             auto sumSqrt = poolInstance.reduce!"a + b"( | 
 |                                poolInstance.amap!parallelSum( | 
 |                                    sqrtMatrix | 
 |                                ) | 
 |                            ); | 
 |  | 
 |             assert(isClose(sumSqrt, 4.437e8, 1e-2)); | 
 |             stderr.writeln("Done sum of square roots."); | 
 |  | 
 |             // Test whether tasks work with function pointers. | 
 |             /+ // This part is buggy and needs to be fixed... | 
 |             auto nanTask = task(&isNaN, 1.0L); | 
 |             poolInstance.put(nanTask); | 
 |             assert(nanTask.spinForce == false); | 
 |  | 
 |             if (poolInstance.size > 0) | 
 |             { | 
 |                 // Test work waiting. | 
 |                 static void uselessFun() | 
 |                 { | 
 |                     foreach (i; 0 .. 1_000_000) {} | 
 |                 } | 
 |  | 
 |                 auto uselessTasks = new typeof(task(&uselessFun))[1000]; | 
 |                 foreach (ref uselessTask; uselessTasks) | 
 |                 { | 
 |                     uselessTask = task(&uselessFun); | 
 |                 } | 
 |                 foreach (ref uselessTask; uselessTasks) | 
 |                 { | 
 |                     poolInstance.put(uselessTask); | 
 |                 } | 
 |                 foreach (ref uselessTask; uselessTasks) | 
 |                 { | 
 |                     uselessTask.workForce(); | 
 |                 } | 
 |             } | 
 |              +/ | 
 |  | 
 |             // Test the case of non-random access + ref returns. | 
 |             int[] nums = [1,2,3,4,5]; | 
 |             static struct RemoveRandom | 
 |             { | 
 |                 int[] arr; | 
 |  | 
 |                 ref int front() | 
 |                 { | 
 |                     return arr.front; | 
 |                 } | 
 |                 void popFront() | 
 |                 { | 
 |                     arr.popFront(); | 
 |                 } | 
 |                 bool empty() | 
 |                 { | 
 |                     return arr.empty; | 
 |                 } | 
 |             } | 
 |  | 
 |             auto refRange = RemoveRandom(nums); | 
 |             foreach (ref elem; poolInstance.parallel(refRange)) | 
 |             { | 
 |                 elem++; | 
 |             } | 
 |             assert(nums == [2,3,4,5,6], text(nums)); | 
 |             stderr.writeln("Nums:  ", nums); | 
 |  | 
 |             poolInstance.stop(); | 
 |         } | 
 |     } | 
 | } | 
 |  | 
 | @system unittest | 
 | { | 
 |     static struct __S_12733 | 
 |     { | 
 |         invariant() { assert(checksum == 1_234_567_890); } | 
 |         this(ulong u){n = u;} | 
 |         void opAssign(__S_12733 s){this.n = s.n;} | 
 |         ulong n; | 
 |         ulong checksum = 1_234_567_890; | 
 |     } | 
 |  | 
 |     static auto __genPair_12733(ulong n) { return __S_12733(n); } | 
 |     immutable ulong[] data = [ 2UL^^59-1, 2UL^^59-1, 2UL^^59-1, 112_272_537_195_293UL ]; | 
 |  | 
 |     auto result = taskPool.amap!__genPair_12733(data); | 
 | } | 
 |  | 
 | @safe unittest | 
 | { | 
 |     import std.range : iota; | 
 |  | 
 |     // this test was in std.range, but caused cycles. | 
 |     assert(__traits(compiles, { foreach (i; iota(0, 100UL).parallel) {} })); | 
 | } | 
 |  | 
 | @safe unittest | 
 | { | 
 |     import std.algorithm.iteration : each; | 
 |  | 
 |     long[] arr; | 
 |     static assert(is(typeof({ | 
 |         arr.parallel.each!"a++"; | 
 |     }))); | 
 | } | 
 |  | 
 | // https://issues.dlang.org/show_bug.cgi?id=17539 | 
 | @system unittest | 
 | { | 
 |     import std.random : rndGen; | 
 |     // ensure compilation | 
 |     try foreach (rnd; rndGen.parallel) break; | 
 |     catch (ParallelForeachError e) {} | 
 | } |