当前位置:首页 » 《休闲阅读》 » 正文

[C++][第三方库][brpc]详细讲解

17 人参与  2024年12月13日 12:02  分类 : 《休闲阅读》  评论

点击全文阅读


目录

1.介绍2.安装3.类与接口介绍1.日志输出类与接口2.ProtoBuf类与接口3.服务端类与接口4.客户端类与接口 4.使用0.一般流程1.Server2.客户端 -- 同步调用3.客户端 -- 异步调用


1.介绍

brpc是用C++编写的工业级RPC框架,常用于搜索、存储、机器学习、广告、推荐等高性能系统具体理解:RPC是远程调用框架 以前都是将数据的处理过程直接在本地封装实现,进行调用完成功能RPC框架远程调用的思想不一样,是将数据处理的过程交给服务器来执行 使用场景: 搭建能在一个端口支持多协议的服务,或访问各种服务Server能同步或异步处理请求Client支持同步、异步、半同步,或使用组合channels简化复杂的分库或并发访问通过http界面调试服务,使用cpu, heap, contention profilers获得更好的延时和吞吐把组织中使用的协议快速地加入brpc,或定制各类组件,包括命名服务(dns, zk, etcd),负载均衡(rr, random, consistent hashing) 对比其他RPC框架grpc开发者:Google语言支持:C++、Java、Python、Go序列化:PB文档:较为完善,没有中文文档编译问题:主要是需要下载submodule问题服务端推送:不支持异步调用: 异步客户端需要用户自己创建消费线程,使用起来较为麻烦支持异步服务器 流式传输(大块数据传输):支持,接口简单易用易用性: 简单易用支持跨语言,跨平台 brpc开发者:baidu语言支持:C++序列化:PB文档:较为完善,有中文文档编译问题:基本没有问题服务端推送:支持,但支持的不好,是通过一次长的RPC调用实现的异步调用:通过回调函数支持客户端异步调用流式传输(大块数据传输):支持,接口稍微比较原生易用性: 提供简洁的API和友好的使用文档,易于上手在sofa-prpc基础上加了一些封装,比如资源管理等,更易用 srpc开发者:sogou语言支持:C++序列化:PB、Thift文档:文档略显简略,没有独立网站,都托管在Github编译问题:基本没有问题服务端推送:不支持异步调用: 支持异步客户端通过workflow支持异步服务器 流式传输(大块数据传输):不支持易用性:专注于异步编程,性能较高,用起来更复杂一些 sofa-pbrpc开发者:baidu语言支持:C++、Java序列化:PB文档:官方文档较少编译问题:编译存在问题,依赖的库版本都比较老服务端推送:不支持异步调用:通过回调函数支持客户端异步调用流式传输(大块数据传输):不支持易用性: 轻量化接口简单,容易使用

2.安装

依赖安装
sudo apt-get install -y libssl-dev libprotobuf-dev libprotoc-dev protobuf-compiler libleveldb-dev
安装brpc
git clone https://github.com/apache/brpc.gitcd brpc/mkdir build && cd buildcmake DCMAKE_INSTALL_PREFIX=/usr .. && cmake --build . -j6make && sudo make install

3.类与接口介绍

1.日志输出类与接口

包含头文件#include <butil/logging.h>日志输出这里,本质上用不着brpc的日志输出,因此这里主要介绍如何关闭日志输出
namespace logging { enum LoggingDestination {     LOG_TO_NONE = 0 };struct BUTIL_EXPORT LoggingSettings {     LoggingSettings();     LoggingDestination logging_dest; }; bool InitLogging(const LoggingSettings& settings); }

2.ProtoBuf类与接口

RpcController:上下文管理类,目前主要用于判断RPC请求是否是OK的Closure客户端:主要用于设置异步处理回调服务端:通过Run()宣告请求处理完毕
namespace google { namespace protobuf { class PROTOBUF_EXPORT Closure { public: Closure() {} virtual ~Closure(); virtual void Run() = 0; };inline Closure* NewCallback(void (*function)()); class PROTOBUF_EXPORT RpcController {     bool Failed();     std::string ErrorText() ; } } } 

3.服务端类与接口

