00001
00009 #include <s3fc/s3_rpc.h>
00010 #include <s3fc/s3_conversion.h>
00011
00012
00013 template<typename T_obj>
00014 void s3_rpc_type_common<T_obj>::clear_callmap(callmap_t& cmap)
00015 {
00016
00017 while (! cmap.empty())
00018 {
00019
00020 delete cmap.begin()->second;
00021 cmap.erase(cmap.begin());
00022 }
00023 }
00024
00025
00026 template<typename T_obj>
00027 s3_rpc_exception s3_rpc_caller_m<T_obj>::err_invalid_args(
00028 const std::string& where,
00029 const std::vector<std::string>& args)
00030 {
00031 std::stringstream ss;
00032 ss << "Invalid arguments: '";
00033 if (args.size() > 0)
00034 {
00035 ss << args[0];
00036 for (unsigned int i = 1; i < args.size(); ++i)
00037 {
00038 ss << "|" << args[1];
00039 }
00040 }
00041 ss << "'";
00042 return s3_rpc_exception(where, ss.str());
00043 }
00044
00045
00046 template<typename T_obj>
00047 s3_rpc_caller_m_v_v<T_obj>::s3_rpc_caller_m_v_v(
00048 methodptr_t n_method) :
00049 method(n_method)
00050 {
00051 }
00052
00053
00054 template<typename T_obj>
00055 void s3_rpc_caller_m_v_v<T_obj>::call(
00056 T_obj* obj,
00057 const std::vector<std::string>& args)
00058 {
00059 S3FC_DBG(
00060 std::cerr << "s3_rpc_caller_m_v<T_obj>::call()"
00061 << " obj: " << obj
00062 << " method: " << method
00063 << std::endl << std::flush;
00064 );
00065
00066 if (args.size() != 0)
00067 {
00068 throw err_invalid_args(
00069 "s3_rpc_caller_m_v_t1::call()",
00070 args);
00071 }
00072 (obj->*method)();
00073
00074 S3FC_DBG(
00075 std::cerr << "s3_rpc_caller_m_v<T_obj>::call() done"
00076 << std::endl << std::flush;
00077 );
00078 }
00079
00080
00081 template<typename T_obj>
00082 unsigned int s3_rpc_caller_m_v_v<T_obj>::get_num_params()
00083 {
00084 return 0;
00085 }
00086
00087
00088 template<typename T_obj, typename T_arg1>
00089 s3_rpc_caller_m_v_t1<T_obj, T_arg1>::s3_rpc_caller_m_v_t1(
00090 methodptr_t n_method) :
00091 method(n_method)
00092 {
00093 }
00094
00095
00096 template<typename T_obj, typename T_arg1>
00097 void s3_rpc_caller_m_v_t1<T_obj, T_arg1>::call(
00098 T_obj* obj,
00099 const std::vector<std::string>& args)
00100 {
00101 if (args.size() != 1)
00102 {
00103 throw err_invalid_args(
00104 "s3_rpc_caller_m_v_t1::call()",
00105 args);
00106 }
00107 (obj->*method)(s3_conversion::from_string<T_arg1>(args[0]));
00108 }
00109
00110
00111 template<typename T_obj, typename T_arg1>
00112 unsigned int s3_rpc_caller_m_v_t1<T_obj, T_arg1>::get_num_params()
00113 {
00114 return 1;
00115 }
00116
00117
00118 template<typename T_obj, typename T_arg1, typename T_arg2>
00119 s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::s3_rpc_caller_m_v_t2(
00120 methodptr_t n_method) :
00121 method(n_method)
00122 {
00123 }
00124
00125
00126 template<typename T_obj, typename T_arg1, typename T_arg2>
00127 void s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::call(
00128 T_obj* obj,
00129 const std::vector<std::string>& args)
00130 {
00131 if (args.size() != 2)
00132 {
00133 throw err_invalid_args(
00134 "s3_rpc_caller_m_v_t2::call()",
00135 args);
00136 }
00137 (obj->*method)(s3_conversion::from_string<T_arg1>(args[0]),
00138 s3_conversion::from_string<T_arg2>(args[1]));
00139 }
00140
00141
00142 template<typename T_obj, typename T_arg1, typename T_arg2>
00143 unsigned int s3_rpc_caller_m_v_t2<T_obj, T_arg1, T_arg2>::get_num_params()
00144 {
00145 return 2;
00146 }
00147
00148
00149 template<typename T_obj>
00150 s3_rpc_server<T_obj>::s3_rpc_server(const std::string& n_name,
00151 T_obj& n_obj) :
00152 obj(&n_obj),
00153 cmap(s3_rpc_callmap<T_obj>::value())
00154 {
00155 std::string lname = s3_rpc_common::rpc_server_name(n_name);
00156 try
00157 {
00158
00159 mb.set_name(lname);
00160 mb.connect();
00161
00162 s3_thread_base::start();
00163 started.wait();
00164 }
00165 catch(const std::exception& e)
00166 {
00167 std::stringstream ss;
00168 ss << "s3_message_box::connect() threw exception (name: "
00169 << lname << "): [" << e.what() << "]";
00170
00171 throw s3_rpc_exception("s3_rpc_server::s3_rpc_server",
00172 ss.str());
00173 }
00174 }
00175
00176
00177 template<typename T_obj>
00178 s3_rpc_server<T_obj>::~s3_rpc_server()
00179 {
00180
00181 s3_thread_base::request_terminate();
00182 s3_thread_base::wait_on_exit();
00183 mb.disconnect();
00184
00185 s3_rpc_type_common<T_obj>::clear_callmap(cmap);
00186 }
00187
00188
00189 template<typename T_obj>
00190 void s3_rpc_server<T_obj>::main_loop()
00191 {
00192 started.post();
00193
00194 mb.subscribe_arrival(state_changed);
00195 while(! test_terminate())
00196 {
00197 wait_on_state_changed();
00198 S3FC_DBG(
00199 std::cerr << "s3_rpc_server<T_obj>::main_loop(): state_changed"
00200 << std::endl << std::flush;
00201 );
00202 handle_message(mb.get_msg());
00203 }
00204 }
00205
00206
00207
00208 template<typename T_obj>
00209 void s3_rpc_server<T_obj>::handle_message(const s3_message& msg)
00210 {
00211 if (! msg.is_empty())
00212 {
00213 std::string body = msg.get_body();
00214
00215 if (body.length() > 0)
00216 {
00217 std::string cmd = remove_first(body);
00218
00219 if (cmd == s3_rpc_common::cmd_tag_call)
00220 {
00221 exec_cmd_call(msg, body);
00222 }
00223
00224 else
00225 {
00226 std::stringstream ss;
00227 ss << "s3_rpc_server::handle_message: RPC message error: "
00228 << "[Invalid RPC command: '" << cmd << "']";
00229 reply_client(mb, msg, ss.str(), true);
00230 }
00231 }
00232
00233 else
00234 {
00235 reply_client(mb, msg,
00236 "s3_rpc_server::handle_message: RPC message error: "
00237 "[No RPC command]",
00238 true);
00239 }
00240 }
00241 }
00242
00243
00244 template<typename T_obj>
00245 void s3_rpc_server<T_obj>::exec_cmd_call(
00246 const s3_message& msg,
00247 std::string body)
00248 {
00249
00250 if (body.length() > 0)
00251 {
00252 std::string method = remove_first(body);
00253
00254 if (cmap.find(method) != cmap.end())
00255 {
00256 try
00257 {
00258 std::vector<std::string> body_parts =
00259 s3_conversion::split_string(body, "|");
00260 cmap[method]->call(obj, body_parts);
00261 reply_client(mb, msg, "", false);
00262 }
00263
00264
00265 catch(const std::exception& e)
00266 {
00267 std::stringstream ss;
00268 ss << "s3_rpc_server::handle_message: RPC message error: ["
00269 << e.what() << "]";
00270 reply_client(mb, msg, ss.str(), true);
00271 }
00272 }
00273
00274 else
00275 {
00276 std::stringstream ss;
00277 ss << "s3_rpc_server::handle_message: RPC message error: ["
00278 << "Invalid RPC method: '" << method << "']";
00279 reply_client(mb, msg, ss.str(), true);
00280 }
00281 }
00282 }
00283
00284
00285 template<typename T_obj>
00286 std::string s3_rpc_server<T_obj>::remove_first(std::string& str)
00287 {
00288 std::string rv;
00289
00290 const char delim = '|';
00291 std::string::size_type di = str.find_first_of(delim);
00292
00293 if (di != std::string::npos)
00294 {
00295
00296 rv.assign(str, 0, di);
00297
00298 str.erase(0, di+1);
00299 }
00300 else
00301 {
00302
00303 rv = str;
00304
00305 str = "";
00306 }
00307 return rv;
00308 }
00309
00310
00311 template<typename T_obj>
00312 void s3_rpc_server<T_obj>::reply_client(s3_message_box& mb,
00313 const s3_message& msg,
00314 const std::string& reply,
00315 bool err)
00316 {
00317 std::stringstream ss;
00318 if (err)
00319 {
00320 ss << "ERROR|" << reply;
00321 }
00322 else
00323 {
00324 ss << "OK|" << reply;
00325 }
00326 mb.reply_msg(msg, ss.str());
00327
00328 S3FC_DBG(
00329 std::cerr << "s3_rpc_server<T_obj>::reply_client: " << std::endl
00330 << "Original: " << std::endl
00331 << " From: " << msg.get_from() << std::endl
00332 << " Body: " << msg.get_body() << std::endl
00333 << "Reply: " << std::endl
00334 << " Body: " << ss.str() << std::endl;
00335 );
00336 }
00337
00338
00339 template<typename T_obj>
00340 s3_rpc_client<T_obj>::s3_rpc_client(const std::string& n_name) :
00341 server_name(s3_rpc_common::rpc_server_name(n_name)),
00342 cmap(s3_rpc_callmap<T_obj>::value())
00343 {
00344 std::string lname = s3_rpc_common::rpc_random_client_name(n_name);
00345 try
00346 {
00347 mb.set_name(lname);
00348 mb.connect();
00349 }
00350 catch(const std::exception& e)
00351 {
00352 std::stringstream ss;
00353 ss << "s3_message_box::connect() threw exception (name: "
00354 << lname << "): [" << e.what() << "]";
00355
00356 throw s3_rpc_exception("s3_rpc_client::s3_rpc_client",
00357 ss.str());
00358 }
00359 }
00360
00361
00362 template<typename T_obj>
00363 s3_rpc_client<T_obj>::~s3_rpc_client()
00364 {
00365 mb.disconnect();
00366 s3_rpc_type_common<T_obj>::clear_callmap(cmap);
00367 }
00368
00369
00370 template<typename T_obj>
00371 std::string s3_rpc_client<T_obj>::call(const std::string& method)
00372 {
00373 assert_method_args("s3_rpc_client<T_obj>::call()",
00374 method, 0);
00375 return call_common(method, "");
00376 }
00377
00378
00379 template<typename T_obj> template<typename T_arg1>
00380 std::string s3_rpc_client<T_obj>::call(const std::string& method,
00381 const T_arg1& arg1)
00382 {
00383 assert_method_args("s3_rpc_client<T_obj>::call(T1)",
00384 method, 1);
00385
00386 std::stringstream ss;
00387 ss << s3_conversion::to_string<T_arg1>(arg1) << "|";
00388 return call_common(method, ss.str());
00389 }
00390
00391
00392 template<typename T_obj> template<typename T_arg1, typename T_arg2>
00393 std::string s3_rpc_client<T_obj>::call(const std::string& method,
00394 const T_arg1& arg1,
00395 const T_arg2& arg2)
00396 {
00397 assert_method_args("s3_rpc_client<T_obj>::call(T1,T2)",
00398 method, 2);
00399
00400 std::stringstream ss;
00401 ss << s3_conversion::to_string<T_arg1>(arg1) << "|";
00402 ss << s3_conversion::to_string<T_arg2>(arg2) << "|";
00403 return call_common(method, ss.str());
00404 }
00405
00406
00407 template<typename T_obj>
00408 std::string s3_rpc_client<T_obj>::call_common(const std::string& method,
00409 const std::string& args)
00410 {
00411 std::stringstream ss;
00412 ss << s3_rpc_common::cmd_tag_call << "|" << method << "|" << args;
00413 mb.send_msg(server_name, ss.str());
00414 return "";
00415 }
00416
00417 template<typename T_obj>
00418 void s3_rpc_client<T_obj>::assert_method_args(const std::string& where,
00419 const std::string& method,
00420 unsigned int num_params)
00421 {
00422
00423 if (cmap.find(method) == cmap.end())
00424 {
00425 std::stringstream ss;
00426 ss << "Invalid method: '" << method << "'";
00427 throw s3_rpc_exception(where, ss.str());
00428 }
00429 if (cmap[method]->get_num_params() != num_params)
00430 {
00431 std::stringstream ss;
00432 ss << "Invalid number of parameters to method '" << method
00433 << "': " << num_params << " (" <<
00434 cmap[method]->get_num_params() << " expected)";
00435 throw s3_rpc_exception(where, ss.str());
00436 }
00437 }
00438