RPC API
此文档主要介绍 TuGraph 的 RPC API 的调用详情。
1.简介
TuGraph 提供丰富的 RPC API,以供开发者通过 RPC 请求远程调用 TuGraph 提供的服务。
RPC(远程过程调用)是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。 相比REST,RPC 面向方法,主要用于函数方法的调用,可以适合更复杂通信需求的场景,且性能更高。 brpc是用c++语言编写的工业级RPC框架,基于brpc,TuGraph 提供了丰富的RPC API,本文档描述 TuGraph 的 RPC API 使用方式。
2.请求
2.1.建立连接
开发者向TuGraph服务 发送RPC请求,首先要建立连接。以C++语言为例,开发者创建指定url的通道(channel), 由通道创建指定的服务存根(LGraphRPCService_Stub),后续即可通过存根像调用本地方法一样向远程 服务器发送请求。
std::shared_ptr<lgraph_rpc::m_channel_options> options = std::make_shared<lgraph_rpc::m_channel_options>();
options->protocol = "baidu_std";
options->connection_type = "";
options->timeout_ms = 60 * 60 * 1000 /*milliseconds*/;
options->max_retry = 3;
std::string load_balancer = "";
std::shared_ptr<lgraph_rpc::m_channel> channel = std::make_shared<lgraph_rpc::m_channel>();
if (channel->Init(url.c_str(), load_balancer, options.get()) != 0)
throw RpcException("Fail to initialize channel");
LGraphRPCService_Stub stub(channel.get());
2.2.请求类型
TuGraph支持10种RPC请求,其中每种请求的功能如下表所示:
请求 | 功能 |
---|---|
GraphApiRequest | 点边索引操作请求 |
CypherRequest | cypher请求 |
PluginRequest | 存储过程请求 |
HARequest | 高可用模式请求 |
ImportRequest | 数据导入请求 |
GraphRequest | 子图操作请求 |
AclRequest | 权限管理请求 |
ConfigRequest | 配置管理请求 |
RestoreRequest | 备份请求 |
SchemaRequest | schema管理请求 |
用户发送请求时,需要传入以下参数:
- client_version: 可选参数,HA模式下可通过对比
client_version
和server_version
防止响应过时的请求 - token: 必要参数,客户端登陆之后获得token,每次请求传入token以 校验用户身份
- is_write_op: 可选参数,标志请求是否是写请求
- user: 可选参数,HA模式下主从之间同步请求时设置user,不需验证token
服务处理完RPC请求之后发回响应,响应消息中除了包含每个请求的单独响应信息之外,还包含以下参数:
- error_code: 必要参数,标志请求处理状态
- redirect: 可选参数,HA模式下向follower发送写请求时处理失败,设置redirect为请求转发地址,即leader地址
- error: 可选参数,请求错误信息
- server_version: 可选参数,HA模式的请求响应中设置
server_version
以避免client读取数据时发生反向时间旅行问题
⚠️ 除CypherRequest、PluginRequest、HARequest和AclRequest外,其余RPC接口将逐步废弃,其功能统一至CypherRequest接口。
3.登录
登录请求信息包含以下参数:
- user: 必要参数,用户名
- pass: 必要参数,密码 以C++为例,用户使用构建好的服务存根发送登录请求:
auto* req = request.mutable_acl_request();
auto* auth = req->mutable_auth_request()->mutable_login();
auth->set_user(user);
auth->set_password(pass);
// send data
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
req->set_client_version(server_version);
req->set_token(token);
LGraphRPCService_Stub stub(channel.get());
LGraphResponse res;
stub.HandleRequest(cntl.get(), req, &resp, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
server_version = std::max(server_version, res.server_version());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
token = res.acl_response().auth_response().token();
登录响应信息包含以下参数:
- token: 必要参数,登录成功会收到带有签名的令牌,即 Json Web Token,客户端储存该令牌,并且用于以后的每次发送请求。 如果登录失败会收到“Authentication failed”错误。
4.查询
用户可以通过Cypher查询和TuGraph进行绝大多数的交互,Cypher请求信息包含以下参数:
- query: 必要参数,Cypher查询语句
- param_names: 可选参数,参数名
- param_values: 可选参数,参数值
- result_in_json_format: 必要参数,查询结果是否以JSON格式返回
- graph: 可选参数,Cypher语句执行的子图名称
- timeout: 可选参数,Cypher语句执行的超时时间
以C++为例,用户发送Cypher请求的方式如下所示:
LGraphResponse res;
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
LGraphRequest req;
req.set_client_version(server_version);
req.set_token(token);
lgraph::CypherRequest* cypher_req = req.mutable_cypher_request();
cypher_req->set_graph(graph);
cypher_req->set_query(query);
cypher_req->set_timeout(timeout);
cypher_req->set_result_in_json_format(true);
LGraphRPCService_Stub stub(channel.get());
stub.HandleRequest(cntl.get(), &req, &res, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
server_version = std::max(server_version, res.server_version());
CypherResponse cypher_res = res.cypher_response();
Cypher请求响应为以下两个参数之一:
- json_result: JSON格式的cypher查询结果
- binary_result: CypherResult格式的cypher查询结果
5.存储过程
为满足用户较为复杂的查询/更新逻辑,TuGraph支持 C 语言和 Python 语言编写的存储过程。 用户可以使用RPC请求对存储过程进行增删改查操作。
5.1.加载存储过程
加载存储过程的请求包含以下参数:
- name: 必要参数,存储过程名称
- read_only: 必要参数,是否只读
- code: 必要参数,存储过程文件读入生成的ByteString
- desc: 可选参数,存储过程描述
- code_type: 可选参数,存储过程代码类型,PY、SO、CPP、ZIP四者之一
以C++为例,用户加载存储过程的方式如下所示:
std::string content;
if (!FieldSpecSerializer::FileReader(source_file, content)) {
std::swap(content, result);
return false;
}
LGraphRequest req;
req.set_is_write_op(true);
lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
pluginRequest->set_graph(graph);
pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
: lgraph::PluginRequest::PYTHON);
pluginRequest->set_version(version);
lgraph::LoadPluginRequest* loadPluginRequest = pluginRequest->mutable_load_plugin_request();
loadPluginRequest->set_code_type([](const std::string& type) {
std::unordered_map<std::string, lgraph::LoadPluginRequest_CodeType> um{
{"SO", lgraph::LoadPluginRequest::SO},
{"PY", lgraph::LoadPluginRequest::PY},
{"ZIP", lgraph::LoadPluginRequest::ZIP},
{"CPP", lgraph::LoadPluginRequest::CPP}};
return um[type];
}(code_type));
loadPluginRequest->set_name(procedure_name);
loadPluginRequest->set_desc(procedure_description);
loadPluginRequest->set_read_only(read_only);
loadPluginRequest->set_code(content);
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
req.set_client_version(server_version);
req.set_token(token);
LGraphRPCService_Stub stub(channel.get());
LGraphResponse res;
stub.HandleRequest(cntl.get(), &req, &res, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
server_version = std::max(server_version, res.server_version());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
加载存储过程的响应不包含参数,如果加载失败则抛出BadInput异常
5.2.调用存储过程
调用存储过程的请求包含以下参数:
- name: 必要参数,存储过程名称
- param: 必要参数,存储过程参数
- result_in_json_format: 可选参数,调用结果是否以JSON格式返回
- in_process: 可选参数,未来支持
- timeout: 可选参数,调用存储过程的超时时间
以C++为例,用户调用存储过程的方式如下所示:
LGraphRequest req;
lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
pluginRequest->set_graph(graph);
pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
: lgraph::PluginRequest::PYTHON);
lgraph::CallPluginRequest *cpRequest = pluginRequest->mutable_call_plugin_request();
cpRequest->set_name(procedure_name);
cpRequest->set_in_process(in_process);
cpRequest->set_param(param);
cpRequest->set_timeout(procedure_time_out);
cpRequest->set_result_in_json_format(json_format);
LGraphResponse res;
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
req.set_client_version(server_version);
req.set_token(token);
LGraphRPCService_Stub stub(channel.get());
stub.HandleRequest(cntl.get(), &req, &res, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
server_version = std::max(server_version, res.server_version());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
if (json_format) {
result = res.mutable_plugin_response()->mutable_call_plugin_response()->json_result();
} else {
result = res.mutable_plugin_response()->mutable_call_plugin_response()->reply();
}
调用存储过程的响应为以下两个参数之一:
- reply: ByteString格式的存储过程调用结果
- json_result: JSON格式的存储过程调用结果
5.3.删除存储过程
删除存储过程的请求包含以下参数:
- name: 必要参数,存储过程名称
以C++为例,用户删除存储过程的方式如下所示:
LGraphRequest req;
req.set_is_write_op(true);
lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
pluginRequest->set_graph(graph);
pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
: lgraph::PluginRequest::PYTHON);
lgraph::DelPluginRequest* dpRequest = pluginRequest->mutable_del_plugin_request();
dpRequest->set_name(procedure_name);
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
req.set_client_version(server_version);
req.set_token(token);
LGraphRPCService_Stub stub(channel.get());
LGraphResponse res;
stub.HandleRequest(cntl.get(), &req, &res, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
server_version = std::max(server_version, res.server_version());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
删除存储过程的响应不包含参数,如果删除失败则抛出BadInput异常
5.4.列举存储过程
列举存储过程请求不需要参数,以C++为例,用户列举存储过程的方式如下所示:
LGraphRequest req;
req.set_is_write_op(false);
lgraph::PluginRequest* pluginRequest = req.mutable_plugin_request();
pluginRequest->set_graph(graph);
pluginRequest->set_type(procedure_type == "CPP" ? lgraph::PluginRequest::CPP
: lgraph::PluginRequest::PYTHON);
pluginRequest->mutable_list_plugin_request();
cntl->Reset();
cntl->request_attachment().append(FLAGS_attachment);
req.set_client_version(server_version);
req.set_token(token);
LGraphRPCService_Stub stub(channel.get());
LGraphResponse res;
stub.HandleRequest(cntl.get(), &req, &res, nullptr);
if (cntl->Failed()) throw RpcConnectionException(cntl->ErrorText());
server_version = std::max(server_version, res.server_version());
if (res.error_code() != LGraphResponse::SUCCESS) throw RpcStatusException(res.error());
result = res.mutable_plugin_response()->mutable_list_plugin_response()->reply();
列举存储过程的响应的参数如下所示:
- reply: JSON格式的procedure列表