S3FC project page S3FC home page

Main Page   Class Hierarchy   Alphabetical List   Compound List   File List   Compound Members   File Members   Related Pages  

s3_rpc.tcc

Go to the documentation of this file.
00001 
00009 #include <s3fc/s3_rpc.h>
00010 #include <s3fc/s3_conversion.h>
00011 
00012 //
00013 template<typename T_obj>
00014 void s3_rpc_type_common<T_obj>::clear_callmap(callmap_t& cmap)
00015 {
00016    // clear the callmap
00017    while (! cmap.empty())
00018    {
00019       // deallocate storage 
00020       delete cmap.begin()->second;
00021       cmap.erase(cmap.begin());
00022    }
00023 }
00024 
00025 //
00026 template<typename T_obj>
00027 s3_rpc_exception s3_rpc_caller_m<T_obj>::err_invalid_args(
00028    const std::string& where,
00029    const std::vector<std::string>& args)
00030 {
00031    std::stringstream ss;
00032    ss << "Invalid arguments: '";
00033    if (args.size() > 0)
00034    {
00035       ss << args[0];
00036       for (unsigned int i = 1; i < args.size(); ++i)
00037       {
00038     ss << "|" << args[1];
00039       }
00040    }
00041    ss << "'";
00042    return s3_rpc_exception(where, ss.str());
00043 }
00044 
00045 //
00046 template<typename T_obj>
00047 s3_rpc_caller_m_v_v<T_obj>::s3_rpc_caller_m_v_v(
00048    methodptr_t n_method) :
00049    method(n_method) 
00050 {
00051 }
00052 
00053 //
00054 template<typename T_obj>
00055 void s3_rpc_caller_m_v_v<T_obj>::call(
00056    T_obj* obj,
00057    const std::vector<std::string>& args)
00058 {
00059    S3FC_DBG(
00060       std::cerr << "s3_rpc_caller_m_v<T_obj>::call()"
00061       << " obj: " << obj
00062       << " method: " << method
00063       << std::endl << std::flush;
00064       );
00065 
00066    if (args.size() != 0)
00067    {
00068       throw err_invalid_args(
00069     "s3_rpc_caller_m_v_t1::call()",
00070     args);
00071    }
00072    (obj->*method)();
00073 
00074    S3FC_DBG(
00075    std::cerr << "s3_rpc_caller_m_v<T_obj>::call() done"
00076         << std::endl << std::flush;
00077       );
00078 }
00079 
00080 //
00081 template<typename T_obj>
00082 unsigned int s3_rpc_caller_m_v_v<T_obj>::get_num_params()
00083 {
00084    return 0;
00085 }
00086 
00087 //
00088 template<typename T_obj, typename T_arg1>
00089 s3_rpc_caller_m_v_t1<T_obj, T_arg1>::s3_rpc_caller_m_v_t1(
00090    methodptr_t n_method) :
00091    method(n_method) 
00092 {
00093 }
00094 
00095 //
00096 template<typename T_obj, typename T_arg1>
00097 void s3_rpc_caller_m_v_t1<T_obj, T_arg1>::call(
00098    T_obj* obj,
00099    const std::vector<std::string>& args)
00100 {
00101    if (args.size() != 1)
00102    {
00103       throw err_invalid_args(
00104     "s3_rpc_caller_m_v_t1::call()",
00105     args);
00106    }
00107    (obj->*method)(s3_conversion::from_string<T_arg1>(args[0]));
00108 }
00109 
00110 //
00111 template<typename T_obj, typename T_arg1>
00112 unsigned int s3_rpc_caller_m_v_t1<T_obj, T_arg1>::get_num_params()
00113 {
00114    return 1;
00115 }
00116 
00117 //
00118 template<typename T_obj, typename T_arg1, typename T_arg2>
00119 s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::s3_rpc_caller_m_v_t2(
00120    methodptr_t n_method) :
00121    method(n_method) 
00122 {
00123 }
00124 
00125 //
00126 template<typename T_obj, typename T_arg1, typename T_arg2>
00127 void s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::call(
00128    T_obj* obj,
00129    const std::vector<std::string>& args)
00130 {
00131    if (args.size() != 2)
00132    {
00133       throw err_invalid_args(
00134     "s3_rpc_caller_m_v_t2::call()",
00135     args);
00136    }
00137    (obj->*method)(s3_conversion::from_string<T_arg1>(args[0]),
00138         s3_conversion::from_string<T_arg2>(args[1]));
00139 }
00140 
00141 //
00142 template<typename T_obj, typename T_arg1, typename T_arg2>
00143 unsigned int s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::get_num_params()
00144 {
00145    return 2;
00146 }
00147 
00148 //
00149 template<typename T_obj>
00150 s3_rpc_server<T_obj>::s3_rpc_server(const std::string& n_name,
00151                 T_obj& n_obj) :
00152    obj(&n_obj),
00153    cmap(s3_rpc_callmap<T_obj>::value())
00154 {
00155    std::string lname = s3_rpc_common::rpc_server_name(n_name);
00156    try
00157    {
00158       // init message box
00159       mb.set_name(lname);
00160       mb.connect();
00161       // start myself
00162       s3_thread_base::start();
00163       started.wait();
00164    }
00165    catch(const std::exception& e)
00166    {
00167       std::stringstream ss;
00168       ss << "s3_message_box::connect() threw exception (name: "
00169     << lname << "): [" << e.what() << "]";
00170       // rethrow as RPC exception
00171       throw s3_rpc_exception("s3_rpc_server::s3_rpc_server",
00172               ss.str());
00173    }
00174 }
00175 
00176 //
00177 template<typename T_obj>
00178 s3_rpc_server<T_obj>::~s3_rpc_server()
00179 {
00180    // stop the main loop
00181    s3_thread_base::request_terminate();
00182    s3_thread_base::wait_on_exit();
00183    mb.disconnect();
00184    // deallocate the callmap storage avoid leaks
00185    s3_rpc_type_common<T_obj>::clear_callmap(cmap);
00186 }
00187 
00188 //
00189 template<typename T_obj>
00190 void s3_rpc_server<T_obj>::main_loop()
00191 {
00192    started.post();
00193    // notify to state_changed when message received
00194    mb.subscribe_arrival(state_changed);
00195    while(! test_terminate())
00196    {
00197       wait_on_state_changed();
00198       S3FC_DBG(
00199     std::cerr << "s3_rpc_server<T_obj>::main_loop(): state_changed"
00200     << std::endl << std::flush;
00201     );
00202       handle_message(mb.get_msg());
00203    }
00204 }
00205 
00206 
00207 //
00208 template<typename T_obj>
00209 void s3_rpc_server<T_obj>::handle_message(const s3_message& msg)
00210 {
00211    if (! msg.is_empty()) 
00212    {
00213       std::string body = msg.get_body();
00214       // extract and handle command
00215       if (body.length() > 0)
00216       {
00217     std::string cmd = remove_first(body);
00218     // RPC call command
00219     if (cmd == s3_rpc_common::cmd_tag_call)
00220     {
00221        exec_cmd_call(msg, body);
00222     }
00223     // invalid command - send reply
00224     else
00225     {
00226        std::stringstream ss;
00227        ss << "s3_rpc_server::handle_message: RPC message error: "
00228           << "[Invalid RPC command: '" << cmd << "']";
00229        reply_client(mb, msg, ss.str(), true);
00230     }
00231       }
00232       // no command
00233       else
00234       {
00235     reply_client(mb, msg,
00236             "s3_rpc_server::handle_message: RPC message error: "
00237             "[No RPC command]",
00238             true);
00239       }
00240    }
00241 }
00242 
00243 //
00244 template<typename T_obj>
00245 void s3_rpc_server<T_obj>::exec_cmd_call(
00246    const s3_message& msg,
00247    std::string body)
00248 {
00249    // extract and handle method
00250    if (body.length() > 0)
00251    {
00252       std::string method = remove_first(body);
00253       // 
00254       if (cmap.find(method) != cmap.end())
00255       {
00256     try
00257     {
00258        std::vector<std::string> body_parts =
00259           s3_conversion::split_string(body, "|");
00260        cmap[method]->call(obj, body_parts);
00261        reply_client(mb, msg, "", false);
00262     }
00263     // specialised rpc caller rejected call - probably due to incorrect
00264     // number of arguments
00265     catch(const std::exception& e)
00266     {
00267        std::stringstream ss;
00268        ss << "s3_rpc_server::handle_message: RPC message error: ["
00269           << e.what() << "]";
00270        reply_client(mb, msg, ss.str(), true);
00271     }
00272       }
00273       // unknown method (i.e. not in callmap)
00274       else
00275       {
00276     std::stringstream ss;
00277     ss << "s3_rpc_server::handle_message: RPC message error: ["
00278        << "Invalid RPC method: '" << method << "']";
00279     reply_client(mb, msg, ss.str(), true);
00280       }
00281    }
00282 }
00283 
00284 //
00285 template<typename T_obj>
00286 std::string s3_rpc_server<T_obj>::remove_first(std::string& str)
00287 {
00288    std::string rv;
00289    
00290    const char delim = '|';
00291    std::string::size_type di = str.find_first_of(delim);
00292    // found delimeter - return first string
00293    if (di != std::string::npos)
00294    {
00295       // extract first part (excluding delimeter) as return value
00296       rv.assign(str, 0, di);
00297       // remove extracted part + delimiter from str
00298       str.erase(0, di+1);
00299    }
00300    else
00301    {
00302       // extract entire string as return value
00303       rv = str;
00304       // clear input string
00305       str = "";
00306    }
00307    return rv;
00308 }
00309 
00310 //
00311 template<typename T_obj>
00312 void s3_rpc_server<T_obj>::reply_client(s3_message_box& mb,
00313                const s3_message& msg,
00314                const std::string& reply,
00315                bool err)
00316 {
00317    std::stringstream ss;
00318    if (err)
00319    {
00320       ss << "ERROR|" << reply;
00321    }
00322    else
00323    {
00324       ss << "OK|" << reply;
00325    }
00326    mb.reply_msg(msg, ss.str());
00327 
00328    S3FC_DBG(
00329       std::cerr << "s3_rpc_server<T_obj>::reply_client: " << std::endl
00330       << "Original: " << std::endl
00331       << " From: " << msg.get_from() << std::endl
00332       << " Body: " << msg.get_body() << std::endl
00333       << "Reply: " << std::endl
00334       << " Body: " << ss.str() << std::endl;
00335       );
00336 }
00337 
00338 //
00339 template<typename T_obj>
00340 s3_rpc_client<T_obj>::s3_rpc_client(const std::string& n_name) :
00341    server_name(s3_rpc_common::rpc_server_name(n_name)),
00342    cmap(s3_rpc_callmap<T_obj>::value())
00343 {
00344    std::string lname = s3_rpc_common::rpc_random_client_name(n_name);
00345    try
00346    {
00347       mb.set_name(lname);
00348       mb.connect();
00349    }
00350    catch(const std::exception& e)
00351    {
00352       std::stringstream ss;
00353       ss << "s3_message_box::connect() threw exception (name: "
00354     << lname << "): [" << e.what() << "]";
00355       // rethrow as RPC exception
00356       throw s3_rpc_exception("s3_rpc_client::s3_rpc_client",
00357               ss.str());
00358    }
00359 }
00360 
00361 //
00362 template<typename T_obj>
00363 s3_rpc_client<T_obj>::~s3_rpc_client()
00364 {
00365    mb.disconnect();
00366    s3_rpc_type_common<T_obj>::clear_callmap(cmap);
00367 }
00368 
00369 //
00370 template<typename T_obj>
00371 std::string s3_rpc_client<T_obj>::call(const std::string& method)
00372 {
00373    assert_method_args("s3_rpc_client<T_obj>::call()",
00374             method, 0);
00375    return call_common(method, "");
00376 }
00377 
00378 //
00379 template<typename T_obj> template<typename T_arg1>
00380 std::string s3_rpc_client<T_obj>::call(const std::string& method,
00381                    const T_arg1& arg1)
00382 {
00383    assert_method_args("s3_rpc_client<T_obj>::call(T1)",
00384             method, 1);
00385 
00386    std::stringstream ss;
00387    ss << s3_conversion::to_string<T_arg1>(arg1) << "|";
00388    return call_common(method, ss.str());
00389 }
00390 
00391 //
00392 template<typename T_obj>  template<typename T_arg1, typename T_arg2>
00393 std::string s3_rpc_client<T_obj>::call(const std::string& method,
00394                    const T_arg1& arg1,
00395                    const T_arg2& arg2)
00396 {
00397    assert_method_args("s3_rpc_client<T_obj>::call(T1,T2)",
00398             method, 2);
00399 
00400    std::stringstream ss;
00401    ss << s3_conversion::to_string<T_arg1>(arg1) << "|";
00402    ss << s3_conversion::to_string<T_arg2>(arg2) << "|";
00403    return call_common(method, ss.str());
00404 }
00405 
00406 //
00407 template<typename T_obj>
00408 std::string s3_rpc_client<T_obj>::call_common(const std::string& method,
00409                    const std::string& args)
00410 {
00411    std::stringstream ss;
00412    ss << s3_rpc_common::cmd_tag_call << "|" << method << "|" << args;
00413    mb.send_msg(server_name, ss.str());
00414    return "";
00415 }
00416 
00417 template<typename T_obj>
00418 void s3_rpc_client<T_obj>::assert_method_args(const std::string& where,
00419                      const std::string& method,
00420                      unsigned int num_params)
00421 {
00422    // invalid method ?
00423    if (cmap.find(method) == cmap.end())
00424    {
00425       std::stringstream ss;
00426       ss << "Invalid method: '" << method << "'";
00427       throw s3_rpc_exception(where, ss.str());
00428    }
00429    if (cmap[method]->get_num_params() != num_params)
00430    {
00431       std::stringstream ss;
00432       ss << "Invalid number of parameters to method '" << method
00433     << "': " << num_params << " (" <<
00434     cmap[method]->get_num_params() << " expected)";
00435       throw s3_rpc_exception(where, ss.str());
00436    }
00437 }
00438 

Send comments to: s3fc@stonethree.com SourceForge Logo