blob: 37951061d93637df7d0d46546c163125503b106e [file] [log] [blame]
/**
* The event module provides a primitive for lightweight signaling of other threads
* (emulating Windows events on Posix)
*
* Copyright: Copyright (c) 2019 D Language Foundation
* License: Distributed under the
* $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0).
* (See accompanying file LICENSE)
* Authors: Rainer Schuetze
* Source: $(DRUNTIMESRC core/sync/event.d)
*/
module core.sync.event;
version (Windows)
{
import core.sys.windows.basetsd /+: HANDLE +/;
import core.sys.windows.winerror /+: WAIT_TIMEOUT +/;
import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent,
WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/;
}
else version (Posix)
{
import core.sys.posix.pthread;
import core.sys.posix.sys.types;
import core.sys.posix.time;
}
else
{
static assert(false, "Platform not supported");
}
import core.time;
import core.internal.abort : abort;
/**
* represents an event. Clients of an event are suspended while waiting
* for the event to be "signaled".
*
* Implemented using `pthread_mutex` and `pthread_condition` on Posix and
* `CreateEvent` and `SetEvent` on Windows.
---
import core.sync.event, core.thread, std.file;
struct ProcessFile
{
ThreadGroup group;
Event event;
void[] buffer;
void doProcess()
{
event.wait();
// process buffer
}
void process(string filename)
{
event.initialize(true, false);
group = new ThreadGroup;
for (int i = 0; i < 10; ++i)
group.create(&doProcess);
buffer = std.file.read(filename);
event.set();
group.joinAll();
event.terminate();
}
}
---
*/
struct Event
{
nothrow @nogc:
/**
* Creates an event object.
*
* Params:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
this(bool manualReset, bool initialState)
{
initialize(manualReset, initialState);
}
/**
* Initializes an event object. Does nothing if the event is already initialized.
*
* Params:
* manualReset = the state of the event is not reset automatically after resuming waiting clients
* initialState = initial state of the signal
*/
void initialize(bool manualReset, bool initialState)
{
version (Windows)
{
if (m_event)
return;
m_event = CreateEvent(null, manualReset, initialState, null);
m_event || abort("Error: CreateEvent failed.");
}
else version (Posix)
{
if (m_initalized)
return;
pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 ||
abort("Error: pthread_mutex_init failed.");
static if ( is( typeof( pthread_condattr_setclock ) ) )
{
pthread_condattr_t attr = void;
pthread_condattr_init(&attr) == 0 ||
abort("Error: pthread_condattr_init failed.");
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 ||
abort("Error: pthread_condattr_setclock failed.");
pthread_cond_init(&m_cond, &attr) == 0 ||
abort("Error: pthread_cond_init failed.");
pthread_condattr_destroy(&attr) == 0 ||
abort("Error: pthread_condattr_destroy failed.");
}
else
{
pthread_cond_init(&m_cond, null) == 0 ||
abort("Error: pthread_cond_init failed.");
}
m_state = initialState;
m_manualReset = manualReset;
m_initalized = true;
}
}
// copying not allowed, can produce resource leaks
@disable this(this);
@disable void opAssign(Event);
~this()
{
terminate();
}
/**
* deinitialize event. Does nothing if the event is not initialized. There must not be
* threads currently waiting for the event to be signaled.
*/
void terminate()
{
version (Windows)
{
if (m_event)
CloseHandle(m_event);
m_event = null;
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_destroy(&m_mutex) == 0 ||
abort("Error: pthread_mutex_destroy failed.");
pthread_cond_destroy(&m_cond) == 0 ||
abort("Error: pthread_cond_destroy failed.");
m_initalized = false;
}
}
}
/// Set the event to "signaled", so that waiting clients are resumed
void set()
{
version (Windows)
{
if (m_event)
SetEvent(m_event);
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = true;
pthread_cond_broadcast(&m_cond);
pthread_mutex_unlock(&m_mutex);
}
}
}
/// Reset the event manually
void reset()
{
version (Windows)
{
if (m_event)
ResetEvent(m_event);
}
else version (Posix)
{
if (m_initalized)
{
pthread_mutex_lock(&m_mutex);
m_state = false;
pthread_mutex_unlock(&m_mutex);
}
}
}
/**
* Wait for the event to be signaled without timeout.
*
* Returns:
* `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured
*/
bool wait()
{
version (Windows)
{
return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0;
}
else version (Posix)
{
return wait(Duration.max);
}
}
/**
* Wait for the event to be signaled with timeout.
*
* Params:
* tmout = the maximum time to wait
* Returns:
* `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or
* the event is uninitialized or another error occured
*/
bool wait(Duration tmout)
{
version (Windows)
{
if (!m_event)
return false;
auto maxWaitMillis = dur!("msecs")(uint.max - 1);
while (tmout > maxWaitMillis)
{
auto res = WaitForSingleObject(m_event, uint.max - 1);
if (res != WAIT_TIMEOUT)
return res == WAIT_OBJECT_0;
tmout -= maxWaitMillis;
}
auto ms = cast(uint)(tmout.total!"msecs");
return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0;
}
else version (Posix)
{
if (!m_initalized)
return false;
pthread_mutex_lock(&m_mutex);
int result = 0;
if (!m_state)
{
if (tmout == Duration.max)
{
result = pthread_cond_wait(&m_cond, &m_mutex);
}
else
{
import core.sync.config;
timespec t = void;
mktspec(t, tmout);
result = pthread_cond_timedwait(&m_cond, &m_mutex, &t);
}
}
if (result == 0 && !m_manualReset)
m_state = false;
pthread_mutex_unlock(&m_mutex);
return result == 0;
}
}
private:
version (Windows)
{
HANDLE m_event;
}
else version (Posix)
{
pthread_mutex_t m_mutex;
pthread_cond_t m_cond;
bool m_initalized;
bool m_state;
bool m_manualReset;
}
}
// Test single-thread (non-shared) use.
@nogc nothrow unittest
{
// auto-reset, initial state false
Event ev1 = Event(false, false);
assert(!ev1.wait(1.dur!"msecs"));
ev1.set();
assert(ev1.wait());
assert(!ev1.wait(1.dur!"msecs"));
// manual-reset, initial state true
Event ev2 = Event(true, true);
assert(ev2.wait());
assert(ev2.wait());
ev2.reset();
assert(!ev2.wait(1.dur!"msecs"));
}
unittest
{
import core.thread, core.atomic;
scope event = new Event(true, false);
int numThreads = 10;
shared int numRunning = 0;
void testFn()
{
event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner
numRunning.atomicOp!"+="(1);
}
auto group = new ThreadGroup;
for (int i = 0; i < numThreads; ++i)
group.create(&testFn);
auto start = MonoTime.currTime;
assert(numRunning == 0);
event.set();
group.joinAll();
assert(numRunning == numThreads);
assert(MonoTime.currTime - start < 5.dur!"seconds");
}