S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_txport_tcp.tcc

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  *
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00024 #include <s3fc/s3_periodic_notifier.h>
00025 #include <s3fc/s3_event.h>
00026 #include <s3fc/s3_txport_base.h>
00027 #include <s3fc/s3_socket_tcp.h>
00028 #include <cassert>
00029 #include <memory>
00030 
00031 #ifndef _WIN32
00032 #include <netinet/in.h>
00033 #endif
00034 
00064 template<class T_Data, class T_Socket>
00065 class s3_txport_tcp_tx_task : public s3_thread_base
00066 {
00068    s3_semaphore sem;
00070    s3_fifo_queue<s3_txport_data<T_Data> >& in_q;
00072    s3_pack_buffer b;
00074    s3_txport_tcp<T_Data, T_Socket> *parent;
00076    s3_semaphore terminate;
00078    s3_periodic_notifier pn;
00080    std::list<s3_txport_client_id> id_list;
00081    
00083    s3_txport_tcp_tx_task(s3_fifo_queue<s3_txport_data<T_Data> >& a_q,
00084           s3_txport_tcp<T_Data, T_Socket> *a_parent);
00085    
00087    ~s3_txport_tcp_tx_task()
00088    {
00089       pn.unsubscribe(sem);
00090    }
00091    
00093    void handle_failure(s3_txport_client_id id, std::string err_msg)
00094    {
00095       // we just report errors
00096       parent->dispatch_event(
00097     s3_txport_event(s3_txport_event::error, id, err_msg));
00098       parent->disconnect_by_id(id);
00099    }
00100 
00102    void cleanup();
00103    
00104  public:
00106    void main_loop();
00107 
00108    void set_terminate()
00109    {
00110       terminate.post();
00111    }
00112 
00113    friend class s3_txport_tcp<T_Data, T_Socket>;
00114 };
00115 
00116 template<class T_Data, class T_Socket>
00117 s3_txport_tcp_tx_task<T_Data, T_Socket>::s3_txport_tcp_tx_task(
00118    s3_fifo_queue<s3_txport_data<T_Data> >& a_q,
00119    s3_txport_tcp<T_Data, T_Socket> *a_parent) :
00120    in_q(a_q), parent(a_parent)
00121 {
00122    // subscribe the queues we read to the notification semaphore.
00123    pn.set_period(500);
00124    pn.subscribe(sem);
00125 }
00126 
00127 template<class T_Data, class T_Socket>
00128 void s3_txport_tcp_tx_task<T_Data, T_Socket>::cleanup()
00129 {
00130    pn.disable();
00131    parent->meta_queue.unsubscribe_consumer(sem);
00132    in_q.unsubscribe_consumer(sem);
00133 }
00134 
00135 template<class T_Data, class T_Socket>
00136 void s3_txport_tcp_tx_task<T_Data, T_Socket>::main_loop()
00137 {
00138    typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00139    unsigned int sz = 0;
00140 
00141    pn.enable();
00142    parent->meta_queue.subscribe_consumer(sem);
00143    in_q.subscribe_consumer(sem);
00144    while (1)
00145    {
00146       if (terminate.try_wait()) return;
00147       b.reset();
00148       // Weed out any dead tasks
00149       parent->reap();
00150       // Now wait for something to happen
00151       sem.wait();
00152       
00153       // If we have metadata, send it. No while loop as a
00154       // semaphore will have been posted for each event.
00155       if (!parent->meta_queue.empty())
00156       {
00157     typename s3_txport_tcp<T_Data, T_Socket>::metadata_type m =
00158        parent->meta_queue.pop();
00159     S3FC_DBG( std::cout << "Sending metadata to client " << m.id
00160          << " " << m.metadata << std::endl );
00161          if (parent->is_connected_by_id(m.id))
00162          {
00163             parent->set_critical_by_id(m.id);
00164             s3_streamable::pack(m.metadata,b);
00165             // Shift size left and set metadata bit
00166             sz = b.get_size() << 1 | 1;
00167        T_Socket* sock = parent->get_socket_by_id(m.id);
00168        assert( sock != 0 );
00169        if ( sock->is_closed() )
00170             {
00171                parent->clear_critical_by_id(m.id);
00172                continue;
00173             }
00174             if ( !sock->write(&sz, sizeof(sz), -1, &terminate) )
00175             {
00176                handle_failure(m.id, sock->get_error());
00177                parent->clear_critical_by_id(m.id);
00178                if (sock->get_errno() == -2) return;
00179                continue;
00180             }
00181             if ( !sock->write(b.get_buffer(), sz >> 1, -1, &terminate) )
00182             {
00183                handle_failure(m.id, sock->get_error());
00184                parent->clear_critical_by_id(m.id);
00185                if (sock->get_errno() == -2) return;
00186                continue;
00187             }
00188             parent->clear_critical_by_id(m.id);
00189          }
00190          else
00191          {
00192              std::ostringstream msg;
00193              msg << " Metadata sent to non-existant client ID";
00194              parent->dispatch_event(
00195       s3_txport_event(s3_txport_event::error, m.id, msg.str()));
00196          }
00197          continue;
00198       }
00199       
00200       // Check the data queue for real stuff to transmit
00201       d = in_q.nbl_open_output();
00202       if (d == 0) continue; // Loop because the hit was something else
00203       
00204       s3_streamable::pack(d->data,b);
00205       id_list = d->get_id_list();
00206       in_q.close_output(d);
00207       
00208       if (!id_list.empty())
00209       {
00210          if (terminate.try_wait()) return;
00211          // Now send the same data to each client in the list
00212          for ( std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00213                ii != id_list.end(); ii++) //foreach destination
00214          {
00215             s3_txport_client_id id = *ii;
00216             // Check that the connection still exists before sending data.
00217             // If the set_critical_by_id returns true, it implies that the
00218             // connection is present
00219             parent->set_critical_by_id(id);
00220             if (parent->is_connected_by_id(id))
00221             {
00222                sz = b.get_size();
00223                if (sz >= (1u << (static_cast<unsigned int>(sizeof(sz))*8u-1u)))
00224                {
00225         throw s3_generic_exception(
00226            "s3_txport_tcp_tx_task::main_loop()",
00227            "Maximum packet size exceeded");
00228                }
00229                // Shift size over by one bit. If the LSB is zero then the
00230                // data is regular payload and not metadata.
00231                sz <<= 1;
00232           T_Socket* sock = parent->get_socket_by_id(id);
00233           assert( sock != 0 );
00234           if ( sock->is_closed() )
00235                {
00236                   parent->clear_critical_by_id(id);
00237                   continue;
00238                }
00239                if ( !sock->write(&sz, sizeof(sz), -1, &terminate) )
00240                {
00241                   handle_failure(id, sock->get_error());
00242                   parent->clear_critical_by_id(id);
00243                   if (sock->get_errno() == -2) return;
00244                   continue;
00245                }
00246           if ( !sock->write(b.get_buffer(), sz >> 1, -1, &terminate) )
00247                {
00248                   handle_failure(id, sock->get_error());
00249                   parent->clear_critical_by_id(id);
00250                   if (sock->get_errno() == -2) return;
00251                   continue;
00252                }
00253                parent->clear_critical_by_id(id);
00254             }
00255             else // ignore data and inform event handlers
00256             {
00257                std::ostringstream msg;
00258                msg << " Data sent to non-existant client ID";
00259                parent->dispatch_event(
00260         s3_txport_event(s3_txport_event::error, id, msg.str()));
00261           parent->clear_critical_by_id(id);
00262             }
00263          }
00264       }
00265       else
00266       {
00267     std::ostringstream msg;
00268     msg << " No connected clients to send to";
00269     parent->dispatch_event(
00270        s3_txport_event(s3_txport_event::error, -1, msg.str()));
00271       }
00272    }
00273    // we will never get here.
00274    throw (s3_generic_exception("s3_txport_tx_task::main_loop()",
00275                 "Instruction pointer cannot get here!"));
00276 }
00277 
00288 template<class T_Data, class T_Socket>
00289 class s3_txport_tcp_rx_task : public s3_thread_base
00290 {
00292    s3_semaphore sem;
00294    s3_pack_buffer b;
00296    T_Socket* sock;
00298    s3_txport_client_id id;
00300    std::string client_ip;
00302    std::string message;
00304    bool is_connected;
00306    s3_txport_tcp<T_Data, T_Socket> *parent;
00308    s3_semaphore terminate;
00310    s3_periodic_notifier pn;
00312    s3_mutex c_lock;
00313 
00314    void set_message(const std::string m)
00315    {
00316       message = m;
00317    }
00318    
00319   protected:
00320    void cleanup()
00321    {
00322 #if 0      
00323       S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::cleanup(): "
00324       << "Cleaning up thread with socket "
00325       << sock.get_fd() << " and ID " << int(id)
00326       << std::endl << std::flush );
00327 #endif      
00328       
00329       c_lock.lock();
00330       is_connected = false;
00331       c_lock.unlock();
00332       
00333       lock_state();
00334       sock->close();
00335       delete sock;
00336       sock = 0;
00337       unlock_state();
00338       
00339       parent->dying_tasklets.push(id);
00340       parent->rx_queue.unsubscribe_producer(sem);
00341       pn.disable();
00342    }
00343    
00344    // Constructor
00345    s3_txport_tcp_rx_task(
00346       s3_txport_client_id a_id, T_Socket* a_sock,
00347       const std::string& a_client_ip,
00348       s3_txport_tcp<T_Data,T_Socket> *a_parent) :
00349       sock(a_sock), id(a_id), client_ip(a_client_ip), message(""),
00350       is_connected(true), parent(a_parent), pn(100)
00351    {
00352       pn.subscribe(sem);
00353    }
00354 
00356    ~s3_txport_tcp_rx_task()
00357    {
00358       pn.unsubscribe(sem);
00359    }
00360 
00361  public:
00363    void set_terminate()
00364    {
00365       terminate.post();
00366    }
00367 
00369    std::string get_message() const
00370    {
00371       return message;
00372    }
00373 
00375    std::string get_client_ip() const
00376    {
00377       return client_ip;
00378    }
00379 
00381    bool connected() const
00382    {
00383       bool ret;
00384       c_lock.lock();
00385       ret = is_connected;
00386       c_lock.unlock();
00387       return ret;
00388    }
00389 
00391    T_Socket* get_socket()
00392    {
00393        return sock;
00394    }
00395 
00396    void main_loop()
00397    {
00398       pn.enable();
00399       parent->rx_queue.subscribe_producer(sem);
00400 
00401       typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00402 #if 0      
00403       S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00404       << "Receive thread for socket " << sock.get_fd()
00405       << " with client ID " << int(id) << " started"
00406       << std::endl << std::flush );
00407 #endif      
00408       bool meta = false;
00409       while (1)
00410       {
00411          int size = 0;
00412          meta = false;
00413     
00414          if (terminate.try_wait())
00415     {
00416        S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00417             << "termination semaphore posted, terminating"
00418             << std::endl << std::flush );
00419        return;
00420     }
00421 #if 0  
00422     S3FC_DBG( std::cout << "survived termination semaphore"
00423          << " (socket " << sock.get_fd() << ")"
00424          << std::endl << std::flush );
00425 #endif    
00426     if ( !sock->read(&size, sizeof(size), &terminate) )
00427     {
00428             set_message(sock->get_error());
00429             return;
00430          }
00431 
00432     S3FC_DBG( std::cout << "read succeeded" << std::endl << std::flush );
00433 
00434          if (size & 1) meta = true;
00435          size >>= 1; // Shift right to remove meta bit.
00436 
00437          // Plump up the streamable for our next move
00438          b.set_capacity(size);
00439          if ( !sock->read(b.get_buffer(), size, &terminate) )
00440          {
00441             set_message(sock->get_error());
00442             return;
00443          }
00444          if (!meta)
00445          {
00446             // Send data to the queue, checking if
00447             do
00448             {
00449                d = parent->rx_queue.nbl_open_input();
00450                if (d == 0 ) // Blocking...
00451                {
00452                   if (terminate.try_wait()) return;
00453                   // Just wait for something to happen
00454                   sem.wait();
00455                }
00456             }
00457             while (d == 0);
00458             s3_streamable::unpack(d->data,b);
00459             // Tag data with our socket (client ID)
00460             d->set_id(id);
00461             parent->rx_queue.close_input(d);
00462          }
00463          else
00464          {
00465             std::string m;
00466             s3_streamable::unpack(m,b);
00467             S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00468             << "Metadata " << m << " received for id "
00469             << int(id) << std::endl << std::flush );
00470             parent->dispatch_event(
00471           s3_txport_event(s3_txport_event::metadata, id, m));
00472     }
00473          b.reset();
00474       }
00475    }
00476 
00477    friend class s3_txport_tcp<T_Data,T_Socket>;
00478 };
00479 
00480 template<class T_Data, class T_Socket>
00481 s3_txport_tcp<T_Data, T_Socket>::s3_txport_tcp(
00482    int queue_slots, std::string network, std::string mask) :
00483    rx_queue(queue_slots), tx_queue(queue_slots), connect_id(0),
00484    sock(INVALID_SOCKET), tx_thread(tx_queue, this),  network_mask(0),
00485    network_number(0)
00486 {
00487 // initialise WS2_32.DLL as per MS manual
00488 #ifdef _WIN32
00489    WSADATA wsaData;
00490    /*
00491     * If multiple s3_txport_tcp objects are created in a single
00492     * process space the WSAStartup call will be called multiple times.
00493     * This is legal, you just have to call WSACleanup() an equal
00494     * number of times.`
00495     * 
00496     * See ~s3_txport_tcp() for more comments on this.
00497     * 
00498     * [jpool]
00499     */
00500    WSAStartup(0x0101, &wsaData);
00501 #endif
00502    struct in_addr ip_addr_temp;
00503 #ifdef _WIN32
00504    /*
00505     * inet_pton does not exist in Win32 API. The older version inet_addr
00506     * does, so I'm using that instead.
00507     *
00508     * [jpool] 09/09/2002
00509     */
00510    ip_addr_temp.s_addr = inet_addr( network.c_str() );   
00511    if ( ip_addr_temp.s_addr == INADDR_NONE )
00512    {
00513       throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00514              "Invalid network number string");
00515    }
00516 #else
00517    int ret = inet_pton(AF_INET,network.c_str(),&ip_addr_temp);
00518    if (ret <= 0)
00519    {
00520       throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00521              "Invalid network number string");
00522    }
00523 #endif
00524    network_number = ntohl(ip_addr_temp.s_addr);
00525 #ifdef _WIN32
00526    ip_addr_temp.s_addr = inet_addr( mask.c_str() );   
00527    if ( ip_addr_temp.s_addr == INADDR_NONE )
00528    {
00529       throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00530              "Invalid network mask string");
00531    }
00532 #else
00533    ret = inet_pton(AF_INET,mask.c_str(),&ip_addr_temp);
00534    if (ret <= 0)
00535    {
00536       throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00537              "Invalid network mask string");
00538    }
00539 #endif
00540    network_mask = ntohl(ip_addr_temp.s_addr);
00541 
00542    tx_thread.start();
00543 }
00544 
00545 template<class T_Data, class T_Socket>
00546 s3_txport_tcp<T_Data, T_Socket>::~s3_txport_tcp()
00547 {
00548    stop();
00549    // de-initialise WS2_32.DLL
00550 #ifdef _WIN32
00551    /*
00552     * For each WSAStartup() we must have a WSACleanup().  Each time
00553     * WSACleanup() is called it will just decrement an internal
00554     * counter. Only the last call will free the resources.
00555     *
00556     * [jpool]
00557     */
00558    WSACleanup();
00559 #endif   
00560 }
00561 
00562 template<class T_Data, class T_Socket>
00563 void s3_txport_tcp<T_Data, T_Socket>::restart()
00564 {
00565    if (!tx_thread.is_running())
00566    {
00567       tx_thread.start();
00568    }
00569 }
00570 
00571 template<class T_Data, class T_Socket>
00572 bool s3_txport_tcp<T_Data, T_Socket>::disconnect_by_id(s3_txport_client_id id)
00573 {
00574    S3FC_DBG( std::cout << "s3_txport_tcp::disconnect_by_id(id="
00575         << int(id) << ") called" << std::endl << std::flush );
00576    
00577    bool retval = false;
00578    
00579    // This will cause the thread to die, and trigger its removal from
00580    // the list of active clients.
00581    connection_mutex.lock();
00582    if (connections.count(id) != 0)
00583    {
00584       if (connections[id]->is_running())
00585       {
00586          connections[id]->set_terminate();
00587          retval = true;
00588 
00589     S3FC_DBG( std::cout << "s3_txport_tcp::disconnect_by_id(): "
00590          "set_terminate() on connection done" << std::endl
00591          << std::flush );
00592       }
00593    }
00594    connection_mutex.unlock();
00595    
00596    return retval;
00597 }
00598 
00599 template<class T_Data, class T_Socket>
00600 void s3_txport_tcp<T_Data, T_Socket>::disconnect_all()
00601 {
00602    while (num_connections() != 0)
00603    {
00604       std::list<s3_txport_client_id> id_list = get_client_id_list();
00605       
00606       for (std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00607       ii != id_list.end(); ii++)
00608       {
00609          disconnect_by_id(*ii);
00610       }
00611       S3FC_DBG(std::cout << "s3_txport_tcp::disconnect_all() reaping"
00612           << std::endl);
00613       reap();
00614       if (num_connections() != 0)
00615       {
00616          S3FC_DBG(std::cout
00617         << "s3_txport_tcp::disconnect_all() num_connections()="
00618         << num_connections() << std::endl);
00619          S3FC_DBG(std::cout
00620         << "s3_txport_tcp::disconnect_all() dying task count="
00621         << dying_tasklets.size() << std::endl);
00622 #ifdef _WIN32
00623     // Sleep in ms. Must include <winbase.h>. [rfanner]
00624     Sleep(1000);
00625 #else
00626     // POSIX sleep in. Must include <netinet/in.h>. [rfanner]
00627     sleep(1);
00628 #endif
00629       }
00630    }
00631 }
00632 
00633 template<class T_Data, class T_Socket>
00634 void s3_txport_tcp<T_Data, T_Socket>::stop()
00635 {
00636    S3FC_DBG( std::cout << "s3_txport_tcp::stop() on TID " << get_thread_id()
00637         << std::endl << std::flush );
00638    
00639    // Terminate the server to client tx thread.
00640    if (tx_thread.is_running())
00641    {
00642       tx_thread.set_terminate();
00643       tx_thread.wait_on_exit();
00644    }
00645 
00646    // Kill the listener.
00647    if (is_running())
00648    {
00649       set_terminate();
00650       wait_on_exit();
00651    }
00652    disconnect_all();
00653    
00654    S3FC_DBG( std::cout << "s3_txport_tcp::stop(): num_connections()="
00655         << num_connections() << std::endl );
00656    S3FC_DBG( std::cout << "s3_txport_tcp::stop(): dying task count="
00657         << dying_tasklets.size() << std::endl );
00658    S3FC_DBG( std::cout << "s3_txport_tcp::stop(): thread " << get_thread_id()
00659         << " is done" << std::endl << std::flush );
00660 }
00661 
00662 template<class T_Data, class T_Socket>
00663 void 
00664 s3_txport_tcp<T_Data, T_Socket>::set_critical_by_id(s3_txport_client_id id)
00665 {
00666    connection_mutex.lock();
00667    // If the connection is registered and it is alive, make it critical
00668    if (connections.count(id) != 0 )
00669    {
00670       connections[id]->lock_state();
00671    }
00672    connection_mutex.unlock();
00673 }
00674 
00675 template<class T_Data, class T_Socket>
00676 void 
00677 s3_txport_tcp<T_Data, T_Socket>::clear_critical_by_id(s3_txport_client_id id)
00678 {
00679    connection_mutex.lock();
00680    if (connections.count(id) != 0)
00681    {
00682       connections[id]->unlock_state();
00683    }
00684    connection_mutex.unlock();
00685 }
00686 
00687 template<class T_Data, class T_Socket>
00688 T_Socket* s3_txport_tcp<T_Data, T_Socket>::get_socket_by_id(
00689    s3_txport_client_id id)
00690 {
00691    T_Socket* ret = 0;
00692    connection_mutex.lock();
00693    if (connections.count(id) != 0)
00694    {
00695       ret = connections[id]->get_socket();
00696    }
00697    connection_mutex.unlock();
00698    
00699    return ret;
00700 }
00701 
00702 template<class T_Data, class T_Socket>
00703 bool 
00704 s3_txport_tcp<T_Data, T_Socket>::is_connected_by_id(s3_txport_client_id id)
00705 {
00706    bool ret = false;
00707    
00708    connection_mutex.lock();
00709    if (connections.count(id) != 0)
00710    {
00711       ret = connections[id]->connected();
00712    }
00713    connection_mutex.unlock();
00714    
00715    return ret;
00716 }
00717 
00718 template<class T_Data, class T_Socket>
00719 std::string 
00720 s3_txport_tcp<T_Data, T_Socket>::get_ip_address_by_id(s3_txport_client_id id)
00721 {
00722    std::string ret;
00723    
00724    connection_mutex.lock();
00725    if (connections.count(id) != 0)
00726    {
00727       ret = connections[id]->get_client_ip();
00728    }
00729    connection_mutex.unlock();
00730    
00731    return ret;
00732 }
00733 
00734 template<class T_Data, class T_Socket>
00735 std::list<s3_txport_client_id>
00736 s3_txport_tcp<T_Data, T_Socket>::get_client_id_list() const
00737 {
00738    typename thread_map::const_iterator ii;
00739    std::list<s3_txport_client_id> current_list;
00740    
00741    connection_mutex.lock();
00742    for (ii = connections.begin(); ii != connections.end(); ii++)
00743    {
00744       current_list.push_back(ii->first);
00745    }
00746    connection_mutex.unlock();
00747    
00748    return current_list;
00749 }
00750 
00751 template<class T_Data, class T_Socket>
00752 void 
00753 s3_txport_tcp<T_Data, T_Socket>::send_metadata(s3_txport_client_id id, const std::string& mdata)
00754 {
00755    meta_queue.push(metadata_type(id, mdata));
00756 }
00757 
00758 template<class T_Data, class T_Socket>
00759 bool s3_txport_tcp<T_Data, T_Socket>::listen(const unsigned int a_port)
00760 {
00761    port = a_port;
00762    
00763    if (!is_running())
00764    {
00765       if ( !sock.is_valid() )
00766       {
00767 #if 0  
00768     S3FC_DBG2_("s3_txport_tcp::listen()", "socket=" << sock.get_fd());
00769 #endif    
00770 
00771     throw s3_generic_exception(
00772        "s3_txport_tcp::listen()",
00773        "Failed to open a socket: " + sock.get_error());
00774       }
00775       if ( !sock.bind(INADDR_ANY, port) )
00776       {
00777          std::ostringstream msg;
00778          msg << "s3_txport_tcp::listen():"
00779              << " Failed to connect to port " << port
00780              << " because " << sock.get_error();
00781          s3_event_dispatcher<s3_txport_event>::dispatch_event(
00782        s3_txport_event(s3_txport_event::error, -1, msg.str()));
00783          return false;
00784       }
00785       if ( !sock.listen(SOMAXCONN) )
00786       {
00787          std::ostringstream msg;
00788          msg << " Failed to listen on socket"  << port
00789              << " because " << sock.get_error();
00790          throw s3_generic_exception("s3_txport_tcp::listen()", msg.str());
00791       }
00792 
00793       start();
00794    }
00795    if (!is_running()) // If we get here, the thread must be running.
00796    {
00797       throw s3_generic_exception(
00798     "s3_txport_tcp::listen()", "Listener thread failed to start.");
00799    }
00800    
00801    return true;
00802 }
00803 
00804 template<class T_Data, class T_Socket>
00805 void s3_txport_tcp<T_Data, T_Socket>::cleanup()
00806 {
00807    // Close the socket we have bound to.
00808    sock.close();
00809 }
00810 
00811 template<class T_Data, class T_Socket>
00812 void s3_txport_tcp<T_Data, T_Socket>::reap()
00813 {
00814    reap_mutex.lock();
00815    while (!dying_tasklets.empty())
00816    {
00817       S3FC_DBG(std::cout << "s3_txport_tcp::reap(): Reaping dead tasks"
00818           << std::endl << std::flush );
00819       s3_txport_client_id tid = dying_tasklets.pop();
00820       
00821       connection_mutex.lock();
00822       S3FC_DBG( std::cout << "Reaper has lock" << std::endl << std::flush );
00823       if (connections.count(tid) == 0) // See if the connection is still here
00824       {
00825     connection_mutex.unlock();
00826     S3FC_DBG( std::cout << "Reaper released lock (client vanished)"
00827          << std::endl << std::flush );
00828     continue; // It has gone so just ignore it.
00829       }
00830       if (!connections[tid]->test_state()) // will return false if critical
00831       {
00832          // Must be critical so just push the id back into the queue
00833          dying_tasklets.push(tid);
00834          connection_mutex.unlock();
00835          S3FC_DBG( std::cout << "Reaper released lock (rx was critical)"
00836          << std::endl << std::flush );
00837          sched_yield();
00838          continue;
00839       }
00840       // If we get here, we have a lock we don't need, so ditch it.
00841       connections[tid]->unlock_state();
00842       S3FC_DBG( std::cout << "Waiting on client thread " << int(tid)
00843            << std::endl << std::flush );
00844       // Some tasks may not have been terminated, so we do it if required.
00845       //if (connections[tid]->is_running()) connections[tid]->terminate();
00846       connections[tid]->wait_on_exit();
00847       S3FC_DBG( std::cout << "Client " << int(tid) << " terminated"
00848       << std::endl << std::flush );
00849       // We need to store the reason for this death for use when we
00850       // generate an event, as it would be unwise to dispatch the
00851       // event holding the mutex...
00852       std::string reason = connections[tid]->get_client_ip() + " " +
00853             connections[tid]->get_message();
00854       delete connections[tid];
00855       // Wipe the thread out of the map
00856       connections.erase(tid);
00857       connection_mutex.unlock();
00858       S3FC_DBG(std::cout << "Reaper released lock" << std::endl);
00859       s3_event_dispatcher<s3_txport_event>::dispatch_event(
00860     s3_txport_event(s3_txport_event::disconnect, tid, reason));
00861    }
00862    reap_mutex.unlock();
00863 }
00864 
00865 template<class T_Data, class T_Socket>
00866 s3_txport_client_id 
00867 s3_txport_tcp<T_Data, T_Socket>::connect(const std::string& IP_address, const unsigned int port)
00868 {
00869    // Create a TCP socket
00870    std::auto_ptr<T_Socket> lsock(new T_Socket(sock));
00871 
00872    if ( !lsock->is_valid() ) // This is also fatal, so throw
00873    {
00874       throw s3_generic_exception(
00875     "s3_txport_tcp::connect()", "Failed to open a socket");
00876    }
00877 
00878    S3FC_DBG2_("s3_txport_tcp::connect()", "connecting with lsock");
00879    
00880    if ( !lsock->connect(IP_address, port) )
00881    {
00882       std::ostringstream msg;
00883       msg << "s3_txport_tcp::connect():"
00884      << " Failed to connect to " << IP_address << ":" << port
00885      << " because " << lsock->get_error();
00886       s3_event_dispatcher<s3_txport_event>::dispatch_event(
00887     s3_txport_event(s3_txport_event::error, -1, msg.str()));
00888       lsock->close();
00889       return static_cast<s3_txport_client_id>(0);
00890    }
00891    S3FC_DBG( std::cout << "s3_txport_client::s3_txport_client(): "
00892         << "connected, spawning handler" << std::endl << std::flush );
00893    // If we get here, we are connected. All comms takes place via
00894    // the socket.  Post two semaphores and wait until both have
00895    // been waited on.
00896    return spawn_handler(lsock.release(), IP_address);
00897 }
00898 
00899 // The thread that will handle the data from the connections.
00900 template<class T_Data, class T_Socket>
00901 void s3_txport_tcp<T_Data, T_Socket>::main_loop()
00902 {
00903    S3FC_DBG(std::cout
00904        << "Daemon configured. Awaiting client connections..." 
00905        << std::endl << std::flush);
00906    while (1)
00907    {
00908       if (terminate.try_wait()) return;
00909       
00910       //test_cancel();
00911       if ( sock.select_rd(1) )
00912       {
00913     S3FC_DBG2_("s3_txport_tcp::main_loop()",
00914           "incoming connection. making server-side socket.");
00915 
00916     struct sockaddr_in clientname;
00917     std::auto_ptr<T_Socket> active_sock(new T_Socket());
00918     if ( sock.accept(*active_sock, &clientname) )
00919          {
00920        S3FC_DBG2_("s3_txport_tcp::main_loop()", "active_sock ok");
00921 
00922        int ip_no = ntohl(clientname.sin_addr.s_addr);
00923        // Record the name of the connecting producer for the logger.
00924        char *tmp = (char *)&ip_no;
00925        std::ostringstream hostname;
00926        hostname << static_cast<unsigned int>(tmp[3]) << "."
00927            << static_cast<unsigned int>(tmp[2]) << "."
00928            << static_cast<unsigned int>(tmp[1]) << "."
00929            << static_cast<unsigned int>(tmp[0]);
00930        
00931        if ( (ip_no & network_mask) == network_number )
00932        {
00933           S3FC_DBG( std::cout << "s3_txport_tcp::main_loop(): "
00934           << "Connection established, spawning handler"
00935           << std::endl << std::flush );
00936           spawn_handler(active_sock.release(), hostname.str());
00937        }
00938        else
00939        {
00940           active_sock->close();
00941           S3FC_DBG( std::cout << "s3_txport_tcp::main_loop(): "
00942           << "Connection from " << hostname << " rejected"
00943           << std::endl << std::flush );
00944           
00945 s3_event_dispatcher<s3_txport_event>::		  dispatch_event(s3_txport_event(s3_txport_event::reject,
00946                    -1, hostname.str()));
00947        }
00948          }
00949     else
00950     {
00951        
00952 s3_event_dispatcher<s3_txport_event>::	       dispatch_event(s3_txport_event(s3_txport_event::error,
00953                      -1, sock.get_error()));
00954     }
00955       }
00956    }
00957 }
00958 
00959 template<class T_Data, class T_Socket>
00960 s3_txport_client_id 
00961 s3_txport_tcp<T_Data, T_Socket>::spawn_handler(T_Socket* active_sock, std::string hostname)
00962 {
00963    connection_mutex.lock();
00964    s3_txport_client_id id = next_id();
00965    connections[id] = new s3_txport_tcp_rx_task<T_Data, T_Socket>(
00966       connect_id, active_sock, hostname, this);
00967    // now start the thread to receive the client's data.
00968    connections[id]->start();
00969    connection_mutex.unlock();
00970 
00971    s3_event_dispatcher<s3_txport_event>::dispatch_event(
00972       s3_txport_event(s3_txport_event::connect, id, hostname));
00973 #if 0   
00974    S3FC_DBG( std::cout << "Socket " << active_sock.get_fd()
00975         << " created for ID " << int(id) << std::endl << std::flush );
00976 #endif   
00977    return id;
00978 }
00979 
00980 template<class T_Data, class T_Socket>
00981 int s3_txport_tcp<T_Data, T_Socket>::num_connections() const
00982 {
00983    int retval = 0;
00984 
00985    connection_mutex.lock();
00986    retval = connections.size();
00987    connection_mutex.unlock();
00988    
00989    return retval;
00990 }

Send comments to: s3fc@stonethree.com SourceForge Logo