CARLA
 
载入中...
搜索中...
未找到
ServerSession.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
9
10#include "carla/Debug.h"
11#include "carla/Logging.h"
12
13#include <boost/asio/read.hpp>
14#include <boost/asio/write.hpp>
15#include <boost/asio/bind_executor.hpp>
16#include <boost/asio/post.hpp>
17
18#include <atomic>
19#include <thread>
20
21namespace carla {
22namespace streaming {
23namespace detail {
24namespace tcp {
25// 用于统计服务器会话的数量
26 static std::atomic_size_t SESSION_COUNTER{0u};
27// ServerSession类的构造函数
28 // @param io_context boost::asio的I/O上下文对象
29 // @param timeout 会话超时时间
30 // @param server 所属的服务器对象
32 boost::asio::io_context &io_context,
33 const time_duration timeout,
34 Server &server)
36 std::string("tcp server session ") + std::to_string(SESSION_COUNTER)),
37 _server(server),
38 _session_id(SESSION_COUNTER++),
39 _socket(io_context),
40 _timeout(timeout),
41 _deadline(io_context),
42 _strand(io_context) {}
43// 打开会话的函数
44 // @param on_opened 会话打开成功的回调函数
45 // @param on_closed 会话关闭的回调函数
47 callback_function_type on_opened,
48 callback_function_type on_closed) {
49 // 断言回调函数不为空
50 DEBUG_ASSERT(on_opened && on_closed);
51 _on_closed = std::move(on_closed);
52
53 // 强制不使用Nagle(内格尔)算法。
54 // 将Linux上的同步模式速度提高了约3倍。
55 const boost::asio::ip::tcp::no_delay option(true);
56 _socket.set_option(option);
57 // 启动定时器
58 StartTimer();
59 // 获取自身的共享指针,以便在异步操作中保持对象存活
60 auto self = shared_from_this(); // 为了让自己存活下去。
61 boost::asio::post(_strand, [=]() {
62 // 定义处理查询的内部函数
63 auto handle_query = [this, self, callback=std::move(on_opened)](
64 const boost::system::error_code &ec,
65 size_t DEBUG_ONLY(bytes_received)) {
66 if (!ec) {
67 // 断言接收到的字节数等于流ID的大小
68 DEBUG_ASSERT_EQ(bytes_received, sizeof(_stream_id));
69 // 打印调试信息,表示会话已启动
70 log_debug("session", _session_id, "for stream", _stream_id, " started");
71 // 在strand的上下文环境中执行回调函数
72 boost::asio::post(_strand.context(), [=]() { callback(self); });
73 } else {
74 // 打印错误信息,表示获取流ID时出错
75 log_error("session", _session_id, ": error retrieving stream id :", ec.message());
76 // 立即关闭会话
77 CloseNow(ec);
78 }
79 };
80
81 // 读取流id。
82 _deadline.expires_from_now(_timeout);
83 // 异步读取流ID
84 boost::asio::async_read(
85 _socket,
86 boost::asio::buffer(&_stream_id, sizeof(_stream_id)),
87 boost::asio::bind_executor(_strand, handle_query));
88 });
89 }
90// 向客户端写入消息的函数
91 // @param message 要写入的消息指针
92 void ServerSession::Write(std::shared_ptr<const Message> message) {
93 // 断言消息不为空且消息内容不为空
94 DEBUG_ASSERT(message != nullptr);
95 DEBUG_ASSERT(!message->empty());
96 auto self = shared_from_this();
97 if (!_socket.is_open()) {
98 return;
99 }
100 if (_is_writing) {
102 // 等待上一条消息发送完毕
103 while (_is_writing) {
104 std::this_thread::yield();
105 }
106 } else {
107 // 忽略该消息
108 log_debug("session", _session_id, ": connection too slow: message discarded");
109 return;
110 }
111 }
112 _is_writing = true;
113// 定义消息发送完成后的回调函数
114 auto handle_sent = [this, self, message](const boost::system::error_code &ec, size_t DEBUG_ONLY(bytes)) {
115 _is_writing = false;
116 if (ec) {
117 // 如果发送出错,打印错误信息并立即关闭会话
118 log_info("session", _session_id, ": error sending data :", ec.message());
119 CloseNow(ec);
120 } else {
121 // 如果发送成功,打印调试信息(可选)并断言发送的字节数正确
122 DEBUG_ONLY(log_debug("session", _session_id, ": successfully sent", bytes, "bytes"));
123 DEBUG_ASSERT_EQ(bytes, sizeof(message_size_type) + message->size());
124 }
125 };
126// 打印调试信息,表示要发送的消息大小
127 log_debug("session", _session_id, ": sending message of", message->size(), "bytes");
128// 设置消息发送的截止时间
129 _deadline.expires_from_now(_timeout);
130 // 异步写入消息
131 boost::asio::async_write(_socket, message->GetBufferSequence(),
132 boost::asio::bind_executor(_strand, handle_sent));
133 }
134// 关闭会话的函数
136 boost::asio::post(_strand, [self=shared_from_this()]() { self->CloseNow(); });
137 }
138 // 启动定时器的函数,如果定时器已过期则关闭会话,否则设置异步等待定时器到期的回调函数
140 if (_deadline.expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
141 log_debug("session", _session_id, "timed out");
142 Close();
143 } else {
144 _deadline.async_wait([this, self=shared_from_this()](boost::system::error_code ec) {
145 if (!ec) {
146 StartTimer();
147 } else {
148 log_debug("session", _session_id, "timed out error:", ec.message());
149 }
150 });
151 }
152 }
153// 立即关闭会话的函数,取消定时器,关闭套接字并执行关闭回调函数
154 void ServerSession::CloseNow(boost::system::error_code ec) {
155 _deadline.cancel();
156 if (!ec)
157 {
158 if (_socket.is_open()) {
159 boost::system::error_code ec2;
160 _socket.shutdown(boost::asio::socket_base::shutdown_both, ec2);
161 _socket.close();
162 }
163 }
164 _on_closed(shared_from_this());
165 log_debug("session", _session_id, "closed");
166 }
167
168} // namespace tcp
169} // namespace detail
170} // namespace streaming
171} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
#define DEBUG_ASSERT_EQ(lhs, rhs)
Definition Debug.h:81
#define DEBUG_ONLY(code)
Definition Debug.h:55
#define LIBCARLA_INITIALIZE_LIFETIME_PROFILER(display_name)
包含Carla流处理模块中TCP通信相关类的头文件依赖。
boost::asio::deadline_timer _deadline
定时器,用于在会话超时后触发关闭操作。
void CloseNow(boost::system::error_code ec=boost::system::error_code())
立即关闭会话。
void Close()
发布一个关闭会话的任务。
Server & _server
对 Server 对象的引用。
ServerSession(boost::asio::io_context &io_context, time_duration timeout, Server &server)
构造函数。
void Open(callback_function_type on_opened, callback_function_type on_closed)
启动会话。
callback_function_type _on_closed
会话关闭时的回调函数。
boost::asio::io_context::strand _strand
用于保证异步操作顺序的 strand 对象。
socket_type _socket
套接字类型,用于网络通信。
stream_id_type _stream_id
流标识符,用于标识会话中传输的数据流。
const size_t _session_id
会话的唯一标识符。
std::function< void(std::shared_ptr< ServerSession >)> callback_function_type
回调函数类型别名。
time_duration _timeout
会话超时时长,表示会话在多长时间内无活动将被关闭。
bool _is_writing
表示当前是否正在进行写入操作的标志。
void Write(std::shared_ptr< const Message > message)
向套接字写入一些数据。
警告:在io_context停止之前,不能销毁这个服务器实例
Positive time duration up to milliseconds resolution.
Definition Time.h:19
static std::atomic_size_t SESSION_COUNTER
uint32_t message_size_type
消息大小的类型定义。
Definition Types.h:40
CARLA模拟器的主命名空间。
Definition Carla.cpp:139
static void log_error(Args &&... args)
Definition Logging.h:115
static void log_info(Args &&... args)
Definition Logging.h:86
static void log_debug(Args &&... args)
Definition Logging.h:71