00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00037 #ifndef S3_TXPORT_TCP_H
00038 #define S3_TXPORT_TCP_H
00039
00040 #include <map>
00041 #include <string>
00042 #include <s3fc/s3_macros.h>
00043 #include <s3fc/s3_semaphore.h>
00044 #include <s3fc/s3_thread_base.h>
00045 #include <s3fc/s3_txport_base.h>
00046 #include <s3fc/s3_streamable.h>
00047 #include <s3fc/s3_growable_fifo_queue.h>
00048 #include <s3fc/s3_socket_tcp.h>
00049 #ifndef _WIN32
00050 #include <s3fc/s3_socket_tcp_ssl.h>
00051 #endif
00052
00053 template<typename T_Data, typename T_Socket> class s3_txport_tcp_tx_task;
00054 template<typename T_Data, typename T_Socket> class s3_txport_tcp_rx_task;
00055
00117 template<class T_Data, class T_Socket=s3_socket_tcp>
00118 class s3_txport_tcp : public s3_thread_base, public s3_txport_base<T_Data>
00119 {
00121 unsigned int port;
00122
00124 s3_fifo_queue<s3_txport_data<T_Data> > rx_queue;
00125
00127 s3_fifo_queue<s3_txport_data<T_Data> > tx_queue;
00128
00130 s3_txport_client_id connect_id;
00131
00133 T_Socket sock;
00134
00141 s3_growable_fifo_queue<s3_txport_client_id> dying_tasklets;
00142
00144 typedef std::map<s3_txport_client_id,
00145 s3_txport_tcp_rx_task<T_Data, T_Socket>*>
00146 thread_map;
00147
00149 thread_map connections;
00150
00152 s3_txport_tcp_tx_task<T_Data, T_Socket> tx_thread;
00153
00155 s3_mutex connection_mutex;
00156
00158 s3_mutex reap_mutex;
00159
00161 struct metadata_type
00162 {
00163 s3_txport_client_id id;
00164 std::string metadata;
00165 metadata_type(s3_txport_client_id a_id, const std::string& md) :
00166 id(a_id), metadata(md){};
00167 metadata_type(){};
00168 };
00169
00171 s3_growable_fifo_queue<metadata_type> meta_queue;
00172
00174 s3_semaphore terminate;
00175
00177 unsigned int network_mask;
00178
00183 unsigned int network_number;
00184
00185 protected:
00189 void set_critical_by_id(s3_txport_client_id id);
00190
00194 void clear_critical_by_id(s3_txport_client_id id);
00195
00197 T_Socket* get_socket_by_id(s3_txport_client_id id);
00198
00200 void cleanup();
00201
00203 void reap();
00204
00206 s3_txport_client_id next_id()
00207 {
00208
00209 if (connect_id > 10000000)
00210 {
00211 connect_id = 1;
00212 }
00213 else
00214 {
00215 connect_id++;
00216 }
00217 return connect_id;
00218 }
00220 s3_txport_client_id spawn_handler(T_Socket* active_socket,
00221 std::string hostname);
00222
00224 void set_terminate()
00225 {
00226 terminate.post();
00227 }
00228
00229 public:
00236 s3_txport_tcp( int queue_size = 4, std::string network = "0.0.0.0",
00237 std::string mask = "0.0.0.0");
00238
00240 ~s3_txport_tcp();
00241
00243 s3_fifo_queue<s3_txport_data<T_Data> >& get_rx_queue()
00244 {
00245 return rx_queue;
00246 }
00247
00249 s3_fifo_queue<s3_txport_data<T_Data> >& get_tx_queue()
00250 {
00251 return tx_queue;
00252 }
00253
00255 void send_metadata(s3_txport_client_id id, const std::string& mdata);
00256
00263 std::list<s3_txport_client_id> get_client_id_list() const;
00264
00266 bool is_connected_by_id(s3_txport_client_id id);
00267
00273 std::string get_ip_address_by_id(s3_txport_client_id id);
00274
00288 bool listen(const unsigned int port);
00289
00299 s3_txport_client_id connect(const std::string& IP_addr,
00300 const unsigned int port);
00301
00306 bool disconnect_by_id(s3_txport_client_id id);
00307
00309 void disconnect_all();
00310
00315 void main_loop();
00316
00318 int num_connections() const;
00319
00321 T_Socket& get_socket()
00322 {
00323 return sock;
00324 }
00325
00327 bool listening() const
00328 {
00329 return is_running();
00330 }
00331
00336 void stop();
00337
00342 void restart();
00343
00344
00345 friend class s3_txport_tcp_tx_task<T_Data, T_Socket>;
00346 friend class s3_txport_tcp_rx_task<T_Data, T_Socket>;
00347 };
00348
00349 #include <s3fc/s3_txport_tcp.tcc>
00350
00351 #endif