S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_message.cc

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  * 
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00033 #include <s3fc/s3_message.h>
00034 #include <s3fc/s3_exception.h>
00035 #include <s3fc/s3_config_node.h>
00036 #include <sstream>
00037 #include <algorithm>
00038 
00039 S3_DEFINE_MAGIC_STRING(s3_message);
00040 
00041 // dftl ctor - empty message
00042 s3_message::s3_message() :
00043    empty(true),
00044    from(""),
00045    to(""),
00046    body(""),
00047    mc(NORMAL),
00048    id(""),
00049    reply_id("")
00050 {
00051 }
00052 
00053 // init ctor - never empty
00054 s3_message::s3_message(const std::string& n_from,
00055              const std::string& n_to,
00056              const std::string& n_body,
00057              const msg_class_t& n_mc,
00058              const msg_id_t& n_id) :
00059    empty(false),
00060    from(n_from),
00061    to(n_to),
00062    body(n_body),
00063    mc(n_mc),
00064    id(n_id),
00065    reply_id("")
00066 {
00067 }
00068 
00069 //
00070 s3_message::s3_message(const std::string& n_from,
00071              const s3_message& msg,
00072              const std::string& n_body,
00073              const msg_id_t& n_id) :
00074    empty(false),
00075    from(n_from),
00076    to(msg.from),
00077    body(n_body),
00078    mc(msg.mc),
00079    id(n_id),
00080    reply_id(msg.id)
00081 {
00082 }
00083 
00084 // virtual dtor required by compiler
00085 s3_message::~s3_message()
00086 {
00087 }
00088 
00089 // pack all fields into buffer 
00090 void s3_message::pack(s3_pack_buffer &b) const
00091 {
00092    s3_streamable::pack(empty, b);
00093    s3_streamable::pack(from, b);
00094    s3_streamable::pack(to, b);
00095    s3_streamable::pack(body, b);
00096    s3_streamable::pack(mc, b);
00097    s3_streamable::pack(id, b);
00098    s3_streamable::pack(reply_id, b);
00099 }
00100 
00101 // unpack all fields from buffer
00102 void s3_message::unpack(const s3_pack_buffer &b) 
00103 {
00104    s3_streamable::unpack(empty, b);
00105    s3_streamable::unpack(from, b);
00106    s3_streamable::unpack(to, b);
00107    s3_streamable::unpack(body, b);
00108    s3_streamable::unpack(mc, b);
00109    s3_streamable::unpack(id, b);
00110    s3_streamable::unpack(reply_id, b);
00111 }
00112 
00113 //
00114 bool s3_message::is_empty() const
00115 {
00116    return empty;
00117 }
00118 
00119 //
00120 bool s3_message::is_reply(const msg_id_t& n_reply_id) const
00121 {
00122    return (reply_id.size() > 0) && (n_reply_id == reply_id);
00123 }
00124 
00125 //
00126 std::string s3_message::get_from() const
00127 {
00128    return from;
00129 }
00130 
00131 //
00132 std::string s3_message::get_to() const
00133 {
00134    return to;
00135 }
00136 
00137 //
00138 std::string s3_message::get_body() const
00139 {
00140    return body;
00141 }
00142 
00143 //
00144 s3_message::msg_class_t s3_message::get_class() const
00145 {
00146    return mc;
00147 }
00148 
00149 //
00150 std::string s3_message::get_debug_str() const
00151 {
00152    std::stringstream str;
00153    str << "[ " << from << " -> " << to << " ]";
00154    switch(mc)
00155    {
00156    case NORMAL:
00157       str << ".";
00158       break;
00159    case PRIORITY:
00160       str << "P";
00161       break;
00162    case CONTROL:
00163       str << "C";
00164       break;
00165    default:
00166       str << " ";
00167       break;
00168    }
00169    if ( reply_id.size() )
00170    {
00171       str << "R";
00172    }
00173    else if ( is_empty() )
00174    {
00175       str << "E";
00176    }
00177    else
00178    {
00179       str << " ";
00180    }
00181    str << " (id: " << id << ") ";
00182    if ( reply_id.size() )
00183    {
00184       str << "(reply_id: " << reply_id << ")";
00185    }
00186    if ( ! is_empty() )
00187    {
00188       str << std::endl;
00189       str << "     '" << body << "'";
00190    }
00191    else 
00192    {
00193       str << " <empty>";
00194    }
00195    return str.str();
00196 }
00197 
00198 // init the static
00199 s3_post_office* s3_message_box::poffice = 0;
00200 unsigned int s3_message_box::dropped_msg_count = 0;
00201 
00202 //
00203 s3_message_box::s3_message_box(const std::string& n_name,
00204                 unsigned int n_rx_queue_len) :
00205    connected(false),
00206    name(n_name),
00207    rx_queue_len(n_rx_queue_len),
00208    rx_queue(0),
00209    subscription_list(0),
00210    seq_nr(0)
00211 {
00212 }
00213 
00214 //
00215 s3_message_box::~s3_message_box()
00216 {
00217    // disconnect if still connected
00218    if (connected)
00219    {
00220       disconnect();
00221    }
00222 }
00223 
00224 //
00225 void s3_message_box::init(const std::string& ip_addr,
00226            int port)
00227 {
00228    // test pre-condition
00229    if ( initialised() )
00230    {
00231       throw s3_generic_exception("s3_message_box::init()",
00232              "Precondition initialised() == false violated");
00233    }
00234    poffice = new s3_post_office();
00235    try
00236    {
00237 #if S3_HAS_LOCAL_PO_SWITCH
00238       poffice->connect();
00239 #else
00240       poffice->connect(ip_addr, port);
00241 #endif
00242    }
00243    catch( const s3_exception& e)
00244    {
00245       // cleanup and rethrow
00246       delete poffice;
00247       poffice = 0;
00248       throw e;
00249    }
00250    
00251 }
00252 
00253 //
00254 void s3_message_box::de_init()
00255 {
00256    if ( ! initialised() )
00257    {
00258       throw s3_generic_exception("s3_message_box::de_init()",
00259              "Precondition initialised() == "
00260              "true violated");
00261    }
00262    // disconnect and stop - this throws if we still have clients
00263    poffice->disconnect();
00264    poffice->request_terminate(); // Use soft terminate <jp>
00265    poffice->wait_on_exit();
00266    delete poffice;
00267    poffice = 0;
00268 }
00269 
00270 //
00271 bool s3_message_box::initialised()
00272 {
00273    S3FC_DBG( std::cerr << "s3_message_box::initialised() poffice = "
00274         << poffice << std::endl << std::flush );
00275 
00276    return (poffice != 0 &&
00277       poffice->is_connected());
00278 }
00279 
00280 //
00281 void s3_message_box::set_name(const std::string& n_name)
00282 {
00283    if ( connected ) 
00284    {
00285       throw s3_generic_exception("s3_message_box::set_name()",
00286              err_str("Connection exists."));
00287    }
00288    name = n_name;
00289 }
00290 
00291 //
00292 void s3_message_box::connect()
00293 {
00294    if ( connected ) 
00295    {
00296       throw s3_generic_exception("s3_message_box::connect()",
00297              err_str("Connection exists."));
00298    }
00299    if ( name.empty() )
00300    {
00301       std::stringstream what;
00302       what << "Invalid name '" << name << "'";
00303       throw s3_generic_exception("s3_message_box::connect()", 
00304              what.str());
00305    }
00306    // register with the post office
00307    poffice->add_dst(name, *this, false);
00308    connected = true;
00309 }
00310 
00311 //
00312 void s3_message_box::disconnect()
00313 {
00314    if ( ! connected )
00315    {
00316       throw s3_generic_exception("s3_message_box::disconnect()",
00317              err_str("No connection."));
00318    }
00319    // unsubscribe from all groups
00320    while ( ! subscription_list.empty() )
00321    {
00322       unsubscribe_group(subscription_list.front());
00323       //subscription_list.pop_front();
00324    }
00325    // deregister
00326    poffice->remove_dst(name, *this, false);
00327    connected = false;
00328 }
00329 
00330 //
00331 void s3_message_box::subscribe_group(const std::string& group)
00332 {
00333    if ( ! connected )
00334    {
00335       throw s3_generic_exception("s3_message_box::subscribe_group()",
00336              err_str("No connection."));
00337    }
00338    // see if we're already subscribed
00339    for ( std::list<std::string>::const_iterator ptr = subscription_list.begin();
00340     ptr != subscription_list.end();
00341     ptr++)
00342    {
00343       if (*ptr == group)
00344       {
00345     std::stringstream str;
00346     str << "Already subscribed to group: '" << group << "'";
00347     throw s3_generic_exception("s3_message_box::subscribe_group()",
00348                 err_str(str.str()));
00349       }
00350    }
00351    poffice->add_dst(group, *this, true);
00352    subscription_list.push_back(group);
00353 }
00354 
00355 //
00356 void s3_message_box::unsubscribe_group(const std::string& group)
00357 {
00358    if ( ! connected )
00359    {
00360       throw s3_generic_exception("s3_message_box::unsubscribe_group()",
00361              err_str("No connection."));
00362    }
00363    // check subscription
00364    std::list<std::string>::iterator ptr;
00365    ptr = std::find(subscription_list.begin(), subscription_list.end(), group);
00366    // not found ?
00367    if ( ptr == subscription_list.end() )
00368    {
00369       std::stringstream str;
00370       str << "Not subscribed to group: '" << group << "'";
00371       throw s3_generic_exception("s3_message_box::unsubscribe_group()",
00372              err_str(str.str()));
00373    }
00374    poffice->remove_dst(group, *this, true);
00375    subscription_list.erase(ptr);
00376 }
00377 
00378 //
00379 void s3_message_box::subscribe_arrival(const s3_semaphore& sem)
00380 {
00381    std::list<s3_semaphore* >::const_iterator ptr = 
00382       std::find(arrival_notification_list.begin(),
00383       arrival_notification_list.end(),
00384       &sem);
00385    if ( ptr != arrival_notification_list.end()) 
00386    {
00387       std::stringstream str;
00388       str << "Already subscribed for arrival: '" << &sem << "'";
00389       throw s3_generic_exception("s3_message_box::subscribe_arrival()",
00390              err_str(str.str()));      
00391    }
00392    arrival_notification_list.push_back(const_cast<s3_semaphore*>(&sem));
00393 }
00394 
00395 //
00396 s3_message::msg_id_t s3_message_box::send_msg(const std::string& dst,
00397                      const std::string& body,
00398                      bool priority)
00399 {
00400    if ( ! connected )
00401    {
00402       throw s3_generic_exception("s3_message_box::send_msg()",
00403              err_str("No connection."));
00404    }
00405    s3_message::msg_id_t id = get_next_message_id();
00406    s3_message msg(name, dst, body, (priority ? 
00407                 s3_message::PRIORITY : 
00408                 s3_message::NORMAL),
00409         id);
00410    poffice->dispatch_msg(msg, *this);
00411    return id;
00412 }
00413 
00414 //
00415 s3_message::msg_id_t s3_message_box::reply_msg(const s3_message& msg,
00416                       const std::string& body)
00417 {
00418    s3_message::msg_id_t id = get_next_message_id();
00419    s3_message reply_msg(name, msg, body, id);
00420    poffice->dispatch_msg(reply_msg, *this);
00421    return id;
00422 }
00423 
00424 
00425 // the rx_queue is always sorted candidate-first by the scheme
00426 // implemented in deliver_message
00427 s3_message s3_message_box::get_msg()
00428 {
00429    // need locks here as we might run concurrently with a post office
00430    // doing a delivery
00431    state_lock.lock();
00432 
00433    s3_message msg;
00434    // candidate is always the first in the queue
00435    if (! rx_queue.empty() )
00436    {
00437       msg = rx_queue.front();
00438       rx_queue.pop_front();
00439    }
00440    else
00441    {
00442       // empty message
00443       msg = s3_message();
00444    }
00445    state_lock.unlock();
00446    return msg;
00447 }
00448 
00449 // insert the new message into the queue in such a way that
00450 // the first is always the candidate
00451 void s3_message_box::deliver_msg(const s3_message& msg)
00452 {
00453    // need locks here as we might run concurrently with a user app
00454    // doing a get_msg
00455    state_lock.lock();
00456    bool rx_changed = false;
00457    // full queue: remove the oldest non-priority message unless
00458    //  all queued messages are priority and the new messages is also
00459    //  priority, in which case the oldest is removed
00460    if ( rx_queue.size() == rx_queue_len )
00461    {
00462       T_RX_Queue::iterator ptr;
00463       // find oldest (first) non-prio (oldest at beginning)
00464       for (ptr = rx_queue.begin(); 
00465       ptr != rx_queue.end();
00466       ptr++)
00467       {
00468     if ((*ptr).get_class() == s3_message::NORMAL) break;
00469       }
00470       // no non-prio in queue: remove the oldest prio
00471       if ( ptr == rx_queue.end() )
00472       {
00473     ptr = rx_queue.begin();
00474       }
00475       // remove it
00476       if ( ptr != rx_queue.end() )
00477       {
00478          rx_queue.erase(ptr);
00479          rx_changed = true;
00480          dropped_msg_count++;
00481       }
00482    }
00483    
00484    // add new message
00485    // prio: add to end of prio messages
00486    if ( msg.get_class() == s3_message::PRIORITY )
00487    {
00488       T_RX_Queue::iterator ptr;
00489       // find first non-prio - just after newest prio
00490       for (ptr = rx_queue.begin(); 
00491       ptr != rx_queue.end();
00492       ptr++)
00493       {
00494     if ((*ptr).get_class() == s3_message::NORMAL) break;
00495       }
00496       // no non-prio: insert new at end
00497       if ( ptr == rx_queue.end() ) {
00498     rx_queue.push_back(msg);
00499     rx_changed = true;
00500       }
00501       // add before start of non-prios (i.e. at end of prios)
00502       else
00503       {
00504     rx_queue.insert(ptr, msg);
00505     rx_changed = true;
00506       }
00507    }
00508    // non-prio: add to end
00509    else
00510    {
00511       rx_queue.push_back(msg);
00512       rx_changed = true;
00513    }
00514    // notify everyone in arrival_notification_list
00515    if ( rx_changed )
00516    {
00517       for ( std::list<s3_semaphore*>::const_iterator ptr = 
00518           arrival_notification_list.begin();
00519        ptr != arrival_notification_list.end();
00520        ptr++ )
00521       {
00522     (*ptr)->post();
00523       }
00524    }
00525    state_lock.unlock();
00526 }
00527 
00528 // format error string - embed name
00529 std::string s3_message_box::err_str(const std::string& err)
00530 {
00531    std::stringstream str;
00532    str << "[name = '" << name << "']: "
00533        << err;
00534    return str.str();
00535 }
00536 
00537 //
00538 std::string s3_message_box::get_next_message_id()
00539 {
00540    std::stringstream str;
00541    str << name << "/" << seq_nr;
00542    seq_nr++;
00543    return str.str();
00544 }
00545 
00546 //
00547 s3_post_office::s3_post_office() :
00548    conn(0),
00549    ctrl_rx_queue(10),
00550    client_rx_queue(10),
00551    seq_nr(0)
00552 {
00553 }
00554 
00555 // destructor
00556 s3_post_office::~s3_post_office()
00557 {
00558    if ( is_connected() )
00559    {
00560       disconnect();
00561    }   
00562 }
00563 
00564 #if S3_HAS_LOCAL_PO_SWITCH
00565 //
00566 void s3_post_office::connect( void )
00567 {
00568    if (conn)
00569    {
00570       throw s3_generic_exception("s3_post_office::connect()",
00571              "Already connected");
00572    }
00573    conn = new s3_txport_local<s3_message>;
00574 
00575    if ( ! s3_post_office_switch::po_switch_txp_ptr  )
00576    {
00577       throw s3_generic_exception("s3_post_office::connect()",
00578                                  "post office switch not running");
00579 
00580    }
00581    conn_id = conn->connect( s3_post_office_switch::po_switch_txp_ptr );
00582    
00583    if (! conn->is_connected_by_id(conn_id))
00584    {
00585       // we may go out of scope here, so we should delete conn.
00586       delete conn;
00587       conn = 0;
00588       throw s3_generic_exception("s3_post_office::connect()",
00589              "Connection failed");
00590    } 
00591 
00592    // start myself
00593    start();
00594    // wait for actual start before returning
00595    started.wait();
00596 }
00597 #else
00598 //
00599 void s3_post_office::connect(const std::string& ip,
00600               int port)
00601 {
00602    if (conn)
00603    {
00604       throw s3_generic_exception("s3_post_office::connect()",
00605              "Already connected");
00606    }
00607    conn = new s3_txport_tcp<s3_message>;
00608    conn_id = conn->connect(ip, port);
00609 
00610    if (! conn->is_connected_by_id(conn_id))
00611    {
00612       // we may go out of scope here, so we should delete conn.
00613       delete conn;
00614       conn = 0;
00615       throw s3_generic_exception("s3_post_office::connect()",
00616              "Connection failed");
00617    }
00618    // start myself
00619    start();
00620    // wait for actual start before returning
00621    started.wait();
00622 }
00623 #endif
00624 
00625 
00626 //
00627 void s3_post_office::disconnect()
00628 {
00629    // just do nothing if we're not connected (not a sin)
00630    if ( ! conn ) return;
00631    // throw if there are still connected clients.
00632    if ( ! dst_map.empty() )
00633    {
00634       throw s3_generic_exception("s3_post_office::disconnect()",
00635              "Connected clients present");
00636    }
00637    // stop event loop, disconnect and deallocate
00638    try
00639    {
00640       conn->stop();
00641       conn->wait_on_exit();
00642       delete conn;
00643       conn = 0;      
00644    }
00645    catch(const std::exception& e)
00646    {
00647       std::cerr << "s3_post_office::disconnect() " << e.what()
00648       << std::endl << std::flush;
00649    }
00650 }
00651 
00652 //
00653 void s3_post_office::add_dst(const std::string& dst,
00654               s3_message_box& mbox,
00655               bool is_group)
00656 {
00657    // atomic register and add operation
00658    dst_lock.lock();
00659    // if no connection, throw
00660    if ( ! is_connected() ) 
00661    {
00662       dst_lock.unlock();
00663       throw s3_generic_exception("s3_post_office::add_dst()",
00664              "No connection");
00665    }
00666    bool present = (dst_map.count(dst) == 1);
00667    // if destination is not a group and already in map, throw  [/g p]  
00668    if ((! is_group) && present)
00669    {
00670       dst_lock.unlock();
00671       throw s3_generic_exception("s3_post_office::add_dst()",
00672              "Name clash (local)");
00673    }
00674    // if destination is not present in map register with switch, check for
00675    // uniqueness and add to map  [ g /p || /g /p = /p ]
00676    if (! present)
00677    {
00678       // this should be able to proceed *with* a state lock in place
00679       if ( add_dst_switch(dst, is_group) ) 
00680       {
00681     // atomic map update
00682     lock_state();
00683     dst_map[dst] = dst_data(std::list<s3_message_box*>(1, &mbox), 
00684              is_group);
00685     unlock_state();
00686       }
00687       else
00688       {
00689     dst_lock.unlock();
00690     throw s3_generic_exception("s3_post_office::add_dst()",
00691                 "Name clash");
00692       }
00693    }
00694    // group and in map - uniqueness already verified and already registered
00695    // with switch [g p]
00696    else
00697    {
00698       // atomic map update
00699       lock_state();
00700       dst_map[dst].clients.push_back(&mbox);
00701       unlock_state();
00702    }
00703    dst_lock.unlock();
00704 }
00705 
00706 //
00707 void s3_post_office::remove_dst(const std::string& dst,
00708             s3_message_box& mbox,
00709             bool is_group )
00710 {
00711    // atomic deregister and remove operation
00712    dst_lock.lock();
00713    // if no connection, throw
00714    if ( ! is_connected() ) 
00715    {
00716       dst_lock.unlock();
00717       throw s3_generic_exception("s3_post_office::remove_dst()",
00718              "No connection");
00719    }
00720    bool present = (dst_map.count(dst) == 1);
00721    // if destination is not in map, throw
00722    if (! present)
00723    {
00724       dst_lock.unlock();
00725       throw s3_generic_exception("s3_post_office::remove_dst()",
00726              "Destination not registered");
00727    }
00728    // if only single client in list unregister with switch and remove entry
00729    // from map
00730    if ( dst_map[dst].clients.size() == 1 )
00731    {
00732       remove_dst_switch(dst, is_group);
00733       // atomic map update
00734       lock_state();
00735       dst_map.erase(dst);
00736       unlock_state();
00737    }
00738    // remove client from list
00739    else
00740    {
00741       // atomic map update
00742       lock_state();
00743       std::list<s3_message_box*>::iterator c =
00744     std::find(dst_map[dst].clients.begin(),
00745          dst_map[dst].clients.end(),
00746          &mbox);
00747       dst_map[dst].clients.erase(c);
00748       unlock_state();
00749    }
00750    dst_lock.unlock();
00751 }
00752 
00753 //
00754 void s3_post_office::main_loop()
00755 {
00756    if (! is_connected() )
00757    {
00758       throw s3_generic_exception("s3_post_office::main_loop()",
00759              "No connection");
00760    }
00761 
00762    // incoming queue from post office switch
00763    s3_fifo_queue<s3_txport_data<s3_message> >& switch_rx_queue =
00764       conn->get_rx_queue();
00765    s3_fifo_queue<s3_txport_data<s3_message> >& switch_tx_queue =
00766       conn->get_tx_queue();  
00767 
00768    // semaphore used to synch to queue entries. subscribe this to
00769    // switch_rx_queue and client_rx_queue as consumers
00770    s3_semaphore rx_rdy;
00771 
00772    switch_rx_queue.subscribe_consumer(rx_rdy);
00773    client_rx_queue.subscribe_consumer(rx_rdy);
00774   
00775    /*
00776     * This periodic notifier is used to dislodge the rx_rdy semaphore.
00777     * The semaphore is posted when switch_rx_queue and client_rx_queue
00778     * received data.
00779     *
00780     * Without the notifier, rx_rdy.wait() will block the thread. This
00781     * poses a problem in the WIN32 port, where phtread_cancel does not
00782     * work correctly.
00783     *
00784     * THe periodic notifier is not required on the Unix platform,
00785     * but it provides a cleaner termination strategy.
00786     *
00787     * The periodic notifier's timeout determines what the maximum time
00788     * is that you will wait after request_terminate() has been
00789     * called. Feel free to experiment.
00790     * 
00791     * This notifier is a requirement for the WIN32 port.
00792     * 30 May 2002: Jan Pool <jpool@stonethree.com>
00793     */
00794    s3_periodic_notifier term_pn(1000);
00795    term_pn.subscribe(rx_rdy);
00796    term_pn.enable();
00797 
00798    // message and dispatcher
00799    //s3_message msg;
00800    //s3_message_box* org = 0;
00801    client_msg cm;
00802    // use for alternating between input queues
00803    bool twiddle = true;
00804 
00805    // signal all is go and spin
00806    started.post();
00807    // Use test_terminate to protect is_connected. [jpool]
00808    while ( !test_terminate() && is_connected() ) 
00809    {
00810       // Modified the while to make the termination easier. [jpool]
00811 
00812       int oper = 0;
00813 
00814       // wait for queue activity
00815       rx_rdy.wait();      
00816       
00817       /*
00818        * We must test for termination here, since some of the attributes
00819        * do not exist anymore. We must therefore break out before trying
00820        * to access any of them.
00821        * [jpool]
00822        */
00823       if ( test_terminate() )
00824       {
00825     break;   
00826       } // if test_terminate()       
00827        
00828       if ( twiddle )
00829       {
00830     if ( ! switch_rx_queue.empty() ) 
00831     {
00832        oper = 1;
00833        twiddle = false;
00834     }
00835     else if ( ! client_rx_queue.empty() ) 
00836     {
00837        oper = 2;
00838        twiddle = true;
00839     }
00840       }
00841       else
00842       {
00843     if ( ! client_rx_queue.empty() ) 
00844     {
00845        oper = 2;
00846        twiddle = true;
00847     }
00848     else if ( ! switch_rx_queue.empty() ) 
00849     {
00850        oper = 1;
00851        twiddle = false;
00852     } 
00853       }      
00854       
00855       // If we received data, we process it here.
00856       if ( oper != 0)
00857       {
00858     // Got data.
00859     if ( oper == 1 ) // read switch
00860     {
00861        cm.msg = switch_rx_queue.pop().data;
00862        cm.client = 0;
00863        S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00864             << "] " << "reading from switch queue" << std::endl
00865             << std::flush);
00866     } // oper == 1
00867     else if ( oper == 2 )  // read client
00868     {
00869        cm = client_rx_queue.pop();
00870        S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00871             << "] " << "reading from " << "client queue"
00872             << std::endl << std::flush);
00873     } // oper = 2
00874     
00875     // cm now effectively contains the next message removed from
00876     // rx_queue, with its originator
00877     S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00878          << "] " << "message received (" << cm.client << "): "
00879          << std::endl << "  " << cm.msg.get_debug_str()
00880          << std::endl << std::flush );
00881     
00882     // message routed into control queue
00883     if (cm.msg.get_class() == s3_message::CONTROL)
00884     {
00885        S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00886             << "] " << "routing msg into ctrl_rx_queue"
00887             << std::endl << std::flush );
00888        ctrl_rx_queue.push(cm.msg);
00889     }
00890     // regular message
00891     else 
00892     {
00893             // protect the map while reading it
00894             lock_state();
00895 
00896        const bool present = dst_map.count( cm.msg.get_to() ) > 0;
00897             const bool group = present ?
00898                dst_map[cm.msg.get_to()].group : false;
00899 
00900             const std::list<s3_message_box*>& clients = present ?
00901                dst_map[cm.msg.get_to()].clients : std::list<s3_message_box*>();
00902        
00903             // origin is switch: forward to all in clients
00904        if ( cm.client == 0 )
00905        {
00906           deliver_all_but_one(cm.msg, clients, 0);
00907        } // if cm.client
00908        // origin is client
00909        else
00910        {
00911           // dest not present: forward to switch
00912           if ( ! present )
00913           {
00914         S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00915              << this << "] " << "forwarding msg to (switch)"
00916              << std::endl << std::flush );
00917         switch_tx_queue.push(s3_txport_data<s3_message>(cm.msg,
00918                           conn_id));
00919           }
00920           // dest is present and not group: forward to single list entry
00921           else if ( ! group )
00922           {
00923         // sanity check
00924         if ( clients.size() != 1 )
00925         {
00926            throw s3_generic_exception("s3_post_office::main_loop()",
00927                   "Internal error: "
00928                   "Non-group dest with "
00929                   "clients.size()> 1 ");
00930            
00931         }
00932         S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00933              << this << "] " << "forwarding msg to (client "
00934              << clients.front() << ")" << std::endl
00935              << std::flush );
00936         clients.front()->deliver_msg(cm.msg);
00937           }
00938           // dest is present and group: forward to clients in list except
00939           //   originating AND forward to switch
00940           else
00941           {
00942         deliver_all_but_one(cm.msg, clients, cm.client);
00943         S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00944              << this << "] forwarding msg to " << "(switch)"
00945              << std::endl << std::flush );
00946         switch_tx_queue.push(
00947            s3_txport_data<s3_message>(cm.msg, conn_id));
00948           }
00949        }
00950             unlock_state();
00951          //   dst_lock.unlock();
00952     } 
00953       }
00954    }
00955    
00956    // cleanup periodic notifier stuff. [jpool]
00957    term_pn.disable();
00958    term_pn.unsubscribe(rx_rdy);
00959 
00960    return;
00961 }
00962 
00963 
00964 //
00965 void s3_post_office::dispatch_msg(const s3_message& msg,
00966               s3_message_box& mbox)
00967 {
00968    client_rx_queue.push(client_msg(msg, &mbox));
00969 }
00970 
00971 //
00972 bool s3_post_office::is_connected() const
00973 {
00974    // we love short-circuiting boolean expressions
00975    return (conn && conn->is_connected_by_id(conn_id));
00976 }
00977 
00978 //
00979 bool s3_post_office::add_dst_switch(const std::string& name,
00980                 bool is_group)
00981 {
00982 
00983    std::stringstream body;
00984    body << "add_dst" 
00985    << " " << (is_group ? "group" : "individual") 
00986    << " " << name;
00987     
00988    // create control messages requesting add_dst and dispatch
00989    s3_message req("__post_office__", 
00990         "__post_office_switch__", 
00991         body.str(),
00992         s3_message::CONTROL,
00993         get_next_message_id());
00994    S3FC_DBG( std::cerr
00995         << "s3_post_office::add_dst_switch() Sending control message: "
00996         << std::endl << "  "
00997         << req.get_debug_str() << std::endl << std::flush);
00998    conn->get_tx_queue().push(s3_txport_data<s3_message>(req, conn_id));
00999 
01000    // wait for response
01001    s3_message resp;
01002    resp = ctrl_rx_queue.pop();
01003 
01004    return (resp.get_body() == "success");
01005 }
01006 
01007 //
01008 bool s3_post_office::remove_dst_switch(const std::string& name,
01009                    bool is_group)
01010 {
01011    std::stringstream body;
01012    body << "remove_dst" 
01013    << " " << (is_group ? "group" : "individual") 
01014    << " " << name;
01015     
01016    // create control messages requesting add_dst and dispatch
01017    s3_message req("__post_office__", 
01018         "__post_office_switch__", 
01019         body.str(),
01020         s3_message::CONTROL,
01021         get_next_message_id());
01022    S3FC_DBG( std::cerr << "s3_post_office::remove_dst_switch() "
01023         << "Sending control message: " << std::endl << "  "
01024         << req.get_debug_str() << std::endl << std::flush);
01025    conn->get_tx_queue().push(s3_txport_data<s3_message>(req, conn_id));
01026 
01027    // wait for response
01028    s3_message resp;
01029    resp = ctrl_rx_queue.pop();
01030 
01031    return (resp.get_body() == "success");
01032 }
01033 
01034 //
01035 void s3_post_office::deliver_all_but_one(
01036    const s3_message& msg,
01037    const std::list<s3_message_box*>& clients,
01038    const s3_message_box* except)
01039 {
01040    for (std::list<s3_message_box*>::const_iterator ptr = clients.begin();
01041    ptr != clients.end();
01042    ptr++)
01043    {
01044       if ( *ptr != except ) 
01045       {
01046     S3FC_DBG( std::cerr << "s3_post_office::main_loop() "
01047          << "forwarding msg to (client "
01048          << *ptr << ")" << std::endl << std::flush );
01049 
01050     (*ptr)->deliver_msg(msg);
01051       }
01052    }
01053 }
01054 
01055 //
01056 std::string s3_post_office::get_next_message_id()
01057 {
01058    std::stringstream str;
01059    str << "__post_office__" << "/" << seq_nr;
01060    seq_nr++;
01061    return str.str();
01062 }
01063 
01064 #if S3_HAS_LOCAL_PO_SWITCH
01065 s3_txport_local<s3_message>* s3_post_office_switch::po_switch_txp_ptr = 0;
01066 #endif
01067 
01068 //
01069 s3_post_office_switch::txp_event_queuer::~txp_event_queuer()
01070 {
01071 }
01072 
01073 //
01074 void s3_post_office_switch::txp_event_queuer::operator()(
01075    const s3_txport_event& event)
01076 {
01077    // this can (and should) never block
01078    ev_queue.push(event);
01079 }
01080 
01081 //
01082 s3_post_office_switch::dst_data::dst_data()
01083 {
01084 }
01085 
01086 //
01087 s3_post_office_switch::dst_data::dst_data(
01088    const std::list<client_id_t>& n_clients,
01089    bool n_group) :
01090    clients(n_clients),
01091    group(n_group)
01092 {
01093 }
01094 
01095 //
01096 s3_post_office_switch::s3_post_office_switch(int n_port) :
01097    conn(0),
01098    seq_nr(0)
01099 {
01100 #if S3_HAS_LOCAL_PO_SWITCH
01101    conn = new s3_txport_local<s3_message>;
01102    po_switch_txp_ptr = conn;
01103 #else
01104    conn = new s3_txport_tcp<s3_message>;
01105    conn->listen(n_port);
01106 
01107  
01108    if ( ! conn->listening() )
01109    {
01110       // check status - throw if problem
01111       delete conn;
01112       conn = 0;
01113       throw s3_generic_exception("s3_post_office_switch"
01114              "::s3_post_office_switch()",
01115              "Init failed, port may be in use.");
01116    }
01117 #endif
01118 
01119    // attach event queuer to transport
01120    conn->subscribe_handler(txp_event_queuer);
01121    // start myself
01122    start();
01123    // wait for start to be indicated
01124    started.wait();
01125 }
01126 
01127 //
01128 s3_post_office_switch::~s3_post_office_switch()
01129 {
01130    if (is_running())
01131    {
01132       // Use soft terminate. [jpool]
01133       request_terminate();
01134       wait_on_exit();
01135    }
01136    conn->stop();
01137 }
01138 
01139 //
01140 void s3_post_office_switch::main_loop()
01141 {
01142    // queue through which we receive messages
01143    queue_t& rx_queue = conn->get_rx_queue();
01144    // this should get posted if any of the rx queues (message or transport
01145    // event) receive something
01146    s3_semaphore rdy;
01147    rx_queue.subscribe_consumer(rdy);
01148    txp_event_queuer.ev_queue.subscribe_consumer(rdy);
01149    
01150    /*
01151     * This periodic notifier is used to dislodge the rdy semaphore.
01152     * The semaphore is posted when rx_queue and txp_event_queuer
01153     * received data.
01154     *
01155     * Without the notifier, rdy.wait() will block the thread. This
01156     * poses a problem in the WIN32 port, where phtread_cancel does not
01157     * work correctly.
01158     *
01159     * THe periodic notifier is not required on the Unix platform,
01160     * but it provides a cleaner termination strategy.
01161     *
01162     * The periodic notifier's timeout determines what the maximum time
01163     * is that you will wait after request_terminate() has been
01164     * called. Feel free to experiment.
01165     * 
01166     * This notifier is a requirement for the WIN32 port.
01167     * 30 May 2002: Jan Pool <jpool@stonethree.com>
01168     */   
01169    s3_periodic_notifier term_pn(1000);
01170    term_pn.subscribe(rdy);
01171    term_pn.enable();
01172 
01173    // indicate that we have started
01174    started.post();
01175    
01176    // run until the transport dies or we get stopped
01177    while ( conn->listening() )
01178    {
01179       rdy.wait();
01180 
01181       /*
01182        * We must test for termination here, since some of the attributes
01183        * do not exist anymore. We must therefore break out before trying
01184        * to access any of them.
01185        * [jpool]
01186        */
01187       if ( test_terminate() )  
01188       {
01189     break;
01190       } // if test_terminate() 
01191 
01192       // service all pending transport events
01193       if (! txp_event_queuer.ev_queue.empty())
01194       {
01195     handle_transport_events();
01196       }
01197       // service the rx queue
01198       if (! rx_queue.empty())
01199       {
01200     // we know there is something in the queue - never block
01201     s3_txport_data<s3_message> packet = rx_queue.pop();
01202     S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop() "
01203          << "recvd message from (" << packet.get_id() << "): "
01204          << std::endl << "  " << packet.data.get_debug_str()
01205          << std::endl << std::flush );
01206     
01207     // control message ?
01208     if ( packet.data.get_class() == s3_message::CONTROL) 
01209     {
01210        std::vector<std::string> cmd = 
01211           s3_conversion::string_to_vec_string(packet.data.get_body());
01212        // 
01213        if ( cmd.size() != 3 )
01214        {
01215           std::stringstream lmsg;
01216           lmsg << "Invalid command length: size = " << cmd.size();
01217           for ( unsigned int i = 0; i < cmd.size(); i++ )
01218           {
01219         lmsg << " cmd[" << i << "]: '" << cmd[i] << "'";
01220           }
01221           lmsg << "Message dump: " << packet.data.get_debug_str()
01222           << std::endl;
01223           log_msg("main_loop()", lmsg.str());
01224        }
01225        bool success = false;
01226        
01227        bool is_group = (cmd[1] == "group");
01228        std::string dst = cmd[2];
01229        client_id_t id = packet.get_id();
01230        if ( cmd[0] == "add_dst" )
01231        {
01232           success = add_dst(dst, id, is_group);
01233           S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()" 
01234           << " add_dst said: " << success
01235           << std::endl << std::flush );
01236        }
01237        else if ( cmd[0] == "remove_dst" )
01238        {
01239           success = remove_dst(dst, id, is_group);
01240           S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()" 
01241           << " remove_dst said: " << success
01242           << std::endl << std::flush );
01243        }
01244        else 
01245        {
01246           success = false;
01247           std::stringstream lmsg;
01248           lmsg << "main_loop()" 
01249           << " unknown command: '" 
01250           << cmd[1] << "'";
01251           log_msg("main_loop",
01252              lmsg.str());
01253        }
01254        
01255        if ( success ) 
01256        {
01257           reply_success(packet);
01258        }
01259        else
01260        {
01261           reply_failure(packet);
01262        }
01263     }
01264     // normal packets are routed between client connections
01265     else 
01266     {
01267        // where it came from
01268        client_id_t src_client = packet.get_id();
01269        // where it should go
01270        std::list<client_id_t> dst_clients;
01271        bool present = false;
01272        //
01273        lock_state();
01274        if ( dst_map.count(packet.data.get_to()) )
01275        {
01276           dst_clients = dst_map[packet.data.get_to()].clients;
01277           present = true;
01278        }
01279        unlock_state();
01280        // only forward if destination present in map
01281        if ( present ) 
01282        {
01283           S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop() " 
01284           << dst_clients.size() << " clients in list"
01285           << std::endl << std::flush );
01286           lock_state();
01287           for (std::list<client_id_t>::const_iterator ptr =
01288              dst_clients.begin();
01289           ptr != dst_clients.end();
01290           ptr++)
01291           {
01292         // don't forward to originating client
01293         if ( *ptr != src_client )
01294         {
01295            S3FC_DBG( std::cerr
01296                 << "s3_post_office_switch::main_loop()" 
01297                 << " forwarding to (" << packet.get_id()
01298                 << ")" << std::endl << std::flush );
01299            forward_msg(packet.data, *ptr);
01300         }
01301           }
01302           unlock_state();
01303        }
01304        else
01305        {
01306           S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()" 
01307           << " dropped message - no clients "
01308           << std::endl << std::flush );
01309        }
01310     }
01311       }
01312    }
01313 
01314    // Clean up periodic notifier stuff. [jpool]
01315    term_pn.unsubscribe(rdy);
01316    term_pn.disable();
01317 
01318    return;
01319 }
01320 
01321 // associate client with dst. check for ambiguous names (if not 
01322 // a group).
01323 // 
01324 bool s3_post_office_switch::add_dst(const std::string& dst,
01325                 client_id_t client,
01326                 bool is_group)
01327 {
01328    bool success = false;
01329    bool present = (dst_map.count(dst) == 1);
01330 
01331    // if destination is a group OR an individual that is NOT present,
01332    // we may proceed - else someone already added individual with 
01333    // existing name and we should fail
01334    if ( is_group || ! present )
01335    {
01336       // dst already there - client should be added to list if its
01337       // not already there
01338       if ( present )
01339       {
01340     std::list<client_id_t>& c_clients = dst_map[dst].clients;
01341     // only add if client not already in list
01342     if ( std::find(c_clients.begin(), c_clients.end(), client) ==
01343          c_clients.end() )
01344     {
01345        c_clients.push_back(client);
01346     }
01347       }
01348       // create entry for new destination in the map and add client as
01349       // the only one in the client list
01350       else
01351       {
01352 
01353     dst_map[dst] = dst_data(std::list<client_id_t>(1, client),
01354              is_group);
01355       }
01356       success = true;
01357    }
01358    // log failure
01359    if ( ! success )
01360    {
01361       std::stringstream lmsg;
01362       lmsg << "Failure: "
01363       << "[dst = '" << dst << "'"
01364       << ", client = '" << client << "'"
01365       << ", is_group = '" << "'"
01366       << ", present = '" << present << "'"
01367       << "]";
01368       log_msg("add_dst()", lmsg.str());
01369    }
01370 
01371    return success;
01372 }
01373 
01374 // remove an association between dst and client. if the client is the
01375 // only one in the client list, the destination is entirely removed from
01376 // the map. 
01377 // a check is first performed to see whether the association actually 
01378 // exists and has the supplied type - if this fails, then no action is 
01379 // taken, but a true is returned
01380 bool s3_post_office_switch::remove_dst(const std::string& dst,
01381                    client_id_t client,
01382                    bool is_group)
01383 {
01384    bool success = false;
01385    bool present = (dst_map.count(dst) == 1);
01386    
01387    // only attempt remove if present
01388    if (present)
01389    {
01390       // check whether registered name coresponds to passed arguments,
01391       // i.e. in map and with correct type (indiv/group)
01392       if(dst_map[dst].group == is_group)
01393       {
01394     // iterator referencing client associated with dst
01395     std::list<client_id_t>::iterator cp =
01396        std::find(dst_map[dst].clients.begin(),
01397             dst_map[dst].clients.end(),
01398             client);
01399     if ( cp != dst_map[dst].clients.end() )
01400     {
01401        dst_map[dst].clients.erase(cp);
01402        // if dst entry in dst_map has no more clients in list, it should
01403        // be removed from the map
01404        if ( dst_map[dst].clients.size() == 0 )
01405        {
01406           dst_map.erase(dst);
01407        }
01408        success = true;
01409     }
01410       }
01411    }
01412    // 
01413    else
01414    {
01415       success = true;
01416    }
01417    // log failure
01418    if ( ! success )
01419    {
01420       std::stringstream lmsg;
01421       lmsg << "Failure: "
01422       << "[dst = '" << dst << "'"
01423       << ", client = '" << client << "'"
01424       << ", is_group = '" << "'"
01425       << ", present = '" << present << "'"
01426       << ", dst_map[dst].group = '" << dst_map[dst].group << "'"
01427       << "]";
01428       log_msg("remove_dst()", lmsg.str());
01429    }
01430    return success;
01431 }
01432 
01433 //
01434 void s3_post_office_switch::remove_client(client_id_t client)
01435 {
01436    // iterate through map, checking for the presence of client in each
01437    // entry's client list and then removing it. if this results in the
01438    // list being empty - push associated destination key into the prune list
01439    std::vector<std::string> prune_list;
01440    for (std::map<std::string, dst_data>::iterator map_iter = dst_map.begin();
01441    map_iter != dst_map.end();
01442    ++map_iter)
01443    {
01444       // remove elements with client in client_list corresponding to map
01445       // entry at map_iter
01446       map_iter->second.clients.remove(client);
01447       // add to prune list if empty
01448       if (map_iter->second.clients.size() == 0)
01449       {
01450     prune_list.push_back(map_iter->first);
01451       }
01452    }
01453    // remove destinations with empty client lists
01454    for (std::vector<std::string>::iterator iter = prune_list.begin();
01455    iter != prune_list.end();
01456    ++iter)
01457    {
01458       dst_map.erase(*iter);
01459    }
01460 }
01461 
01462 // 
01463 void 
01464 s3_post_office_switch::reply_success(const s3_txport_data<s3_message>& packet)
01465 {
01466    const s3_message& msg = packet.data;
01467    s3_message reply_msg("__post_office_switch__",
01468          msg,
01469          "success",
01470          get_next_message_id());
01471    forward_msg(reply_msg, packet.get_id());
01472 }
01473 
01474 // 
01475 void 
01476 s3_post_office_switch::reply_failure(const s3_txport_data<s3_message>& packet)
01477 {
01478    const s3_message& msg = packet.data;
01479    s3_message reply_msg("__post_office_switch__",
01480          msg,
01481          "failure",
01482          get_next_message_id());
01483    forward_msg(msg, packet.get_id());
01484 }
01485 
01486 //
01487 void s3_post_office_switch::forward_msg(const s3_message& msg,
01488                const client_id_t client)
01489 {
01490    S3FC_DBG( std::cerr << "s3_post_office_switch::forward_msg() "
01491         << "forwarding to (" << client << "): "
01492         << std::endl << "  "
01493         << msg.get_debug_str() << std::endl << std::flush );
01494    conn->get_tx_queue().push(s3_txport_data<s3_message>(msg, client));
01495 }
01496 
01497 // 
01498 void s3_post_office_switch::log_msg(const std::string& where,
01499                 const std::string& what)
01500 {
01501    std::cerr << "s3_post_office_switch::" << where << " [LOGMSG] [" 
01502         << what << "]" << std::endl << std::flush;
01503 }
01504 
01505 //
01506 std::string s3_post_office_switch::get_next_message_id()
01507 {
01508    std::stringstream str;
01509    str << "__post_office_switch__" << "/" << seq_nr;
01510    seq_nr++;
01511    return str.str();
01512 }
01513 
01514 void s3_post_office_switch::handle_transport_events()
01515 {
01516    // handle all pending events
01517    while (! txp_event_queuer.ev_queue.empty())
01518    {
01519       s3_txport_event ev = txp_event_queuer.ev_queue.pop();
01520       // only handle disconnect events - ignore the rest
01521       if (ev.event == s3_txport_event::disconnect)
01522       {
01523     remove_client(ev.id);
01524       }
01525    }
01526 }

Send comments to: s3fc@stonethree.com SourceForge Logo