S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_message.h

Go to the documentation of this file.
00001 /*
00002  * Stone Three Foundation Class (s3fc) provides a number of utility classes.
00003  * Copyright (C) 2001 by Stone Three Signal Processing (Pty) Ltd.
00004  *
00005  * Authored by Stone Three Signal Processing (Pty) Ltd.
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00020  * 
00021  * Please see the file 'COPYING' in the source root directory.
00022  */
00023 
00033 #ifndef S3_MESSAGE_H
00034 #define S3_MESSAGE_H
00035 
00297 /*
00298  * - The message is distributed to all client 
00299  * Currently, messages are routed according to an @e extremely simple (and 
00300  * admittedly ineffective, inelegant and really stupid) algorithm:
00301  * - Any received message is forwarded to all connections in the connection list 
00302  *   of that destination, except the one on which it was received.
00303  */
00304 
00305 //#define S3_HAS_LOCAL_PO_SWITCH 1
00306 
00307 #include <iostream>
00308 #include <string>
00309 #include <vector>
00310 #include <list>
00311 #include <s3fc/s3_streamable.h>
00312 #include <s3fc/s3_fifo_queue.h>
00313 #include <s3fc/s3_growable_fifo_queue.h>
00314 #include <s3fc/s3_txport_base.h>
00315 
00316 #if S3_HAS_LOCAL_PO_SWITCH
00317    #include <s3fc/s3_txport_local.h>
00318 
00319 #else
00320    #include <s3fc/s3_txport_tcp.h>
00321 #endif   
00322 
00323 #include <s3fc/s3_mutex.h>
00324 #include <s3fc/s3_semaphore.h>
00325 
00326 class s3_message_box;
00327 
00338 class s3_message : public s3_streamable
00339 {
00340   public:
00341    typedef std::string msg_id_t;
00345    enum msg_class_t
00346    {
00350       NORMAL,
00354       PRIORITY,
00358       CONTROL
00359    };
00360   protected:
00364    bool empty;
00368    std::string from;
00372    std::string to;
00376    std::string body;
00380    msg_class_t mc;
00384    msg_id_t id;
00388    msg_id_t reply_id;
00389   public:
00390    S3_DECLARE_MAGIC_STRING;
00395    s3_message();
00412    s3_message(const std::string& n_from,
00413          const std::string& n_to,
00414          const std::string& n_body,
00415          const msg_class_t& n_mc,
00416          const msg_id_t& n_id);
00436    s3_message(const std::string& n_from,
00437          const s3_message& msg,
00438          const std::string& n_body,
00439          const msg_id_t& n_id);
00443    virtual ~s3_message();
00448    virtual void pack(s3_pack_buffer &b) const;
00453    virtual void unpack(const s3_pack_buffer &b);
00459    bool is_empty() const;
00467    bool is_reply(const msg_id_t& n_id) const;
00472    std::string get_from() const;
00477    std::string get_to() const;
00482    std::string get_body() const;
00487    msg_class_t get_class() const;
00488    // diagnostic purposes - return a formatted string containing
00489    // all the field values
00490    std::string get_debug_str() const;
00491 };
00492 
00493 S3_TYPEGROUP_TRAITS_POD(s3_message::msg_class_t);
00494 
00495 // forward decl
00496 class s3_post_office;
00497 
00513 class s3_message_box
00514 {
00515    // provide access to privates 
00516    friend class s3_post_office;
00517   public:
00518    typedef std::list<s3_message> T_RX_Queue;
00519 
00524    static unsigned int dropped_msg_count;
00525   private:
00531    static s3_post_office* poffice;
00535    bool connected;
00539    std::string name;
00543    unsigned int rx_queue_len;
00547    T_RX_Queue rx_queue;
00551    std::list<std::string> subscription_list;
00555    int seq_nr;
00559    std::list<s3_semaphore*> arrival_notification_list;
00563    s3_mutex state_lock;
00564   public:
00571    s3_message_box(const std::string& name = "",
00572         unsigned int n_queue_len = 20);
00576    ~s3_message_box();
00586    static void init(const std::string& ip_addr,
00587           int port);
00593    static void de_init();
00598    static bool initialised();
00606    void set_name(const std::string& name);
00616    void connect();
00621    void disconnect();
00629    void subscribe_group(const std::string& group);
00637    void unsubscribe_group(const std::string& group);
00646    void subscribe_arrival(const s3_semaphore& sem);
00655    s3_message::msg_id_t send_msg(const std::string& dst,
00656              const std::string& body,
00657              bool priority = false);
00672    s3_message::msg_id_t reply_msg(const s3_message& msg,
00673               const std::string& body = "");
00678    s3_message get_msg();
00679   protected:
00686    void deliver_msg(const s3_message& msg);
00690    std::string err_str(const std::string& err);
00694    std::string get_next_message_id();
00695    
00696 };
00697 
00712 class s3_post_office : public s3_thread_base
00713 {
00714    friend class s3_message_box;
00715   private:
00719    struct client_msg
00720    {
00721       // message
00722       s3_message msg;
00723       // client that dispatched it
00724       s3_message_box* client;
00725       // 
00726       client_msg() { }
00727       // 
00728       client_msg(const s3_message& n_msg,
00729        s3_message_box* n_client) :
00730     msg(n_msg),
00731     client(n_client) { }
00732    };
00737    struct dst_data
00738    {
00739       std::list<s3_message_box*> clients;
00740       bool group;
00741       dst_data()  { }
00742       dst_data(const std::list<s3_message_box*> n_clients,
00743           bool n_group) :
00744     clients(n_clients),
00745     group(n_group)   {  }
00746    };
00747    
00751 #if S3_HAS_LOCAL_PO_SWITCH
00752    s3_txport_local<s3_message>* conn;
00753 #else
00754    s3_txport_tcp<s3_message>* conn;
00755 #endif
00756 
00760    s3_txport_client_id conn_id;
00765    std::map<std::string, dst_data> dst_map;
00770    s3_fifo_queue<s3_message> ctrl_rx_queue;
00774    s3_fifo_queue<client_msg> client_rx_queue;
00778    int seq_nr;
00786    s3_mutex dst_lock;
00790    s3_semaphore started;
00791   public:
00792    
00796    ~s3_post_office();
00797    
00801    s3_post_office();
00815 #if S3_HAS_LOCAL_PO_SWITCH
00816    void connect( void );
00817 #else
00818    void connect(const std::string& ip, int port);
00819 #endif
00820 
00825    void disconnect();
00830    void add_dst(const std::string& dst,
00831       s3_message_box& mbox,
00832       bool is_group);
00837    void remove_dst(const std::string& dst,
00838          s3_message_box& mbox,
00839          bool is_group);
00845    virtual void main_loop();
00846   protected:
00852    void dispatch_msg(const s3_message& msg,
00853            s3_message_box& mbox);
00859    bool is_connected() const;
00871    bool add_dst_switch(const std::string& name,
00872              bool is_group);
00884    bool remove_dst_switch(const std::string& name,
00885            bool is_group);
00892    void deliver_all_but_one(const s3_message& msg,
00893              const std::list<s3_message_box*>& clients,
00894              const s3_message_box* except);
00898    std::string get_next_message_id();
00899 };
00900 
00908 class s3_post_office_switch : public s3_thread_base
00909 {
00910   private:
00916    struct txp_event_queuer : public s3_event_handler<s3_txport_event>
00917    {
00918      public:
00919       virtual ~txp_event_queuer();
00927       typedef s3_growable_fifo_queue<s3_txport_event> ev_queue_t;
00928       // event queue: 
00929       ev_queue_t ev_queue;
00934       virtual void operator()(const s3_txport_event& event);
00935    };
00940    typedef s3_txport_client_id client_id_t;
00941    // forward decl before friend and friend before proper decl
00942    struct dst_data;
00943    friend struct dst_data;
00948    struct dst_data
00949    {
00950       std::list<client_id_t> clients;
00951       bool group;
00952       // Default constructor - empty
00953       dst_data();
00954       // Initialising constructor.
00955       dst_data(const std::list<client_id_t>& n_clients,
00956           bool n_group);
00957    };
00958    typedef s3_fifo_queue<s3_txport_data<s3_message> > queue_t;
00963 #if S3_HAS_LOCAL_PO_SWITCH
00964    s3_txport_local<s3_message>* conn;
00965 #else
00966    s3_txport_tcp<s3_message>* conn;
00967 #endif
00968 
00971    txp_event_queuer txp_event_queuer;
00981    std::map<std::string, dst_data> dst_map;
00985    int seq_nr;
00989    s3_semaphore started;
00990   public:
00991 #if S3_HAS_LOCAL_PO_SWITCH
00992      static s3_txport_local<s3_message>* po_switch_txp_ptr;
00993 #endif
00994 
01001    s3_post_office_switch(int n_port);
01002    // now 20:15 - will do docs later. this does stop() and wait_on_exit()
01003    virtual ~s3_post_office_switch();
01009    virtual void main_loop();
01010   protected:
01011 
01018    bool add_dst(const std::string& dst,
01019       client_id_t client,
01020       bool is_group);
01027    bool remove_dst(const std::string& dst,
01028          client_id_t client,
01029          bool is_group);
01036    void remove_client(client_id_t client);
01041    void reply_success(const s3_txport_data<s3_message>& packet);
01042    
01047    void reply_failure(const s3_txport_data<s3_message>& packet);
01053    void forward_msg(const s3_message& msg,
01054           const client_id_t client);
01060    void log_msg(const std::string& where,
01061       const std::string& what);
01065    std::string get_next_message_id();
01069    void handle_transport_events();
01070 };
01071 
01072 #endif

Send comments to: s3fc@stonethree.com SourceForge Logo