CARLA
 
载入中...
搜索中...
未找到
router.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
11
12namespace carla {
13namespace multigpu {
14
16 _next(0) { }
17
19 Stop();
20}
21
24 _listener->Stop();
25 _listener.reset();
26 _pool.Stop();
27}
28
29Router::Router(uint16_t port) :
30 _next(0) {
31
32 _endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("0.0.0.0"), port);
33 _listener = std::make_shared<carla::multigpu::Listener>(_pool.io_context(), _endpoint);
34}
35
37 // prepare server
38 std::weak_ptr<Router> weak = shared_from_this();
39
40 carla::multigpu::Listener::callback_function_type on_open = [=](std::shared_ptr<carla::multigpu::Primary> session) {
41 auto self = weak.lock();
42 if (!self) return;
43 self->ConnectSession(session);
44 };
45
46 carla::multigpu::Listener::callback_function_type on_close = [=](std::shared_ptr<carla::multigpu::Primary> session) {
47 auto self = weak.lock();
48 if (!self) return;
49 self->DisconnectSession(session);
50 };
51
53 [=](std::shared_ptr<carla::multigpu::Primary> session, carla::Buffer buffer) {
54 auto self = weak.lock();
55 if (!self) return;
56 std::lock_guard<std::mutex> lock(self->_mutex);
57 auto prom =self-> _promises.find(session.get());
58 if (prom != self->_promises.end()) {
59 log_info("Got data from secondary (with promise): ", buffer.size());
60 prom->second->set_value({session, std::move(buffer)});
61 self->_promises.erase(prom);
62 } else {
63 log_info("Got data from secondary (without promise): ", buffer.size());
64 }
65 };
66
67 _commander.set_router(shared_from_this());
68
69 _listener->Listen(on_open, on_close, on_response);
70 log_info("Listening at ", _endpoint);
71}
72
73void Router::SetNewConnectionCallback(std::function<void(void)> func)
74{
75 _callback = func;
76}
77
78void Router::AsyncRun(size_t worker_threads) {
79 _pool.AsyncRun(worker_threads);
80}
81
82boost::asio::ip::tcp::endpoint Router::GetLocalEndpoint() const {
83 return _endpoint;
84}
85
86void Router::ConnectSession(std::shared_ptr<Primary> session) {
87 DEBUG_ASSERT(session != nullptr);
88 std::lock_guard<std::mutex> lock(_mutex);
89 _sessions.emplace_back(std::move(session));
90 log_info("Connected secondary servers:", _sessions.size());
91 // run external callback for new connections
92 if (_callback)
93 _callback();
94}
95
96void Router::DisconnectSession(std::shared_ptr<Primary> session) {
97 DEBUG_ASSERT(session != nullptr);
98 std::lock_guard<std::mutex> lock(_mutex);
99 if (_sessions.size() == 0) return;
100 _sessions.erase(
101 std::remove(_sessions.begin(), _sessions.end(), session),
102 _sessions.end());
103 log_info("Connected secondary servers:", _sessions.size());
104}
105
107 std::lock_guard<std::mutex> lock(_mutex);
108 _sessions.clear();
109 log_info("Disconnecting all secondary servers");
110}
111
113 // define the command header
114 CommandHeader header;
115 header.id = id;
116 header.size = buffer.size();
117 Buffer buf_header((uint8_t *) &header, sizeof(header));
118
119 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
120 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
121 auto message = Primary::MakeMessage(view_header, view_data);
122
123 // write to multiple servers
124 std::lock_guard<std::mutex> lock(_mutex);
125 for (auto &s : _sessions) {
126 if (s != nullptr) {
127 s->Write(message);
128 }
129 }
130}
131
132std::future<SessionInfo> Router::WriteToNext(MultiGPUCommand id, Buffer &&buffer) {
133 // define the command header
134 CommandHeader header;
135 header.id = id;
136 header.size = buffer.size();
137 Buffer buf_header((uint8_t *) &header, sizeof(header));
138
139 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
140 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
141 auto message = Primary::MakeMessage(view_header, view_data);
142
143 // create the promise for the posible answer
144 auto response = std::make_shared<std::promise<SessionInfo>>();
145
146 // write to the next server only
147 std::lock_guard<std::mutex> lock(_mutex);
148 if (_next >= _sessions.size()) {
149 _next = 0;
150 }
151 if (_next < _sessions.size()) {
152 // std::cout << "Sending to session " << _next << std::endl;
153 auto s = _sessions[_next];
154 if (s != nullptr) {
155 _promises[s.get()] = response;
156 std::cout << "Updated promise into map: " << _promises.size() << std::endl;
157 s->Write(message);
158 }
159 }
160 ++_next;
161 return response->get_future();
162}
163
164std::future<SessionInfo> Router::WriteToOne(std::weak_ptr<Primary> server, MultiGPUCommand id, Buffer &&buffer) {
165 // define the command header
166 CommandHeader header;
167 header.id = id;
168 header.size = buffer.size();
169 Buffer buf_header((uint8_t *) &header, sizeof(header));
170
171 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
172 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
173 auto message = Primary::MakeMessage(view_header, view_data);
174
175 // create the promise for the posible answer
176 auto response = std::make_shared<std::promise<SessionInfo>>();
177
178 // write to the specific server only
179 std::lock_guard<std::mutex> lock(_mutex);
180 auto s = server.lock();
181 if (s) {
182 _promises[s.get()] = response;
183 s->Write(message);
184 }
185 return response->get_future();
186}
187
188std::weak_ptr<Primary> Router::GetNextServer() {
189 std::lock_guard<std::mutex> lock(_mutex);
190 if (_next >= _sessions.size()) {
191 _next = 0;
192 }
193 if (_next < _sessions.size()) {
194 return std::weak_ptr<Primary>(_sessions[_next]);
195 } else {
196 return std::weak_ptr<Primary>();
197 }
198}
199
200} // namespace multigpu
201} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:66
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:56
A piece of raw data.
void Stop()
Stop the ThreadPool and join all its threads.
Definition ThreadPool.h:76
void AsyncRun(size_t worker_threads)
Launch threads to run tasks asynchronously.
Definition ThreadPool.h:51
auto & io_context()
Return the underlying io_context.
Definition ThreadPool.h:35
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition listener.h:32
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition listener.h:33
void set_router(std::shared_ptr< Router > router)
static auto MakeMessage(Buffers... buffers)
Definition primary.h:54
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const
Definition router.cpp:82
PrimaryCommands _commander
Definition router.h:77
void Write(MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:112
std::vector< std::shared_ptr< Primary > > _sessions
Definition router.h:73
std::future< SessionInfo > WriteToNext(MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:132
std::function< void(void)> _callback
Definition router.h:78
std::unordered_map< Primary *, std::shared_ptr< std::promise< SessionInfo > > > _promises
Definition router.h:76
std::mutex _mutex
Definition router.h:70
std::shared_ptr< Listener > _listener
Definition router.h:74
void ConnectSession(std::shared_ptr< Primary > session)
Definition router.cpp:86
std::future< SessionInfo > WriteToOne(std::weak_ptr< Primary > server, MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:164
boost::asio::ip::tcp::endpoint _endpoint
Definition router.h:72
void DisconnectSession(std::shared_ptr< Primary > session)
Definition router.cpp:96
void AsyncRun(size_t worker_threads)
Definition router.cpp:78
void SetNewConnectionCallback(std::function< void(void)>)
Definition router.cpp:73
std::weak_ptr< Primary > GetNextServer()
Definition router.cpp:188
This file contains definitions of common data structures used in traffic manager.
Definition Carla.cpp:133
static void log_info(Args &&... args)
Definition Logging.h:82