S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_spsc_fifo_queue.h

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  * 
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00034 #ifndef S3_SPSC_FIFO_QUEUE_H
00035 #define S3_SPSC_FIFO_QUEUE_H
00036 
00037 #include <pthread.h>
00038 #include <semaphore.h>
00039 
00040 #include <s3fc/s3_macros.h>
00041 #include <s3fc/s3_inplace_fifo_base.h>
00042 
00048 template <class T>
00049 class s3_spsc_fifo_queue : public s3_inplace_fifo_base<T>
00050 {
00051   private:
00053    unsigned int buffer_size;
00055    T* buffer;
00060    unsigned int c_size;
00062    unsigned int input_index;
00064    unsigned int output_index;
00066    pthread_mutex_t access_lock_a;
00068    pthread_mutex_t access_lock_p;
00070    pthread_mutex_t access_lock_c;
00072    sem_t sem_read;
00074    sem_t sem_write;
00075   protected:
00076    // critical section of empty()
00077    bool crit_empty() const
00078    {
00079       return (c_size == 0);        
00080    }
00081    // critical section of full()
00082    bool crit_full() const
00083    {
00084       return (c_size == buffer_size);     
00085    }
00086    // critical section of open_input()
00087    T* crit_open_input()
00088    {
00089       // Wait if buffers are full
00090       sem_wait(&sem_write);
00091       // Return a pointer to the next buffer available for production.
00092       return (buffer+input_index);     
00093    }
00094    // critical section of open_output()
00095    T* crit_open_output()
00096    {
00097       // Wait if buffers are empty
00098       sem_wait(&sem_read);
00099       // Return a pointer to the next buffer available for consumption.
00100       return (buffer+output_index);     
00101    }
00102    
00103   public:
00109    s3_spsc_fifo_queue(unsigned int n_buffer_size = 5) :
00110       buffer_size(n_buffer_size),
00111       buffer(0),
00112       c_size(0),
00113       input_index(0),
00114       output_index(0)
00115    {
00116       // Allocate buffer.
00117       buffer = new T[buffer_size];
00118       input_index = output_index = 0;
00119       // Init mutex
00120       pthread_mutex_init(&access_lock_a, 0);
00121       pthread_mutex_init(&access_lock_p, 0);
00122       pthread_mutex_init(&access_lock_c, 0);
00123       // Init semaphores.
00124       sem_init(&sem_write, 0, buffer_size);
00125       sem_init(&sem_read, 0, 0);
00126    }
00127    
00132    virtual ~s3_spsc_fifo_queue()
00133    {
00134       // Free willy
00135       delete [] buffer;
00136       // and the other fish
00137       sem_destroy(&sem_write);
00138       sem_destroy(&sem_read);
00139       pthread_mutex_destroy(&access_lock_a);
00140       pthread_mutex_destroy(&access_lock_p);
00141       pthread_mutex_destroy(&access_lock_c);
00142    }
00147    bool empty() const
00148    {
00149       bool is_empty;
00150       pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00151       is_empty = crit_empty();
00152       pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00153       return is_empty;
00154    }
00159    bool full() const
00160    {
00161       bool is_full;
00162       pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00163       is_full = crit_full();
00164       pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00165       return is_full;
00166    }
00171    unsigned int size() const
00172    {
00173       unsigned int s;
00174       pthread_mutex_lock(NONCONST_MUTEXP(&access_lock_a));
00175       s = c_size;
00176       pthread_mutex_unlock(NONCONST_MUTEXP(&access_lock_a));
00177       return s;
00178    }
00185    T* open_input()
00186    {
00187       T* temp;
00188       pthread_mutex_lock(&access_lock_p);     
00189       temp = crit_open_input();
00190       pthread_mutex_unlock(&access_lock_p);
00191       return temp;
00192    } 
00196    T* nbl_open_input()
00197    {
00198       T* temp;
00199       pthread_mutex_lock(&access_lock_p);     
00200       if ( crit_full() )   // return null pointer
00201       {
00202     temp = 0;
00203       }
00204       else
00205       {
00206     temp = crit_open_input();
00207       }
00208       pthread_mutex_unlock(&access_lock_p);
00209       return temp;
00210    }
00211    
00216    void close_input(const T*t = 0)
00217    {
00218       pthread_mutex_lock(&access_lock_p);
00219       // Advance input pointer and wrap around if needed
00220       if (++input_index >= buffer_size)
00221       {
00222     input_index = 0;
00223       }
00224       c_size++;
00225       sem_post(&sem_read);
00226       notify_consumers();  // let all the consumers know
00227       pthread_mutex_unlock(&access_lock_p);
00228    }
00235    T* open_output()
00236    {
00237       T* temp;
00238       pthread_mutex_lock(&access_lock_c);
00239       temp = crit_open_output();
00240       pthread_mutex_unlock(&access_lock_c);
00241       return temp;
00242    }
00246    T* nbl_open_output()
00247    {
00248       T* temp;
00249       pthread_mutex_lock(&access_lock_p);     
00250       if ( crit_empty() )  // return null pointer
00251       {
00252     temp = 0;
00253       }
00254       else
00255       {
00256     temp = crit_open_output();   // this never blocks
00257       }
00258       pthread_mutex_unlock(&access_lock_p);
00259       return temp;
00260    }
00265    void close_output(const T*t = 0)
00266    {
00267       pthread_mutex_lock(&access_lock_c);
00268       // Advance output pointer and wrap around if needed
00269       if (++output_index >= buffer_size)
00270       {
00271     output_index = 0;
00272       }
00273       c_size--;
00274       sem_post(&sem_write);
00275       notify_producers();  // let all the producers know
00276       pthread_mutex_unlock(&access_lock_c);
00277    }
00278 };
00279 
00280 #endif  // S3_SPSC_FIFO_QUEUE_H

Send comments to: s3fc@stonethree.com SourceForge Logo