CARLA
 
载入中...
搜索中...
未找到
primary.cpp
浏览该文件的文档.
1// Copyright (c) 2022 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// 主服务器
5//
6// This work is licensed under the terms of the MIT license.
7// For a copy, see <https://opensource.org/licenses/MIT>.
8/// @brief 包含CARLA多GPU支持和网络通信相关的头文件,以及Boost.Asio和C++标准库的部分功能。
9#include "carla/multigpu/primary.h"///< 包含CARLA多GPU支持的主头文件,可能定义了多GPU环境下的主要接口或类。
10
11#include "carla/Debug.h"///< 包含CARLA的调试功能,可能包括断言、日志记录级别等。
12#include "carla/Logging.h"///< 包含CARLA的日志记录功能,可能定义了日志记录器、日志级别和日志消息格式等。
13#include "carla/multigpu/incomingMessage.h"///< 包含CARLA多GPU支持中接收消息的相关类和函数。
14#include "carla/multigpu/listener.h"///< 包含CARLA多GPU支持中监听网络通信的相关类和函数。
15/// @brief 包含Boost.Asio库的头文件,用于网络编程和异步I/O操作。
16#include <boost/asio/read.hpp>///< Boost.Asio库中的读操作函数,用于从网络套接字读取数据。
17#include <boost/asio/write.hpp> ///< Boost.Asio库中的写操作函数,用于向网络套接字写入数据。
18#include <boost/asio/bind_executor.hpp>///< Boost.Asio库中的绑定执行器函数,用于将异步操作与特定的执行器(如I/O上下文)关联。
19#include <boost/asio/post.hpp> ///< Boost.Asio库中的post函数,用于将任务异步地发布到执行器上执行。
20/// @brief 包含C++标准库的头文件,用于多线程编程和原子操作。
21#include <atomic>///< C++标准库中的原子操作类,用于实现线程安全的计数器、标志位等。
22#include <thread>///< C++标准库中的线程类和相关函数,用于创建和管理线程。
23
24namespace carla {
25namespace multigpu {
26 /**
27 * @namespace carla::multigpu
28 * @brief CARLA模拟器中处理多GPU通信的命名空间。
29 */
30
31 /**
32 * @brief 用于生成唯一会话ID的静态原子计数器。
33 *
34 * 这是一个线程安全的计数器,用于为每个新的Primary会话实例分配一个唯一的会话ID。
35 */
36 static std::atomic_size_t SESSION_COUNTER{0u};
37 /**
38 * @class Primary
39 * @brief 管理TCP会话的类,用于CARLA的多GPU通信。
40 */
41 /**
42 * @brief Primary类的构造函数。
43 *
44 * @param io_context 引用Boost.Asio的IO上下文,用于异步通信。
45 * @param timeout 会话的超时时间。
46 * @param server 对Listener对象的引用,用于处理传入的连接和消息。
47 *
48 * 构造函数初始化Primary类的成员变量,并设置性能分析器(如果使用了LIBCARLA_INITIALIZE_LIFETIME_PROFILER宏)。
49 * 它还会为当前会话分配一个唯一的会话ID,并创建一个新的套接字、截止时间和执行器绑定。
50 */
52 boost::asio::io_context &io_context,
53 const time_duration timeout,
54 Listener &server)
56 std::string("tcp multigpu server session ") + std::to_string(SESSION_COUNTER)),
57 _server(server),
58 _session_id(SESSION_COUNTER++),
59 _socket(io_context),
60 _timeout(timeout),
61 _deadline(io_context),
62 _strand(io_context),
63 _buffer_pool(std::make_shared<BufferPool>()) {}
64 /**
65 * @brief Primary类的析构函数实现。
66 *
67 * 如果套接字仍然打开,则先调用shutdown方法关闭套接字的读写操作,然后调用close方法关闭套接字。
68 */
70 if (_socket.is_open()) {
71 boost::system::error_code ec;
72 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec);
73 _socket.close();
74 }
75 }
76
77 /// 启动会话并在成功读取流 id 后调用 @a on_opened,会话关闭后调用 @a on_closed。
82 DEBUG_ASSERT(on_opened && on_closed);// 确保回调函数不为空。
83
84 // 这强制不使用 Nagle 算法。将 Linux 上的同步模式速度提高了约 3 倍。
85 const boost::asio::ip::tcp::no_delay option(true);
86 _socket.set_option(option);
87
88 // 保存回调函数的引用。
89 _on_closed = std::move(on_closed);
90 _on_response = std::move(on_response);
91 // 调用`on_opened`回调,传入当前`Primary`对象的共享指针。
92 on_opened(shared_from_this());
93 // 开始读取数据
94 ReadData();
95 }
96
97 /// 将一些数据写入套接字。
98 void Primary::Write(std::shared_ptr<const carla::streaming::detail::tcp::Message> message) {
99 DEBUG_ASSERT(message != nullptr);// 确保消息不为空。
100 DEBUG_ASSERT(!message->empty());// 确保消息内容不为空。
101 // 创建一个当前`Primary`对象的弱引用,以避免循环引用。
102 std::weak_ptr<Primary> weak = shared_from_this();
103 // 在IO上下文的执行器上异步执行任务。
104 boost::asio::post(_strand, [=]() {
105 // 尝试获取当前`Primary`对象的强引用。
106 auto self = weak.lock();
107 if (!self) return;// 如果对象已被销毁,则直接返回。
108 // 检查套接字是否仍然打开。
109 if (!self->_socket.is_open()) {
110 return;// 如果套接字已关闭,则不执行写入操作。
111 }
112 // 定义一个回调函数来处理写入完成后的结果。
113 auto handle_sent = [weak, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
114 // 尝试获取当前`Primary`对象的强引用。
115 auto self = weak.lock();
116 if (!self) return;// 如果对象已被销毁,则直接返回。
117 // 检查是否发生错误。
118 if (ec) {
119 // 记录错误日志,并立即关闭会话。
120 log_error("session ", self->_session_id, ": error sending data: ", ec.message());
121 self->CloseNow(ec);
122 } else {
123 // DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
124 }
125 };
126 // 设置超时时间。
127 self->_deadline.expires_from_now(self->_timeout);
128 // 异步写入数据到套接字,并在写入完成后调用`handle_sent`回调函数。
129 boost::asio::async_write(
130 self->_socket,
131 message->GetBufferSequence(),
132 boost::asio::bind_executor(self->_strand, handle_sent));
133 });
134 }
135 /**
136 * @brief 异步地将文本数据写入套接字。
137 *
138 * 此方法将给定的文本数据异步写入TCP套接字。首先发送数据的大小(以字节为单位),
139 * 然后发送实际的文本数据。如果套接字未打开或对象已被销毁,则不执行任何操作。
140 *
141 * @param text 要写入套接字的文本数据。
142 */
143 void Primary::Write(std::string text) {
144 // 创建一个当前对象的弱引用,以避免循环引用。
145 std::weak_ptr<Primary> weak = shared_from_this();
146 // 在IO上下文的执行器上异步执行任务。
147 boost::asio::post(_strand, [=]() {
148 // 尝试获取当前对象的强引用。
149 auto self = weak.lock();
150 if (!self) return; // 如果对象已被销毁,则直接返回。
151 // 检查套接字是否仍然打开。
152 if (!self->_socket.is_open()) {
153 return;// 如果套接字已关闭,则不执行写入操作。
154 }
155
156 // 发送的第一个大小缓冲区
157 self->_deadline.expires_from_now(self->_timeout);
158 int this_size = text.size();
159 boost::asio::async_write(
160 self->_socket,
161 boost::asio::buffer(&this_size, sizeof(this_size)),
162 // 发送完成后不执行任何操作(占位回调)。
163 boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
164 // 发送实际的文本数据。
165 boost::asio::async_write(
166 self->_socket,
167 boost::asio::buffer(text.c_str(), text.size()),
168 // 发送完成后不执行任何操作(占位回调)。
169 boost::asio::bind_executor(self->_strand, [](const boost::system::error_code &, size_t){ }));
170 });
171 }
172 /**
173 * @brief 异步地读取套接字数据。
174 *
175 * 此方法异步地从TCP套接字读取数据。它首先尝试获取当前对象的强引用,
176 * 如果成功,则分配一个缓冲区来接收数据,并注册一个回调函数来处理读取操作的结果。
177 * 如果读取成功,它将调用`_on_response`回调函数,并递归地调用自己以继续读取数据。
178 * 如果读取失败,则记录错误日志并重新开始读取过程。
179 */
181 // 创建一个当前对象的弱引用,以避免循环引用。
182 std::weak_ptr<Primary> weak = shared_from_this();
183 // 在IO上下文的执行器上异步执行任务。
184 boost::asio::post(_strand, [weak]() {
185 // 尝试获取当前对象的强引用。
186 auto self = weak.lock();
187 if (!self) return;// 如果对象已被销毁,则直接返回。
188 // 分配一个缓冲区来接收数据。
189 auto message = std::make_shared<IncomingMessage>(self->_buffer_pool->Pop());
190 // 定义回调函数来处理读取操作的结果。
191 auto handle_read_data = [weak, message](boost::system::error_code ec, size_t DEBUG_ONLY(bytes)) {
192 // 尝试获取当前对象的强引用。
193 auto self = weak.lock();
194 if (!self) return;// 如果对象已被销毁,则直接返回。
195 // 检查是否读取成功。
196 if (!ec) {
197 // 验证读取的字节数是否与缓冲区大小一致,并且不为0。
198 DEBUG_ASSERT_EQ(bytes, message->size());
199 DEBUG_ASSERT_NE(bytes, 0u);
200 // 将缓冲区中的数据移交给回调函数,并开始读取下一块数据。
201 self->_on_response(self, message->pop());
202 std::cout << "Getting data on listener\n";
203 self->ReadData(); // 递归调用以继续读取数据。
204 } else {
205 // 如果读取失败,则记录错误日志并重新开始读取过程。
206 log_error("primary server: failed to read data: ", ec.message());
207 }
208 };
209 /**
210 * @brief 异步读取套接字头部信息(通常包含后续数据的大小)的回调函数。
211 *
212 * 此回调函数用于处理从TCP套接字异步读取的头部信息。如果成功读取且消息大小大于0,
213 * 则会分配相应大小的缓冲区,并启动异步读取数据的操作。如果读取失败或消息大小为0,
214 * 则会记录错误日志并关闭连接。
215 *
216 * @param ec 读取操作的结果代码。如果为0,表示读取成功;否则表示读取失败。
217 * @param bytes 读取的字节数(仅在调试模式下使用)。
218 */
219 auto handle_read_header = [weak, message, handle_read_data](
220 boost::system::error_code ec,
221 size_t DEBUG_ONLY(bytes)) {
222 auto self = weak.lock();
223 if (!self) return;
224 if (!ec && (message->size() > 0u)) {
225 // 既然已经知道了即将到来的缓冲区的大小,我们就可以分配缓冲区并开始存储数据。
226 boost::asio::async_read(
227 self->_socket,
228 message->buffer(),
229 boost::asio::bind_executor(self->_strand, handle_read_data));
230 } else {
231 if (ec) {
232 log_error("Primary server: failed to read header: ", ec.message());
233 }
234 // Connect(); // 此处可能需要根据实际情况决定是否重连
235 self->Close();
236 }
237 };
238
239 // Read the size of the buffer that is coming.
240 boost::asio::async_read(
241 self->_socket,
242 message->size_as_buffer(),
243 boost::asio::bind_executor(self->_strand, handle_read_header));
244 });
245 }
246 /**
247 * @brief 异步读取套接字头部信息(即数据缓冲区的大小)。
248 *
249 * 此方法会启动一个异步读取操作,从TCP套接字中读取数据缓冲区的大小。
250 * 读取完成后,会调用`handle_read_header`回调函数来处理读取结果。
251 */
252 // 接下来的代码段是前面代码的一部分,为了完整性而保留在此处,但注释已添加到上面的回调函数中。
253 // ...(省略了部分代码,具体为async_read的调用)
254
255 /**
256 * @brief 关闭连接并释放资源。
257 *
258 * 此方法会启动一个异步任务,在该任务中尝试获取当前对象的强引用。
259 * 如果成功获取,则调用`CloseNow`方法来关闭连接并释放资源。
260 */
262 std::weak_ptr<Primary> weak = shared_from_this();
263 boost::asio::post(_strand, [weak]() {
264 auto self = weak.lock();
265 if (!self) return;
266 self->CloseNow();
267 });
268 }
269 /**
270 * @brief 启动定时器以监控连接是否超时。
271 *
272 * 此方法会检查定时器是否已经过期。如果已过期,则记录调试信息并关闭连接。
273 * 如果未过期,则启动一个异步等待操作,等待定时器超时。超时后,会递归调用`StartTimer`
274 * 方法以继续监控,或者在遇到错误时记录错误日志。
275 */
277 if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
278 log_debug("session ", _session_id, " time out");
279 Close();
280 } else {
281 std::weak_ptr<Primary> weak = shared_from_this();
282 _deadline.async_wait([weak](boost::system::error_code ec) {
283 auto self = weak.lock();
284 if (!self) return;
285 if (!ec) {
286 self->StartTimer();
287 } else {
288 log_error("session ", self->_session_id, " timed out error: ", ec.message());
289 }
290 });
291 }
292 }
293 /// \brief 立即关闭连接并处理相关资源。
294///
295/// 此方法用于在接收到关闭指令或错误时,立即取消任何挂起的操作,关闭套接字,
296/// 并通知相关的关闭事件处理函数。
297///
298/// \param ec 错误代码,指示关闭操作是否由于错误而触发。
299 void Primary::CloseNow(boost::system::error_code ec) {
300 /// \details 取消所有挂起的定时器操作。
301 _deadline.cancel();
302 /// \details 如果没有错误代码(即正常关闭),则检查套接字是否仍然打开
303 if (!ec)
304 {
305 /// \details 如果套接字仍然打开,则先进行双向关闭操作,然后关闭套接字
306 if (_socket.is_open()) {
307 boost::system::error_code ec2;// 用于捕获shutdown操作的错误代码
308 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
309 _socket.close();// 关闭套接字
310 }
311 }
312 /// \details 通知关闭事件的处理函数,传递当前对象的共享指针。
313 _on_closed(shared_from_this());
314 /// \details 记录调试信息,表明会话已关闭。
315 log_debug("session", _session_id, "closed");
316 }
317
318} // namespace multigpu
319} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:81
#define DEBUG_ASSERT_NE(lhs, rhs)
Definition Debug.h:82
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
一个缓冲区池。 从这个池中弹出的缓冲区在销毁时会自动返回到池中, 这样分配的内存可以被重用。
Definition BufferPool.h:29
警告:在它的io_context停止之前,这个服务器不能被销毁。
Definition listener.cpp:20
std::function< void(std::shared_ptr< Primary >)> callback_function_type
Definition listener.h:32
std::function< void(std::shared_ptr< Primary >, carla::Buffer)> callback_function_type_response
Definition listener.h:33
const size_t _session_id
Definition primary.h:116
void StartTimer()
启动定时器以监控连接是否超时。
Definition primary.cpp:276
boost::asio::deadline_timer _deadline
Definition primary.h:122
void Open(Listener::callback_function_type on_opened, Listener::callback_function_type on_closed, Listener::callback_function_type_response on_response)
启动会话并在成功读取流 id 后调用 on_opened,会话关闭后调用 on_closed。
Definition primary.cpp:78
~Primary()
Primary类的析构函数实现。
Definition primary.cpp:69
Listener::callback_function_type _on_closed
Definition primary.h:126
Primary(boost::asio::io_context &io_context, time_duration timeout, Listener &server)
Primary类的构造函数。
Definition primary.cpp:51
boost::asio::io_context::strand _strand
Definition primary.h:124
Listener::callback_function_type_response _on_response
Definition primary.h:128
void Write(std::shared_ptr< const carla::streaming::detail::tcp::Message > message)
将一些数据写入套接字。
Definition primary.cpp:98
void CloseNow(boost::system::error_code ec=boost::system::error_code())
立即关闭连接并处理相关资源。
Definition primary.cpp:299
void Close()
发布工作以关闭会话。
Definition primary.cpp:261
void ReadData()
读取数据
Definition primary.cpp:180
Positive time duration up to milliseconds resolution.
Definition Time.h:19
static std::atomic_size_t SESSION_COUNTER
用于生成唯一会话ID的静态原子计数器。
Definition primary.cpp:36
CARLA模拟器的主命名空间。
Definition Carla.cpp:139
static void log_error(Args &&... args)
Definition Logging.h:115
static void log_debug(Args &&... args)
Definition Logging.h:71