| /** |
| * The event module provides a primitive for lightweight signaling of other threads |
| * (emulating Windows events on Posix) |
| * |
| * Copyright: Copyright (c) 2019 D Language Foundation |
| * License: Distributed under the |
| * $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0). |
| * (See accompanying file LICENSE) |
| * Authors: Rainer Schuetze |
| * Source: $(DRUNTIMESRC core/sync/event.d) |
| */ |
| module core.sync.event; |
| |
| version (Windows) |
| { |
| import core.sys.windows.basetsd /+: HANDLE +/; |
| import core.sys.windows.winerror /+: WAIT_TIMEOUT +/; |
| import core.sys.windows.winbase /+: CreateEvent, CloseHandle, SetEvent, ResetEvent, |
| WaitForSingleObject, INFINITE, WAIT_OBJECT_0+/; |
| } |
| else version (Posix) |
| { |
| import core.sys.posix.pthread; |
| import core.sys.posix.sys.types; |
| import core.sys.posix.time; |
| } |
| else |
| { |
| static assert(false, "Platform not supported"); |
| } |
| |
| import core.time; |
| import core.internal.abort : abort; |
| |
| /** |
| * represents an event. Clients of an event are suspended while waiting |
| * for the event to be "signaled". |
| * |
| * Implemented using `pthread_mutex` and `pthread_condition` on Posix and |
| * `CreateEvent` and `SetEvent` on Windows. |
| --- |
| import core.sync.event, core.thread, std.file; |
| |
| struct ProcessFile |
| { |
| ThreadGroup group; |
| Event event; |
| void[] buffer; |
| |
| void doProcess() |
| { |
| event.wait(); |
| // process buffer |
| } |
| |
| void process(string filename) |
| { |
| event.initialize(true, false); |
| group = new ThreadGroup; |
| for (int i = 0; i < 10; ++i) |
| group.create(&doProcess); |
| |
| buffer = std.file.read(filename); |
| event.set(); |
| group.joinAll(); |
| event.terminate(); |
| } |
| } |
| --- |
| */ |
| struct Event |
| { |
| nothrow @nogc: |
| /** |
| * Creates an event object. |
| * |
| * Params: |
| * manualReset = the state of the event is not reset automatically after resuming waiting clients |
| * initialState = initial state of the signal |
| */ |
| this(bool manualReset, bool initialState) |
| { |
| initialize(manualReset, initialState); |
| } |
| |
| /** |
| * Initializes an event object. Does nothing if the event is already initialized. |
| * |
| * Params: |
| * manualReset = the state of the event is not reset automatically after resuming waiting clients |
| * initialState = initial state of the signal |
| */ |
| void initialize(bool manualReset, bool initialState) |
| { |
| version (Windows) |
| { |
| if (m_event) |
| return; |
| m_event = CreateEvent(null, manualReset, initialState, null); |
| m_event || abort("Error: CreateEvent failed."); |
| } |
| else version (Posix) |
| { |
| if (m_initalized) |
| return; |
| pthread_mutex_init(cast(pthread_mutex_t*) &m_mutex, null) == 0 || |
| abort("Error: pthread_mutex_init failed."); |
| static if ( is( typeof( pthread_condattr_setclock ) ) ) |
| { |
| pthread_condattr_t attr = void; |
| pthread_condattr_init(&attr) == 0 || |
| abort("Error: pthread_condattr_init failed."); |
| pthread_condattr_setclock(&attr, CLOCK_MONOTONIC) == 0 || |
| abort("Error: pthread_condattr_setclock failed."); |
| pthread_cond_init(&m_cond, &attr) == 0 || |
| abort("Error: pthread_cond_init failed."); |
| pthread_condattr_destroy(&attr) == 0 || |
| abort("Error: pthread_condattr_destroy failed."); |
| } |
| else |
| { |
| pthread_cond_init(&m_cond, null) == 0 || |
| abort("Error: pthread_cond_init failed."); |
| } |
| m_state = initialState; |
| m_manualReset = manualReset; |
| m_initalized = true; |
| } |
| } |
| |
| // copying not allowed, can produce resource leaks |
| @disable this(this); |
| @disable void opAssign(Event); |
| |
| ~this() |
| { |
| terminate(); |
| } |
| |
| /** |
| * deinitialize event. Does nothing if the event is not initialized. There must not be |
| * threads currently waiting for the event to be signaled. |
| */ |
| void terminate() |
| { |
| version (Windows) |
| { |
| if (m_event) |
| CloseHandle(m_event); |
| m_event = null; |
| } |
| else version (Posix) |
| { |
| if (m_initalized) |
| { |
| pthread_mutex_destroy(&m_mutex) == 0 || |
| abort("Error: pthread_mutex_destroy failed."); |
| pthread_cond_destroy(&m_cond) == 0 || |
| abort("Error: pthread_cond_destroy failed."); |
| m_initalized = false; |
| } |
| } |
| } |
| |
| |
| /// Set the event to "signaled", so that waiting clients are resumed |
| void set() |
| { |
| version (Windows) |
| { |
| if (m_event) |
| SetEvent(m_event); |
| } |
| else version (Posix) |
| { |
| if (m_initalized) |
| { |
| pthread_mutex_lock(&m_mutex); |
| m_state = true; |
| pthread_cond_broadcast(&m_cond); |
| pthread_mutex_unlock(&m_mutex); |
| } |
| } |
| } |
| |
| /// Reset the event manually |
| void reset() |
| { |
| version (Windows) |
| { |
| if (m_event) |
| ResetEvent(m_event); |
| } |
| else version (Posix) |
| { |
| if (m_initalized) |
| { |
| pthread_mutex_lock(&m_mutex); |
| m_state = false; |
| pthread_mutex_unlock(&m_mutex); |
| } |
| } |
| } |
| |
| /** |
| * Wait for the event to be signaled without timeout. |
| * |
| * Returns: |
| * `true` if the event is in signaled state, `false` if the event is uninitialized or another error occured |
| */ |
| bool wait() |
| { |
| version (Windows) |
| { |
| return m_event && WaitForSingleObject(m_event, INFINITE) == WAIT_OBJECT_0; |
| } |
| else version (Posix) |
| { |
| return wait(Duration.max); |
| } |
| } |
| |
| /** |
| * Wait for the event to be signaled with timeout. |
| * |
| * Params: |
| * tmout = the maximum time to wait |
| * Returns: |
| * `true` if the event is in signaled state, `false` if the event was nonsignaled for the given time or |
| * the event is uninitialized or another error occured |
| */ |
| bool wait(Duration tmout) |
| { |
| version (Windows) |
| { |
| if (!m_event) |
| return false; |
| |
| auto maxWaitMillis = dur!("msecs")(uint.max - 1); |
| |
| while (tmout > maxWaitMillis) |
| { |
| auto res = WaitForSingleObject(m_event, uint.max - 1); |
| if (res != WAIT_TIMEOUT) |
| return res == WAIT_OBJECT_0; |
| tmout -= maxWaitMillis; |
| } |
| auto ms = cast(uint)(tmout.total!"msecs"); |
| return WaitForSingleObject(m_event, ms) == WAIT_OBJECT_0; |
| } |
| else version (Posix) |
| { |
| if (!m_initalized) |
| return false; |
| |
| pthread_mutex_lock(&m_mutex); |
| |
| int result = 0; |
| if (!m_state) |
| { |
| if (tmout == Duration.max) |
| { |
| result = pthread_cond_wait(&m_cond, &m_mutex); |
| } |
| else |
| { |
| import core.sync.config; |
| |
| timespec t = void; |
| mktspec(t, tmout); |
| |
| result = pthread_cond_timedwait(&m_cond, &m_mutex, &t); |
| } |
| } |
| if (result == 0 && !m_manualReset) |
| m_state = false; |
| |
| pthread_mutex_unlock(&m_mutex); |
| |
| return result == 0; |
| } |
| } |
| |
| private: |
| version (Windows) |
| { |
| HANDLE m_event; |
| } |
| else version (Posix) |
| { |
| pthread_mutex_t m_mutex; |
| pthread_cond_t m_cond; |
| bool m_initalized; |
| bool m_state; |
| bool m_manualReset; |
| } |
| } |
| |
| // Test single-thread (non-shared) use. |
| @nogc nothrow unittest |
| { |
| // auto-reset, initial state false |
| Event ev1 = Event(false, false); |
| assert(!ev1.wait(1.dur!"msecs")); |
| ev1.set(); |
| assert(ev1.wait()); |
| assert(!ev1.wait(1.dur!"msecs")); |
| |
| // manual-reset, initial state true |
| Event ev2 = Event(true, true); |
| assert(ev2.wait()); |
| assert(ev2.wait()); |
| ev2.reset(); |
| assert(!ev2.wait(1.dur!"msecs")); |
| } |
| |
| unittest |
| { |
| import core.thread, core.atomic; |
| |
| scope event = new Event(true, false); |
| int numThreads = 10; |
| shared int numRunning = 0; |
| |
| void testFn() |
| { |
| event.wait(8.dur!"seconds"); // timeout below limit for druntime test_runner |
| numRunning.atomicOp!"+="(1); |
| } |
| |
| auto group = new ThreadGroup; |
| |
| for (int i = 0; i < numThreads; ++i) |
| group.create(&testFn); |
| |
| auto start = MonoTime.currTime; |
| assert(numRunning == 0); |
| |
| event.set(); |
| group.joinAll(); |
| |
| assert(numRunning == numThreads); |
| |
| assert(MonoTime.currTime - start < 5.dur!"seconds"); |
| } |