| /* go-rec-small.c -- receive something smaller than 64 bits on a channel. |
| |
| Copyright 2009 The Go Authors. All rights reserved. |
| Use of this source code is governed by a BSD-style |
| license that can be found in the LICENSE file. */ |
| |
| #include <stdint.h> |
| |
| #include "go-assert.h" |
| #include "go-panic.h" |
| #include "channel.h" |
| |
| /* This mutex controls access to the selected field of struct |
| __go_channel_select. While this mutex is held, no other mutexes |
| may be acquired. */ |
| |
| pthread_mutex_t __go_select_data_mutex = PTHREAD_MUTEX_INITIALIZER; |
| |
| /* Try to synchronize with a select waiting on a sychronized channel. |
| This is used by a send or receive. The channel is locked. This |
| returns true if it was able to synch. */ |
| |
| _Bool |
| __go_synch_with_select (struct __go_channel *channel, _Bool is_send) |
| { |
| struct __go_channel_select *p; |
| int i; |
| |
| __go_assert (channel->num_entries == 0); |
| |
| i = pthread_mutex_lock (&__go_select_data_mutex); |
| __go_assert (i == 0); |
| |
| for (p = (is_send |
| ? channel->select_receive_queue |
| : channel->select_send_queue); |
| p != NULL; |
| p = p->next) |
| { |
| if (*p->selected == NULL) |
| { |
| *p->selected = channel; |
| *p->is_read = !is_send; |
| if (is_send) |
| channel->selected_for_receive = 1; |
| else |
| channel->selected_for_send = 1; |
| break; |
| } |
| } |
| |
| i = pthread_mutex_unlock (&__go_select_data_mutex); |
| __go_assert (i == 0); |
| |
| /* The caller is responsible for signalling the select condition |
| variable so that the other select knows that something has |
| changed. We can't signal it here because we can't acquire the |
| select mutex while we hold a channel lock. */ |
| |
| return p != NULL; |
| } |
| |
| /* If we synch with a select, then we need to signal the select that |
| something has changed. This requires grabbing the select mutex, |
| which can only be done when the channel is unlocked. This routine |
| does the signalling. It is called with the channel locked. It |
| unlocks the channel, broadcasts the signal and relocks the |
| channel. */ |
| |
| void |
| __go_broadcast_to_select (struct __go_channel *channel) |
| { |
| pthread_mutex_t *select_mutex; |
| pthread_cond_t *select_cond; |
| int i; |
| |
| select_mutex = channel->select_mutex; |
| select_cond = channel->select_cond; |
| |
| i = pthread_mutex_unlock (&channel->lock); |
| __go_assert (i == 0); |
| |
| __go_assert (select_mutex != NULL && select_cond != NULL); |
| |
| i = pthread_mutex_lock (select_mutex); |
| __go_assert (i == 0); |
| |
| i = pthread_cond_broadcast (select_cond); |
| __go_assert (i == 0); |
| |
| i = pthread_mutex_unlock (select_mutex); |
| __go_assert (i == 0); |
| |
| i = pthread_mutex_lock (&channel->lock); |
| __go_assert (i == 0); |
| } |
| |
| /* Prepare to receive something on a channel. Return true if the |
| channel is acquired, false if it is closed. */ |
| |
| _Bool |
| __go_receive_acquire (struct __go_channel *channel, _Bool for_select) |
| { |
| int i; |
| _Bool my_wait_lock; |
| _Bool synched_with_select; |
| |
| my_wait_lock = 0; |
| synched_with_select = 0; |
| |
| i = pthread_mutex_lock (&channel->lock); |
| __go_assert (i == 0); |
| |
| while (1) |
| { |
| _Bool need_broadcast; |
| |
| need_broadcast = 0; |
| |
| /* Check whether the channel is closed. */ |
| if (channel->is_closed |
| && (channel->num_entries == 0 |
| ? channel->next_store == 0 |
| : channel->next_fetch == channel->next_store)) |
| { |
| if (channel->saw_close) |
| { |
| ++channel->closed_op_count; |
| if (channel->closed_op_count >= MAX_CLOSED_OPERATIONS) |
| __go_panic_msg ("too many operations on closed channel"); |
| } |
| channel->saw_close = 1; |
| channel->selected_for_receive = 0; |
| __go_unlock_and_notify_selects (channel); |
| return 0; |
| } |
| |
| /* If somebody else has the channel locked for receiving, we |
| have to wait. If FOR_SELECT is true, then we are the one |
| with the lock. */ |
| if (!channel->selected_for_receive || for_select) |
| { |
| if (channel->num_entries == 0) |
| { |
| /* If somebody else is waiting to receive, we have to |
| wait. */ |
| if (!channel->waiting_to_receive || my_wait_lock) |
| { |
| _Bool was_marked; |
| |
| /* Lock the channel so that we get to receive |
| next. */ |
| was_marked = channel->waiting_to_receive; |
| channel->waiting_to_receive = 1; |
| my_wait_lock = 1; |
| |
| /* See if there is a value to receive. */ |
| if (channel->next_store > 0) |
| return 1; |
| |
| /* If we haven't already done so, try to synch with |
| a select waiting to send on this channel. If we |
| have already synched with a select, we are just |
| looping until the select eventually causes |
| something to be sent. */ |
| if (!synched_with_select && !for_select) |
| { |
| if (__go_synch_with_select (channel, 0)) |
| { |
| synched_with_select = 1; |
| need_broadcast = 1; |
| } |
| } |
| |
| /* If we marked the channel as waiting, we need to |
| signal, because something changed. It needs to |
| be a broadcast since there might be other |
| receivers waiting. */ |
| if (!was_marked) |
| { |
| i = pthread_cond_broadcast (&channel->cond); |
| __go_assert (i == 0); |
| } |
| } |
| } |
| else |
| { |
| /* If there is a value on the channel, we are OK. */ |
| if (channel->next_fetch != channel->next_store) |
| return 1; |
| } |
| } |
| |
| /* If we just synched with a select, then we need to signal the |
| select condition variable. We can only do that if we unlock |
| the channel. So we need to unlock, signal, lock, and go |
| around the loop again without waiting. */ |
| if (need_broadcast) |
| { |
| __go_broadcast_to_select (channel); |
| continue; |
| } |
| |
| /* Wait for something to change, then loop around and try |
| again. */ |
| |
| i = pthread_cond_wait (&channel->cond, &channel->lock); |
| __go_assert (i == 0); |
| } |
| } |
| |
| /* Finished receiving something on a channel. */ |
| |
| void |
| __go_receive_release (struct __go_channel *channel) |
| { |
| int i; |
| |
| if (channel->num_entries != 0) |
| channel->next_fetch = (channel->next_fetch + 1) % channel->num_entries; |
| else |
| { |
| /* For a synchronous receiver, we tell the sender that we picked |
| up the value by setting the next_store field back to 0. |
| Using the mutexes should implement a memory barrier. */ |
| __go_assert (channel->next_store == 1); |
| channel->next_store = 0; |
| |
| channel->waiting_to_receive = 0; |
| } |
| |
| channel->selected_for_receive = 0; |
| |
| /* This is a broadcast to make sure that a synchronous sender sees |
| it. */ |
| i = pthread_cond_broadcast (&channel->cond); |
| __go_assert (i == 0); |
| |
| __go_unlock_and_notify_selects (channel); |
| } |
| |
| /* Unlock a channel and notify any waiting selects that something |
| happened. */ |
| |
| void |
| __go_unlock_and_notify_selects (struct __go_channel *channel) |
| { |
| pthread_mutex_t* select_mutex; |
| pthread_cond_t* select_cond; |
| int i; |
| |
| select_mutex = channel->select_mutex; |
| select_cond = channel->select_cond; |
| |
| i = pthread_mutex_unlock (&channel->lock); |
| __go_assert (i == 0); |
| |
| if (select_mutex != NULL) |
| { |
| i = pthread_mutex_lock (select_mutex); |
| __go_assert (i == 0); |
| i = pthread_cond_broadcast (select_cond); |
| __go_assert (i == 0); |
| i = pthread_mutex_unlock (select_mutex); |
| __go_assert (i == 0); |
| } |
| } |
| |
| /* Receive something 64 bits or smaller on a channel. */ |
| |
| uint64_t |
| __go_receive_small (struct __go_channel *channel, _Bool for_select) |
| { |
| uint64_t ret; |
| |
| if (channel == NULL) |
| __go_panic_msg ("receive from nil channel"); |
| |
| __go_assert (channel->element_size <= sizeof (uint64_t)); |
| |
| if (!__go_receive_acquire (channel, for_select)) |
| return 0; |
| |
| ret = channel->data[channel->next_fetch]; |
| |
| __go_receive_release (channel); |
| |
| return ret; |
| } |