此处只介绍主要用到的成员与接口Server:服务器类ServerOptions:服务器参数配置类ClosureGuardClosure对象的智能管理类Controller:管理RPC调用的上下文和状态 它提供了一些方法和属性来控制和检查RPC调用的状态、错误信息、超时设置
namespace brpc { struct ServerOptions {     //无数据传输,则指定时间后关闭连接     int idle_timeout_sec; // Default: -1 (disabled)     int num_threads; // Default: #cpu-cores     //.... }enum ServiceOwnership {     //添加服务失败时,服务器将负责删除服务对象     SERVER_OWNS_SERVICE,     //添加服务失败时,服务器也不会删除服务对象     SERVER_DOESNT_OWN_SERVICE };class Server {     int AddService(google::protobuf::Service* service,                    ServiceOwnership ownership);     int Start(int port, const ServerOptions* opt);     int Stop(int closewait_ms/*not used anymore*/);     int Join();     //休眠直到ctrl+c按下,或者stop和join服务器     void RunUntilAskedToQuit(); };class ClosureGuard {     explicit ClosureGuard(google::protobuf::Closure* done);     ~ClosureGuard() { if (_done) _done->Run(); } };class HttpHeader {     void set_content_type(const std::string& type)     const std::string* GetHeader(const std::string& key)     void SetHeader(const std::string& key,          const std::string& value);     const URI& uri() const { return _uri; }     HttpMethod method() const { return _method; }     void set_method(const HttpMethod method)     int status_code()     void set_status_code(int status_code);      };class Controller : public google::protobuf::RpcController {     void set_timeout_ms(int64_t timeout_ms);     void set_max_retry(int max_retry);     google::protobuf::Message* response();     HttpHeader& http_response();     HttpHeader& http_request();     bool Failed();     std::string ErrorText();          using AfterRpcRespFnType = std::function<         void(Controller* cntl,         const google::protobuf::Message* req,         const google::protobuf::Message* res)>;     void set_after_rpc_resp_fn(AfterRpcRespFnType&& fn) };}

4.客户端类与接口

Channel:客户端与服务器网络通信信道类ChannelOptions:信道配置类
namespace brpc { struct ChannelOptions {     //请求连接超时时间     int32_t connect_timeout_ms;// Default: 200 (milliseconds)     //rpc请求超时时间     int32_t timeout_ms;// Default: 500 (milliseconds)     //最大重试次数     int max_retry;// Default: 3     //序列化协议类型  options.protocol = "baidu_std";     AdaptiveProtocolType protocol;     //.... };class Channel : public ChannelBase {     //初始化接口,成功返回0;     int Init(const char* server_addr_and_port,  const ChannelOptions* options); };}

4.使用

0.一般流程

服务端: 创建RPC服务子类继承PB中的EchoService服务类,并实现内部的业务接口逻辑创建RPC服务器对象,搭建服务器向服务器类中添加RPC子服务对象,告诉服务器收到什么请求用哪个接口处理启动服务器 客户端: 创建网络通信信道实例化PB中的EchoService_Stub类对象发起RPC请i去,获取响应进行处理 makefile
all: server client_sync client_asyncserver: server.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldbclient_sync: client_sync.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldbclient_async: client.async.cc main.pb.ccg++ -o $@ $^ -std=c++17 -lbrpc -lgflags -lssl -lcrypto -lprotobuf -lleveldb.PHONY:cleanclean:rm server client_sync  client_async

1.Server

#include <brpc/server.h>#include <butil/logging.h>#include "main.pb.h"// 继承于EchoService, 创建一个子类, 并实现RPC调用的业务功能class EchoServiceImpl : public SnowK::EchoService{public:    EchoServiceImpl()    {}    ~EchoServiceImpl()    {}    void Echo(google::protobuf::RpcController *controller,              const ::SnowK::EchoRequest *request,              ::SnowK::EchoResponse *response,              ::google::protobuf::Closure *done)    {        brpc::ClosureGuard rpc_guard(done);        std::cout << "收到消息: " << request->message() << std::endl;        std::string str = request->message() + "--响应";        response->set_message(str);        // done->Run(); // 已经有了ClosureGuard, 则不再需要    }};int main(int argc, char* argv[]){    // 0.关闭brpc的默认日志输出    logging::LoggingSettings settings;    settings.logging_dest = logging::LoggingDestination::LOG_TO_NONE;    logging::InitLogging(settings);    // 1.构造服务器对象    brpc::Server server;    // 2.向服务器对象中,新增EchoService服务    EchoServiceImpl echo_service;    if (server.AddService(&echo_service, brpc::ServiceOwnership::SERVER_DOESNT_OWN_SERVICE) == -1)    {        std::cout << "添加Rpc服务失败!" << std::endl;        return -1;    }    // 3.启动服务器    brpc::ServerOptions options;    options.idle_timeout_sec = -1; // 连接空闲超时时间, 超时后连接被关闭    options.num_threads = 1; // IO线程数量    if(server.Start(3366, &options) == -1)    {        std::cout << "启动服务器失败" << std::endl;        return -1;    }    server.RunUntilAskedToQuit(); // 修改等待运行结束    return 0;}

2.客户端 – 同步调用

同步调用:客户端会阻塞收到server端的响应或发生错误
#include <brpc/channel.h>#include "main.pb.h"int main(int argc, char* argv[]){    // 1.构造Channel信道, 连接服务器    brpc::ChannelOptions options;    options.connect_timeout_ms = -1; // 连接等待超时时间, -1表示一直等待    options.timeout_ms = -1; // RPC请求等待超时时间, -1表示一直等待    options.max_retry = 3; // 请求重试次数    options.protocol = "baidu_std"; // 序列化协议, 默认使用baidu_std    brpc::Channel channel;    if(channel.Init("127.0.0.1:3366", &options) == -1)    {        std::cout << "初始化信道失败" << std::endl;        return -1;    }    // 2.构建EchoService_Stub对象, 用于进行RPC调用    SnowK::EchoService_Stub stub(&channel);    // 3.进行RPC调用    SnowK::EchoRequest req;    req.set_message("Hello SnowK");    brpc::Controller *cntl = new brpc::Controller();    SnowK::EchoResponse *resp = new SnowK::EchoResponse();    stub.Echo(cntl, &req, resp, nullptr); // nullptr为同步调用    if(cntl->Failed())    {        std::cout << "RPC调用失败" << cntl->ErrorText() << std::endl;        return -1;    }    std::cout << "收到响应: " << resp->message() << std::endl;    delete cntl;    delete resp;    return 0;}

3.客户端 – 异步调用

异步调用:指客户端注册一个响应处理回调函数, 当调用一个RPC接口时立即返回, 不会阻塞等待响应, 当Server端返回响应时会调用传入的回调函数处理响应具体做法: 给CallMethod传递一个额外的回调对象doneCallMethod在发出request后就结束了,而不是在RPC结束后当server端返回response或发生错误(包括超时)时,done->Run()会被调用 对RPC的后续处理应该写在done->Run()里,而不是CallMethod后 由于CallMethod结束不意味着RPC结束,response/controller仍可能被框架及done->Run()使用,它们一般得创建在堆上, 并在done->Run()中删除 如果提前删除了它们,那当done->Run()被调用时,将访问到无效内存
#include <thread>#include <brpc/channel.h>#include "main.pb.h"void Callback(brpc::Controller* cntl, SnowK::EchoResponse* resp){    std::shared_ptr<brpc::Controller> cntl_guard(cntl);    std::shared_ptr<SnowK::EchoResponse> resp_guard(resp);    if (cntl->Failed())    {        std::cout << "RPC调用失败" << cntl->ErrorText() << std::endl;        return;    }    std::cout << "收到响应: " << resp->message() << std::endl;}int main(int argc, char *argv[]){    // 1.构造Channel信道, 连接服务器    brpc::ChannelOptions options;    options.connect_timeout_ms = -1; // 连接等待超时时间, -1表示一直等待    options.timeout_ms = -1;         // RPC请求等待超时时间, -1表示一直等待    options.max_retry = 3;           // 请求重试次数    options.protocol = "baidu_std";  // 序列化协议, 默认使用baidu_std    brpc::Channel channel;    if (channel.Init("127.0.0.1:3366", &options) == -1)    {        std::cout << "初始化信道失败" << std::endl;        return -1;    }    // 2.构建EchoService_Stub对象, 用于进行RPC调用    SnowK::EchoService_Stub stub(&channel);    // 3.进行RPC调用    SnowK::EchoRequest req;    req.set_message("Hello SnowK");    brpc::Controller *cntl = new brpc::Controller();    SnowK::EchoResponse *resp = new SnowK::EchoResponse();    auto closure = google::protobuf::NewCallback(Callback, cntl, resp);        stub.Echo(cntl, &req, resp, closure); // 异步调用    std::cout << "异步调用结束" << std::endl;    std::this_thread::sleep_for(std::chrono::seconds(3));    return 0;}


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/200564.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1