00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00034 #ifndef S3_SPSC_FIFO_QUEUE_H
00035 #define S3_SPSC_FIFO_QUEUE_H
00036
00037 #include <pthread.h>
00038 #include <semaphore.h>
00039
00040 #include <s3fc/s3_macros.h>
00041 #include <s3fc/s3_inplace_fifo_base.h>
00042
00048 template <class T>
00049 class s3_spsc_fifo_queue : public s3_inplace_fifo_base<T>
00050 {
00051 private:
00053 unsigned int buffer_size;
00055 T* buffer;
00060 unsigned int c_size;
00062 unsigned int input_index;
00064 unsigned int output_index;
00066 pthread_mutex_t access_lock_a;
00068 pthread_mutex_t access_lock_p;
00070 pthread_mutex_t access_lock_c;
00072 sem_t sem_read;
00074 sem_t sem_write;
00075 protected:
00076
00077 bool crit_empty() const
00078 {
00079 return (c_size == 0);
00080 }
00081
00082 bool crit_full() const
00083 {
00084 return (c_size == buffer_size);
00085 }
00086
00087 T* crit_open_input()
00088 {
00089
00090 sem_wait(&sem_write);
00091
00092 return (buffer+input_index);
00093 }
00094
00095 T* crit_open_output()
00096 {
00097
00098 sem_wait(&sem_read);
00099
00100 return (buffer+output_index);
00101 }
00102
00103 public:
00109 s3_spsc_fifo_queue(unsigned int n_buffer_size = 5) :
00110 buffer_size(n_buffer_size),
00111 buffer(0),
00112 c_size(0),
00113 input_index(0),
00114 output_index(0)
00115 {
00116
00117 buffer = new T[buffer_size];
00118 input_index = output_index = 0;
00119
00120 pthread_mutex_init(&access_lock_a, 0);
00121 pthread_mutex_init(&access_lock_p, 0);
00122 pthread_mutex_init(&access_lock_c, 0);
00123
00124 sem_init(&sem_write, 0, buffer_size);
00125 sem_init(&sem_read, 0, 0);
00126 }
00127
00132 virtual ~s3_spsc_fifo_queue()
00133 {
00134
00135 delete [] buffer;
00136
00137 sem_destroy(&sem_write);
00138 sem_destroy(&sem_read);
00139 pthread_mutex_destroy(&access_lock_a);
00140 pthread_mutex_destroy(&access_lock_p);
00141 pthread_mutex_destroy(&access_lock_c);
00142 }
00147 bool empty() const
00148 {
00149 bool is_empty;
00150 pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00151 is_empty = crit_empty();
00152 pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00153 return is_empty;
00154 }
00159 bool full() const
00160 {
00161 bool is_full;
00162 pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00163 is_full = crit_full();
00164 pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00165 return is_full;
00166 }
00171 unsigned int size() const
00172 {
00173 unsigned int s;
00174 pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00175 s = c_size;
00176 pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00177 return s;
00178 }
00185 T* open_input()
00186 {
00187 T* temp;
00188 pthread_mutex_lock(&access_lock_p);
00189 temp = crit_open_input();
00190 pthread_mutex_unlock(&access_lock_p);
00191 return temp;
00192 }
00196 T* nbl_open_input()
00197 {
00198 T* temp;
00199 pthread_mutex_lock(&access_lock_p);
00200 if ( crit_full() )
00201 {
00202 temp = 0;
00203 }
00204 else
00205 {
00206 temp = crit_open_input();
00207 }
00208 pthread_mutex_unlock(&access_lock_p);
00209 return temp;
00210 }
00211
00216 void close_input(const T*t = 0)
00217 {
00218 pthread_mutex_lock(&access_lock_p);
00219
00220 if (++input_index >= buffer_size)
00221 {
00222 input_index = 0;
00223 }
00224 c_size++;
00225 sem_post(&sem_read);
00226 notify_consumers();
00227 pthread_mutex_unlock(&access_lock_p);
00228 }
00235 T* open_output()
00236 {
00237 T* temp;
00238 pthread_mutex_lock(&access_lock_c);
00239 temp = crit_open_output();
00240 pthread_mutex_unlock(&access_lock_c);
00241 return temp;
00242 }
00246 T* nbl_open_output()
00247 {
00248 T* temp;
00249 pthread_mutex_lock(&access_lock_p);
00250 if ( crit_empty() )
00251 {
00252 temp = 0;
00253 }
00254 else
00255 {
00256 temp = crit_open_output();
00257 }
00258 pthread_mutex_unlock(&access_lock_p);
00259 return temp;
00260 }
00265 void close_output(const T*t = 0)
00266 {
00267 pthread_mutex_lock(&access_lock_c);
00268
00269 if (++output_index >= buffer_size)
00270 {
00271 output_index = 0;
00272 }
00273 c_size--;
00274 sem_post(&sem_write);
00275 notify_producers();
00276 pthread_mutex_unlock(&access_lock_c);
00277 }
00278 };
00279
00280 #endif // S3_SPSC_FIFO_QUEUE_H