28#include <boost/asio/connect.hpp>
31#include <boost/asio/read.hpp>
34#include <boost/asio/write.hpp>
37#include <boost/asio/post.hpp>
40#include <boost/asio/bind_executor.hpp>
49 boost::asio::ip::tcp::endpoint ep,
52 _socket(_pool.io_context()),
54 _strand(_pool.io_context()),
55 _connection_timer(_pool.io_context()),
67 _socket(_pool.io_context()),
68 _strand(_pool.io_context()),
69 _connection_timer(_pool.io_context()),
72 boost::asio::ip::address ip_address = boost::asio::ip::address::from_string(ip);
73 _endpoint = boost::asio::ip::tcp::endpoint(ip_address, port);
86 std::weak_ptr<Secondary> weak = shared_from_this();
87 boost::asio::post(
_strand, [weak]() {
88 auto self = weak.lock();
95 if (self->_socket.is_open()) {
96 self->_socket.close();
99 auto handle_connect = [weak](boost::system::error_code ec) {
100 auto self = weak.lock();
103 log_error(
"secondary server: connection failed:", ec.message());
115 self->_socket.set_option(boost::asio::ip::tcp::no_delay(
true));
117 log_info(
"secondary server: connected to ", self->_endpoint);
122 self->_socket.async_connect(self->_endpoint, boost::asio::bind_executor(self->_strand, handle_connect));
126 void Secondary::Stop() {
127 _connection_timer.cancel();
128 std::weak_ptr<Secondary> weak = shared_from_this();
129 boost::asio::post(_strand, [weak]() {
130 auto self = weak.lock();
133 if (self->_socket.is_open()) {
134 self->_socket.close();
139 void Secondary::Reconnect() {
140 std::weak_ptr<Secondary> weak = shared_from_this();
142 _connection_timer.async_wait([weak](boost::system::error_code ec) {
143 auto self = weak.lock();
151 void Secondary::AsyncRun(
size_t worker_threads) {
152 _pool.AsyncRun(worker_threads);
155 void Secondary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
158 std::weak_ptr<Secondary> weak = shared_from_this();
159 boost::asio::post(_strand, [=]() {
160 auto self = weak.lock();
162 if (!self->_socket.is_open()) {
166 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
167 auto self = weak.lock();
170 log_error(
"error sending data: ", ec.message());
175 boost::asio::async_write(
177 message->GetBufferSequence(),
178 boost::asio::bind_executor(self->_strand, handle_sent));
186 auto message = Secondary::MakeMessage(view_data);
190 std::weak_ptr<Secondary> weak = shared_from_this();
191 boost::asio::post(_strand, [=]() {
192 auto self = weak.lock();
194 if (!self->_socket.is_open()) {
199 auto handle_sent = [weak, message](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
200 auto self = weak.lock();
203 log_error(
"error sending data: ", ec.message());
208 boost::asio::async_write(
210 message->GetBufferSequence(),
211 boost::asio::bind_executor(self->_strand, handle_sent));
215 void Secondary::Write(std::string text) {
216 std::weak_ptr<Secondary> weak = shared_from_this();
217 boost::asio::post(_strand, [=]() {
218 auto self = weak.lock();
220 if (!self->_socket.is_open()) {
225 auto handle_sent = [weak](
const boost::system::error_code &ec,
size_t DEBUG_ONLY(bytes)) {
226 auto self = weak.lock();
229 log_error(
"error sending data: ", ec.message());
235 int this_size = text.size();
236 boost::asio::async_write(
238 boost::asio::buffer(&this_size,
sizeof(this_size)),
239 boost::asio::bind_executor(self->_strand, handle_sent));
242 boost::asio::async_write(
244 boost::asio::buffer(text.c_str(), text.size()),
245 boost::asio::bind_executor(self->_strand, handle_sent));
250 void Secondary::ReadData() {
251 std::weak_ptr<Secondary> weak = shared_from_this();
252 boost::asio::post(_strand, [weak]() {
253 auto self = weak.lock();
259 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
262 auto handle_read_data = [weak, message](boost::system::error_code ec,
size_t DEBUG_ONLY(bytes)) {
263 auto self = weak.lock();
269 self->GetCommander().process_command(message->pop());
273 log_error(
"secondary server: failed to read data: ", ec.message());
279 auto handle_read_header = [weak, message, handle_read_data](
280 boost::system::error_code ec,
282 auto self = weak.lock();
284 if (!ec && (message->size() > 0u)) {
290 boost::asio::async_read(
293 boost::asio::bind_executor(self->_strand, handle_read_data));
294 }
else if (!self->_done) {
295 log_error(
"secondary server: failed to read header: ", ec.message());
303 boost::asio::async_read(
305 message->size_as_buffer(),
306 boost::asio::bind_executor(self->_strand, handle_read_header));
if(!Actor||!Actor->Destroy())
#define DEBUG_ASSERT(predicate)
#define DEBUG_ASSERT_EQ(lhs, rhs)
#define DEBUG_ASSERT_NE(lhs, rhs)
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
std::function< void(MultiGPUCommand, carla::Buffer)> callback_type
void set_secondary(std::shared_ptr< Secondary > secondary)
void set_callback(callback_type callback)
boost::asio::ip::tcp::endpoint _endpoint
Secondary(boost::asio::ip::tcp::endpoint ep, SecondaryCommands::callback_type callback)
boost::asio::io_context::strand _strand
SecondaryCommands _commander
void AsyncRun(size_t worker_threads)
static time_duration seconds(size_t timeout)
uint32_t message_size_type
消息大小的类型定义。
static void log_error(Args &&... args)
static void log_info(Args &&... args)