#include #include #include #include "queue.h" //#define INC( q, x) ++(x) %= (q)->_queue_size; #define INC( q, x) ({ ++(x); (x) %= (q)->_queue_size; (x); }) //#define pthread_mutex_lock( a) { pthread_mutex_lock( a); printf( "%p: got mutex\n", pthread_self()); fflush( stdout); } static queue_elem_t * queue_create_elem( process_func_t func_, void * data_) { queue_elem_t * qe = (queue_elem_t *)malloc( sizeof( queue_elem_t)); if ( !qe) return NULL; qe->_data = data_; qe->_process = func_; qe->_done = 0; pthread_mutex_init( &qe->_elem_lock, NULL); return qe; } static queue_elem_t * queue_get_next( queue_t * q_) { size_t head; queue_elem_t * qe = NULL; pthread_mutex_lock( &q_->_queue_lock); while ( q_->_size != q_->_queue_size && q_->_next == q_->_head && !q_->_quit) pthread_cond_wait( &q_->_queue_insert, &q_->_queue_lock); if ( !q_->_quit) { qe = q_->_queue[ q_->_next]; INC( q_, q_->_next); } pthread_mutex_unlock( &q_->_queue_lock); return qe; } static void * queue_worker( void * arg_) { queue_t * q = (queue_t *)arg_; while ( !q->_quit) { queue_elem_t * qe = queue_get_next( q); if ( !qe) continue; pthread_mutex_lock( &qe->_elem_lock); if ( qe->_process && qe->_data) qe->_process( qe->_data); qe->_done = 1; pthread_cond_signal( &q->_elem_done); pthread_mutex_unlock( &qe->_elem_lock); } return NULL; } static void * queue_reaper( void * arg_) { queue_t * q = (queue_t *)arg_; while ( !q->_quit) { queue_elem_t * qe = NULL; pthread_mutex_lock( &q->_queue_lock); while ( q->_tail == q->_next && !q->_quit) pthread_cond_wait( &q->_elem_done, &q->_queue_lock); if ( !q->_quit) { qe = q->_queue[ q->_tail]; q->_queue[ q->_tail] = NULL; INC( q, q->_tail); --q->_size; pthread_cond_signal( &q->_queue_remove); } pthread_mutex_unlock( &q->_queue_lock); if ( !qe) continue; // XXX: should sync if _elem_done was not tail elem. pthread_mutex_lock( &qe->_elem_lock); assert( qe->_done); if ( !qe->_process && !qe->_data) q->_last = 1; else q->_reap( qe->_data); pthread_mutex_unlock( &qe->_elem_lock); } return NULL; } void queue_insert( queue_t * q_, process_func_t func_, void * data_) { size_t head; queue_elem_t * qe = queue_create_elem( func_, data_); if ( !qe) return; pthread_mutex_lock( &q_->_queue_lock); while ( q_->_size && q_->_tail == q_->_head && !q_->_quit) pthread_cond_wait( &q_->_queue_remove, &q_->_queue_lock); if ( !q_->_quit) { q_->_last = 0; assert( q_->_queue[ q_->_head] == NULL); q_->_queue[ q_->_head] = qe; INC( q_, q_->_head); ++q_->_size; pthread_cond_signal( &q_->_queue_insert); } pthread_mutex_unlock( &q_->_queue_lock); } void queue_insert_last( queue_t * q_) { queue_insert( q_, NULL, NULL); } void queue_wait_last( queue_t * q_) { pthread_mutex_lock( &q_->_queue_lock); while ( !q_->_last) pthread_cond_wait( &q_->_queue_remove, &q_->_queue_lock); pthread_mutex_unlock( &q_->_queue_lock); } queue_t * queue_create( size_t queue_size_, size_t workers_, process_func_t reaper_) { queue_t * q; int i; int rc; if ( workers_ < 1 || queue_size_ < workers_ || !reaper_) return NULL; q = (queue_t *)malloc( sizeof( queue_t)); if ( !q) return NULL; q->_queue_size = queue_size_; q->_queue = (queue_elem_t **)malloc( queue_size_ * sizeof( queue_elem_t *)); if ( !q->_queue) { free( q); return NULL; } for ( i=0; i_queue[i] = NULL; q->_head = q->_tail = q->_next = q->_size = 0; q->_quit = 0; q->_reap = reaper_; q->_last = 0; pthread_mutex_init( &q->_queue_lock, NULL); pthread_cond_init( &q->_queue_insert, NULL); pthread_cond_init( &q->_queue_remove, NULL); pthread_cond_init( &q->_elem_done, NULL); q->_workers = workers_; q->_tid_workers = (pthread_t *)malloc( workers_ * sizeof( pthread_t)); if ( !q->_tid_workers) { free( q->_queue); free( q); return NULL; } pthread_mutex_lock( &q->_queue_lock); for ( i=0; i_tid_workers[i], NULL, queue_worker, (void *)q); pthread_create( &q->_tid_reaper, NULL, queue_reaper, (void *)q); pthread_mutex_unlock( &q->_queue_lock); return q; } void queue_destruct( queue_t * q_) { int i; q_->_quit = 1; for ( i=0; i_workers; i++) { pthread_cond_broadcast( &q_->_queue_insert); pthread_join( q_->_tid_workers[i], NULL); } pthread_cond_signal( &q_->_elem_done); pthread_join( q_->_tid_reaper, NULL); pthread_mutex_destroy( &q_->_queue_lock); pthread_cond_destroy( &q_->_queue_insert); pthread_cond_destroy( &q_->_queue_remove); pthread_cond_destroy( &q_->_elem_done); free( q_->_tid_workers); free( q_->_queue); free( q_); }