00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00033 #include <s3fc/s3_message.h>
00034 #include <s3fc/s3_exception.h>
00035 #include <s3fc/s3_config_node.h>
00036 #include <sstream>
00037 #include <algorithm>
00038
00039 S3_DEFINE_MAGIC_STRING(s3_message);
00040
00041
00042 s3_message::s3_message() :
00043 empty(true),
00044 from(""),
00045 to(""),
00046 body(""),
00047 mc(NORMAL),
00048 id(""),
00049 reply_id("")
00050 {
00051 }
00052
00053
00054 s3_message::s3_message(const std::string& n_from,
00055 const std::string& n_to,
00056 const std::string& n_body,
00057 const msg_class_t& n_mc,
00058 const msg_id_t& n_id) :
00059 empty(false),
00060 from(n_from),
00061 to(n_to),
00062 body(n_body),
00063 mc(n_mc),
00064 id(n_id),
00065 reply_id("")
00066 {
00067 }
00068
00069
00070 s3_message::s3_message(const std::string& n_from,
00071 const s3_message& msg,
00072 const std::string& n_body,
00073 const msg_id_t& n_id) :
00074 empty(false),
00075 from(n_from),
00076 to(msg.from),
00077 body(n_body),
00078 mc(msg.mc),
00079 id(n_id),
00080 reply_id(msg.id)
00081 {
00082 }
00083
00084
00085 s3_message::~s3_message()
00086 {
00087 }
00088
00089
00090 void s3_message::pack(s3_pack_buffer &b) const
00091 {
00092 s3_streamable::pack(empty, b);
00093 s3_streamable::pack(from, b);
00094 s3_streamable::pack(to, b);
00095 s3_streamable::pack(body, b);
00096 s3_streamable::pack(mc, b);
00097 s3_streamable::pack(id, b);
00098 s3_streamable::pack(reply_id, b);
00099 }
00100
00101
00102 void s3_message::unpack(const s3_pack_buffer &b)
00103 {
00104 s3_streamable::unpack(empty, b);
00105 s3_streamable::unpack(from, b);
00106 s3_streamable::unpack(to, b);
00107 s3_streamable::unpack(body, b);
00108 s3_streamable::unpack(mc, b);
00109 s3_streamable::unpack(id, b);
00110 s3_streamable::unpack(reply_id, b);
00111 }
00112
00113
00114 bool s3_message::is_empty() const
00115 {
00116 return empty;
00117 }
00118
00119
00120 bool s3_message::is_reply(const msg_id_t& n_reply_id) const
00121 {
00122 return (reply_id.size() > 0) && (n_reply_id == reply_id);
00123 }
00124
00125
00126 std::string s3_message::get_from() const
00127 {
00128 return from;
00129 }
00130
00131
00132 std::string s3_message::get_to() const
00133 {
00134 return to;
00135 }
00136
00137
00138 std::string s3_message::get_body() const
00139 {
00140 return body;
00141 }
00142
00143
00144 s3_message::msg_class_t s3_message::get_class() const
00145 {
00146 return mc;
00147 }
00148
00149
00150 std::string s3_message::get_debug_str() const
00151 {
00152 std::stringstream str;
00153 str << "[ " << from << " -> " << to << " ]";
00154 switch(mc)
00155 {
00156 case NORMAL:
00157 str << ".";
00158 break;
00159 case PRIORITY:
00160 str << "P";
00161 break;
00162 case CONTROL:
00163 str << "C";
00164 break;
00165 default:
00166 str << " ";
00167 break;
00168 }
00169 if ( reply_id.size() )
00170 {
00171 str << "R";
00172 }
00173 else if ( is_empty() )
00174 {
00175 str << "E";
00176 }
00177 else
00178 {
00179 str << " ";
00180 }
00181 str << " (id: " << id << ") ";
00182 if ( reply_id.size() )
00183 {
00184 str << "(reply_id: " << reply_id << ")";
00185 }
00186 if ( ! is_empty() )
00187 {
00188 str << std::endl;
00189 str << " '" << body << "'";
00190 }
00191 else
00192 {
00193 str << " <empty>";
00194 }
00195 return str.str();
00196 }
00197
00198
00199 s3_post_office* s3_message_box::poffice = 0;
00200 unsigned int s3_message_box::dropped_msg_count = 0;
00201
00202
00203 s3_message_box::s3_message_box(const std::string& n_name,
00204 unsigned int n_rx_queue_len) :
00205 connected(false),
00206 name(n_name),
00207 rx_queue_len(n_rx_queue_len),
00208 rx_queue(0),
00209 subscription_list(0),
00210 seq_nr(0)
00211 {
00212 }
00213
00214
00215 s3_message_box::~s3_message_box()
00216 {
00217
00218 if (connected)
00219 {
00220 disconnect();
00221 }
00222 }
00223
00224
00225 void s3_message_box::init(const std::string& ip_addr,
00226 int port)
00227 {
00228
00229 if ( initialised() )
00230 {
00231 throw s3_generic_exception("s3_message_box::init()",
00232 "Precondition initialised() == false violated");
00233 }
00234 poffice = new s3_post_office();
00235 try
00236 {
00237 #if S3_HAS_LOCAL_PO_SWITCH
00238 poffice->connect();
00239 #else
00240 poffice->connect(ip_addr, port);
00241 #endif
00242 }
00243 catch( const s3_exception& e)
00244 {
00245
00246 delete poffice;
00247 poffice = 0;
00248 throw e;
00249 }
00250
00251 }
00252
00253
00254 void s3_message_box::de_init()
00255 {
00256 if ( ! initialised() )
00257 {
00258 throw s3_generic_exception("s3_message_box::de_init()",
00259 "Precondition initialised() == "
00260 "true violated");
00261 }
00262
00263 poffice->disconnect();
00264 poffice->request_terminate();
00265 poffice->wait_on_exit();
00266 delete poffice;
00267 poffice = 0;
00268 }
00269
00270
00271 bool s3_message_box::initialised()
00272 {
00273 S3FC_DBG( std::cerr << "s3_message_box::initialised() poffice = "
00274 << poffice << std::endl << std::flush );
00275
00276 return (poffice != 0 &&
00277 poffice->is_connected());
00278 }
00279
00280
00281 void s3_message_box::set_name(const std::string& n_name)
00282 {
00283 if ( connected )
00284 {
00285 throw s3_generic_exception("s3_message_box::set_name()",
00286 err_str("Connection exists."));
00287 }
00288 name = n_name;
00289 }
00290
00291
00292 void s3_message_box::connect()
00293 {
00294 if ( connected )
00295 {
00296 throw s3_generic_exception("s3_message_box::connect()",
00297 err_str("Connection exists."));
00298 }
00299 if ( name.empty() )
00300 {
00301 std::stringstream what;
00302 what << "Invalid name '" << name << "'";
00303 throw s3_generic_exception("s3_message_box::connect()",
00304 what.str());
00305 }
00306
00307 poffice->add_dst(name, *this, false);
00308 connected = true;
00309 }
00310
00311
00312 void s3_message_box::disconnect()
00313 {
00314 if ( ! connected )
00315 {
00316 throw s3_generic_exception("s3_message_box::disconnect()",
00317 err_str("No connection."));
00318 }
00319
00320 while ( ! subscription_list.empty() )
00321 {
00322 unsubscribe_group(subscription_list.front());
00323
00324 }
00325
00326 poffice->remove_dst(name, *this, false);
00327 connected = false;
00328 }
00329
00330
00331 void s3_message_box::subscribe_group(const std::string& group)
00332 {
00333 if ( ! connected )
00334 {
00335 throw s3_generic_exception("s3_message_box::subscribe_group()",
00336 err_str("No connection."));
00337 }
00338
00339 for ( std::list<std::string>::const_iterator ptr = subscription_list.begin();
00340 ptr != subscription_list.end();
00341 ptr++)
00342 {
00343 if (*ptr == group)
00344 {
00345 std::stringstream str;
00346 str << "Already subscribed to group: '" << group << "'";
00347 throw s3_generic_exception("s3_message_box::subscribe_group()",
00348 err_str(str.str()));
00349 }
00350 }
00351 poffice->add_dst(group, *this, true);
00352 subscription_list.push_back(group);
00353 }
00354
00355
00356 void s3_message_box::unsubscribe_group(const std::string& group)
00357 {
00358 if ( ! connected )
00359 {
00360 throw s3_generic_exception("s3_message_box::unsubscribe_group()",
00361 err_str("No connection."));
00362 }
00363
00364 std::list<std::string>::iterator ptr;
00365 ptr = std::find(subscription_list.begin(), subscription_list.end(), group);
00366
00367 if ( ptr == subscription_list.end() )
00368 {
00369 std::stringstream str;
00370 str << "Not subscribed to group: '" << group << "'";
00371 throw s3_generic_exception("s3_message_box::unsubscribe_group()",
00372 err_str(str.str()));
00373 }
00374 poffice->remove_dst(group, *this, true);
00375 subscription_list.erase(ptr);
00376 }
00377
00378
00379 void s3_message_box::subscribe_arrival(const s3_semaphore& sem)
00380 {
00381 std::list<s3_semaphore* >::const_iterator ptr =
00382 std::find(arrival_notification_list.begin(),
00383 arrival_notification_list.end(),
00384 &sem);
00385 if ( ptr != arrival_notification_list.end())
00386 {
00387 std::stringstream str;
00388 str << "Already subscribed for arrival: '" << &sem << "'";
00389 throw s3_generic_exception("s3_message_box::subscribe_arrival()",
00390 err_str(str.str()));
00391 }
00392 arrival_notification_list.push_back(const_cast<s3_semaphore*>(&sem));
00393 }
00394
00395
00396 s3_message::msg_id_t s3_message_box::send_msg(const std::string& dst,
00397 const std::string& body,
00398 bool priority)
00399 {
00400 if ( ! connected )
00401 {
00402 throw s3_generic_exception("s3_message_box::send_msg()",
00403 err_str("No connection."));
00404 }
00405 s3_message::msg_id_t id = get_next_message_id();
00406 s3_message msg(name, dst, body, (priority ?
00407 s3_message::PRIORITY :
00408 s3_message::NORMAL),
00409 id);
00410 poffice->dispatch_msg(msg, *this);
00411 return id;
00412 }
00413
00414
00415 s3_message::msg_id_t s3_message_box::reply_msg(const s3_message& msg,
00416 const std::string& body)
00417 {
00418 s3_message::msg_id_t id = get_next_message_id();
00419 s3_message reply_msg(name, msg, body, id);
00420 poffice->dispatch_msg(reply_msg, *this);
00421 return id;
00422 }
00423
00424
00425
00426
00427 s3_message s3_message_box::get_msg()
00428 {
00429
00430
00431 state_lock.lock();
00432
00433 s3_message msg;
00434
00435 if (! rx_queue.empty() )
00436 {
00437 msg = rx_queue.front();
00438 rx_queue.pop_front();
00439 }
00440 else
00441 {
00442
00443 msg = s3_message();
00444 }
00445 state_lock.unlock();
00446 return msg;
00447 }
00448
00449
00450
00451 void s3_message_box::deliver_msg(const s3_message& msg)
00452 {
00453
00454
00455 state_lock.lock();
00456 bool rx_changed = false;
00457
00458
00459
00460 if ( rx_queue.size() == rx_queue_len )
00461 {
00462 T_RX_Queue::iterator ptr;
00463
00464 for (ptr = rx_queue.begin();
00465 ptr != rx_queue.end();
00466 ptr++)
00467 {
00468 if ((*ptr).get_class() == s3_message::NORMAL) break;
00469 }
00470
00471 if ( ptr == rx_queue.end() )
00472 {
00473 ptr = rx_queue.begin();
00474 }
00475
00476 if ( ptr != rx_queue.end() )
00477 {
00478 rx_queue.erase(ptr);
00479 rx_changed = true;
00480 dropped_msg_count++;
00481 }
00482 }
00483
00484
00485
00486 if ( msg.get_class() == s3_message::PRIORITY )
00487 {
00488 T_RX_Queue::iterator ptr;
00489
00490 for (ptr = rx_queue.begin();
00491 ptr != rx_queue.end();
00492 ptr++)
00493 {
00494 if ((*ptr).get_class() == s3_message::NORMAL) break;
00495 }
00496
00497 if ( ptr == rx_queue.end() ) {
00498 rx_queue.push_back(msg);
00499 rx_changed = true;
00500 }
00501
00502 else
00503 {
00504 rx_queue.insert(ptr, msg);
00505 rx_changed = true;
00506 }
00507 }
00508
00509 else
00510 {
00511 rx_queue.push_back(msg);
00512 rx_changed = true;
00513 }
00514
00515 if ( rx_changed )
00516 {
00517 for ( std::list<s3_semaphore*>::const_iterator ptr =
00518 arrival_notification_list.begin();
00519 ptr != arrival_notification_list.end();
00520 ptr++ )
00521 {
00522 (*ptr)->post();
00523 }
00524 }
00525 state_lock.unlock();
00526 }
00527
00528
00529 std::string s3_message_box::err_str(const std::string& err)
00530 {
00531 std::stringstream str;
00532 str << "[name = '" << name << "']: "
00533 << err;
00534 return str.str();
00535 }
00536
00537
00538 std::string s3_message_box::get_next_message_id()
00539 {
00540 std::stringstream str;
00541 str << name << "/" << seq_nr;
00542 seq_nr++;
00543 return str.str();
00544 }
00545
00546
00547 s3_post_office::s3_post_office() :
00548 conn(0),
00549 ctrl_rx_queue(10),
00550 client_rx_queue(10),
00551 seq_nr(0)
00552 {
00553 }
00554
00555
00556 s3_post_office::~s3_post_office()
00557 {
00558 if ( is_connected() )
00559 {
00560 disconnect();
00561 }
00562 }
00563
00564 #if S3_HAS_LOCAL_PO_SWITCH
00565
00566 void s3_post_office::connect( void )
00567 {
00568 if (conn)
00569 {
00570 throw s3_generic_exception("s3_post_office::connect()",
00571 "Already connected");
00572 }
00573 conn = new s3_txport_local<s3_message>;
00574
00575 if ( ! s3_post_office_switch::po_switch_txp_ptr )
00576 {
00577 throw s3_generic_exception("s3_post_office::connect()",
00578 "post office switch not running");
00579
00580 }
00581 conn_id = conn->connect( s3_post_office_switch::po_switch_txp_ptr );
00582
00583 if (! conn->is_connected_by_id(conn_id))
00584 {
00585
00586 delete conn;
00587 conn = 0;
00588 throw s3_generic_exception("s3_post_office::connect()",
00589 "Connection failed");
00590 }
00591
00592
00593 start();
00594
00595 started.wait();
00596 }
00597 #else
00598
00599 void s3_post_office::connect(const std::string& ip,
00600 int port)
00601 {
00602 if (conn)
00603 {
00604 throw s3_generic_exception("s3_post_office::connect()",
00605 "Already connected");
00606 }
00607 conn = new s3_txport_tcp<s3_message>;
00608 conn_id = conn->connect(ip, port);
00609
00610 if (! conn->is_connected_by_id(conn_id))
00611 {
00612
00613 delete conn;
00614 conn = 0;
00615 throw s3_generic_exception("s3_post_office::connect()",
00616 "Connection failed");
00617 }
00618
00619 start();
00620
00621 started.wait();
00622 }
00623 #endif
00624
00625
00626
00627 void s3_post_office::disconnect()
00628 {
00629
00630 if ( ! conn ) return;
00631
00632 if ( ! dst_map.empty() )
00633 {
00634 throw s3_generic_exception("s3_post_office::disconnect()",
00635 "Connected clients present");
00636 }
00637
00638 try
00639 {
00640 conn->stop();
00641 conn->wait_on_exit();
00642 delete conn;
00643 conn = 0;
00644 }
00645 catch(const std::exception& e)
00646 {
00647 std::cerr << "s3_post_office::disconnect() " << e.what()
00648 << std::endl << std::flush;
00649 }
00650 }
00651
00652
00653 void s3_post_office::add_dst(const std::string& dst,
00654 s3_message_box& mbox,
00655 bool is_group)
00656 {
00657
00658 dst_lock.lock();
00659
00660 if ( ! is_connected() )
00661 {
00662 dst_lock.unlock();
00663 throw s3_generic_exception("s3_post_office::add_dst()",
00664 "No connection");
00665 }
00666 bool present = (dst_map.count(dst) == 1);
00667
00668 if ((! is_group) && present)
00669 {
00670 dst_lock.unlock();
00671 throw s3_generic_exception("s3_post_office::add_dst()",
00672 "Name clash (local)");
00673 }
00674
00675
00676 if (! present)
00677 {
00678
00679 if ( add_dst_switch(dst, is_group) )
00680 {
00681
00682 lock_state();
00683 dst_map[dst] = dst_data(std::list<s3_message_box*>(1, &mbox),
00684 is_group);
00685 unlock_state();
00686 }
00687 else
00688 {
00689 dst_lock.unlock();
00690 throw s3_generic_exception("s3_post_office::add_dst()",
00691 "Name clash");
00692 }
00693 }
00694
00695
00696 else
00697 {
00698
00699 lock_state();
00700 dst_map[dst].clients.push_back(&mbox);
00701 unlock_state();
00702 }
00703 dst_lock.unlock();
00704 }
00705
00706
00707 void s3_post_office::remove_dst(const std::string& dst,
00708 s3_message_box& mbox,
00709 bool is_group )
00710 {
00711
00712 dst_lock.lock();
00713
00714 if ( ! is_connected() )
00715 {
00716 dst_lock.unlock();
00717 throw s3_generic_exception("s3_post_office::remove_dst()",
00718 "No connection");
00719 }
00720 bool present = (dst_map.count(dst) == 1);
00721
00722 if (! present)
00723 {
00724 dst_lock.unlock();
00725 throw s3_generic_exception("s3_post_office::remove_dst()",
00726 "Destination not registered");
00727 }
00728
00729
00730 if ( dst_map[dst].clients.size() == 1 )
00731 {
00732 remove_dst_switch(dst, is_group);
00733
00734 lock_state();
00735 dst_map.erase(dst);
00736 unlock_state();
00737 }
00738
00739 else
00740 {
00741
00742 lock_state();
00743 std::list<s3_message_box*>::iterator c =
00744 std::find(dst_map[dst].clients.begin(),
00745 dst_map[dst].clients.end(),
00746 &mbox);
00747 dst_map[dst].clients.erase(c);
00748 unlock_state();
00749 }
00750 dst_lock.unlock();
00751 }
00752
00753
00754 void s3_post_office::main_loop()
00755 {
00756 if (! is_connected() )
00757 {
00758 throw s3_generic_exception("s3_post_office::main_loop()",
00759 "No connection");
00760 }
00761
00762
00763 s3_fifo_queue<s3_txport_data<s3_message> >& switch_rx_queue =
00764 conn->get_rx_queue();
00765 s3_fifo_queue<s3_txport_data<s3_message> >& switch_tx_queue =
00766 conn->get_tx_queue();
00767
00768
00769
00770 s3_semaphore rx_rdy;
00771
00772 switch_rx_queue.subscribe_consumer(rx_rdy);
00773 client_rx_queue.subscribe_consumer(rx_rdy);
00774
00775
00776
00777
00778
00779
00780
00781
00782
00783
00784
00785
00786
00787
00788
00789
00790
00791
00792
00793
00794 s3_periodic_notifier term_pn(1000);
00795 term_pn.subscribe(rx_rdy);
00796 term_pn.enable();
00797
00798
00799
00800
00801 client_msg cm;
00802
00803 bool twiddle = true;
00804
00805
00806 started.post();
00807
00808 while ( !test_terminate() && is_connected() )
00809 {
00810
00811
00812 int oper = 0;
00813
00814
00815 rx_rdy.wait();
00816
00817
00818
00819
00820
00821
00822
00823 if ( test_terminate() )
00824 {
00825 break;
00826 }
00827
00828 if ( twiddle )
00829 {
00830 if ( ! switch_rx_queue.empty() )
00831 {
00832 oper = 1;
00833 twiddle = false;
00834 }
00835 else if ( ! client_rx_queue.empty() )
00836 {
00837 oper = 2;
00838 twiddle = true;
00839 }
00840 }
00841 else
00842 {
00843 if ( ! client_rx_queue.empty() )
00844 {
00845 oper = 2;
00846 twiddle = true;
00847 }
00848 else if ( ! switch_rx_queue.empty() )
00849 {
00850 oper = 1;
00851 twiddle = false;
00852 }
00853 }
00854
00855
00856 if ( oper != 0)
00857 {
00858
00859 if ( oper == 1 )
00860 {
00861 cm.msg = switch_rx_queue.pop().data;
00862 cm.client = 0;
00863 S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00864 << "] " << "reading from switch queue" << std::endl
00865 << std::flush);
00866 }
00867 else if ( oper == 2 )
00868 {
00869 cm = client_rx_queue.pop();
00870 S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00871 << "] " << "reading from " << "client queue"
00872 << std::endl << std::flush);
00873 }
00874
00875
00876
00877 S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00878 << "] " << "message received (" << cm.client << "): "
00879 << std::endl << " " << cm.msg.get_debug_str()
00880 << std::endl << std::flush );
00881
00882
00883 if (cm.msg.get_class() == s3_message::CONTROL)
00884 {
00885 S3FC_DBG( std::cerr << "s3_post_office::main_loop() [" << this
00886 << "] " << "routing msg into ctrl_rx_queue"
00887 << std::endl << std::flush );
00888 ctrl_rx_queue.push(cm.msg);
00889 }
00890
00891 else
00892 {
00893
00894 lock_state();
00895
00896 const bool present = dst_map.count( cm.msg.get_to() ) > 0;
00897 const bool group = present ?
00898 dst_map[cm.msg.get_to()].group : false;
00899
00900 const std::list<s3_message_box*>& clients = present ?
00901 dst_map[cm.msg.get_to()].clients : std::list<s3_message_box*>();
00902
00903
00904 if ( cm.client == 0 )
00905 {
00906 deliver_all_but_one(cm.msg, clients, 0);
00907 }
00908
00909 else
00910 {
00911
00912 if ( ! present )
00913 {
00914 S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00915 << this << "] " << "forwarding msg to (switch)"
00916 << std::endl << std::flush );
00917 switch_tx_queue.push(s3_txport_data<s3_message>(cm.msg,
00918 conn_id));
00919 }
00920
00921 else if ( ! group )
00922 {
00923
00924 if ( clients.size() != 1 )
00925 {
00926 throw s3_generic_exception("s3_post_office::main_loop()",
00927 "Internal error: "
00928 "Non-group dest with "
00929 "clients.size()> 1 ");
00930
00931 }
00932 S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00933 << this << "] " << "forwarding msg to (client "
00934 << clients.front() << ")" << std::endl
00935 << std::flush );
00936 clients.front()->deliver_msg(cm.msg);
00937 }
00938
00939
00940 else
00941 {
00942 deliver_all_but_one(cm.msg, clients, cm.client);
00943 S3FC_DBG( std::cerr << "s3_post_office::main_loop() ["
00944 << this << "] forwarding msg to " << "(switch)"
00945 << std::endl << std::flush );
00946 switch_tx_queue.push(
00947 s3_txport_data<s3_message>(cm.msg, conn_id));
00948 }
00949 }
00950 unlock_state();
00951
00952 }
00953 }
00954 }
00955
00956
00957 term_pn.disable();
00958 term_pn.unsubscribe(rx_rdy);
00959
00960 return;
00961 }
00962
00963
00964
00965 void s3_post_office::dispatch_msg(const s3_message& msg,
00966 s3_message_box& mbox)
00967 {
00968 client_rx_queue.push(client_msg(msg, &mbox));
00969 }
00970
00971
00972 bool s3_post_office::is_connected() const
00973 {
00974
00975 return (conn && conn->is_connected_by_id(conn_id));
00976 }
00977
00978
00979 bool s3_post_office::add_dst_switch(const std::string& name,
00980 bool is_group)
00981 {
00982
00983 std::stringstream body;
00984 body << "add_dst"
00985 << " " << (is_group ? "group" : "individual")
00986 << " " << name;
00987
00988
00989 s3_message req("__post_office__",
00990 "__post_office_switch__",
00991 body.str(),
00992 s3_message::CONTROL,
00993 get_next_message_id());
00994 S3FC_DBG( std::cerr
00995 << "s3_post_office::add_dst_switch() Sending control message: "
00996 << std::endl << " "
00997 << req.get_debug_str() << std::endl << std::flush);
00998 conn->get_tx_queue().push(s3_txport_data<s3_message>(req, conn_id));
00999
01000
01001 s3_message resp;
01002 resp = ctrl_rx_queue.pop();
01003
01004 return (resp.get_body() == "success");
01005 }
01006
01007
01008 bool s3_post_office::remove_dst_switch(const std::string& name,
01009 bool is_group)
01010 {
01011 std::stringstream body;
01012 body << "remove_dst"
01013 << " " << (is_group ? "group" : "individual")
01014 << " " << name;
01015
01016
01017 s3_message req("__post_office__",
01018 "__post_office_switch__",
01019 body.str(),
01020 s3_message::CONTROL,
01021 get_next_message_id());
01022 S3FC_DBG( std::cerr << "s3_post_office::remove_dst_switch() "
01023 << "Sending control message: " << std::endl << " "
01024 << req.get_debug_str() << std::endl << std::flush);
01025 conn->get_tx_queue().push(s3_txport_data<s3_message>(req, conn_id));
01026
01027
01028 s3_message resp;
01029 resp = ctrl_rx_queue.pop();
01030
01031 return (resp.get_body() == "success");
01032 }
01033
01034
01035 void s3_post_office::deliver_all_but_one(
01036 const s3_message& msg,
01037 const std::list<s3_message_box*>& clients,
01038 const s3_message_box* except)
01039 {
01040 for (std::list<s3_message_box*>::const_iterator ptr = clients.begin();
01041 ptr != clients.end();
01042 ptr++)
01043 {
01044 if ( *ptr != except )
01045 {
01046 S3FC_DBG( std::cerr << "s3_post_office::main_loop() "
01047 << "forwarding msg to (client "
01048 << *ptr << ")" << std::endl << std::flush );
01049
01050 (*ptr)->deliver_msg(msg);
01051 }
01052 }
01053 }
01054
01055
01056 std::string s3_post_office::get_next_message_id()
01057 {
01058 std::stringstream str;
01059 str << "__post_office__" << "/" << seq_nr;
01060 seq_nr++;
01061 return str.str();
01062 }
01063
01064 #if S3_HAS_LOCAL_PO_SWITCH
01065 s3_txport_local<s3_message>* s3_post_office_switch::po_switch_txp_ptr = 0;
01066 #endif
01067
01068
01069 s3_post_office_switch::txp_event_queuer::~txp_event_queuer()
01070 {
01071 }
01072
01073
01074 void s3_post_office_switch::txp_event_queuer::operator()(
01075 const s3_txport_event& event)
01076 {
01077
01078 ev_queue.push(event);
01079 }
01080
01081
01082 s3_post_office_switch::dst_data::dst_data()
01083 {
01084 }
01085
01086
01087 s3_post_office_switch::dst_data::dst_data(
01088 const std::list<client_id_t>& n_clients,
01089 bool n_group) :
01090 clients(n_clients),
01091 group(n_group)
01092 {
01093 }
01094
01095
01096 s3_post_office_switch::s3_post_office_switch(int n_port) :
01097 conn(0),
01098 seq_nr(0)
01099 {
01100 #if S3_HAS_LOCAL_PO_SWITCH
01101 conn = new s3_txport_local<s3_message>;
01102 po_switch_txp_ptr = conn;
01103 #else
01104 conn = new s3_txport_tcp<s3_message>;
01105 conn->listen(n_port);
01106
01107
01108 if ( ! conn->listening() )
01109 {
01110
01111 delete conn;
01112 conn = 0;
01113 throw s3_generic_exception("s3_post_office_switch"
01114 "::s3_post_office_switch()",
01115 "Init failed, port may be in use.");
01116 }
01117 #endif
01118
01119
01120 conn->subscribe_handler(txp_event_queuer);
01121
01122 start();
01123
01124 started.wait();
01125 }
01126
01127
01128 s3_post_office_switch::~s3_post_office_switch()
01129 {
01130 if (is_running())
01131 {
01132
01133 request_terminate();
01134 wait_on_exit();
01135 }
01136 conn->stop();
01137 }
01138
01139
01140 void s3_post_office_switch::main_loop()
01141 {
01142
01143 queue_t& rx_queue = conn->get_rx_queue();
01144
01145
01146 s3_semaphore rdy;
01147 rx_queue.subscribe_consumer(rdy);
01148 txp_event_queuer.ev_queue.subscribe_consumer(rdy);
01149
01150
01151
01152
01153
01154
01155
01156
01157
01158
01159
01160
01161
01162
01163
01164
01165
01166
01167
01168
01169 s3_periodic_notifier term_pn(1000);
01170 term_pn.subscribe(rdy);
01171 term_pn.enable();
01172
01173
01174 started.post();
01175
01176
01177 while ( conn->listening() )
01178 {
01179 rdy.wait();
01180
01181
01182
01183
01184
01185
01186
01187 if ( test_terminate() )
01188 {
01189 break;
01190 }
01191
01192
01193 if (! txp_event_queuer.ev_queue.empty())
01194 {
01195 handle_transport_events();
01196 }
01197
01198 if (! rx_queue.empty())
01199 {
01200
01201 s3_txport_data<s3_message> packet = rx_queue.pop();
01202 S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop() "
01203 << "recvd message from (" << packet.get_id() << "): "
01204 << std::endl << " " << packet.data.get_debug_str()
01205 << std::endl << std::flush );
01206
01207
01208 if ( packet.data.get_class() == s3_message::CONTROL)
01209 {
01210 std::vector<std::string> cmd =
01211 s3_conversion::string_to_vec_string(packet.data.get_body());
01212
01213 if ( cmd.size() != 3 )
01214 {
01215 std::stringstream lmsg;
01216 lmsg << "Invalid command length: size = " << cmd.size();
01217 for ( unsigned int i = 0; i < cmd.size(); i++ )
01218 {
01219 lmsg << " cmd[" << i << "]: '" << cmd[i] << "'";
01220 }
01221 lmsg << "Message dump: " << packet.data.get_debug_str()
01222 << std::endl;
01223 log_msg("main_loop()", lmsg.str());
01224 }
01225 bool success = false;
01226
01227 bool is_group = (cmd[1] == "group");
01228 std::string dst = cmd[2];
01229 client_id_t id = packet.get_id();
01230 if ( cmd[0] == "add_dst" )
01231 {
01232 success = add_dst(dst, id, is_group);
01233 S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()"
01234 << " add_dst said: " << success
01235 << std::endl << std::flush );
01236 }
01237 else if ( cmd[0] == "remove_dst" )
01238 {
01239 success = remove_dst(dst, id, is_group);
01240 S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()"
01241 << " remove_dst said: " << success
01242 << std::endl << std::flush );
01243 }
01244 else
01245 {
01246 success = false;
01247 std::stringstream lmsg;
01248 lmsg << "main_loop()"
01249 << " unknown command: '"
01250 << cmd[1] << "'";
01251 log_msg("main_loop",
01252 lmsg.str());
01253 }
01254
01255 if ( success )
01256 {
01257 reply_success(packet);
01258 }
01259 else
01260 {
01261 reply_failure(packet);
01262 }
01263 }
01264
01265 else
01266 {
01267
01268 client_id_t src_client = packet.get_id();
01269
01270 std::list<client_id_t> dst_clients;
01271 bool present = false;
01272
01273 lock_state();
01274 if ( dst_map.count(packet.data.get_to()) )
01275 {
01276 dst_clients = dst_map[packet.data.get_to()].clients;
01277 present = true;
01278 }
01279 unlock_state();
01280
01281 if ( present )
01282 {
01283 S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop() "
01284 << dst_clients.size() << " clients in list"
01285 << std::endl << std::flush );
01286 lock_state();
01287 for (std::list<client_id_t>::const_iterator ptr =
01288 dst_clients.begin();
01289 ptr != dst_clients.end();
01290 ptr++)
01291 {
01292
01293 if ( *ptr != src_client )
01294 {
01295 S3FC_DBG( std::cerr
01296 << "s3_post_office_switch::main_loop()"
01297 << " forwarding to (" << packet.get_id()
01298 << ")" << std::endl << std::flush );
01299 forward_msg(packet.data, *ptr);
01300 }
01301 }
01302 unlock_state();
01303 }
01304 else
01305 {
01306 S3FC_DBG( std::cerr << "s3_post_office_switch::main_loop()"
01307 << " dropped message - no clients "
01308 << std::endl << std::flush );
01309 }
01310 }
01311 }
01312 }
01313
01314
01315 term_pn.unsubscribe(rdy);
01316 term_pn.disable();
01317
01318 return;
01319 }
01320
01321
01322
01323
01324 bool s3_post_office_switch::add_dst(const std::string& dst,
01325 client_id_t client,
01326 bool is_group)
01327 {
01328 bool success = false;
01329 bool present = (dst_map.count(dst) == 1);
01330
01331
01332
01333
01334 if ( is_group || ! present )
01335 {
01336
01337
01338 if ( present )
01339 {
01340 std::list<client_id_t>& c_clients = dst_map[dst].clients;
01341
01342 if ( std::find(c_clients.begin(), c_clients.end(), client) ==
01343 c_clients.end() )
01344 {
01345 c_clients.push_back(client);
01346 }
01347 }
01348
01349
01350 else
01351 {
01352
01353 dst_map[dst] = dst_data(std::list<client_id_t>(1, client),
01354 is_group);
01355 }
01356 success = true;
01357 }
01358
01359 if ( ! success )
01360 {
01361 std::stringstream lmsg;
01362 lmsg << "Failure: "
01363 << "[dst = '" << dst << "'"
01364 << ", client = '" << client << "'"
01365 << ", is_group = '" << "'"
01366 << ", present = '" << present << "'"
01367 << "]";
01368 log_msg("add_dst()", lmsg.str());
01369 }
01370
01371 return success;
01372 }
01373
01374
01375
01376
01377
01378
01379
01380 bool s3_post_office_switch::remove_dst(const std::string& dst,
01381 client_id_t client,
01382 bool is_group)
01383 {
01384 bool success = false;
01385 bool present = (dst_map.count(dst) == 1);
01386
01387
01388 if (present)
01389 {
01390
01391
01392 if(dst_map[dst].group == is_group)
01393 {
01394
01395 std::list<client_id_t>::iterator cp =
01396 std::find(dst_map[dst].clients.begin(),
01397 dst_map[dst].clients.end(),
01398 client);
01399 if ( cp != dst_map[dst].clients.end() )
01400 {
01401 dst_map[dst].clients.erase(cp);
01402
01403
01404 if ( dst_map[dst].clients.size() == 0 )
01405 {
01406 dst_map.erase(dst);
01407 }
01408 success = true;
01409 }
01410 }
01411 }
01412
01413 else
01414 {
01415 success = true;
01416 }
01417
01418 if ( ! success )
01419 {
01420 std::stringstream lmsg;
01421 lmsg << "Failure: "
01422 << "[dst = '" << dst << "'"
01423 << ", client = '" << client << "'"
01424 << ", is_group = '" << "'"
01425 << ", present = '" << present << "'"
01426 << ", dst_map[dst].group = '" << dst_map[dst].group << "'"
01427 << "]";
01428 log_msg("remove_dst()", lmsg.str());
01429 }
01430 return success;
01431 }
01432
01433
01434 void s3_post_office_switch::remove_client(client_id_t client)
01435 {
01436
01437
01438
01439 std::vector<std::string> prune_list;
01440 for (std::map<std::string, dst_data>::iterator map_iter = dst_map.begin();
01441 map_iter != dst_map.end();
01442 ++map_iter)
01443 {
01444
01445
01446 map_iter->second.clients.remove(client);
01447
01448 if (map_iter->second.clients.size() == 0)
01449 {
01450 prune_list.push_back(map_iter->first);
01451 }
01452 }
01453
01454 for (std::vector<std::string>::iterator iter = prune_list.begin();
01455 iter != prune_list.end();
01456 ++iter)
01457 {
01458 dst_map.erase(*iter);
01459 }
01460 }
01461
01462
01463 void
01464 s3_post_office_switch::reply_success(const s3_txport_data<s3_message>& packet)
01465 {
01466 const s3_message& msg = packet.data;
01467 s3_message reply_msg("__post_office_switch__",
01468 msg,
01469 "success",
01470 get_next_message_id());
01471 forward_msg(reply_msg, packet.get_id());
01472 }
01473
01474
01475 void
01476 s3_post_office_switch::reply_failure(const s3_txport_data<s3_message>& packet)
01477 {
01478 const s3_message& msg = packet.data;
01479 s3_message reply_msg("__post_office_switch__",
01480 msg,
01481 "failure",
01482 get_next_message_id());
01483 forward_msg(msg, packet.get_id());
01484 }
01485
01486
01487 void s3_post_office_switch::forward_msg(const s3_message& msg,
01488 const client_id_t client)
01489 {
01490 S3FC_DBG( std::cerr << "s3_post_office_switch::forward_msg() "
01491 << "forwarding to (" << client << "): "
01492 << std::endl << " "
01493 << msg.get_debug_str() << std::endl << std::flush );
01494 conn->get_tx_queue().push(s3_txport_data<s3_message>(msg, client));
01495 }
01496
01497
01498 void s3_post_office_switch::log_msg(const std::string& where,
01499 const std::string& what)
01500 {
01501 std::cerr << "s3_post_office_switch::" << where << " [LOGMSG] ["
01502 << what << "]" << std::endl << std::flush;
01503 }
01504
01505
01506 std::string s3_post_office_switch::get_next_message_id()
01507 {
01508 std::stringstream str;
01509 str << "__post_office_switch__" << "/" << seq_nr;
01510 seq_nr++;
01511 return str.str();
01512 }
01513
01514 void s3_post_office_switch::handle_transport_events()
01515 {
01516
01517 while (! txp_event_queuer.ev_queue.empty())
01518 {
01519 s3_txport_event ev = txp_event_queuer.ev_queue.pop();
01520
01521 if (ev.event == s3_txport_event::disconnect)
01522 {
01523 remove_client(ev.id);
01524 }
01525 }
01526 }