CARLA
 
载入中...
搜索中...
未找到
test_streaming.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#include "test.h"
8// 包含Carla线程组相关的头文件,用于管理和操作多个线程,例如创建线程、等待线程结束等功能
9#include <carla/ThreadGroup.h>
10// 包含Carla流媒体客户端相关的头文件,用于实现与流媒体服务器进行通信的客户端功能
12// 包含Carla流媒体服务器相关的头文件,用于实现流媒体服务端的相关功能,比如接收客户端连接、发送数据等
14// 包含Carla流媒体细节相关的调度器头文件,可能涉及到对流媒体数据分发、处理等底层逻辑的实现
16// 包含Carla流媒体基于TCP协议客户端相关的详细实现头文件,提供了具体的TCP客户端功能实现细节
18// 包含Carla流媒体基于TCP协议服务器相关的详细实现头文件,提供了具体的TCP服务器功能实现细节
20// 包含Carla流媒体底层客户端相关的头文件,涉及更底层的客户端功能实现,可能与协议交互等基础操作有关
22// 包含Carla流媒体底层服务器相关的头文件,涉及更底层的服务器功能实现,同样可能侧重于基础的协议处理等方面
24
25#include <atomic>
26// 使用 std::chrono_literals 命名空间,这样可以方便地使用时间字面量
27using namespace std::chrono_literals;
28
29// 这里需要级别低一些来在异常或者断言的情形下正确的停止线程
31public:
32 // 定义一个 boost::asio 库的 io_context 对象,用于异步 I/O 操作,例如网络通信中的异步读写等,它是整个异步操作的核心上下文环境
33 boost::asio::io_context service;
34 // 显式的构造函数,接收一个可选的参数threads,用于指定要创建的工作线程数量,默认值为2
35 explicit io_context_running(size_t threads = 2u)
36 // 创建一个 io_context::work 对象,它用于保持 io_context 处于运行状态,防止 io_context.run() 立即返回,只要这个对象存在且关联到 io_context,io_context 就会持续运行其事件循环
38 // 启动一个线程,在线程里面执行service.run
39 _threads.CreateThreads(threads, [this]() { service.run(); });
40 }
41
43 // 函数结束时,也顺便停止服务
44 service.stop();
45 }
46
47private:
48 // 定义一个 io_context::work 类型的对象,用于维持 io_context 的运行状态,如前面构造函数中所述
49 boost::asio::io_context::work _work_to_do;
50 // 定义一个 carla::ThreadGroup 类型的对象,用于管理创建的线程,比如创建线程、等待线程结束等操作
52};
53
54TEST(streaming, low_level_sending_strings) {
55 // 使用 util::buffer 命名空间,可能其中包含了与缓冲区操作相关的函数、类型等定义,具体取决于该命名空间的实际内容
56 using namespace util::buffer;
57 // 使用 carla::streaming 命名空间,包含了流媒体相关的通用功能、类型等定义,便于在测试函数中直接访问相关内容
58 using namespace carla::streaming;
59 // 使用 carla::streaming::detail 命名空间,涉及流媒体更详细、底层的实现相关的定义,这里使用可能是为了访问一些内部的辅助功能等
60 using namespace carla::streaming::detail;
61 // 使用 carla::streaming::low_level 命名空间,对应于流媒体底层相关的功能、类型等定义,针对更基础的操作进行测试
62 using namespace carla::streaming::low_level;
63
64 // 定义一个常量表达式,表示要发送的消息数量,这里设置为100条消息
65 constexpr auto number_of_messages = 100u;
66 // 定义一个字符串常量,表示要发送的消息内容,这里是"Hello client!"
67 const std::string message_text = "Hello client!";
68
69 // 定义一个原子类型的变量,用于统计接收到的消息数量,初始值为0,原子类型保证在多线程环境下对其的操作是原子性的,避免数据竞争导致计数错误
70 std::atomic_size_t message_count{0u};
71
72 // 创建一个 io_context_running 类型的对象,用于管理异步 I/O 上下文以及相关线程,默认会创建2个工作线程来运行 io_context 的事件循环
74
76 // 为服务器设置超时时间为1秒,意味着在某些操作(比如等待客户端连接、接收数据等)如果超过1秒没有响应,可能会触发相应的超时处理逻辑,具体取决于服务器的实现
77 srv.SetTimeout(1s);
78
79 // 构造一个流对象
80 auto stream = srv.MakeStream();
81
83 c.Subscribe(io.service, stream.token(), [&](auto message) {
84 // 每接收到一条消息,消息计数变量加1,通过原子操作保证计数的正确性
85 ++message_count;
86 // 断言接收到的消息大小与发送的消息文本大小相等,用于验证数据完整性,确保接收的数据长度正确
87 ASSERT_EQ(message.size(), message_text.size());
88 // 将接收到的消息(可能是某种缓冲区类型,取决于具体实现)转换为字符串类型,假设 as_string 函数是在 util::buffer 命名空间中定义用于这种转换的函数
89 const std::string msg = as_string(message);
90 // 断言接收到的消息内容与发送的消息内容一致,进一步验证数据的准确性
91 ASSERT_EQ(msg, message_text);
92 });
93
94 // 创建一个基于TCP的底层流媒体客户端对象,用于连接到服务器并接收服务器发送的数据
95 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
97 for (auto i = 0u; i < number_of_messages; ++i) {
98 // 让当前线程暂停执行一小段时间(2毫秒),可能是为了模拟一定的发送间隔,避免过于频繁地发送数据,或者是为了让接收端有时间处理之前接收到的数据等原因
99 std::this_thread::sleep_for(2ms);
100 // 将共享缓冲区视图对象赋值给一个临时变量 View,这里的赋值操作可能涉及到一些引用计数等共享资源管理的逻辑,具体取决于 SharedBufferView 类型的实现
101 carla::SharedBufferView View = BufView;
102 // 通过流对象将缓冲区视图数据写入流中,这样服务器就会将这些数据发送给已订阅该流的客户端,具体的发送机制取决于流对象和服务器的底层实现
103 stream.Write(View);
104 }
105 // 再让当前线程暂停执行一小段时间(2毫秒),可能是为了确保最后一批数据有足够时间被发送和接收处理
106 std::this_thread::sleep_for(2ms);
107 // 断言message_count个数一定至少比message_count大3
108 ASSERT_GE(message_count, number_of_messages - 3u);
109
110 // 停止 io_context 的运行,结束服务器和客户端相关的异步操作,释放相关资源
111 io.service.stop();
112}
113// 定义一个测试用例,测试名称为"streaming",测试子项名称为"low_level_unsubscribing",用于测试底层流媒体客户端取消订阅的相关功能
114TEST(streaming, low_level_unsubscribing) {
115 // 使用 util::buffer 命名空间,可能其中包含了与缓冲区操作相关的函数、类型等定义,具体取决于该命名空间的实际内容
116 using namespace util::buffer;
117 // 使用 carla::streaming 命名空间,包含了流媒体相关的通用功能、类型等定义,便于在测试函数中直接访问相关内容
118 using namespace carla::streaming;
119 // 使用 carla::streaming::detail 命名空间,涉及流媒体更详细、底层的实现相关的定义,这里使用可能是为了访问一些内部的辅助功能等
120 using namespace carla::streaming::detail;
121 // 使用 carla::streaming::low_level 命名空间,对应于流媒体底层相关的功能、类型等定义,针对更基础的操作进行测试
122 using namespace carla::streaming::low_level;
123
124 // 定义一个常量表达式,表示要发送的消息数量,这里设置为50条消息
125 constexpr auto number_of_messages = 50u;
126 // 定义一个字符串常量,表示要发送的消息内容,这里是"Hello client!"
127 const std::string message_text = "Hello client!";
128
129 // 创建一个 io_context_running 类型的对象,用于管理异步 I/O 上下文以及相关线程,默认会创建2个工作线程来运行 io_context 的事件循环
131
133 // 为服务器设置超时时间为1秒,意味着在某些操作(比如等待客户端连接、接收数据等)如果超过1秒没有响应,可能会触发相应的超时处理逻辑,具体取决于服务器的实现
134 srv.SetTimeout(1s);
135
136 // 创建一个基于TCP的底层流媒体客户端对象,用于连接到服务器并接收服务器发送的数据
138 // 循环10次,每次创建一个流、订阅、发送数据、取消订阅,模拟多次不同流的订阅和取消操作场景
139 for (auto n = 0u; n < 10u; ++n) {
140 // 通过服务器对象创建一个流媒体流对象,这个流对象可以用于后续向客户端发送数据等操作,具体流的实现细节取决于服务器内部的逻辑
141 auto stream = srv.MakeStream();
142 // 定义一个原子类型的变量,用于统计接收到的消息数量,初始值为0,原子类型保证在多线程环境下对其的操作是原子性的,避免数据竞争导致计数错误
143 std::atomic_size_t message_count{0u};
144
145
146 c.Subscribe(io.service, stream.token(), [&](auto message) {
147 // 每接收到一条消息,消息计数变量加1,通过原子操作保证计数的正确性
148 ++message_count;
149 // 断言接收到的消息大小与发送的消息文本大小相等,用于验证数据完整性,确保接收的数据长度正确
150 ASSERT_EQ(message.size(), message_text.size());
151 // 将接收到的消息(可能是某种缓冲区类型,取决于具体实现)转换为字符串类型,假设 as_string 函数是在 util::buffer 命名空间中定义用于这种转换的函数
152 const std::string msg = as_string(message);
153 // 断言接收到的消息内容与发送的消息内容一致,进一步验证数据的准确性
154 ASSERT_EQ(msg, message_text);
155 });
156 // 创建一个 carla::Buffer 类型的对象,用于存储要发送的消息数据,通过 boost::asio::buffer 函数将消息文本的字符数组及其大小包装成一个缓冲区对象,方便后续对流媒体流进行写入操作
157 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
158
160 // 循环发送指定数量的消息
161 for (auto i = 0u; i < number_of_messages; ++i) {
162 // 让当前线程暂停执行一小段时间(4毫秒),可能是为了模拟一定的发送间隔,避免过于频繁地发送数据,或者是为了让接收端有时间处理之前接收到的数据等原因
163 std::this_thread::sleep_for(4ms);
164 // 将共享缓冲区视图对象赋值给一个临时变量 View,这里的赋值操作可能涉及到一些引用计数等共享资源管理的逻辑,具体取决于 SharedBufferView 类型的实现
165 carla::SharedBufferView View = BufView;
166 // 通过流对象将缓冲区视图数据写入流中,这样服务器就会将这些数据发送给已订阅该流的客户端,具体的发送机制取决于流对象和服务器的底层实现
167 stream.Write(View);
168 }
169
170 // 让当前线程暂停执行一小段时间(4毫秒),可能是为了确保最后一批数据有足够时间被发送和接收处理
171 std::this_thread::sleep_for(4ms);
172 // 客户端调用 UnSubscribe 函数取消对指定流的订阅,这样后续服务器通过该流发送的数据客户端将不再接收,用于测试取消订阅功能是否正常
173 c.UnSubscribe(stream.token());
174
175 // 继续循环发送相同数量的消息,模拟取消订阅后服务器仍在发送数据的情况,验证客户端是否确实不再接收这些数据
176 for (auto i = 0u; i < number_of_messages; ++i) {
177 // 让当前线程暂停执行一小段时间(2毫秒),可能是为了模拟一定的发送间隔,避免过于频繁地发送数据,或者是为了让接收端有时间处理之前接收到的数据等原因
178 std::this_thread::sleep_for(2ms);
179 // 将共享缓冲区视图对象赋值给一个临时变量 View,这里的赋值操作可能涉及到一些引用计数等共享资源管理的逻辑,具体取决于 SharedBufferView 类型的实现
180 carla::SharedBufferView View = BufView;
181 // 通过流对象将缓冲区视图数据写入流中,服务器会尝试发送这些数据,但客户端已取消订阅,不应再接收
182 stream.Write(View);
183 }
184
185 ASSERT_GE(message_count, number_of_messages - 3u);
186 }
187
188 io.service.stop();
189}
190
191// 这是一个测试用例,测试低级别TCP小消息流
192TEST(streaming, low_level_tcp_small_message) {
193 using namespace carla::streaming;
194 using namespace carla::streaming::detail;
195
196 boost::asio::io_context io_context;
197 tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
198
199 tcp::Server srv(io_context, ep);
200 / 设置服务器超时时间
201 srv.SetTimeout(1s);
202 // 初始化一个原子布尔变量,表示任务是否完成
203 std::atomic_bool done{false};
204 // 初始化一个原子整数变量,表示接收到的消息数量
205 std::atomic_size_t message_count{0u};
206
207 const std::string msg = "Hola!";
208
209 // 开始监听服务器会话
210 srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
211 ASSERT_EQ(session->get_stream_id(), 1u);
212
213 // 创建一个缓冲区对象,传入消息内容和大小
214 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
215 // 创建一个共享缓冲区视图对象
217 // 循环直到完成标志被设置
218 while (!done) {
219 // 线程休眠一段时间
220 std::this_thread::sleep_for(1ns);
221 // 写入共享缓冲区视图到会话
222 carla::SharedBufferView View = BufView;
223 session->Write(View);
224 }
225 std::cout << "done!\n";
226 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout << "session closed!\n"; });
227
228 // 创建一个调度器对象,传入本地端点
229 Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(srv.GetLocalEndpoint())};
230 auto stream = dispatcher.MakeStream();
231 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](carla::Buffer message) {
232 ++message_count;
233 ASSERT_FALSE(message.empty());
234 ASSERT_EQ(message.size(), 5u);
235 const std::string received = util::buffer::as_string(message);
236 // 断言接收到的消息与原始消息相同
237 ASSERT_EQ(received, msg);
238 });
239 // 连接客户端
240 c->Connect();
241
242 // 需要至少两个线程,因为这个服务循环要使用其中一个
243 carla::ThreadGroup threads;
244 threads.CreateThreads(
245 std::max(2u, std::thread::hardware_concurrency()),
246 [&]() { io_context.run(); });
247
248 std::this_thread::sleep_for(2s);
249 io_context.stop();
250 done = true;
251 std::cout << "client received " << message_count << " messages\n";
252 ASSERT_GT(message_count, 10u);
253 c->Stop();
254}
255
256struct DoneGuard {
257 ~DoneGuard() { done = true; };
258 std::atomic_bool &done;
259};
260
261// 测试流是否可以在服务器停止后继续存在。
262// 这个测试用例主要用于验证流的生命周期相关特性,
263// 即在服务器停止的情况下,流是否依然能够维持一定的功能或者存在状态。
264TEST(streaming, stream_outlives_server) {
265 using namespace carla::streaming;// 使用carla流命名空间。
266 using namespace util::buffer;// 引入util::buffer命名空间,应该是用于操作缓冲区相关的功能
267 constexpr size_t iterations = 10u;// 定义迭代次数的常量,这里设定为10次,用于控制循环执行的轮数等逻辑
268 std::atomic_bool done{false};// 定义一个原子布尔类型的变量,用于标记某个操作是否完成,初始化为false
269 const std::string message = "Hello client, how are you?";// 定义一个指向流对象的智能指针,用于后续对流的操作,初始时为空指针
270 std::shared_ptr<Stream> stream;
271
272 carla::ThreadGroup sender;
273 DoneGuard g = {done};
274 sender.CreateThread([&]() {
275
276 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
278 while (!done) {
279 std::this_thread::sleep_for(1ms);
280 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
281 if (s != nullptr) {
282 carla::SharedBufferView View = BufView;
283 s->Write(View);
284 }
285 }
286 });
287
288 for (auto i = 0u; i < iterations; ++i) {
289 Server srv(TESTING_PORT);// 创建一个Server实例,使用TESTING_PORT作为端口号,具体端口值应该在别处定义
290 srv.AsyncRun(2u);// 以异步方式启动服务器,参数2u可能用于配置相关运行参数,比如线程数量等(取决于Server类具体实现)
291 {
292 auto s = std::make_shared<Stream>(srv.MakeStream());// 通过服务器对象srv创建一个Stream智能指针s,这里的MakeStream函数应该是用于创建Stream实例
293 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);// 使用原子操作将创建的Stream智能指针s存储到外部定义的stream变量中,采用宽松内存顺序
294 }
295 std::atomic_size_t messages_received{0u};
296 {// 创建一个Client实例
297 Client c;
298 // 以异步方式启动客户端,参数2u同样可能涉及相关运行配置(和服务器端类似,取决于Client类实现)
299 c.AsyncRun(2u);
300 // 客户端进行订阅操作,传入从stream智能指针获取的token(标识)以及一个lambda表达式作为回调函数
301 c.Subscribe(stream->token(), [&](auto buffer) {
302 const std::string result = as_string(buffer);// 将接收到的buffer转换为字符串类型,as_string函数应该是自定义的转换函数
303 ASSERT_EQ(result, message);// 使用断言验证转换后的结果是否和期望的message相等,message应该是外部定义的期望消息内容
304 ++messages_received;// 如果消息验证通过,将统计接收消息数量的变量messages_received自增1
305 });
306 std::this_thread::sleep_for(20ms);
307 } // client dies here.
308 ASSERT_GT(messages_received, 0u);
309 } // server dies here.
310 std::this_thread::sleep_for(20ms);
311 done = true;
312} // stream dies here.
313
314// 测试多个客户端订阅同一个流的情况
315TEST(streaming, multi_stream) {
316 using namespace carla::streaming;// 使用carla流命名空间。
317 using namespace util::buffer;// 使用缓冲区工具命名空间。
318 constexpr size_t number_of_messages = 100u;// 消息数量。
319 constexpr size_t number_of_clients = 6u;// 客户端数量。
320 constexpr size_t iterations = 10u; // 迭代次数。
321 const std::string message = "Hi y'all!";
322
323 Server srv(TESTING_PORT);
324 // 创建服务器。
325 srv.AsyncRun(number_of_clients);// 异步运行服务器。
326 auto stream = srv.MakeStream();
327// 创建流。
328
329 for (auto i = 0u; i < iterations; ++i) {
330 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
331 // 创建客户端向量。
332
333 for (auto &pair : v) {
334 // 遍历客户端向量。
335 pair.first = 0u; // 初始化接收到的消息数。
336 pair.second = std::make_unique<Client>();
337 pair.second->AsyncRun(1u);// 异步运行客户端。
338 pair.second->Subscribe(stream.token(), [&](auto buffer) {
339 const std::string result = as_string(buffer);
340 ASSERT_EQ(result, message);// 断言结果等于发送的消息。
341 ++pair.first;// 增加接收到的消息数。
342 });
343 }
344
345 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));// 创建缓冲区。
346 carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(Buf));// 创建缓冲区视图
347 std::this_thread::sleep_for(6ms);
348 for (auto j = 0u; j < number_of_messages; ++j) {
349 std::this_thread::sleep_for(6ms);
350 carla::SharedBufferView View = BufView;// 创建缓冲区视图。
351 stream.Write(View);
352 }
353 std::this_thread::sleep_for(6ms);
354
355 for (auto &pair : v) {
356 // 遍历客户端向量。
357 ASSERT_GE(pair.first, number_of_messages - 3u);
358 // 断言接收到的消息数至少为发送消息数减3。
359 }
360 }
361}
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:60
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
void CreateThreads(size_t count, F functor)
Definition ThreadGroup.h:36
void CreateThread(F &&functor)
Definition ThreadGroup.h:31
void AsyncRun(size_t worker_threads)
void Subscribe(const Token &token, Functor &&callback)
A streaming server.
void AsyncRun(size_t worker_threads)
Keeps the mapping between streams and sessions.
Definition Dispatcher.h:30
carla::streaming::Stream MakeStream()
警告:在io_context停止之前,不能销毁这个服务器实例
void SetTimeout(time_duration timeout)
设置会话超时时间,仅对新创建的会话有效,默认为10秒
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
A client able to subscribe to multiple streams.
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)
一个低级的流媒体服务器。每个新流都有一个关联的令牌, 客户端可以使用这个令牌来订阅流。此服务器需要外部 io_context 运行。
boost::asio::io_context::work _work_to_do
boost::asio::io_context service
carla::ThreadGroup _threads
io_context_running(size_t threads=2u)
包含Boost.Asio库中的IP地址类定义。 包含Boost.Asio库中的TCP协议类定义。
std::shared_ptr< BufferView > SharedBufferView
Definition BufferView.h:163
本文件包含了网络通信相关类所需的头文件。
std::atomic_bool & done
constexpr uint16_t TESTING_PORT
Definition test.h:24
TEST(streaming, low_level_sending_strings)