00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00034 #ifndef S3_FIFO_QUEUE_H
00035 #define S3_FIFO_QUEUE_H
00036
00037 #include <pthread.h>
00038
00039 #include <s3fc/s3_inplace_fifo_base.h>
00040 #include <s3fc/s3_macros.h>
00041
00042 #if 0
00043 #define DEBUG_STR(x) x
00044 #include <iostream>
00045 #else
00046 #define DEBUG_STR(x)
00047 #endif
00048
00049
00060 template<typename T>
00061 class s3_fifo_queue : public s3_inplace_fifo_base<T>
00062 {
00063 protected:
00067 unsigned int num_slots;
00071 pthread_mutex_t state_lock;
00076 bool* p_rdy;
00081 bool* c_rdy;
00086 pthread_cond_t p_rdy_changed;
00091 pthread_cond_t c_rdy_changed;
00095 unsigned int p_candidate;
00099 unsigned int c_candidate;
00103 T* slots;
00104 public:
00109 s3_fifo_queue(unsigned int n_num_slots) :
00110 num_slots(n_num_slots),
00111 p_rdy(new bool[n_num_slots]),
00112 c_rdy(new bool[n_num_slots]),
00113 p_candidate(0),
00114 c_candidate(0),
00115 slots(new T[n_num_slots])
00116 {
00117
00118 for ( unsigned int i = 0; i < n_num_slots; i++ )
00119 {
00120 p_rdy[i] = true;
00121 c_rdy[i] = false;
00122 }
00123
00124 pthread_mutex_init(&state_lock, 0);
00125 pthread_cond_init(&p_rdy_changed, 0);
00126 pthread_cond_init(&c_rdy_changed, 0);
00127 }
00132 virtual ~s3_fifo_queue()
00133 {
00134 pthread_mutex_destroy(&state_lock);
00135 pthread_cond_destroy(&p_rdy_changed);
00136 pthread_cond_destroy(&c_rdy_changed);
00137 delete[] slots;
00138 delete[] p_rdy;
00139 delete[] c_rdy;
00140 }
00146 bool empty() const
00147 {
00148 bool isempty = true;
00149 pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00150 isempty = (c_rdy[c_candidate] == false);
00151 pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00152 return isempty;
00153 }
00159 bool full() const
00160 {
00161 bool isfull = true;
00162 pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00163 isfull = (p_rdy[p_candidate] == false);
00164 pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00165 return isfull;
00166 }
00172 unsigned int size() const
00173 {
00174 unsigned int cnt = 0;
00175 pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00176 for (unsigned int i = 0; i < num_slots; i++)
00177 {
00178 if (c_rdy[i]) cnt++;
00179 }
00180 pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00181 return cnt;
00182 }
00183 protected:
00213 T* open_input(bool blocking)
00214 {
00215 T* ptr = 0;
00216
00217 pthread_cleanup_push(s3_fifo_base<T>::unlock_mutex, &state_lock);
00218 pthread_mutex_lock(&state_lock);
00219 while (true)
00220 {
00221 if ( p_rdy[p_candidate] )
00222 {
00223 p_rdy[p_candidate] = false;
00224 ptr = &slots[p_candidate];
00225 p_candidate = (p_candidate+1) % num_slots;
00226 break;
00227 }
00228 else
00229 {
00230 if (blocking)
00231 {
00232
00233 pthread_cond_wait(&p_rdy_changed, &state_lock);
00234 }
00235 else
00236 {
00237 ptr = 0;
00238 break;
00239 }
00240 }
00241 }
00242 DEBUG_STR( show_status("P", ptr-slots) );
00243
00244 pthread_cleanup_pop(0);
00245 pthread_mutex_unlock(&state_lock);
00246
00247 return ptr;
00248 }
00249 public:
00253 T* open_input()
00254 {
00255 return open_input(true);
00256 }
00260 T* nbl_open_input()
00261 {
00262 return open_input(false);
00263 }
00275 void close_input(const T* ptr)
00276 {
00277
00278 if ( ! ptr )
00279 {
00280 return;
00281 }
00282 pthread_mutex_lock(&state_lock);
00283 p_rdy[ptr - slots] = false;
00284 c_rdy[ptr - slots] = true;
00285 pthread_cond_broadcast(&c_rdy_changed);
00286 s3_fifo_base<T>::notify_consumers();
00287 DEBUG_STR( show_status("p", ptr-slots) );
00288 pthread_mutex_unlock(&state_lock);
00289 }
00290 protected:
00316 T* open_output(bool blocking)
00317 {
00318 T* ptr = 0;
00319
00320 pthread_cleanup_push(s3_fifo_base<T>::unlock_mutex, &state_lock);
00321 pthread_mutex_lock(&state_lock);
00322
00323 while (true)
00324 {
00325 if ( c_rdy[c_candidate] )
00326 {
00327 c_rdy[c_candidate] = false;
00328 ptr = &slots[c_candidate];
00329 c_candidate = (c_candidate+1) % num_slots;
00330 break;
00331 }
00332 else
00333 {
00334 if (blocking)
00335 {
00336
00337 pthread_cond_wait(&c_rdy_changed, &state_lock);
00338 }
00339 else
00340 {
00341 ptr = 0;
00342 break;
00343 }
00344 }
00345 }
00346
00347 DEBUG_STR( show_status("C", ptr-slots) );
00348
00349 pthread_cleanup_pop(0);
00350 pthread_mutex_unlock(&state_lock);
00351
00352 return ptr;
00353 }
00354 public:
00358 T* open_output()
00359 {
00360 return open_output(true);
00361 }
00365 T* nbl_open_output()
00366 {
00367 return open_output(false);
00368 }
00380 void close_output(const T* ptr)
00381 {
00382
00383 if ( ! ptr )
00384 {
00385 return;
00386 }
00387 pthread_mutex_lock(&state_lock);
00388 c_rdy[ptr - slots] = false;
00389 p_rdy[ptr - slots] = true;
00390 pthread_cond_broadcast(&p_rdy_changed);
00391 s3_fifo_base<T>::notify_producers();
00392 DEBUG_STR( show_status("c", ptr-slots) );
00393 pthread_mutex_unlock(&state_lock);
00394 }
00404 void show_status(const char* str = 0, int slot = -1) const
00405 {
00406 std::cerr << std::endl << std::flush;
00407 if ( str ) std::cerr << str << std::flush;
00408 if ( slot != -1) std::cerr << "[" << slot << "]" << std::flush;
00409 std::cerr << " " << std::flush;
00410 for (unsigned int i = 0; i < num_slots; i++)
00411 {
00412 if (p_rdy[i])
00413 {
00414 std::cerr << ((i == p_candidate) ? 'P' : 'p') << std::flush;
00415 }
00416 else if (c_rdy[i])
00417 {
00418 std::cerr << (( i == c_candidate) ? 'C' : 'c') << std::flush;
00419 }
00420 else
00421 {
00422 std::cerr << '!' << std::flush;
00423 }
00424 }
00425 }
00426
00430 unsigned int get_capacity( void ) const
00431 {
00432 return num_slots;
00433 }
00434 };
00435
00436 #endif