blob: 9d9f728f2bc43b4a9cb629c8dab2067ea53f8ee0 [file] [log] [blame]
/* go-select.c -- implement select.
Copyright 2009 The Go Authors. All rights reserved.
Use of this source code is governed by a BSD-style
license that can be found in the LICENSE file. */
#include <pthread.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdint.h>
#include <stdlib.h>
#include <unistd.h>
#include "config.h"
#include "go-assert.h"
#include "channel.h"
/* __go_select builds an array of these structures. */
struct select_channel
{
/* The channel being selected. */
struct __go_channel* channel;
/* If this channel is selected, the value to return. */
size_t retval;
/* If this channel is a duplicate of one which appears earlier in
the array, this is the array index of the earlier channel. This
is -1UL if this is not a dup. */
size_t dup_index;
/* An entry to put on the send or receive queue. */
struct __go_channel_select queue_entry;
/* True if selected for send. */
_Bool is_send;
/* True if channel is ready--it has data to receive or space to
send. */
_Bool is_ready;
};
/* This mutex controls access to __go_select_cond. This mutex may not
be acquired if any channel locks are held. */
static pthread_mutex_t __go_select_mutex = PTHREAD_MUTEX_INITIALIZER;
/* When we have to wait for channels, we tell them to trigger this
condition variable when they send or receive something. */
static pthread_cond_t __go_select_cond = PTHREAD_COND_INITIALIZER;
/* Sort the channels by address. This avoids deadlock when multiple
selects are running on overlapping sets of channels. */
static int
channel_sort (const void *p1, const void *p2)
{
const struct select_channel *c1 = (const struct select_channel *) p1;
const struct select_channel *c2 = (const struct select_channel *) p2;
if ((uintptr_t) c1->channel < (uintptr_t) c2->channel)
return -1;
else if ((uintptr_t) c1->channel > (uintptr_t) c2->channel)
return 1;
else
return 0;
}
/* Return whether there is an entry on QUEUE which can be used for a
synchronous send or receive. */
static _Bool
is_queue_ready (struct __go_channel_select *queue)
{
int x;
if (queue == NULL)
return 0;
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
while (queue != NULL)
{
if (*queue->selected == NULL)
break;
queue = queue->next;
}
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
return queue != NULL;
}
/* Return whether CHAN is ready. If IS_SEND is true check whether it
has space to send, otherwise check whether it has a value to
receive. */
static _Bool
is_channel_ready (struct __go_channel* channel, _Bool is_send)
{
if (is_send)
{
if (channel->selected_for_send)
return 0;
if (channel->is_closed)
return 1;
if (channel->num_entries > 0)
{
/* An asynchronous channel is ready for sending if there is
room in the buffer. */
return ((channel->next_store + 1) % channel->num_entries
!= channel->next_fetch);
}
else
{
if (channel->waiting_to_send)
{
/* Some other goroutine is waiting to send on this
channel, so we can't. */
return 0;
}
if (channel->waiting_to_receive)
{
/* Some other goroutine is waiting to receive a value,
so we can send one. */
return 1;
}
if (is_queue_ready (channel->select_receive_queue))
{
/* There is a select statement waiting to synchronize
with this one. */
return 1;
}
return 0;
}
}
else
{
if (channel->selected_for_receive)
return 0;
if (channel->is_closed)
return 1;
if (channel->num_entries > 0)
{
/* An asynchronous channel is ready for receiving if there
is a value in the buffer. */
return channel->next_fetch != channel->next_store;
}
else
{
if (channel->waiting_to_receive)
{
/* Some other goroutine is waiting to receive from this
channel, so it is not ready for us to receive. */
return 0;
}
if (channel->next_store > 0)
{
/* There is data on the channel. */
return 1;
}
if (is_queue_ready (channel->select_send_queue))
{
/* There is a select statement waiting to synchronize
with this one. */
return 1;
}
return 0;
}
}
}
/* Mark a channel as selected. The channel is locked. IS_SELECTED is
true if the channel was selected for us by another goroutine. We
set *NEEDS_BROADCAST if we need to broadcast on the select
condition variable. Return true if we got it. */
static _Bool
mark_channel_selected (struct __go_channel *channel, _Bool is_send,
_Bool is_selected, _Bool *needs_broadcast)
{
if (channel->num_entries == 0)
{
/* This is a synchronous channel. If there is no goroutine
currently waiting, but there is another select waiting, then
we need to tell that select to use this channel. That may
fail--there may be no other goroutines currently waiting--as
a third goroutine may already have claimed the select. */
if (!is_selected
&& !channel->is_closed
&& (is_send
? !channel->waiting_to_receive
: channel->next_store == 0))
{
int x;
struct __go_channel_select *queue;
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
queue = (is_send
? channel->select_receive_queue
: channel->select_send_queue);
__go_assert (queue != NULL);
while (queue != NULL)
{
if (*queue->selected == NULL)
{
*queue->selected = channel;
*queue->is_read = !is_send;
break;
}
queue = queue->next;
}
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
if (queue == NULL)
return 0;
if (is_send)
channel->selected_for_receive = 1;
else
channel->selected_for_send = 1;
/* We are going to have to tell the other select that there
is something to do. */
*needs_broadcast = 1;
}
}
if (is_send)
channel->selected_for_send = 1;
else
channel->selected_for_receive = 1;
return 1;
}
/* Mark a channel to indicate that a select is waiting. The channel
is locked. */
static void
mark_select_waiting (struct select_channel *sc,
struct __go_channel **selected_pointer,
_Bool *selected_for_read_pointer)
{
struct __go_channel *channel = sc->channel;
_Bool is_send = sc->is_send;
if (channel->num_entries == 0)
{
struct __go_channel_select **pp;
pp = (is_send
? &channel->select_send_queue
: &channel->select_receive_queue);
/* Add an entry to the queue of selects on this channel. */
sc->queue_entry.next = *pp;
sc->queue_entry.selected = selected_pointer;
sc->queue_entry.is_read = selected_for_read_pointer;
*pp = &sc->queue_entry;
}
channel->select_mutex = &__go_select_mutex;
channel->select_cond = &__go_select_cond;
/* We never actually clear the select_mutex and select_cond fields.
In order to clear them safely, we would need to have some way of
knowing when no select is waiting for the channel. Thus we
introduce a bit of inefficiency for every channel that select
needs to wait for. This is harmless other than the performance
cost. */
}
/* Remove the entry for this select waiting on this channel. The
channel is locked. We check both queues, because the channel may
be selected for both reading and writing. */
static void
clear_select_waiting (struct select_channel *sc,
struct __go_channel **selected_pointer)
{
struct __go_channel *channel = sc->channel;
if (channel->num_entries == 0)
{
_Bool found;
struct __go_channel_select **pp;
found = 0;
for (pp = &channel->select_send_queue; *pp != NULL; pp = &(*pp)->next)
{
if ((*pp)->selected == selected_pointer)
{
*pp = (*pp)->next;
found = 1;
break;
}
}
for (pp = &channel->select_receive_queue; *pp != NULL; pp = &(*pp)->next)
{
if ((*pp)->selected == selected_pointer)
{
*pp = (*pp)->next;
found = 1;
break;
}
}
__go_assert (found);
}
}
/* Look through the list of channels to see which ones are ready.
Lock each channels, and set the is_ready flag. Return the number
of ready channels. */
static size_t
lock_channels_find_ready (struct select_channel *channels, size_t count)
{
size_t ready_count;
size_t i;
ready_count = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
size_t dup_index = channels[i].dup_index;
int x;
if (channel == NULL)
continue;
if (dup_index != (size_t) -1UL)
{
if (channels[dup_index].is_ready)
{
channels[i].is_ready = 1;
++ready_count;
}
continue;
}
x = pthread_mutex_lock (&channel->lock);
__go_assert (x == 0);
if (is_channel_ready (channel, is_send))
{
channels[i].is_ready = 1;
++ready_count;
}
}
return ready_count;
}
/* The channel we are going to select has been forced by some other
goroutine. SELECTED_CHANNEL is the channel we will use,
SELECTED_FOR_READ is whether the other goroutine wants to read from
the channel. Note that the channel could be specified multiple
times in this select, so we must mark each appropriate entry for
this channel as ready. Every other channel is marked as not ready.
All the channels are locked before this routine is called. This
returns the number of ready channels. */
size_t
force_selected_channel_ready (struct select_channel *channels, size_t count,
struct __go_channel *selected_channel,
_Bool selected_for_read)
{
size_t ready_count;
size_t i;
ready_count = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
if (channel == NULL)
continue;
if (channel != selected_channel
|| (is_send ? !selected_for_read : selected_for_read))
channels[i].is_ready = 0;
else
{
channels[i].is_ready = 1;
++ready_count;
}
}
__go_assert (ready_count > 0);
return ready_count;
}
/* Unlock all the channels. */
static void
unlock_channels (struct select_channel *channels, size_t count)
{
size_t i;
int x;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
if (channel == NULL)
continue;
if (channels[i].dup_index != (size_t) -1UL)
continue;
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
}
/* At least one channel is ready. Randomly pick a channel to return.
Unlock all the channels. IS_SELECTED is true if the channel was
picked for us by some other goroutine. If SELECTED_POINTER is not
NULL, remove it from the queue for all the channels. Return the
retval field of the selected channel. This will return 0 if we
can't use the selected channel, because it relied on synchronizing
with some other select, and that select already synchronized with a
different channel. */
static size_t
unlock_channels_and_select (struct select_channel *channels,
size_t count, size_t ready_count,
_Bool is_selected,
struct __go_channel **selected_pointer)
{
size_t selected;
size_t ret;
_Bool needs_broadcast;
size_t i;
int x;
/* Pick which channel we are going to return. */
#if defined(HAVE_RANDOM)
selected = (size_t) random () % ready_count;
#else
selected = (size_t) rand () % ready_count;
#endif
ret = 0;
needs_broadcast = 0;
/* Look at the channels in reverse order so that we don't unlock a
duplicated channel until we have seen all its dups. */
for (i = 0; i < count; ++i)
{
size_t j = count - i - 1;
struct __go_channel *channel = channels[j].channel;
_Bool is_send = channels[j].is_send;
if (channel == NULL)
continue;
if (channels[j].is_ready)
{
if (selected == 0)
{
if (mark_channel_selected (channel, is_send, is_selected,
&needs_broadcast))
ret = channels[j].retval;
}
--selected;
}
if (channels[j].dup_index == (size_t) -1UL)
{
if (selected_pointer != NULL)
clear_select_waiting (&channels[j], selected_pointer);
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
}
/* The NEEDS_BROADCAST variable is set if we are synchronizing with
some other select statement. We can't do the actual broadcast
until we have unlocked all the channels. */
if (needs_broadcast)
{
x = pthread_mutex_lock (&__go_select_mutex);
__go_assert (x == 0);
x = pthread_cond_broadcast (&__go_select_cond);
__go_assert (x == 0);
x = pthread_mutex_unlock (&__go_select_mutex);
__go_assert (x == 0);
}
return ret;
}
/* Mark all channels to show that we are waiting for them. This is
called with the select mutex held, but none of the channels are
locked. This returns true if some channel was found to be
ready. */
static _Bool
mark_all_channels_waiting (struct select_channel* channels, size_t count,
struct __go_channel **selected_pointer,
_Bool *selected_for_read_pointer)
{
_Bool ret;
int x;
size_t i;
ret = 0;
for (i = 0; i < count; ++i)
{
struct __go_channel *channel = channels[i].channel;
_Bool is_send = channels[i].is_send;
if (channel == NULL)
continue;
if (channels[i].dup_index != (size_t) -1UL)
{
size_t j;
/* A channel may be selected for both read and write. */
if (channels[channels[i].dup_index].is_send != is_send)
{
for (j = channels[i].dup_index + 1; j < i; ++j)
{
if (channels[j].channel == channel
&& channels[j].is_send == is_send)
break;
}
if (j < i)
continue;
}
}
x = pthread_mutex_lock (&channel->lock);
__go_assert (x == 0);
/* To avoid a race condition, we have to check again whether the
channel is ready. It may have become ready since we did the
first set of checks but before we acquired the select mutex.
If we don't check here, we could sleep forever on the select
condition variable. */
if (is_channel_ready (channel, is_send))
ret = 1;
/* If SELECTED_POINTER is NULL, then we have already marked the
channel as waiting. */
if (selected_pointer != NULL)
mark_select_waiting (&channels[i], selected_pointer,
selected_for_read_pointer);
x = pthread_mutex_unlock (&channel->lock);
__go_assert (x == 0);
}
return ret;
}
/* Implement select. This is called by the compiler-generated code
with pairs of arguments: a pointer to a channel, and an int which
is non-zero for send, zero for receive. */
size_t
__go_select (size_t count, _Bool has_default,
struct __go_channel **channel_args, _Bool *is_send_args)
{
struct select_channel stack_buffer[16];
struct select_channel *allocated_buffer;
struct select_channel *channels;
size_t i;
int x;
struct __go_channel *selected_channel;
_Bool selected_for_read;
_Bool is_queued;
if (count < sizeof stack_buffer / sizeof stack_buffer[0])
{
channels = &stack_buffer[0];
allocated_buffer = NULL;
}
else
{
allocated_buffer = ((struct select_channel *)
malloc (count * sizeof (struct select_channel)));
channels = allocated_buffer;
}
for (i = 0; i < count; ++i)
{
struct __go_channel *channel_arg = channel_args[i];
_Bool is_send = is_send_args[i];
channels[i].channel = (struct __go_channel*) channel_arg;
channels[i].retval = i + 1;
channels[i].dup_index = (size_t) -1UL;
channels[i].queue_entry.next = NULL;
channels[i].queue_entry.selected = NULL;
channels[i].is_send = is_send;
channels[i].is_ready = 0;
}
qsort (channels, count, sizeof (struct select_channel), channel_sort);
for (i = 0; i < count; ++i)
{
size_t j;
for (j = 0; j < i; ++j)
{
if (channels[j].channel == channels[i].channel)
{
channels[i].dup_index = j;
break;
}
}
}
/* SELECT_CHANNEL is used to select synchronized channels. If no
channels are ready, we store a pointer to this variable on the
select queue for each synchronized channel. Because the variable
may be set by channel operations running in other goroutines,
SELECT_CHANNEL may only be accessed when all the channels are
locked and/or when the select_data_mutex is locked. */
selected_channel = NULL;
/* SELECTED_FOR_READ is set to true if SELECTED_CHANNEL was set by a
goroutine which wants to read from the channel. The access
restrictions for this are like those for SELECTED_CHANNEL. */
selected_for_read = 0;
/* IS_QUEUED is true if we have queued up this select on the queues
for any associated synchronous channels. We only do this if no
channels are ready the first time around the loop. */
is_queued = 0;
while (1)
{
int ready_count;
_Bool is_selected;
/* Lock all channels, identify which ones are ready. */
ready_count = lock_channels_find_ready (channels, count);
/* All the channels are locked, so we can look at
SELECTED_CHANNEL. If it is not NULL, then our choice has
been forced by some other goroutine. This can only happen
after the first time through the loop. */
is_selected = selected_channel != NULL;
if (is_selected)
ready_count = force_selected_channel_ready (channels, count,
selected_channel,
selected_for_read);
if (ready_count > 0)
{
size_t ret;
ret = unlock_channels_and_select (channels, count, ready_count,
is_selected,
(is_queued
? &selected_channel
: NULL));
/* If RET is zero, it means that the channel we picked
turned out not to be ready, because some other select
grabbed it during our traversal. Loop around and try
again. */
if (ret == 0)
{
is_queued = 0;
/* We are no longer on any channel queues, so it is safe
to touch SELECTED_CHANNEL here. It must be NULL,
because otherwise that would somebody has promised to
synch up with us and then failed to do so. */
__go_assert (selected_channel == NULL);
continue;
}
if (allocated_buffer != NULL)
free (allocated_buffer);
return ret;
}
/* No channels were ready. */
unlock_channels (channels, count);
if (has_default)
{
/* Use the default clause. */
if (allocated_buffer != NULL)
free (allocated_buffer);
return 0;
}
/* This is a blocking select. Grab the select lock, tell all
the channels to notify us when something happens, and wait
for something to happen. */
x = pthread_mutex_lock (&__go_select_mutex);
__go_assert (x == 0);
/* Check whether CHANNEL_SELECTED was set while the channels
were unlocked. If it was set, then we can simply loop around
again. We need to check this while the select mutex is held.
It is possible that something will set CHANNEL_SELECTED while
we mark the channels as waiting. If this happens, that
goroutine is required to signal the select condition
variable, which means acquiring the select mutex. Since we
have the select mutex locked ourselves, we can not miss that
signal. */
x = pthread_mutex_lock (&__go_select_data_mutex);
__go_assert (x == 0);
is_selected = selected_channel != NULL;
x = pthread_mutex_unlock (&__go_select_data_mutex);
__go_assert (x == 0);
if (!is_selected)
{
/* Mark the channels as waiting, and check whether they have
become ready. */
if (!mark_all_channels_waiting (channels, count,
(is_queued
? NULL
: &selected_channel),
(is_queued
? NULL
: &selected_for_read)))
{
x = pthread_cond_wait (&__go_select_cond, &__go_select_mutex);
__go_assert (x == 0);
}
is_queued = 1;
}
x = pthread_mutex_unlock (&__go_select_mutex);
__go_assert (x == 0);
}
}