CARLA
 
载入中...
搜索中...
未找到
router.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
10
11namespace carla {
12namespace multigpu {
13
14// Router类的默认构造函数,初始化成员变量_next为0,可能用于后续标识下一个要处理的相关元素(比如连接等情况)
16 _next(0) { }
17
18// Router类的析构函数,用于在对象销毁时进行资源清理等操作
19// 调用Stop函数来停止相关的监听、释放资源等操作
21 Stop();
22}
23
24// 停止路由器(Router)相关操作的函数,主要执行以下清理和停止操作:
25// 1. 调用ClearSessions函数清除所有活动的会话。
26// 2. 调用_listener的Stop函数停止监听器,防止接受新连接。
27// 3. 通过_reset释放_listener对象的内存,避免内存泄漏等问题。
28// 4. 调用_pool的Stop函数停止相关的线程池以释放其所占用的资源。
30 ClearSessions(); // 清除所有活动的会话。
31 _listener->Stop(); // 停止监听器,防止接受新连接
32 _listener.reset(); // 释放监听器对象的内存。
33 _pool.Stop(); // 停止相关的线程池以释放资源。
34}
35
36// Router类的构造函数,接受一个端口号(port)作为参数,用于初始化监听的端口相关设置等
37// 参数port: 要监听的网络端口号,用于接收外部连接
38// 首先初始化成员变量_next为0,然后创建一个TCP端点(_endpoint),使其监听所有网络接口(0.0.0.0)上的指定端口,
39// 接着初始化_listener为指向Listener对象的共享指针,该Listener对象用于处理传入的连接,传入线程池的I/O上下文和创建好的端点信息。
40Router::Router(uint16_t port) :
41 _next(0) {
42
43 // 创建一个TCP端点,监听所有网络接口(0.0.0.0)上的指定端口
44 _endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string("0.0.0.0"), port);
45 // 初始化_listener为指向Listener对象的共享指针,用于处理传入连接。
46 _listener = std::make_shared<carla::multigpu::Listener>(_pool.io_context(), _endpoint);
47}
48
49// 设置路由器(Router)回调函数的函数,主要用于设置与连接相关的回调逻辑,包括连接打开、关闭以及收到响应时的处理逻辑等
50// 具体操作如下:
51// 1. 首先将当前Router对象的弱指针(weak)保存起来,方便在后续的回调函数中判断对象是否仍然有效(避免悬空指针等问题)。
52// 2. 设置连接打开(on_open)回调函数,当有新的连接建立时(对应的是与Primary类型的会话连接),会调用ConnectSession函数来处理该新连接。
53// 3. 设置连接关闭(on_close)回调函数,当会话连接关闭时,会调用DisconnectSession函数进行相应的清理等操作。
54// 4. 设置收到响应(on_response)回调函数,当从辅助服务器收到数据时,会根据是否有对应的承诺(promise)来进行不同的处理,
55// 如果有承诺,则将收到的数据设置到对应的承诺中(表示异步操作完成并返回结果),然后从承诺列表中移除该承诺;如果没有承诺,则仅记录收到数据的日志信息。
56// 5. 将自身(Router对象)设置到_commander对象中(可能是用于后续的命令相关操作,通过共享指针共享自身给_commander使用)。
57// 6. 调用_listener的Listen函数,传入上述设置好的回调函数,开始监听连接相关的事件,并记录开始监听的日志信息(包含监听的端点信息)。
59 // 准备服务器
60 std::weak_ptr<Router> weak = shared_from_this();
61
62 carla::multigpu::Listener::callback_function_type on_open = [=](std::shared_ptr<carla::multigpu::Primary> session) {
63 auto self = weak.lock();
64 if (!self) return;
65 self->ConnectSession(session);
66 };
67
68 carla::multigpu::Listener::callback_function_type on_close = [=](std::shared_ptr<carla::multigpu::Primary> session) {
69 auto self = weak.lock();
70 if (!self) return;
71 self->DisconnectSession(session);
72 };
73
75 [=](std::shared_ptr<carla::multigpu::Primary> session, carla::Buffer buffer) {
76 auto self = weak.lock();
77 if (!self) return;
78 std::lock_guard<std::mutex> lock(self->_mutex);
79 auto prom =self-> _promises.find(session.get());
80 if (prom!= self->_promises.end()) {
81 log_info("Got data from secondary (with promise): ", buffer.size());
82 prom->second->set_value({session, std::move(buffer)});
83 self->_promises.erase(prom);
84 } else {
85 log_info("Got data from secondary (without promise): ", buffer.size());
86 }
87 };
88
89 _commander.set_router(shared_from_this());
90
91 _listener->Listen(on_open, on_close, on_response);
92 log_info("Listening at ", _endpoint);
93}
94
95// 设置新连接回调函数的函数,外部可以传入一个函数对象(std::function<void(void)>类型),该函数会在有新连接建立时被调用
96// 参数func: 外部传入的函数对象,无参数,无返回值,用于定义新连接建立时的自定义操作逻辑
97void Router::SetNewConnectionCallback(std::function<void(void)> func)
98{
99 _callback = func;
100}
101
102// 异步运行路由器(Router)相关操作的函数,启动线程池以异步处理相关任务
103// 参数worker_threads: 指定线程池中的工作线程数量,用于控制并发处理能力
104void Router::AsyncRun(size_t worker_threads) {
105 _pool.AsyncRun(worker_threads);
106}
107
108// 获取路由器(Router)本地监听端点信息的函数,返回其监听的TCP端点对象(包含IP地址和端口等信息)
109boost::asio::ip::tcp::endpoint Router::GetLocalEndpoint() const {
110 return _endpoint;
111}
112
113// 处理新连接建立的函数,将新的会话(Primary类型的共享指针)添加到活动会话列表(_sessions)中,并记录相关日志信息
114// 参数session: 指向新建立的Primary类型会话的共享指针,表示新连接的会话对象
115// 首先通过DEBUG_ASSERT断言传入的会话指针不为空,然后使用互斥锁(_mutex)保护共享资源(_sessions列表),
116// 将传入的会话添加到_sessions列表末尾,记录当前连接的辅助服务器数量的日志信息,
117// 最后如果设置了新连接的外部回调函数(_callback),则调用该回调函数执行自定义的新连接操作逻辑。
118void Router::ConnectSession(std::shared_ptr<Primary> session) {
119 DEBUG_ASSERT(session!= nullptr);
120 std::lock_guard<std::mutex> lock(_mutex);
121 _sessions.emplace_back(std::move(session));
122 log_info("Connected secondary servers:", _sessions.size());
123 // 对新连接运行外部回调
124 if (_callback)
125 _callback();
126}
127
128// 处理会话连接断开的函数,从活动会话列表(_sessions)中移除指定的会话,并记录相关日志信息
129// 参数session: 指向要断开的Primary类型会话的共享指针,表示要移除的会话对象
130// 同样先通过DEBUG_ASSERT断言传入的会话指针不为空,然后使用互斥锁(_mutex)保护共享资源(_sessions列表),
131// 判断如果活动会话列表不为空,则通过std::remove和erase结合的方式从列表中移除指定的会话对象,
132// 最后记录当前连接的辅助服务器数量的日志信息。
133void Router::DisconnectSession(std::shared_ptr<Primary> session) {
134 DEBUG_ASSERT(session!= nullptr);
135 std::lock_guard<std::mutex> lock(_mutex);
136 if (_sessions.size() == 0) return;
137 _sessions.erase(
138 std::remove(_sessions.begin(), _sessions.end(), session),
139 _sessions.end());
140 log_info("Connected secondary servers:", _sessions.size());
141}
142
143// 清除所有活动会话的函数,通过互斥锁(_mutex)保护共享资源(_sessions列表),直接清空_sessions列表,
144// 并记录断开所有辅助服务器连接的日志信息,表示所有活动会话都已被清理。
146 std::lock_guard<std::mutex> lock(_mutex);
147 _sessions.clear();
148 log_info("Disconnecting all secondary servers");
149}
150
151// 向所有活动会话(辅助服务器)广播写入消息的函数,消息包含命令头和数据缓冲区两部分内容
152// 参数id: 表示要发送的MultiGPUCommand类型的命令ID,用于标识消息的类型或用途。
153// 参数buffer: 要发送的数据缓冲区(使用右值引用,避免不必要的拷贝),包含实际要发送的数据内容。
154// 具体操作如下:
155// 1. 首先创建一个命令头(CommandHeader)结构体对象header,设置其命令ID(id)和数据大小(buffer.size()),
156// 然后将该命令头结构体转换为Buffer类型(buf_header),方便后续处理。
157// 2. 通过BufferView将命令头和数据缓冲区分别创建视图(view_header和view_data),
158// 再利用Primary的MakeMessage函数将这两个视图组合成一个完整的消息(message)。
159// 3. 使用互斥锁(_mutex)保护共享资源(_sessions列表),遍历所有活动会话(_sessions),
160// 对于每个不为空的会话对象,调用其Write函数将消息发送出去,实现向所有辅助服务器广播消息的功能。
161void Router::Write(MultiGPUCommand id, Buffer &&buffer) {
162 // 定义命令头
163 CommandHeader header;
164 header.id = id;
165 header.size = buffer.size();
166 Buffer buf_header((uint8_t *) &header, sizeof(header));
167
168 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
169 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
170 auto message = Primary::MakeMessage(view_header, view_data);
171
172 // 写入多个服务器
173 std::lock_guard<std::mutex> lock(_mutex);
174 for (auto &s : _sessions) {
175 if (s!= nullptr) {
176 s->Write(message);
177 }
178 }
179}
180
181// 向特定的下一个活动会话(辅助服务器)写入消息,并返回一个表示异步操作结果的未来对象(std::future),用于获取后续的响应信息
182// 参数id: 表示要发送的MultiGPUCommand类型的命令ID,用于标识消息的类型或用途。
183// 参数buffer: 要发送的数据缓冲区(使用右值引用,避免不必要的拷贝),包含实际要发送的数据内容。
184// 具体操作如下:
185// 1. 类似Write函数,先创建命令头结构体对象header,设置相关属性后转换为Buffer类型(buf_header),
186// 再通过BufferView创建命令头和数据的视图(view_header和view_data),并组合成完整消息(message)。
187// 2. 创建一个共享指针指向的std::promise对象(response),用于后续异步操作完成时设置返回值(实现类似回调的功能)。
188// 3. 使用互斥锁(_mutex)保护共享资源(_sessions列表),通过_next变量确定要发送的下一个会话索引,
189// 如果索引超出会话列表大小,则重置为0;如果索引合法,则获取对应的会话对象(s),
190// 如果会话对象不为空,则将当前创建的承诺(response)添加到_promises映射中(以会话对象的指针为键,方便后续根据会话查找对应的承诺),
191// 然后调用该会话对象的Write函数发送消息,并递增_next变量指向下一个可能的会话索引。
192// 4. 最后返回response的未来对象(response->get_future()),外部可以通过该未来对象获取异步操作的最终结果(比如等待响应并获取返回的数据等)。
193std::future<SessionInfo> Router::WriteToNext(MultiGPUCommand id, Buffer &&buffer) {
194 // 定义命令头
195 CommandHeader header;
196 header.id = id;
197 header.size = buffer.size();
198 Buffer buf_header((uint8_t *) &header, sizeof(header));
199
200 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
201 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
202 auto message = Primary::MakeMessage(view_header, view_data);
203
204 // 为可能的答案创建承诺自动响应
205 auto response = std::make_shared<std::promise<SessionInfo>>();
206
207 // 只写特定服务器
208 std::lock_guard<std::mutex> lock(_mutex);
209 if (_next >= _sessions.size()) {
210 _next = 0;
211 }
212 if (_next < _sessions.size()) {
213 // std::cout << "Sending to session " << _next << std::endl;
214 auto s = _sessions[_next];
215 if (s!= nullptr) {
216 _promises[s.get()] = response;
217 std::cout << "Updated promise into map: " << _promises.size() << std::endl;
218 s->Write(message);
219 }
220 }
221 ++_next;
222 return response->get_future();
223}
224
225// 向指定的活动会话(辅助服务器)写入消息,并返回一个表示异步操作结果的未来对象(std::future),用于获取后续的响应信息
226// 参数server: 指向要发送消息的Primary类型会话的弱指针(std::weak_ptr),通过lock操作可获取对应的强指针来访问会话对象。
227// 参数id: 表示要发送的MultiGPUCommand类型的命令ID,用于标识消息的类型或用途。
228// 参数buffer: 要发送的数据缓冲区(使用右值引用,避免不必要的拷贝),包含实际要发送的数据内容。
229// 具体操作如下:
230// 1. 同样先创建命令头相关内容并组合成消息(与前面类似的步骤)。
231// 2. 创建一个共享指针指向的std::promise对象(response)用于后续设置返回值。
232// 3. 使用互斥锁(_mutex)保护共享资源,通过server弱指针获取对应的强指针(s),如果获取成功(表示会话对象有效),
233// 则将当前创建的承诺(response)添加到_promises映射中,并调用该会话对象的Write函数发送消息。
234// 4. 最后返回response的未来对象,供外部获取异步操作的结果。
235std::future<SessionInfo> Router::WriteToOne(std::weak_ptr<Primary> server, MultiGPUCommand id, Buffer &&buffer) {
236 // 定义命令头
237 CommandHeader header;
238 header.id = id;
239 header.size = buffer.size();
240 Buffer buf_header((uint8_t *) &header, sizeof(header));
241
242 auto view_header = carla::BufferView::CreateFrom(std::move(buf_header));
243 auto view_data = carla::BufferView::CreateFrom(std::move(buffer));
244 auto message = Primary::MakeMessage(view_header, view_data);
245
246 // 为可能的答案创建承诺自动响应
247
248 auto response = std::make_shared<std::promise<SessionInfo>>();
249
250 // 只写特定服务器
251
252 std::lock_guard<std::mutex> lock(_mutex);
253 auto s = server.lock();
254 if (s) {
255 _promises[s.get()] = response;
256 s->Write(message);
257 }
258 return response->get_future();
259}
260
261// 获取下一个活动会话(辅助服务器)的弱指针(std::weak_ptr)的函数,通过_next变量来确定下一个要返回的会话索引,
262// 如果索引超出会话列表大小,则重置为0;如果索引合法,则返回对应会话的弱指针,方便外部以弱引用的方式访问会话对象,
263// 如果索引不合法(会话列表为空等情况),则返回一个空的弱指针。
264std::weak_ptr<Primary> Router::GetNextServer() {
265 std::lock_guard<std::mutex> lock(_mutex);
266 if (_next >= _sessions.size()) {
267 _next = 0;
268 }
269 if (_next < _sessions.size()) {
270 return std::weak_ptr<Primary>(_sessions[_next]);
271 } else {
272 return std::weak_ptr<Primary>();
273 }
274}
275
276} // 名称空间 multigpu
277} // 名称空间 carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
std::unordered_map< Primary *, std::shared_ptr< std::promise< SessionInfo > > > _promises
Definition router.h:90
void Write(MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:161
void AsyncRun(size_t worker_threads)
Definition router.cpp:104
ThreadPool _pool
Definition router.h:85
~Router()
Definition router.cpp:20
void SetCallbacks()
Definition router.cpp:58
boost::asio::ip::tcp::endpoint _endpoint
Definition router.h:86
boost::asio::ip::tcp::endpoint GetLocalEndpoint() const
Definition router.cpp:109
PrimaryCommands _commander
Definition router.h:91
std::function< void(void)> _callback
Definition router.h:92
void DisconnectSession(std::shared_ptr< Primary > session)
Definition router.cpp:133
std::future< SessionInfo > WriteToOne(std::weak_ptr< Primary > server, MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:235
Router(void)
Definition router.cpp:15
std::mutex _mutex
Definition router.h:84
void ClearSessions()
Definition router.cpp:145
void Stop()
Definition router.cpp:29
std::shared_ptr< Listener > _listener
Definition router.h:88
void ConnectSession(std::shared_ptr< Primary > session)
Definition router.cpp:118
std::vector< std::shared_ptr< Primary > > _sessions
Definition router.h:87
uint32_t _next
Definition router.h:89
std::future< SessionInfo > WriteToNext(MultiGPUCommand id, Buffer &&buffer)
Definition router.cpp:193
void SetNewConnectionCallback(std::function< void(void)>)
Definition router.cpp:97
std::weak_ptr< Primary > GetNextServer()
Definition router.cpp:264
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:60
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition listener.h:32
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition listener.h:33
CARLA模拟器的主命名空间。
Definition Carla.cpp:139