00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00032 template<class T_Data>
00033 s3_txport_local<T_Data>::s3_txport_local( unsigned int queue_size)
00034 : rx_queue(queue_size), tx_queue(queue_size),connect_id(0), pn(500)
00035 {
00036 start();
00037 }
00038
00039 template<class T_Data>
00040 s3_txport_local<T_Data>::~s3_txport_local()
00041 {
00042 if (is_running())
00043 {
00044 terminate.post();
00045 wait_on_exit();
00046 }
00047 disconnect_all();
00048 }
00049
00050 template<class T_Data>
00051 s3_fifo_queue<s3_txport_data<T_Data> >& s3_txport_local<T_Data>::get_tx_queue()
00052 {
00053 return tx_queue;
00054 }
00055
00056 template<class T_Data>
00057 s3_fifo_queue<s3_txport_data<T_Data> >& s3_txport_local<T_Data>::get_rx_queue()
00058 {
00059 return rx_queue;
00060 }
00061
00062 template<class T_Data>
00063 std::list<s3_txport_client_id>
00064 s3_txport_local<T_Data>::get_client_id_list() const
00065 {
00066 std::list<s3_txport_client_id> ids;
00067 typename c_map::const_iterator ii;
00068 connection_mutex.lock();
00069 for ( ii = connections.begin(); ii!= connections.end(); ii++)
00070 {
00071 ids.push_back(ii->first);
00072 }
00073 connection_mutex.unlock();
00074 return ids;
00075 }
00076
00077 template<class T_Data>
00078 void s3_txport_local<T_Data>::send_metadata(s3_txport_client_id id,
00079 const std::string& metadata)
00080 {
00081 connection_mutex.lock();
00082 if (connections.count(id) != 0)
00083 {
00084 connections[id]->dispatch_event(
00085 s3_txport_event(s3_txport_event::metadata, id,
00086 metadata));
00087 }
00088 connection_mutex.unlock();
00089
00090 }
00091
00092 template<class T_Data>
00093 bool s3_txport_local<T_Data>::is_connected_by_id(s3_txport_client_id id)
00094 {
00095 bool ret;
00096 connection_mutex.lock();
00097 ret = connections.count(id) != 0;
00098 connection_mutex.unlock();
00099 return ret;
00100 }
00101
00102 template<class T_Data>
00103 bool s3_txport_local<T_Data>::disconnect_by_id(s3_txport_client_id id)
00104 {
00105 bool ret;
00106 connection_mutex.lock();
00107 if (connections.count(id) != 0 )
00108 {
00109
00110 connections[id]->deregister_me(this);
00111 connections.erase(id);
00112 dispatch_event(s3_txport_event(s3_txport_event::disconnect,id,
00113 "Disconnected locally"));
00114 ret = true;
00115 }
00116 else
00117 {
00118 ret = false;
00119 }
00120 connection_mutex.unlock();
00121 return ret;
00122 }
00123
00124 template<class T_Data>
00125 void s3_txport_local<T_Data>::disconnect_all()
00126 {
00127 typename c_map::iterator ii = connections.begin();
00128 typename c_map::iterator next_ii;
00129
00130 while ( ii != connections.end() )
00131 {
00132 next_ii = ii;
00133 next_ii++;
00134 disconnect_by_id(ii->first);
00135 ii = next_ii;
00136 }
00137 }
00138
00139 template<class T_Data>
00140 bool s3_txport_local<T_Data>::listening() const
00141 {
00142 return is_running();
00143 }
00144
00145 template<class T_Data>
00146 s3_txport_client_id s3_txport_local<T_Data>::connect(
00147 s3_txport_local<T_Data> *listener)
00148 {
00149 if (listener == 0)
00150 {
00151 throw s3_generic_exception(
00152 "s3_txport_local::connect(s3_txport_local *)",
00153 "Remote listener does not exist (null pointer)");
00154 }
00155 s3_txport_local<T_Data> *remote = listener->register_me(this);
00156 if (remote == 0)
00157 {
00158
00159 dispatch_event(s3_txport_event(s3_txport_event::error,
00160 0,"Already connected to this remote"));
00161 return static_cast<s3_txport_client_id>(0);
00162 }
00163 s3_txport_client_id id = next_id();
00164 connection_mutex.lock();
00165 connections[id] = remote;
00166 connection_mutex.unlock();
00167 dispatch_event(s3_txport_event(s3_txport_event::connect,
00168 id,"Locally established connection"));
00169 return id;
00170 }
00171
00172 template<class T_Data>
00173 s3_txport_local<T_Data>* s3_txport_local<T_Data>::register_me(
00174 s3_txport_local<T_Data> *remote)
00175 {
00176
00177 typename c_map::iterator ii;
00178 bool registered = false;
00179 connection_mutex.lock();
00180 for ( ii = connections.begin(); ii!= connections.end(); ii++)
00181 {
00182 if (ii->second == remote)
00183 {
00184 registered = true;
00185 }
00186 }
00187 connection_mutex.unlock();
00188 if (registered)
00189 {
00190 dispatch_event(s3_txport_event(s3_txport_event::error,0,
00191 "Remote attempted duplicate connect"));
00192 return 0;
00193 }
00194
00195 s3_txport_client_id id = next_id();
00196 connection_mutex.lock();
00197 connections[id] = remote;
00198 connection_mutex.unlock();
00199 dispatch_event(s3_txport_event(s3_txport_event::connect,id,
00200 "Remote connetion received"));
00201 return this;
00202 }
00203
00204 template<class T_Data>
00205 void s3_txport_local<T_Data>::deregister_me(
00206 const s3_txport_local<T_Data> *remote)
00207 {
00208 typename c_map::iterator ii;
00209 connection_mutex.lock();
00210
00211 ii = connections.begin();
00212 while( ii != connections.end() )
00213 {
00214 typename c_map::iterator next_ii;
00215 next_ii = ii;
00216 next_ii++;
00217
00218
00219 if (ii->second == remote)
00220 {
00221 dispatch_event(s3_txport_event(s3_txport_event::disconnect,
00222 ii->first,
00223 "Remote has deregistered"));
00224 connections.erase(ii);
00225 }
00226 ii = next_ii;
00227 }
00228 connection_mutex.unlock();
00229 }
00230
00231 template<class T_Data>
00232 bool s3_txport_local<T_Data>::listen()
00233 {
00234 return true;
00235 }
00236
00237 template<class T_Data>
00238 void s3_txport_local<T_Data>::cleanup()
00239 {
00240 pn.disable();
00241 pn.unsubscribe(sem);
00242 tx_queue.unsubscribe_consumer(sem);
00243 S3FC_DBG(std::cout << "s3_txport_local:: cleanup() cleaning up"
00244 << std::endl);
00245 }
00246
00247 template<class T_Data>
00248 void s3_txport_local<T_Data>::main_loop()
00249 {
00250 typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00251 std::list<s3_txport_client_id> id_list;
00252 pn.subscribe(sem);
00253 pn.enable();
00254 tx_queue.subscribe_consumer(sem);
00255
00256 while (1)
00257 {
00258 sem.wait();
00259 if (terminate.try_wait())
00260 {
00261 return;
00262 }
00263 d = tx_queue.nbl_open_output();
00264 if (d == 0) continue;
00265
00266 id_list = d->get_id_list();
00267
00268 if (!id_list.empty())
00269 {
00270
00271 for ( std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00272 ii != id_list.end(); ii++)
00273 {
00274 connection_mutex.lock();
00275 if (connections.count(*ii) != 0 )
00276 {
00277 connections[*ii]->get_rx_queue().push(*d);
00278 connection_mutex.unlock();
00279 }
00280 else
00281 {
00282 connection_mutex.unlock();
00283 dispatch_event(s3_txport_event(s3_txport_event::error,
00284 *ii, "Data sent to non-existant client ID"));
00285 }
00286 }
00287 }
00288 tx_queue.close_output(d);
00289 }
00290 }
00291 template<class T_Data>
00292 int s3_txport_local<T_Data>::num_connections() const
00293 {
00294 int c;
00295 connection_mutex.lock();
00296 c = connections.size();
00297 connection_mutex.unlock();
00298 return c;
00299 }