2 #include "ccan/list/list.h" 4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
5 static VALUE rb_eClosedQueueError;
13 #define MUTEX_ALLOW_TRAP FL_USER1 16 wakeup_one(
struct list_head *head)
20 list_for_each_safe(head, cur, next,
node) {
21 list_del_init(&cur->
node);
32 wakeup_all(
struct list_head *head)
36 list_for_each_safe(head, cur, next,
node) {
37 list_del_init(&cur->
node);
50 struct list_head waitq;
53 #if defined(HAVE_WORKING_FORK) 54 static void rb_mutex_abandon_all(
rb_mutex_t *mutexes);
55 static void rb_mutex_abandon_keeping_mutexes(
rb_thread_t *
th);
84 #define GetMutexPtr(obj, tobj) \ 85 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 87 #define mutex_mark NULL 103 mutex_free(
void *ptr)
108 const char *
err = rb_mutex_unlock_th(mutex, mutex->
th);
109 if (err)
rb_bug(
"%s", err);
115 mutex_memsize(
const void *ptr)
138 mutex_alloc(
VALUE klass)
144 list_head_init(&mutex->
waitq);
155 mutex_initialize(
VALUE self)
163 return mutex_alloc(rb_cMutex);
206 if (mutex->
th == 0) {
211 mutex_locked(th,
self);
247 if (mutex->
th == th) {
253 while (mutex->
th != th) {
256 struct timeval tv = { 0, 100000 };
266 if ((vm_living_thread_num(th->
vm) == th->
vm->
sleeper) &&
273 native_sleep(th, timeout);
279 if (patrol_thread == th)
280 patrol_thread =
NULL;
284 rb_check_deadlock(th->
vm);
291 if (mutex->
th == th) mutex_locked(th,
self);
325 if (mutex->
th == 0) {
326 err =
"Attempt to unlock a mutex which is not locked";
328 else if (mutex->
th != th) {
329 err =
"Attempt to unlock a mutex which is locked by another thread";
336 list_for_each_safe(&mutex->
waitq, cur, next,
node) {
337 list_del_init(&cur->
node);
344 rb_bug(
"unexpected THREAD_STOPPED");
347 rb_bug(
"unexpected THREAD_KILLED");
352 while (*th_mutex != mutex) {
376 err = rb_mutex_unlock_th(mutex,
GET_THREAD());
382 #if defined(HAVE_WORKING_FORK) 401 rb_mutex_abandon_all(mutex);
415 list_head_init(&mutex->
waitq);
421 rb_mutex_sleep_forever(
VALUE time)
423 rb_thread_sleep_deadly_allow_spurious_wakeup();
428 rb_mutex_wait_for(
VALUE time)
441 if (!
NIL_P(timeout)) {
446 if (
NIL_P(timeout)) {
502 rb_mutex_synchronize_m(
VALUE self,
VALUE args)
523 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) 525 struct list_head waitq;
530 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq) 531 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq) 534 int num_waiting_push;
535 struct list_head pushq;
540 queue_mark(
void *ptr)
542 struct rb_queue *q = ptr;
549 queue_memsize(
const void *ptr)
551 return sizeof(
struct rb_queue);
561 queue_alloc(
VALUE klass)
571 static struct rb_queue *
580 #define QUEUE_CLOSED FL_USER5 583 szqueue_mark(
void *ptr)
585 struct rb_szqueue *sq = ptr;
591 szqueue_memsize(
const void *ptr)
593 return sizeof(
struct rb_szqueue);
603 szqueue_alloc(
VALUE klass)
605 struct rb_szqueue *sq;
607 &szqueue_data_type, sq);
613 static struct rb_szqueue *
614 szqueue_ptr(
VALUE obj)
616 struct rb_szqueue *sq;
638 queue_length(
VALUE self,
struct rb_queue *q)
644 queue_closed_p(
VALUE self)
650 raise_closed_queue_error(
VALUE self)
652 rb_raise(rb_eClosedQueueError,
"queue closed");
656 queue_closed_result(
VALUE self,
struct rb_queue *q)
658 assert(queue_length(
self, q) == 0);
702 rb_queue_initialize(
VALUE self)
704 struct rb_queue *q = queue_ptr(
self);
711 queue_do_push(
VALUE self,
struct rb_queue *q,
VALUE obj)
713 if (queue_closed_p(
self)) {
714 raise_closed_queue_error(
self);
753 rb_queue_close(
VALUE self)
755 struct rb_queue *q = queue_ptr(
self);
757 if (!queue_closed_p(
self)) {
774 rb_queue_closed_p(
VALUE self)
792 return queue_do_push(
self, queue_ptr(
self), obj);
796 queue_sleep(
VALUE arg)
798 rb_thread_sleep_deadly_allow_spurious_wakeup();
806 struct rb_szqueue *
sq;
811 queue_sleep_done(
VALUE p)
815 list_del(&qw->
w.
node);
816 qw->
as.
q->num_waiting--;
822 szqueue_sleep_done(
VALUE p)
826 list_del(&qw->
w.
node);
827 qw->
as.
sq->num_waiting_push--;
833 queue_do_pop(
VALUE self,
struct rb_queue *q,
int should_block)
835 check_array(
self, q->que);
841 else if (queue_closed_p(
self)) {
842 return queue_closed_result(
self, q);
848 assert(queue_closed_p(
self) == 0);
852 list_add_tail(&qw.
as.
q->waitq, &qw.
w.
node);
853 qw.
as.
q->num_waiting++;
865 int should_block = 1;
868 should_block = !
RTEST(argv[0]);
888 rb_queue_pop(
int argc,
VALUE *argv,
VALUE self)
890 int should_block = queue_pop_should_block(argc, argv);
891 return queue_do_pop(
self, queue_ptr(
self), should_block);
902 rb_queue_empty_p(
VALUE self)
904 return queue_length(
self, queue_ptr(
self)) == 0 ?
Qtrue :
Qfalse;
914 rb_queue_clear(
VALUE self)
916 struct rb_queue *q = queue_ptr(
self);
932 rb_queue_length(
VALUE self)
934 return LONG2NUM(queue_length(
self, queue_ptr(
self)));
944 rb_queue_num_waiting(
VALUE self)
946 struct rb_queue *q = queue_ptr(
self);
948 return INT2NUM(q->num_waiting);
971 struct rb_szqueue *sq = szqueue_ptr(
self);
999 rb_szqueue_close(
VALUE self)
1001 if (!queue_closed_p(
self)) {
1002 struct rb_szqueue *sq = szqueue_ptr(
self);
1018 rb_szqueue_max_get(
VALUE self)
1020 return LONG2NUM(szqueue_ptr(
self)->max);
1035 struct rb_szqueue *sq = szqueue_ptr(
self);
1040 if (max > sq->max) {
1041 diff = max - sq->max;
1051 szqueue_push_should_block(
int argc,
const VALUE *argv)
1053 int should_block = 1;
1056 should_block = !
RTEST(argv[1]);
1058 return should_block;
1076 rb_szqueue_push(
int argc,
VALUE *argv,
VALUE self)
1078 struct rb_szqueue *sq = szqueue_ptr(
self);
1079 int should_block = szqueue_push_should_block(argc, argv);
1081 while (queue_length(
self, &sq->q) >= sq->max) {
1082 if (!should_block) {
1085 else if (queue_closed_p(
self)) {
1094 list_add_tail(pushq, &qw.
w.
node);
1095 sq->num_waiting_push++;
1101 if (queue_closed_p(
self)) {
1103 raise_closed_queue_error(
self);
1106 return queue_do_push(
self, &sq->q, argv[0]);
1110 szqueue_do_pop(
VALUE self,
int should_block)
1112 struct rb_szqueue *sq = szqueue_ptr(
self);
1113 VALUE retval = queue_do_pop(
self, &sq->q, should_block);
1115 if (queue_length(
self, &sq->q) < sq->max) {
1137 rb_szqueue_pop(
int argc,
VALUE *argv,
VALUE self)
1139 int should_block = queue_pop_should_block(argc, argv);
1140 return szqueue_do_pop(
self, should_block);
1150 rb_szqueue_clear(
VALUE self)
1152 struct rb_szqueue *sq = szqueue_ptr(
self);
1160 rb_szqueue_length(
VALUE self)
1162 struct rb_szqueue *sq = szqueue_ptr(
self);
1164 return LONG2NUM(queue_length(
self, &sq->q));
1174 rb_szqueue_num_waiting(
VALUE self)
1176 struct rb_szqueue *sq = szqueue_ptr(
self);
1178 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1189 rb_szqueue_empty_p(
VALUE self)
1191 struct rb_szqueue *sq = szqueue_ptr(
self);
1193 return queue_length(
self, &sq->q) == 0 ?
Qtrue :
Qfalse;
1200 struct list_head waitq;
1232 condvar_memsize(
const void *ptr)
1244 condvar_ptr(
VALUE self)
1254 condvar_alloc(
VALUE klass)
1260 list_head_init(&cv->
waitq);
1272 rb_condvar_initialize(
VALUE self)
1275 list_head_init(&cv->
waitq);
1287 do_sleep(
VALUE args)
1312 rb_condvar_wait(
int argc,
VALUE *argv,
VALUE self)
1315 VALUE mutex, timeout;
1337 rb_condvar_signal(
VALUE self)
1340 wakeup_one(&cv->
waitq);
1351 rb_condvar_broadcast(
VALUE self)
1354 wakeup_all(&cv->
waitq);
1360 undumpable(
VALUE obj)
1367 alias_global_const(
const char *
name,
VALUE klass)
1373 Init_thread_sync(
void)
1429 rb_define_method(rb_cSizedQueue,
"num_waiting", rb_szqueue_num_waiting, 0);
1444 rb_define_method(rb_cConditionVariable,
"initialize", rb_condvar_initialize, 0);
1449 rb_define_method(rb_cConditionVariable,
"broadcast", rb_condvar_broadcast, 0);
1451 #define ALIAS_GLOBAL_CONST(name) \ 1452 alias_global_const(#name, rb_c##name) #define GetMutexPtr(obj, tobj)
VALUE rb_mutex_lock(VALUE self)
struct timeval rb_time_interval(VALUE num)
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
void rb_bug(const char *fmt,...)
struct rb_mutex_struct * next_mutex
#define RUBY_TYPED_FREE_IMMEDIATELY
#define szqueue_pushq(sq)
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
int rb_block_given_p(void)
Determines if the current method is given a block.
VALUE rb_mutex_trylock(VALUE self)
void rb_raise(VALUE exc, const char *fmt,...)
VALUE rb_ary_shift(VALUE ary)
#define TypedData_Get_Struct(obj, type, data_type, sval)
VALUE rb_mutex_unlock(VALUE self)
struct rb_thread_struct volatile * th
VALUE rb_ary_push(VALUE ary, VALUE item)
VALUE rb_ary_tmp_new(long capa)
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
VALUE rb_ary_clear(VALUE ary)
void rb_define_alloc_func(VALUE, rb_alloc_func_t)
void rb_mutex_allow_trap(VALUE self, int val)
#define FL_TEST_RAW(x, f)
void rb_gc_mark(VALUE ptr)
VALUE rb_ensure(VALUE(*b_proc)(ANYARGS), VALUE data1, VALUE(*e_proc)(ANYARGS), VALUE data2)
An equivalent to ensure clause.
void rb_undef_method(VALUE klass, const char *name)
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
VALUE rb_obj_class(VALUE)
call-seq: obj.class -> class
#define RB_TYPE_P(obj, type)
#define RUBY_VM_CHECK_INTS_BLOCKING(th)
#define RUBY_TYPED_WB_PROTECTED
#define szqueue_waitq(sq)
RUBY_EXTERN VALUE rb_cObject
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
void rb_define_const(VALUE, const char *, VALUE)
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
#define ALIAS_GLOBAL_CONST(name)
struct rb_mutex_struct * keeping_mutexes
VALUE rb_obj_is_mutex(VALUE obj)
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
VALUE rb_mutex_owned_p(VALUE self)
RUBY_EXTERN VALUE rb_cThread
enum rb_thread_status status
#define RB_OBJ_WRITE(a, slot, b)
unsigned long interrupt_mask
#define TypedData_Make_Struct(klass, type, data_type, sval)
#define FL_UNSET_RAW(x, f)
VALUE rb_mutex_locked_p(VALUE self)
#define RUBY_VM_INTERRUPTED(th)
struct rb_mutex_struct rb_mutex_t
union queue_waiter::@119 as
#define RUBY_TYPED_DEFAULT_FREE
PACKED_STRUCT_UNALIGNED(struct rb_queue { struct list_head waitq;const VALUE que;int num_waiting;})
void rb_threadptr_interrupt(rb_thread_t *th)
void rb_define_method(VALUE klass, const char *name, VALUE(*func)(ANYARGS), int argc)
void rb_provide(const char *)
#define Check_TypedStruct(v, t)