00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00033 #ifndef S3_MESSAGE_H
00034 #define S3_MESSAGE_H
00035
00297
00298
00299
00300
00301
00302
00303
00304
00305
00306
00307 #include <iostream>
00308 #include <string>
00309 #include <vector>
00310 #include <list>
00311 #include <s3fc/s3_streamable.h>
00312 #include <s3fc/s3_fifo_queue.h>
00313 #include <s3fc/s3_growable_fifo_queue.h>
00314 #include <s3fc/s3_txport_base.h>
00315
00316 #if S3_HAS_LOCAL_PO_SWITCH
00317 #include <s3fc/s3_txport_local.h>
00318
00319 #else
00320 #include <s3fc/s3_txport_tcp.h>
00321 #endif
00322
00323 #include <s3fc/s3_mutex.h>
00324 #include <s3fc/s3_semaphore.h>
00325
00326 class s3_message_box;
00327
00338 class s3_message : public s3_streamable
00339 {
00340 public:
00341 typedef std::string msg_id_t;
00345 enum msg_class_t
00346 {
00350 NORMAL,
00354 PRIORITY,
00358 CONTROL
00359 };
00360 protected:
00364 bool empty;
00368 std::string from;
00372 std::string to;
00376 std::string body;
00380 msg_class_t mc;
00384 msg_id_t id;
00388 msg_id_t reply_id;
00389 public:
00390 S3_DECLARE_MAGIC_STRING;
00395 s3_message();
00412 s3_message(const std::string& n_from,
00413 const std::string& n_to,
00414 const std::string& n_body,
00415 const msg_class_t& n_mc,
00416 const msg_id_t& n_id);
00436 s3_message(const std::string& n_from,
00437 const s3_message& msg,
00438 const std::string& n_body,
00439 const msg_id_t& n_id);
00443 virtual ~s3_message();
00448 virtual void pack(s3_pack_buffer &b) const;
00453 virtual void unpack(const s3_pack_buffer &b);
00459 bool is_empty() const;
00467 bool is_reply(const msg_id_t& n_id) const;
00472 std::string get_from() const;
00477 std::string get_to() const;
00482 std::string get_body() const;
00487 msg_class_t get_class() const;
00488
00489
00490 std::string get_debug_str() const;
00491 };
00492
00493 S3_TYPEGROUP_TRAITS_POD(s3_message::msg_class_t);
00494
00495
00496 class s3_post_office;
00497
00513 class s3_message_box
00514 {
00515
00516 friend class s3_post_office;
00517 public:
00518 typedef std::list<s3_message> T_RX_Queue;
00519
00524 static unsigned int dropped_msg_count;
00525 private:
00531 static s3_post_office* poffice;
00535 bool connected;
00539 std::string name;
00543 unsigned int rx_queue_len;
00547 T_RX_Queue rx_queue;
00551 std::list<std::string> subscription_list;
00555 int seq_nr;
00559 std::list<s3_semaphore*> arrival_notification_list;
00563 s3_mutex state_lock;
00564 public:
00571 s3_message_box(const std::string& name = "",
00572 unsigned int n_queue_len = 20);
00576 ~s3_message_box();
00586 static void init(const std::string& ip_addr,
00587 int port);
00593 static void de_init();
00598 static bool initialised();
00606 void set_name(const std::string& name);
00616 void connect();
00621 void disconnect();
00629 void subscribe_group(const std::string& group);
00637 void unsubscribe_group(const std::string& group);
00646 void subscribe_arrival(const s3_semaphore& sem);
00655 s3_message::msg_id_t send_msg(const std::string& dst,
00656 const std::string& body,
00657 bool priority = false);
00672 s3_message::msg_id_t reply_msg(const s3_message& msg,
00673 const std::string& body = "");
00678 s3_message get_msg();
00679 protected:
00686 void deliver_msg(const s3_message& msg);
00690 std::string err_str(const std::string& err);
00694 std::string get_next_message_id();
00695
00696 };
00697
00712 class s3_post_office : public s3_thread_base
00713 {
00714 friend class s3_message_box;
00715 private:
00719 struct client_msg
00720 {
00721
00722 s3_message msg;
00723
00724 s3_message_box* client;
00725
00726 client_msg() { }
00727
00728 client_msg(const s3_message& n_msg,
00729 s3_message_box* n_client) :
00730 msg(n_msg),
00731 client(n_client) { }
00732 };
00737 struct dst_data
00738 {
00739 std::list<s3_message_box*> clients;
00740 bool group;
00741 dst_data() { }
00742 dst_data(const std::list<s3_message_box*> n_clients,
00743 bool n_group) :
00744 clients(n_clients),
00745 group(n_group) { }
00746 };
00747
00751 #if S3_HAS_LOCAL_PO_SWITCH
00752 s3_txport_local<s3_message>* conn;
00753 #else
00754 s3_txport_tcp<s3_message>* conn;
00755 #endif
00756
00760 s3_txport_client_id conn_id;
00765 std::map<std::string, dst_data> dst_map;
00770 s3_fifo_queue<s3_message> ctrl_rx_queue;
00774 s3_fifo_queue<client_msg> client_rx_queue;
00778 int seq_nr;
00786 s3_mutex dst_lock;
00790 s3_semaphore started;
00791 public:
00792
00796 ~s3_post_office();
00797
00801 s3_post_office();
00815 #if S3_HAS_LOCAL_PO_SWITCH
00816 void connect( void );
00817 #else
00818 void connect(const std::string& ip, int port);
00819 #endif
00820
00825 void disconnect();
00830 void add_dst(const std::string& dst,
00831 s3_message_box& mbox,
00832 bool is_group);
00837 void remove_dst(const std::string& dst,
00838 s3_message_box& mbox,
00839 bool is_group);
00845 virtual void main_loop();
00846 protected:
00852 void dispatch_msg(const s3_message& msg,
00853 s3_message_box& mbox);
00859 bool is_connected() const;
00871 bool add_dst_switch(const std::string& name,
00872 bool is_group);
00884 bool remove_dst_switch(const std::string& name,
00885 bool is_group);
00892 void deliver_all_but_one(const s3_message& msg,
00893 const std::list<s3_message_box*>& clients,
00894 const s3_message_box* except);
00898 std::string get_next_message_id();
00899 };
00900
00908 class s3_post_office_switch : public s3_thread_base
00909 {
00910 private:
00916 struct txp_event_queuer : public s3_event_handler<s3_txport_event>
00917 {
00918 public:
00919 virtual ~txp_event_queuer();
00927 typedef s3_growable_fifo_queue<s3_txport_event> ev_queue_t;
00928
00929 ev_queue_t ev_queue;
00934 virtual void operator()(const s3_txport_event& event);
00935 };
00940 typedef s3_txport_client_id client_id_t;
00941
00942 struct dst_data;
00943 friend struct dst_data;
00948 struct dst_data
00949 {
00950 std::list<client_id_t> clients;
00951 bool group;
00952
00953 dst_data();
00954
00955 dst_data(const std::list<client_id_t>& n_clients,
00956 bool n_group);
00957 };
00958 typedef s3_fifo_queue<s3_txport_data<s3_message> > queue_t;
00963 #if S3_HAS_LOCAL_PO_SWITCH
00964 s3_txport_local<s3_message>* conn;
00965 #else
00966 s3_txport_tcp<s3_message>* conn;
00967 #endif
00968
00971 txp_event_queuer txp_event_queuer;
00981 std::map<std::string, dst_data> dst_map;
00985 int seq_nr;
00989 s3_semaphore started;
00990 public:
00991 #if S3_HAS_LOCAL_PO_SWITCH
00992 static s3_txport_local<s3_message>* po_switch_txp_ptr;
00993 #endif
00994
01001 s3_post_office_switch(int n_port);
01002
01003 virtual ~s3_post_office_switch();
01009 virtual void main_loop();
01010 protected:
01011
01018 bool add_dst(const std::string& dst,
01019 client_id_t client,
01020 bool is_group);
01027 bool remove_dst(const std::string& dst,
01028 client_id_t client,
01029 bool is_group);
01036 void remove_client(client_id_t client);
01041 void reply_success(const s3_txport_data<s3_message>& packet);
01042
01047 void reply_failure(const s3_txport_data<s3_message>& packet);
01053 void forward_msg(const s3_message& msg,
01054 const client_id_t client);
01060 void log_msg(const std::string& where,
01061 const std::string& what);
01065 std::string get_next_message_id();
01069 void handle_transport_events();
01070 };
01071
01072 #endif