S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_fifo_queue.h

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  * 
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00034 #ifndef S3_FIFO_QUEUE_H
00035 #define S3_FIFO_QUEUE_H
00036 
00037 #include <pthread.h>
00038 
00039 #include <s3fc/s3_inplace_fifo_base.h>
00040 #include <s3fc/s3_macros.h>
00041 
00042 #if 0
00043 #define DEBUG_STR(x) x
00044 #include <iostream>
00045 #else
00046 #define DEBUG_STR(x) 
00047 #endif
00048 
00049 
00060 template<typename T>
00061 class s3_fifo_queue : public s3_inplace_fifo_base<T>
00062 {
00063   protected:
00067    unsigned int num_slots;
00071    pthread_mutex_t state_lock;
00076    bool* p_rdy;
00081    bool* c_rdy;
00086    pthread_cond_t p_rdy_changed;
00091    pthread_cond_t c_rdy_changed;
00095    unsigned int p_candidate;
00099    unsigned int c_candidate;
00103    T* slots;
00104   public:
00109    s3_fifo_queue(unsigned int n_num_slots) :
00110       num_slots(n_num_slots),
00111       p_rdy(new bool[n_num_slots]),
00112       c_rdy(new bool[n_num_slots]),
00113       p_candidate(0),
00114       c_candidate(0),
00115       slots(new T[n_num_slots])
00116    {
00117       // init flags
00118       for ( unsigned int i = 0; i < n_num_slots; i++ ) 
00119       {
00120     p_rdy[i] = true;
00121     c_rdy[i] = false;
00122       }
00123       // init threadstuff
00124       pthread_mutex_init(&state_lock, 0);
00125       pthread_cond_init(&p_rdy_changed, 0);
00126       pthread_cond_init(&c_rdy_changed, 0);
00127    }
00132    virtual ~s3_fifo_queue()
00133    {
00134       pthread_mutex_destroy(&state_lock);
00135       pthread_cond_destroy(&p_rdy_changed);
00136       pthread_cond_destroy(&c_rdy_changed);
00137       delete[] slots;
00138       delete[] p_rdy;
00139       delete[] c_rdy;
00140    }
00146    bool empty() const 
00147    {
00148       bool isempty = true;
00149       pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00150       isempty = (c_rdy[c_candidate] == false);
00151       pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00152       return isempty;
00153    }
00159    bool full() const
00160    {
00161       bool isfull = true;
00162       pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00163       isfull = (p_rdy[p_candidate] == false);
00164       pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00165       return isfull;
00166    }
00172    unsigned int size() const 
00173    {
00174       unsigned int cnt = 0;
00175       pthread_mutex_lock(NONCONST_MUTEXP(&state_lock));
00176       for (unsigned int i = 0; i < num_slots; i++) 
00177       {
00178     if (c_rdy[i]) cnt++;
00179       }
00180       pthread_mutex_unlock(NONCONST_MUTEXP(&state_lock));
00181       return cnt;
00182    }
00183   protected:
00213    T* open_input(bool blocking) 
00214    {
00215       T* ptr = 0;
00216       // set unlock handler and lock the state
00217       pthread_cleanup_push(s3_fifo_base<T>::unlock_mutex, &state_lock);
00218       pthread_mutex_lock(&state_lock);
00219       while (true)
00220       {
00221     if ( p_rdy[p_candidate] ) 
00222     {
00223        p_rdy[p_candidate] = false;
00224        ptr = &slots[p_candidate];
00225        p_candidate = (p_candidate+1) % num_slots;
00226        break;
00227     }
00228     else 
00229     {
00230        if (blocking)
00231        {
00232           // wait on p_rdy changed
00233           pthread_cond_wait(&p_rdy_changed, &state_lock);
00234        }
00235        else
00236        {
00237           ptr = 0;
00238           break;
00239        }
00240     }
00241       }
00242       DEBUG_STR( show_status("P", ptr-slots) );
00243       // remove handler AND unlock mutex
00244       pthread_cleanup_pop(0);
00245       pthread_mutex_unlock(&state_lock);
00246       // return slot data
00247       return ptr;
00248    }
00249   public:
00253    T* open_input()
00254    {
00255       return open_input(true);
00256    }
00260    T* nbl_open_input()
00261    {
00262       return open_input(false);
00263    }
00275    void close_input(const T* ptr) 
00276    {
00277       // return on null pointer
00278       if ( ! ptr ) 
00279       {
00280     return;
00281       }
00282       pthread_mutex_lock(&state_lock);
00283       p_rdy[ptr - slots] = false;
00284       c_rdy[ptr - slots] = true;
00285       pthread_cond_broadcast(&c_rdy_changed);
00286       s3_fifo_base<T>::notify_consumers();   // let all the consumers know
00287       DEBUG_STR( show_status("p", ptr-slots) );
00288       pthread_mutex_unlock(&state_lock);
00289    }
00290   protected:
00316    T* open_output(bool blocking) 
00317    {
00318       T* ptr = 0;
00319       // set unlock handler and lock the state
00320       pthread_cleanup_push(s3_fifo_base<T>::unlock_mutex, &state_lock);
00321       pthread_mutex_lock(&state_lock);
00322       // test availability
00323       while (true)
00324       {
00325     if ( c_rdy[c_candidate] ) 
00326     {
00327        c_rdy[c_candidate] = false;
00328        ptr = &slots[c_candidate];
00329        c_candidate = (c_candidate+1) % num_slots;
00330        break;
00331     }
00332     else 
00333     {
00334        if (blocking)
00335        {
00336           // wait on c_rdy changed
00337           pthread_cond_wait(&c_rdy_changed, &state_lock);
00338        }
00339        else
00340        {
00341           ptr = 0;
00342           break;
00343        }
00344     }
00345       }
00346       // we're done - give lock back
00347       DEBUG_STR( show_status("C", ptr-slots) );
00348       // remove handler AND unlock mutex
00349       pthread_cleanup_pop(0);
00350       pthread_mutex_unlock(&state_lock);
00351       // return slot data
00352       return ptr;
00353    }
00354   public:
00358    T* open_output()
00359    {
00360       return open_output(true);
00361    }
00365    T* nbl_open_output()
00366    {
00367       return open_output(false);
00368    }
00380    void close_output(const T* ptr) 
00381    {
00382       // return on null pointer
00383       if ( ! ptr ) 
00384       {
00385     return;
00386       }
00387       pthread_mutex_lock(&state_lock);
00388       c_rdy[ptr - slots] = false;
00389       p_rdy[ptr - slots] = true;
00390       pthread_cond_broadcast(&p_rdy_changed);
00391       s3_fifo_base<T>::notify_producers();   // let all the producers know
00392       DEBUG_STR( show_status("c", ptr-slots) );
00393       pthread_mutex_unlock(&state_lock);
00394    }
00404    void show_status(const char* str = 0, int slot = -1) const
00405    {
00406       std::cerr << std::endl << std::flush;
00407       if ( str ) std::cerr << str << std::flush;
00408       if ( slot != -1) std::cerr << "[" << slot << "]" << std::flush;
00409       std::cerr << " " << std::flush;
00410       for (unsigned int i = 0; i < num_slots; i++) 
00411       {
00412     if (p_rdy[i])
00413     {
00414        std::cerr << ((i == p_candidate) ? 'P' : 'p') << std::flush;
00415     }
00416     else if (c_rdy[i])
00417     {
00418        std::cerr << (( i == c_candidate) ? 'C' : 'c') << std::flush;
00419     }
00420     else 
00421     {
00422        std::cerr << '!' << std::flush;
00423     }
00424       }
00425    }
00426 
00430    unsigned int get_capacity( void ) const
00431    {  
00432       return num_slots;
00433    }
00434 };
00435 
00436 #endif

Send comments to: s3fc@stonethree.com SourceForge Logo