CARLA
 
载入中...
搜索中...
未找到
Dispatcher.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
8
9#include "carla/Exception.h"
10#include "carla/Logging.h"
12
13#include <exception>
14
15namespace carla {
16namespace streaming {
17namespace detail {
18
19 // Dispatcher 析构函数
20// 作用:在对象销毁时断开所有会话与其流的连接,确保此时 io_context 已停止
22 // 遍历存储流状态的映射表
23 // 断开所有会话与其流的连接,确保在此时 io_context 已停止
24 for (auto &pair : _stream_map) {
25#ifndef LIBCARLA_NO_EXCEPTIONS // 如果没有定义 LIBCARLA_NO_EXCEPTIONS,即允许异常
26 try {
27#endif // LIBCARLA_NO_EXCEPTIONS
28 auto stream_state = pair.second;// 获取流状态
29 stream_state->ClearSessions();// 清除会话
30#ifndef LIBCARLA_NO_EXCEPTIONS
31 } catch (const std::exception &e) {
32 // 如果在清除会话时发生异常,记录错误信息
33 log_error("failed to clear sessions:", e.what());
34 }
35#endif // LIBCARLA_NO_EXCEPTIONS
36 }
37 }
38
39 // Dispatcher 类成员函数
40 // 创建一个新的流或重用现有流
41// 返回值:carla::streaming::Stream 对象,代表创建的或重用的流
43 std::lock_guard<std::mutex> lock(_mutex);// 确保线程安全
44 ++_cached_token._token.stream_id; // 增加流ID,防止溢出
46 std::shared_ptr<MultiStreamState> ptr;// 声明流状态的智能指针
47 auto search = _stream_map.find(_cached_token.get_stream_id());// 查找现有流
48 if (search == _stream_map.end()) {
49 // 如果没有找到,创建新的流
50 ptr = std::make_shared<MultiStreamState>(_cached_token);
51
52 // 尝试将新创建的流状态对象插入到流映射表中
53 auto result = _stream_map.emplace(std::make_pair(_cached_token.get_stream_id(), ptr));
54 if (!result.second) {
55 // 如果插入失败(理论上不应该发生,除非有并发问题),抛出异常
56 throw_exception(std::runtime_error("failed to create stream!"));
57 }
58 // 记录新流的创建
59 log_debug("Stream created");
60 // 返回代表新创建的流的 Stream 对象
61 return carla::streaming::Stream(ptr);
62 } else {
63 // 将新流插入流映射表中
64 log_debug("Stream reused");// 日志记录流创建信息
65 ptr = search->second;// 获取现有流状态
66 return carla::streaming::Stream(ptr);// 返回现有流
67 }
68 }
69
70 // 关闭指定ID的流
72 // 使用互斥锁保护共享资源_stream_map,避免多线程同时访问导致数据竞争
73 std::lock_guard<std::mutex> lock(_mutex);
74 // 打印日志,记录关闭流的请求,包括流的ID
75 log_debug("Calling CloseStream for ", id);
76 // 在_stream_map中查找指定ID的流
77 auto search = _stream_map.find(id);
78 // 如果找到了指定ID的流
79 if (search != _stream_map.end()) {
80 // 获取流的状态(可能是一个指向流状态对象的智能指针或其他容器)
81 auto stream_state = search->second;
82 // 如果流状态有效(非空)
83 if (stream_state) {
84 // 打印日志,记录断开所有会话的操作,包括流的ID
85 log_debug("Disconnecting all sessions (stream ", id, ")");
86 // 断开与该流关联的所有会话
87 stream_state->ClearSessions();
88 }
89 // 从_stream_map中删除该流的状态信息
90 _stream_map.erase(search);
91 }
92 }
93
94 // 注册会话到指定流
95 bool Dispatcher::RegisterSession(std::shared_ptr<Session> session) {
96 // 确保传入的会话指针不为空
97 DEBUG_ASSERT(session != nullptr);
98 // 使用互斥锁保护共享资源_stream_map,避免多线程同时访问导致数据竞争
99 std::lock_guard<std::mutex> lock(_mutex);
100 // 在_stream_map中查找与会话关联的流的ID
101 auto search = _stream_map.find(session->get_stream_id());
102 // 如果找到了对应的流
103 if (search != _stream_map.end()) {
104 // 获取流的状态(可能是一个指向流状态对象的智能指针或其他容器)
105 auto stream_state = search->second;
106 // 如果流状态有效(非空)
107 if (stream_state) {
108 // 打印日志,记录连接会话的操作,包括流的ID
109 log_debug("Connecting session (stream ", session->get_stream_id(), ")");
110 // 将会话连接到该流
111 stream_state->ConnectSession(std::move(session));
112 // 打印日志,记录当前_stream_map中的流数量
113 log_debug("Current streams: ", _stream_map.size());
114 return true;
115 }
116 }
117 // 如果流不存在或无效,打印错误日志
118 log_error("Invalid session: no stream available with id", session->get_stream_id());
119 return false;
120 // 返回false,表示会话注册失败
121 }
122
123 // 从流中注销会话
124// 参数:session - 要注销的会话的shared_ptr
125 void Dispatcher::DeregisterSession(std::shared_ptr<Session> session) {
126 // 断言session不为空
127 DEBUG_ASSERT(session != nullptr);
128 // 使用互斥锁进行线程安全的操作
129 std::lock_guard<std::mutex> lock(_mutex);
130 // 记录日志,表示正在调用DeregisterSession函数
131 log_debug("Calling DeregisterSession for ", session->get_stream_id());
132 // 在_stream_map中查找给定会话的流ID
133 auto search = _stream_map.find(session->get_stream_id());
134 if (search != _stream_map.end()) { // 如果找到了对应的流
135 auto stream_state = search->second;// 获取流状态
136 if (stream_state) { // 如果流状态有效
137 // 记录日志,表示正在断开会话
138 log_debug("Disconnecting session (stream ", session->get_stream_id(), ")");
139 // 从流状态中断开该会话
140 stream_state->DisconnectSession(session);
141 // 记录日志,显示当前流的数量
142 log_debug("Current streams: ", _stream_map.size());
143 }
144 }
145 }
146 // 根据传感器ID获取令牌的函数
147 // 参数:sensor_id - 要获取令牌的传感器ID
148 // 返回值:对应的令牌
150 // 使用互斥锁进行线程安全的操作
151 std::lock_guard<std::mutex> lock(_mutex);
152 // 记录日志,表示正在搜索传感器ID
153 log_debug("Searching sensor id: ", sensor_id);
154 // 在_stream_map中查找给定的传感器ID
155 auto search = _stream_map.find(sensor_id);
156 if (search != _stream_map.end()) { // 如果找到了对应的传感器流
157 // 记录日志,表示找到了传感器ID
158 log_debug("Found sensor id: ", sensor_id);
159 auto stream_state = search->second;// 获取流状态
160 // 强制流状态为活动状态
161 stream_state->ForceActive();
162 // 记录日志,表示正在从指定流获取令牌
163 log_debug("Getting token from stream ", sensor_id, " on port ", stream_state->token().get_port());
164 // 返回流状态中的令牌
165 return stream_state->token();
166 } else {
167 // 如果没有找到对应的传感器流
168 // 记录日志,表示没有找到传感器ID,需要创建新的传感器流
169 log_debug("Not Found sensor id, creating sensor stream: ", sensor_id);
170 // 使用缓存的令牌创建一个临时令牌,并设置其流ID
171 token_type temp_token(_cached_token);
172 temp_token.set_stream_id(sensor_id);
173 // 创建一个新的MultiStreamState实例,并尝试将其插入到_stream_map中
174 auto ptr = std::make_shared<MultiStreamState>(temp_token);
175 auto result = _stream_map.emplace(std::make_pair(temp_token.get_stream_id(), ptr));
176 // 强制新流状态为活动状态
177 ptr->ForceActive();
178 if (!result.second) {// 如果插入失败(理论上不应该发生,因为使用了唯一的sensor_id)
179 // 记录日志,表示创建多流失败
180 log_debug("Failed to create multistream for stream ", sensor_id, " on port ", temp_token.get_port());
181 }
182 // 记录日志,表示已创建令牌
183 log_debug("Created token from stream ", sensor_id, " on port ", temp_token.get_port());
184 // 返回创建的临时令牌
185 return temp_token;
186 }
187 return token_type();// 如果未找到流,返回默认令牌
188 }
189
190} // namespace detail
191} // namespace streaming
192} // namespace carla
#define DEBUG_ASSERT(predicate)
Definition Debug.h:68
void DeregisterSession(std::shared_ptr< Session > session)
carla::streaming::Stream MakeStream()
bool RegisterSession(std::shared_ptr< Session > session)
void CloseStream(carla::streaming::detail::stream_id_type id)
token_type GetToken(stream_id_type sensor_id)
静态断言,用于确保token_data结构体的大小与Token::data的大小相同。
const auto & get_stream_id() const
获取流ID的引用。
auto get_port() const
获取端口号。
token_data _token
存储令牌数据的成员变量。
void set_stream_id(stream_id_type id)
设置流ID。
uint32_t stream_id_type
流ID的类型定义。
Definition Types.h:33
detail::Stream< detail::MultiStreamState > Stream
A stream represents an unidirectional channel for sending data from server to client.
Definition Stream.h:25
CARLA模拟器的主命名空间。
Definition Carla.cpp:139
void throw_exception(const std::exception &e)
Definition Carla.cpp:142
static void log_error(Args &&... args)
Definition Logging.h:115
static void log_debug(Args &&... args)
Definition Logging.h:71
stream_id_type stream_id
流ID,用于唯一标识一个流。