00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include <s3fc/s3_periodic_notifier.h>
00025 #include <s3fc/s3_event.h>
00026 #include <s3fc/s3_txport_base.h>
00027 #include <s3fc/s3_socket_tcp.h>
00028 #include <cassert>
00029 #include <memory>
00030
00031 #ifndef _WIN32
00032 #include <netinet/in.h>
00033 #endif
00034
00064 template<class T_Data, class T_Socket>
00065 class s3_txport_tcp_tx_task : public s3_thread_base
00066 {
00068 s3_semaphore sem;
00070 s3_fifo_queue<s3_txport_data<T_Data> >& in_q;
00072 s3_pack_buffer b;
00074 s3_txport_tcp<T_Data, T_Socket> *parent;
00076 s3_semaphore terminate;
00078 s3_periodic_notifier pn;
00080 std::list<s3_txport_client_id> id_list;
00081
00083 s3_txport_tcp_tx_task(s3_fifo_queue<s3_txport_data<T_Data> >& a_q,
00084 s3_txport_tcp<T_Data, T_Socket> *a_parent);
00085
00087 ~s3_txport_tcp_tx_task()
00088 {
00089 pn.unsubscribe(sem);
00090 }
00091
00093 void handle_failure(s3_txport_client_id id, std::string err_msg)
00094 {
00095
00096 parent->dispatch_event(
00097 s3_txport_event(s3_txport_event::error, id, err_msg));
00098 parent->disconnect_by_id(id);
00099 }
00100
00102 void cleanup();
00103
00104 public:
00106 void main_loop();
00107
00108 void set_terminate()
00109 {
00110 terminate.post();
00111 }
00112
00113 friend class s3_txport_tcp<T_Data, T_Socket>;
00114 };
00115
00116 template<class T_Data, class T_Socket>
00117 s3_txport_tcp_tx_task<T_Data, T_Socket>::s3_txport_tcp_tx_task(
00118 s3_fifo_queue<s3_txport_data<T_Data> >& a_q,
00119 s3_txport_tcp<T_Data, T_Socket> *a_parent) :
00120 in_q(a_q), parent(a_parent)
00121 {
00122
00123 pn.set_period(500);
00124 pn.subscribe(sem);
00125 }
00126
00127 template<class T_Data, class T_Socket>
00128 void s3_txport_tcp_tx_task<T_Data, T_Socket>::cleanup()
00129 {
00130 pn.disable();
00131 parent->meta_queue.unsubscribe_consumer(sem);
00132 in_q.unsubscribe_consumer(sem);
00133 }
00134
00135 template<class T_Data, class T_Socket>
00136 void s3_txport_tcp_tx_task<T_Data, T_Socket>::main_loop()
00137 {
00138 typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00139 unsigned int sz = 0;
00140
00141 pn.enable();
00142 parent->meta_queue.subscribe_consumer(sem);
00143 in_q.subscribe_consumer(sem);
00144 while (1)
00145 {
00146 if (terminate.try_wait()) return;
00147 b.reset();
00148
00149 parent->reap();
00150
00151 sem.wait();
00152
00153
00154
00155 if (!parent->meta_queue.empty())
00156 {
00157 typename s3_txport_tcp<T_Data, T_Socket>::metadata_type m =
00158 parent->meta_queue.pop();
00159 S3FC_DBG( std::cout << "Sending metadata to client " << m.id
00160 << " " << m.metadata << std::endl );
00161 if (parent->is_connected_by_id(m.id))
00162 {
00163 parent->set_critical_by_id(m.id);
00164 s3_streamable::pack(m.metadata,b);
00165
00166 sz = b.get_size() << 1 | 1;
00167 T_Socket* sock = parent->get_socket_by_id(m.id);
00168 assert( sock != 0 );
00169 if ( sock->is_closed() )
00170 {
00171 parent->clear_critical_by_id(m.id);
00172 continue;
00173 }
00174 if ( !sock->write(&sz, sizeof(sz), -1, &terminate) )
00175 {
00176 handle_failure(m.id, sock->get_error());
00177 parent->clear_critical_by_id(m.id);
00178 if (sock->get_errno() == -2) return;
00179 continue;
00180 }
00181 if ( !sock->write(b.get_buffer(), sz >> 1, -1, &terminate) )
00182 {
00183 handle_failure(m.id, sock->get_error());
00184 parent->clear_critical_by_id(m.id);
00185 if (sock->get_errno() == -2) return;
00186 continue;
00187 }
00188 parent->clear_critical_by_id(m.id);
00189 }
00190 else
00191 {
00192 std::ostringstream msg;
00193 msg << " Metadata sent to non-existant client ID";
00194 parent->dispatch_event(
00195 s3_txport_event(s3_txport_event::error, m.id, msg.str()));
00196 }
00197 continue;
00198 }
00199
00200
00201 d = in_q.nbl_open_output();
00202 if (d == 0) continue;
00203
00204 s3_streamable::pack(d->data,b);
00205 id_list = d->get_id_list();
00206 in_q.close_output(d);
00207
00208 if (!id_list.empty())
00209 {
00210 if (terminate.try_wait()) return;
00211
00212 for ( std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00213 ii != id_list.end(); ii++)
00214 {
00215 s3_txport_client_id id = *ii;
00216
00217
00218
00219 parent->set_critical_by_id(id);
00220 if (parent->is_connected_by_id(id))
00221 {
00222 sz = b.get_size();
00223 if (sz >= (1u << (static_cast<unsigned int>(sizeof(sz))*8u-1u)))
00224 {
00225 throw s3_generic_exception(
00226 "s3_txport_tcp_tx_task::main_loop()",
00227 "Maximum packet size exceeded");
00228 }
00229
00230
00231 sz <<= 1;
00232 T_Socket* sock = parent->get_socket_by_id(id);
00233 assert( sock != 0 );
00234 if ( sock->is_closed() )
00235 {
00236 parent->clear_critical_by_id(id);
00237 continue;
00238 }
00239 if ( !sock->write(&sz, sizeof(sz), -1, &terminate) )
00240 {
00241 handle_failure(id, sock->get_error());
00242 parent->clear_critical_by_id(id);
00243 if (sock->get_errno() == -2) return;
00244 continue;
00245 }
00246 if ( !sock->write(b.get_buffer(), sz >> 1, -1, &terminate) )
00247 {
00248 handle_failure(id, sock->get_error());
00249 parent->clear_critical_by_id(id);
00250 if (sock->get_errno() == -2) return;
00251 continue;
00252 }
00253 parent->clear_critical_by_id(id);
00254 }
00255 else
00256 {
00257 std::ostringstream msg;
00258 msg << " Data sent to non-existant client ID";
00259 parent->dispatch_event(
00260 s3_txport_event(s3_txport_event::error, id, msg.str()));
00261 parent->clear_critical_by_id(id);
00262 }
00263 }
00264 }
00265 else
00266 {
00267 std::ostringstream msg;
00268 msg << " No connected clients to send to";
00269 parent->dispatch_event(
00270 s3_txport_event(s3_txport_event::error, -1, msg.str()));
00271 }
00272 }
00273
00274 throw (s3_generic_exception("s3_txport_tx_task::main_loop()",
00275 "Instruction pointer cannot get here!"));
00276 }
00277
00288 template<class T_Data, class T_Socket>
00289 class s3_txport_tcp_rx_task : public s3_thread_base
00290 {
00292 s3_semaphore sem;
00294 s3_pack_buffer b;
00296 T_Socket* sock;
00298 s3_txport_client_id id;
00300 std::string client_ip;
00302 std::string message;
00304 bool is_connected;
00306 s3_txport_tcp<T_Data, T_Socket> *parent;
00308 s3_semaphore terminate;
00310 s3_periodic_notifier pn;
00312 s3_mutex c_lock;
00313
00314 void set_message(const std::string m)
00315 {
00316 message = m;
00317 }
00318
00319 protected:
00320 void cleanup()
00321 {
00322 #if 0
00323 S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::cleanup(): "
00324 << "Cleaning up thread with socket "
00325 << sock.get_fd() << " and ID " << int(id)
00326 << std::endl << std::flush );
00327 #endif
00328
00329 c_lock.lock();
00330 is_connected = false;
00331 c_lock.unlock();
00332
00333 lock_state();
00334 sock->close();
00335 delete sock;
00336 sock = 0;
00337 unlock_state();
00338
00339 parent->dying_tasklets.push(id);
00340 parent->rx_queue.unsubscribe_producer(sem);
00341 pn.disable();
00342 }
00343
00344
00345 s3_txport_tcp_rx_task(
00346 s3_txport_client_id a_id, T_Socket* a_sock,
00347 const std::string& a_client_ip,
00348 s3_txport_tcp<T_Data,T_Socket> *a_parent) :
00349 sock(a_sock), id(a_id), client_ip(a_client_ip), message(""),
00350 is_connected(true), parent(a_parent), pn(100)
00351 {
00352 pn.subscribe(sem);
00353 }
00354
00356 ~s3_txport_tcp_rx_task()
00357 {
00358 pn.unsubscribe(sem);
00359 }
00360
00361 public:
00363 void set_terminate()
00364 {
00365 terminate.post();
00366 }
00367
00369 std::string get_message() const
00370 {
00371 return message;
00372 }
00373
00375 std::string get_client_ip() const
00376 {
00377 return client_ip;
00378 }
00379
00381 bool connected() const
00382 {
00383 bool ret;
00384 c_lock.lock();
00385 ret = is_connected;
00386 c_lock.unlock();
00387 return ret;
00388 }
00389
00391 T_Socket* get_socket()
00392 {
00393 return sock;
00394 }
00395
00396 void main_loop()
00397 {
00398 pn.enable();
00399 parent->rx_queue.subscribe_producer(sem);
00400
00401 typename s3_fifo_queue<s3_txport_data<T_Data> >::s3_queue_data_t *d;
00402 #if 0
00403 S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00404 << "Receive thread for socket " << sock.get_fd()
00405 << " with client ID " << int(id) << " started"
00406 << std::endl << std::flush );
00407 #endif
00408 bool meta = false;
00409 while (1)
00410 {
00411 int size = 0;
00412 meta = false;
00413
00414 if (terminate.try_wait())
00415 {
00416 S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00417 << "termination semaphore posted, terminating"
00418 << std::endl << std::flush );
00419 return;
00420 }
00421 #if 0
00422 S3FC_DBG( std::cout << "survived termination semaphore"
00423 << " (socket " << sock.get_fd() << ")"
00424 << std::endl << std::flush );
00425 #endif
00426 if ( !sock->read(&size, sizeof(size), &terminate) )
00427 {
00428 set_message(sock->get_error());
00429 return;
00430 }
00431
00432 S3FC_DBG( std::cout << "read succeeded" << std::endl << std::flush );
00433
00434 if (size & 1) meta = true;
00435 size >>= 1;
00436
00437
00438 b.set_capacity(size);
00439 if ( !sock->read(b.get_buffer(), size, &terminate) )
00440 {
00441 set_message(sock->get_error());
00442 return;
00443 }
00444 if (!meta)
00445 {
00446
00447 do
00448 {
00449 d = parent->rx_queue.nbl_open_input();
00450 if (d == 0 )
00451 {
00452 if (terminate.try_wait()) return;
00453
00454 sem.wait();
00455 }
00456 }
00457 while (d == 0);
00458 s3_streamable::unpack(d->data,b);
00459
00460 d->set_id(id);
00461 parent->rx_queue.close_input(d);
00462 }
00463 else
00464 {
00465 std::string m;
00466 s3_streamable::unpack(m,b);
00467 S3FC_DBG( std::cout << "s3_txport_tcp_rx_task::main_loop(): "
00468 << "Metadata " << m << " received for id "
00469 << int(id) << std::endl << std::flush );
00470 parent->dispatch_event(
00471 s3_txport_event(s3_txport_event::metadata, id, m));
00472 }
00473 b.reset();
00474 }
00475 }
00476
00477 friend class s3_txport_tcp<T_Data,T_Socket>;
00478 };
00479
00480 template<class T_Data, class T_Socket>
00481 s3_txport_tcp<T_Data, T_Socket>::s3_txport_tcp(
00482 int queue_slots, std::string network, std::string mask) :
00483 rx_queue(queue_slots), tx_queue(queue_slots), connect_id(0),
00484 sock(INVALID_SOCKET), tx_thread(tx_queue, this), network_mask(0),
00485 network_number(0)
00486 {
00487
00488 #ifdef _WIN32
00489 WSADATA wsaData;
00490
00491
00492
00493
00494
00495
00496
00497
00498
00499
00500 WSAStartup(0x0101, &wsaData);
00501 #endif
00502 struct in_addr ip_addr_temp;
00503 #ifdef _WIN32
00504
00505
00506
00507
00508
00509
00510 ip_addr_temp.s_addr = inet_addr( network.c_str() );
00511 if ( ip_addr_temp.s_addr == INADDR_NONE )
00512 {
00513 throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00514 "Invalid network number string");
00515 }
00516 #else
00517 int ret = inet_pton(AF_INET,network.c_str(),&ip_addr_temp);
00518 if (ret <= 0)
00519 {
00520 throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00521 "Invalid network number string");
00522 }
00523 #endif
00524 network_number = ntohl(ip_addr_temp.s_addr);
00525 #ifdef _WIN32
00526 ip_addr_temp.s_addr = inet_addr( mask.c_str() );
00527 if ( ip_addr_temp.s_addr == INADDR_NONE )
00528 {
00529 throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00530 "Invalid network mask string");
00531 }
00532 #else
00533 ret = inet_pton(AF_INET,mask.c_str(),&ip_addr_temp);
00534 if (ret <= 0)
00535 {
00536 throw s3_generic_exception("s3_txport_tcp::s3_txport_tcp()",
00537 "Invalid network mask string");
00538 }
00539 #endif
00540 network_mask = ntohl(ip_addr_temp.s_addr);
00541
00542 tx_thread.start();
00543 }
00544
00545 template<class T_Data, class T_Socket>
00546 s3_txport_tcp<T_Data, T_Socket>::~s3_txport_tcp()
00547 {
00548 stop();
00549
00550 #ifdef _WIN32
00551
00552
00553
00554
00555
00556
00557
00558 WSACleanup();
00559 #endif
00560 }
00561
00562 template<class T_Data, class T_Socket>
00563 void s3_txport_tcp<T_Data, T_Socket>::restart()
00564 {
00565 if (!tx_thread.is_running())
00566 {
00567 tx_thread.start();
00568 }
00569 }
00570
00571 template<class T_Data, class T_Socket>
00572 bool s3_txport_tcp<T_Data, T_Socket>::disconnect_by_id(s3_txport_client_id id)
00573 {
00574 S3FC_DBG( std::cout << "s3_txport_tcp::disconnect_by_id(id="
00575 << int(id) << ") called" << std::endl << std::flush );
00576
00577 bool retval = false;
00578
00579
00580
00581 connection_mutex.lock();
00582 if (connections.count(id) != 0)
00583 {
00584 if (connections[id]->is_running())
00585 {
00586 connections[id]->set_terminate();
00587 retval = true;
00588
00589 S3FC_DBG( std::cout << "s3_txport_tcp::disconnect_by_id(): "
00590 "set_terminate() on connection done" << std::endl
00591 << std::flush );
00592 }
00593 }
00594 connection_mutex.unlock();
00595
00596 return retval;
00597 }
00598
00599 template<class T_Data, class T_Socket>
00600 void s3_txport_tcp<T_Data, T_Socket>::disconnect_all()
00601 {
00602 while (num_connections() != 0)
00603 {
00604 std::list<s3_txport_client_id> id_list = get_client_id_list();
00605
00606 for (std::list<s3_txport_client_id>::iterator ii = id_list.begin();
00607 ii != id_list.end(); ii++)
00608 {
00609 disconnect_by_id(*ii);
00610 }
00611 S3FC_DBG(std::cout << "s3_txport_tcp::disconnect_all() reaping"
00612 << std::endl);
00613 reap();
00614 if (num_connections() != 0)
00615 {
00616 S3FC_DBG(std::cout
00617 << "s3_txport_tcp::disconnect_all() num_connections()="
00618 << num_connections() << std::endl);
00619 S3FC_DBG(std::cout
00620 << "s3_txport_tcp::disconnect_all() dying task count="
00621 << dying_tasklets.size() << std::endl);
00622 #ifdef _WIN32
00623
00624 Sleep(1000);
00625 #else
00626
00627 sleep(1);
00628 #endif
00629 }
00630 }
00631 }
00632
00633 template<class T_Data, class T_Socket>
00634 void s3_txport_tcp<T_Data, T_Socket>::stop()
00635 {
00636 S3FC_DBG( std::cout << "s3_txport_tcp::stop() on TID " << get_thread_id()
00637 << std::endl << std::flush );
00638
00639
00640 if (tx_thread.is_running())
00641 {
00642 tx_thread.set_terminate();
00643 tx_thread.wait_on_exit();
00644 }
00645
00646
00647 if (is_running())
00648 {
00649 set_terminate();
00650 wait_on_exit();
00651 }
00652 disconnect_all();
00653
00654 S3FC_DBG( std::cout << "s3_txport_tcp::stop(): num_connections()="
00655 << num_connections() << std::endl );
00656 S3FC_DBG( std::cout << "s3_txport_tcp::stop(): dying task count="
00657 << dying_tasklets.size() << std::endl );
00658 S3FC_DBG( std::cout << "s3_txport_tcp::stop(): thread " << get_thread_id()
00659 << " is done" << std::endl << std::flush );
00660 }
00661
00662 template<class T_Data, class T_Socket>
00663 void
00664 s3_txport_tcp<T_Data, T_Socket>::set_critical_by_id(s3_txport_client_id id)
00665 {
00666 connection_mutex.lock();
00667
00668 if (connections.count(id) != 0 )
00669 {
00670 connections[id]->lock_state();
00671 }
00672 connection_mutex.unlock();
00673 }
00674
00675 template<class T_Data, class T_Socket>
00676 void
00677 s3_txport_tcp<T_Data, T_Socket>::clear_critical_by_id(s3_txport_client_id id)
00678 {
00679 connection_mutex.lock();
00680 if (connections.count(id) != 0)
00681 {
00682 connections[id]->unlock_state();
00683 }
00684 connection_mutex.unlock();
00685 }
00686
00687 template<class T_Data, class T_Socket>
00688 T_Socket* s3_txport_tcp<T_Data, T_Socket>::get_socket_by_id(
00689 s3_txport_client_id id)
00690 {
00691 T_Socket* ret = 0;
00692 connection_mutex.lock();
00693 if (connections.count(id) != 0)
00694 {
00695 ret = connections[id]->get_socket();
00696 }
00697 connection_mutex.unlock();
00698
00699 return ret;
00700 }
00701
00702 template<class T_Data, class T_Socket>
00703 bool
00704 s3_txport_tcp<T_Data, T_Socket>::is_connected_by_id(s3_txport_client_id id)
00705 {
00706 bool ret = false;
00707
00708 connection_mutex.lock();
00709 if (connections.count(id) != 0)
00710 {
00711 ret = connections[id]->connected();
00712 }
00713 connection_mutex.unlock();
00714
00715 return ret;
00716 }
00717
00718 template<class T_Data, class T_Socket>
00719 std::string
00720 s3_txport_tcp<T_Data, T_Socket>::get_ip_address_by_id(s3_txport_client_id id)
00721 {
00722 std::string ret;
00723
00724 connection_mutex.lock();
00725 if (connections.count(id) != 0)
00726 {
00727 ret = connections[id]->get_client_ip();
00728 }
00729 connection_mutex.unlock();
00730
00731 return ret;
00732 }
00733
00734 template<class T_Data, class T_Socket>
00735 std::list<s3_txport_client_id>
00736 s3_txport_tcp<T_Data, T_Socket>::get_client_id_list() const
00737 {
00738 typename thread_map::const_iterator ii;
00739 std::list<s3_txport_client_id> current_list;
00740
00741 connection_mutex.lock();
00742 for (ii = connections.begin(); ii != connections.end(); ii++)
00743 {
00744 current_list.push_back(ii->first);
00745 }
00746 connection_mutex.unlock();
00747
00748 return current_list;
00749 }
00750
00751 template<class T_Data, class T_Socket>
00752 void
00753 s3_txport_tcp<T_Data, T_Socket>::send_metadata(s3_txport_client_id id, const std::string& mdata)
00754 {
00755 meta_queue.push(metadata_type(id, mdata));
00756 }
00757
00758 template<class T_Data, class T_Socket>
00759 bool s3_txport_tcp<T_Data, T_Socket>::listen(const unsigned int a_port)
00760 {
00761 port = a_port;
00762
00763 if (!is_running())
00764 {
00765 if ( !sock.is_valid() )
00766 {
00767 #if 0
00768 S3FC_DBG2_("s3_txport_tcp::listen()", "socket=" << sock.get_fd());
00769 #endif
00770
00771 throw s3_generic_exception(
00772 "s3_txport_tcp::listen()",
00773 "Failed to open a socket: " + sock.get_error());
00774 }
00775 if ( !sock.bind(INADDR_ANY, port) )
00776 {
00777 std::ostringstream msg;
00778 msg << "s3_txport_tcp::listen():"
00779 << " Failed to connect to port " << port
00780 << " because " << sock.get_error();
00781 s3_event_dispatcher<s3_txport_event>::dispatch_event(
00782 s3_txport_event(s3_txport_event::error, -1, msg.str()));
00783 return false;
00784 }
00785 if ( !sock.listen(SOMAXCONN) )
00786 {
00787 std::ostringstream msg;
00788 msg << " Failed to listen on socket" << port
00789 << " because " << sock.get_error();
00790 throw s3_generic_exception("s3_txport_tcp::listen()", msg.str());
00791 }
00792
00793 start();
00794 }
00795 if (!is_running())
00796 {
00797 throw s3_generic_exception(
00798 "s3_txport_tcp::listen()", "Listener thread failed to start.");
00799 }
00800
00801 return true;
00802 }
00803
00804 template<class T_Data, class T_Socket>
00805 void s3_txport_tcp<T_Data, T_Socket>::cleanup()
00806 {
00807
00808 sock.close();
00809 }
00810
00811 template<class T_Data, class T_Socket>
00812 void s3_txport_tcp<T_Data, T_Socket>::reap()
00813 {
00814 reap_mutex.lock();
00815 while (!dying_tasklets.empty())
00816 {
00817 S3FC_DBG(std::cout << "s3_txport_tcp::reap(): Reaping dead tasks"
00818 << std::endl << std::flush );
00819 s3_txport_client_id tid = dying_tasklets.pop();
00820
00821 connection_mutex.lock();
00822 S3FC_DBG( std::cout << "Reaper has lock" << std::endl << std::flush );
00823 if (connections.count(tid) == 0)
00824 {
00825 connection_mutex.unlock();
00826 S3FC_DBG( std::cout << "Reaper released lock (client vanished)"
00827 << std::endl << std::flush );
00828 continue;
00829 }
00830 if (!connections[tid]->test_state())
00831 {
00832
00833 dying_tasklets.push(tid);
00834 connection_mutex.unlock();
00835 S3FC_DBG( std::cout << "Reaper released lock (rx was critical)"
00836 << std::endl << std::flush );
00837 sched_yield();
00838 continue;
00839 }
00840
00841 connections[tid]->unlock_state();
00842 S3FC_DBG( std::cout << "Waiting on client thread " << int(tid)
00843 << std::endl << std::flush );
00844
00845
00846 connections[tid]->wait_on_exit();
00847 S3FC_DBG( std::cout << "Client " << int(tid) << " terminated"
00848 << std::endl << std::flush );
00849
00850
00851
00852 std::string reason = connections[tid]->get_client_ip() + " " +
00853 connections[tid]->get_message();
00854 delete connections[tid];
00855
00856 connections.erase(tid);
00857 connection_mutex.unlock();
00858 S3FC_DBG(std::cout << "Reaper released lock" << std::endl);
00859 s3_event_dispatcher<s3_txport_event>::dispatch_event(
00860 s3_txport_event(s3_txport_event::disconnect, tid, reason));
00861 }
00862 reap_mutex.unlock();
00863 }
00864
00865 template<class T_Data, class T_Socket>
00866 s3_txport_client_id
00867 s3_txport_tcp<T_Data, T_Socket>::connect(const std::string& IP_address, const unsigned int port)
00868 {
00869
00870 std::auto_ptr<T_Socket> lsock(new T_Socket(sock));
00871
00872 if ( !lsock->is_valid() )
00873 {
00874 throw s3_generic_exception(
00875 "s3_txport_tcp::connect()", "Failed to open a socket");
00876 }
00877
00878 S3FC_DBG2_("s3_txport_tcp::connect()", "connecting with lsock");
00879
00880 if ( !lsock->connect(IP_address, port) )
00881 {
00882 std::ostringstream msg;
00883 msg << "s3_txport_tcp::connect():"
00884 << " Failed to connect to " << IP_address << ":" << port
00885 << " because " << lsock->get_error();
00886 s3_event_dispatcher<s3_txport_event>::dispatch_event(
00887 s3_txport_event(s3_txport_event::error, -1, msg.str()));
00888 lsock->close();
00889 return static_cast<s3_txport_client_id>(0);
00890 }
00891 S3FC_DBG( std::cout << "s3_txport_client::s3_txport_client(): "
00892 << "connected, spawning handler" << std::endl << std::flush );
00893
00894
00895
00896 return spawn_handler(lsock.release(), IP_address);
00897 }
00898
00899
00900 template<class T_Data, class T_Socket>
00901 void s3_txport_tcp<T_Data, T_Socket>::main_loop()
00902 {
00903 S3FC_DBG(std::cout
00904 << "Daemon configured. Awaiting client connections..."
00905 << std::endl << std::flush);
00906 while (1)
00907 {
00908 if (terminate.try_wait()) return;
00909
00910
00911 if ( sock.select_rd(1) )
00912 {
00913 S3FC_DBG2_("s3_txport_tcp::main_loop()",
00914 "incoming connection. making server-side socket.");
00915
00916 struct sockaddr_in clientname;
00917 std::auto_ptr<T_Socket> active_sock(new T_Socket());
00918 if ( sock.accept(*active_sock, &clientname) )
00919 {
00920 S3FC_DBG2_("s3_txport_tcp::main_loop()", "active_sock ok");
00921
00922 int ip_no = ntohl(clientname.sin_addr.s_addr);
00923
00924 char *tmp = (char *)&ip_no;
00925 std::ostringstream hostname;
00926 hostname << static_cast<unsigned int>(tmp[3]) << "."
00927 << static_cast<unsigned int>(tmp[2]) << "."
00928 << static_cast<unsigned int>(tmp[1]) << "."
00929 << static_cast<unsigned int>(tmp[0]);
00930
00931 if ( (ip_no & network_mask) == network_number )
00932 {
00933 S3FC_DBG( std::cout << "s3_txport_tcp::main_loop(): "
00934 << "Connection established, spawning handler"
00935 << std::endl << std::flush );
00936 spawn_handler(active_sock.release(), hostname.str());
00937 }
00938 else
00939 {
00940 active_sock->close();
00941 S3FC_DBG( std::cout << "s3_txport_tcp::main_loop(): "
00942 << "Connection from " << hostname << " rejected"
00943 << std::endl << std::flush );
00944
00945 s3_event_dispatcher<s3_txport_event>:: dispatch_event(s3_txport_event(s3_txport_event::reject,
00946 -1, hostname.str()));
00947 }
00948 }
00949 else
00950 {
00951
00952 s3_event_dispatcher<s3_txport_event>:: dispatch_event(s3_txport_event(s3_txport_event::error,
00953 -1, sock.get_error()));
00954 }
00955 }
00956 }
00957 }
00958
00959 template<class T_Data, class T_Socket>
00960 s3_txport_client_id
00961 s3_txport_tcp<T_Data, T_Socket>::spawn_handler(T_Socket* active_sock, std::string hostname)
00962 {
00963 connection_mutex.lock();
00964 s3_txport_client_id id = next_id();
00965 connections[id] = new s3_txport_tcp_rx_task<T_Data, T_Socket>(
00966 connect_id, active_sock, hostname, this);
00967
00968 connections[id]->start();
00969 connection_mutex.unlock();
00970
00971 s3_event_dispatcher<s3_txport_event>::dispatch_event(
00972 s3_txport_event(s3_txport_event::connect, id, hostname));
00973 #if 0
00974 S3FC_DBG( std::cout << "Socket " << active_sock.get_fd()
00975 << " created for ID " << int(id) << std::endl << std::flush );
00976 #endif
00977 return id;
00978 }
00979
00980 template<class T_Data, class T_Socket>
00981 int s3_txport_tcp<T_Data, T_Socket>::num_connections() const
00982 {
00983 int retval = 0;
00984
00985 connection_mutex.lock();
00986 retval = connections.size();
00987 connection_mutex.unlock();
00988
00989 return retval;
00990 }