44 _endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(
"0.0.0.0"), port);
60 std::weak_ptr<Router> weak = shared_from_this();
63 auto self = weak.lock();
65 self->ConnectSession(session);
69 auto self = weak.lock();
71 self->DisconnectSession(session);
75 [=](std::shared_ptr<carla::multigpu::Primary> session,
carla::Buffer buffer) {
76 auto self = weak.lock();
78 std::lock_guard<std::mutex> lock(self->_mutex);
79 auto prom =self->
_promises.find(session.get());
80 if (prom!= self->_promises.end()) {
81 log_info(
"Got data from secondary (with promise): ", buffer.size());
82 prom->second->set_value({session, std::move(buffer)});
83 self->_promises.erase(prom);
85 log_info(
"Got data from secondary (without promise): ", buffer.size());
91 _listener->Listen(on_open, on_close, on_response);
105 _pool.AsyncRun(worker_threads);
120 std::lock_guard<std::mutex> lock(
_mutex);
121 _sessions.emplace_back(std::move(session));
122 log_info(
"Connected secondary servers:",
_sessions.size());
135 std::lock_guard<std::mutex> lock(
_mutex);
140 log_info(
"Connected secondary servers:",
_sessions.size());
146 std::lock_guard<std::mutex> lock(
_mutex);
148 log_info(
"Disconnecting all secondary servers");
163 CommandHeader header;
165 header.size = buffer.size();
166 Buffer buf_header((uint8_t *) &header,
sizeof(header));
170 auto message = Primary::MakeMessage(view_header, view_data);
173 std::lock_guard<std::mutex> lock(
_mutex);
195 CommandHeader header;
197 header.size = buffer.size();
198 Buffer buf_header((uint8_t *) &header,
sizeof(header));
202 auto message = Primary::MakeMessage(view_header, view_data);
205 auto response = std::make_shared<std::promise<SessionInfo>>();
208 std::lock_guard<std::mutex> lock(
_mutex);
217 std::cout <<
"Updated promise into map: " <<
_promises.size() << std::endl;
222 return response->get_future();
235std::future<SessionInfo>
Router::WriteToOne(std::weak_ptr<Primary> server, MultiGPUCommand
id, Buffer &&buffer) {
237 CommandHeader header;
239 header.size = buffer.size();
240 Buffer buf_header((uint8_t *) &header,
sizeof(header));
244 auto message = Primary::MakeMessage(view_header, view_data);
248 auto response = std::make_shared<std::promise<SessionInfo>>();
252 std::lock_guard<std::mutex> lock(
_mutex);
253 auto s = server.lock();
258 return response->get_future();
265 std::lock_guard<std::mutex> lock(
_mutex);
272 return std::weak_ptr<Primary>();
#define DEBUG_ASSERT(predicate)
std::unordered_map< Primary *, std::shared_ptr< std::promise< SessionInfo > > > _promises
void Write(MultiGPUCommand id, Buffer &&buffer)
void AsyncRun(size_t worker_threads)
boost::asio::ip::tcp::endpoint _endpoint
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const
PrimaryCommands _commander
std::function< void(void)> _callback
void DisconnectSession(std::shared_ptr< Primary > session)
std::future< SessionInfo > WriteToOne(std::weak_ptr< Primary > server, MultiGPUCommand id, Buffer &&buffer)
std::shared_ptr< Listener > _listener
void ConnectSession(std::shared_ptr< Primary > session)
std::vector< std::shared_ptr< Primary > > _sessions
std::future< SessionInfo > WriteToNext(MultiGPUCommand id, Buffer &&buffer)
void SetNewConnectionCallback(std::function< void(void)>)
std::weak_ptr< Primary > GetNextServer()
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
std::function< void(std::shared_ptr< Primary >)> callback_function_type
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response