S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_txport_local.tcc

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  *
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00032 template<class T_Data>
00033 s3_txport_local<T_Data>::s3_txport_local( unsigned int queue_size)
00034   : rx_queue(queue_size), tx_queue(queue_size),connect_id(0), pn(500)
00035 {
00036    start();
00037 }
00038 
00039 template<class T_Data>
00040 s3_txport_local<T_Data>::~s3_txport_local()
00041 {
00042    if (is_running())
00043    {
00044       terminate.post();
00045       wait_on_exit();
00046    }
00047    disconnect_all();
00048 }
00049 
00050 template<class T_Data>
00051 s3_fifo_queue<s3_txport_data<T_Data> >& s3_txport_local<T_Data>::get_tx_queue()
00052 {
00053    return tx_queue;
00054 }
00055 
00056 template<class T_Data>
00057 s3_fifo_queue<s3_txport_data<T_Data> >& s3_txport_local<T_Data>::get_rx_queue()
00058 {
00059    return rx_queue;
00060 }
00061 
00062 template<class T_Data>
00063 std::list<s3_txport_client_id>
00064 s3_txport_local<T_Data>::get_client_id_list() const
00065 {
00066    std::list<s3_txport_client_id> ids;
00067    typename c_map::const_iterator ii;
00068    connection_mutex.lock();
00069    for ( ii = connections.begin(); ii!= connections.end(); ii++)
00070    {
00071       ids.push_back(ii->first);
00072    }
00073    connection_mutex.unlock();
00074    return ids;
00075 }
00076 
00077 template<class T_Data>
00078 void s3_txport_local<T_Data>::send_metadata(s3_txport_client_id id,
00079                    const std::string& metadata)
00080 {
00081    connection_mutex.lock();
00082    if (connections.count(id) != 0)
00083    {
00084        connections[id]->dispatch_event(
00085      s3_txport_event(s3_txport_event::metadata, id,
00086            metadata));
00087    }
00088    connection_mutex.unlock();
00089 
00090 }
00091 
00092 template<class T_Data>
00093 bool s3_txport_local<T_Data>::is_connected_by_id(s3_txport_client_id id)
00094 {
00095    bool ret;
00096    connection_mutex.lock();
00097    ret = connections.count(id) != 0;
00098    connection_mutex.unlock();
00099    return ret;
00100 }
00101 
00102 template<class T_Data>
00103 bool s3_txport_local<T_Data>::disconnect_by_id(s3_txport_client_id id)
00104 {
00105    bool ret;
00106    connection_mutex.lock();
00107    if (connections.count(id) != 0 )
00108    {
00109        // Remove us from the offending remote
00110        connections[id]->deregister_me(this);
00111        connections.erase(id);
00112        dispatch_event(s3_txport_event(s3_txport_event::disconnect,id,
00113             "Disconnected locally"));
00114        ret = true;
00115    }
00116    else
00117    {
00118       ret = false;
00119    }
00120    connection_mutex.unlock();
00121    return ret;
00122 }
00123 
00124 template<class T_Data>
00125 void s3_txport_local<T_Data>::disconnect_all()
00126 {
00127    typename c_map::iterator ii = connections.begin();
00128    typename c_map::iterator next_ii;
00129    
00130    while ( ii != connections.end() ) 
00131    {
00132       next_ii = ii;
00133       next_ii++;
00134       disconnect_by_id(ii->first);
00135       ii = next_ii;
00136    }
00137 }
00138 
00139 template<class T_Data>
00140 bool s3_txport_local<T_Data>::listening() const
00141 {
00142    return is_running();
00143 }
00144 
00145 template<class T_Data>
00146 s3_txport_client_id s3_txport_local<T_Data>::connect(
00147    s3_txport_local<T_Data> *listener)
00148 {
00149    if (listener == 0)
00150    {
00151        throw s3_generic_exception(
00152      "s3_txport_local::connect(s3_txport_local *)",
00153      "Remote listener does not exist (null pointer)");
00154    }
00155    s3_txport_local<T_Data> *remote = listener->register_me(this);
00156    if (remote == 0)
00157    {
00158       // Already registered so buzz off.
00159       dispatch_event(s3_txport_event(s3_txport_event::error,
00160                  0,"Already connected to this remote"));
00161       return static_cast<s3_txport_client_id>(0);
00162    }
00163    s3_txport_client_id id = next_id();
00164    connection_mutex.lock();
00165    connections[id] = remote;
00166    connection_mutex.unlock();
00167    dispatch_event(s3_txport_event(s3_txport_event::connect,
00168               id,"Locally established connection"));
00169    return id;
00170 }
00171 
00172 template<class T_Data>
00173 s3_txport_local<T_Data>* s3_txport_local<T_Data>::register_me(
00174    s3_txport_local<T_Data> *remote)
00175 {
00176    // Check if we are already registered!!
00177    typename c_map::iterator ii;
00178    bool registered = false;
00179    connection_mutex.lock();
00180    for ( ii = connections.begin(); ii!= connections.end(); ii++)
00181    {
00182       if (ii->second == remote)
00183       {
00184          registered = true;
00185       }
00186    }
00187    connection_mutex.unlock();
00188    if (registered)
00189    {
00190       dispatch_event(s3_txport_event(s3_txport_event::error,0,
00191          "Remote attempted duplicate connect"));
00192       return 0;
00193    }
00194 
00195    s3_txport_client_id id = next_id();
00196    connection_mutex.lock();
00197    connections[id] = remote;
00198    connection_mutex.unlock();
00199    dispatch_event(s3_txport_event(s3_txport_event::connect,id,
00200               "Remote connetion received"));
00201    return this;
00202 }
00203 
00204 template<class T_Data>
00205 void s3_txport_local<T_Data>::deregister_me(
00206    const s3_txport_local<T_Data> *remote)
00207 {
00208    typename c_map::iterator ii;
00209    connection_mutex.lock();
00210 
00211    ii = connections.begin();
00212    while( ii != connections.end() )
00213    {
00214       typename c_map::iterator next_ii;
00215       next_ii = ii;
00216       next_ii++;
00217 
00218       // Remove if correct value (this invalidates ii).
00219       if (ii->second == remote)
00220       {
00221          dispatch_event(s3_txport_event(s3_txport_event::disconnect,
00222                ii->first,
00223                "Remote has deregistered"));
00224          connections.erase(ii);
00225       }
00226       ii = next_ii;      
00227    }
00228    connection_mutex.unlock();
00229 }
00230 
00231 template<class T_Data>
00232 bool s3_txport_local<T_Data>::listen()
00233 {
00234    return true;
00235 }
00236 
00237 template<class T_Data>
00238 void s3_txport_local<T_Data>::cleanup()
00239 {
00240    pn.disable();
00241    pn.unsubscribe(sem);
00242    tx_queue.unsubscribe_consumer(sem);
00243    S3FC_DBG(std::cout << "s3_txport_local:: cleanup() cleaning up"
00244        << std::endl);
00245 }
00246 
00247 template<class T_Data>
00248 void s3_txport_local<T_Data>::main_loop()
00249 {
00250    typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00251    std::list<s3_txport_client_id> id_list;
00252    pn.subscribe(sem);
00253    pn.enable();
00254    tx_queue.subscribe_consumer(sem);
00255 
00256    while (1)
00257    {
00258       sem.wait();
00259       if (terminate.try_wait())
00260       {
00261          return; // kill thread.
00262       }
00263       d = tx_queue.nbl_open_output();
00264       if (d == 0)  continue; // Loop because the hit was something else
00265 
00266       id_list = d->get_id_list();
00267 
00268       if (!id_list.empty())
00269       {
00270          // Now send the same data to each client in the list
00271          for ( std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00272                ii != id_list.end(); ii++)
00273          {
00274             connection_mutex.lock();
00275             if (connections.count(*ii) != 0 )
00276             {
00277                connections[*ii]->get_rx_queue().push(*d);
00278                connection_mutex.unlock();
00279             }
00280             else
00281             {
00282                connection_mutex.unlock();
00283                dispatch_event(s3_txport_event(s3_txport_event::error,
00284          *ii, "Data sent to non-existant client ID"));
00285             }
00286          }
00287       }
00288       tx_queue.close_output(d);
00289    }
00290 }
00291 template<class T_Data>
00292 int s3_txport_local<T_Data>::num_connections() const
00293 {
00294    int c;
00295    connection_mutex.lock();
00296    c = connections.size();
00297    connection_mutex.unlock();
00298    return c;
00299 }

Send comments to: s3fc@stonethree.com SourceForge Logo