CARLA
 
载入中...
搜索中...
未找到
test_benchmark_streaming.cpp
浏览该文件的文档.
1// 版权所有 (c) 2017 Universitat Autonoma 计算机视觉中心 (CVC)
2// 巴塞罗那 (UAB)。
3//
4// 本作品根据 MIT 许可证的条款进行许可。
5// 有关副本,请参阅 <https://opensource.org/licenses/MIT>。
6
7#include "test.h"
8//包含名为test.h的自定义头文件,可能包含项目特定的定义、函数声明等。
9#include <carla/Buffer.h>
10#include <carla/BufferView.h>
13//包含名为test.h的自定义头文件,可能包含项目特定的定义、函数声明等。
14#include <boost/asio/post.hpp>
15//是包含boost库中的asio模块的post.hpp头文件,boost::asio常用于异步输入/输出操作,这里的post可能与将任务提交到执行队列相关。
16#include <algorithm>
17//包含了许多通用算法,如排序、查找等算法的模板函数声明。
18using namespace carla::streaming;//前者使得可以直接使用carla::streaming命名空间下的类型和函数而无需每次都写完整的命名空间前缀
19using namespace std::chrono_literals;//使得可以直接使用std::chrono库中的字面值(例如1s表示1秒的时间字面值等)。
20
21// 创建特定大小的消息
22static auto make_special_message(size_t size) { // 创建一个包含42的向量,大小为size
23 std::vector<uint32_t> v(size/sizeof(uint32_t), 42u);
24 carla::Buffer msg(v);
25 EXPECT_EQ(msg.size(), size);
27
28 return BufView;
29}
30
31// 基准测试类
32class Benchmark {
33public:
34
35 Benchmark(uint16_t port, size_t message_size, double success_ratio)
36 : _server(port),
37 _client(),
38 _message(make_special_message(message_size)),
41 _success_ratio(success_ratio) {}// 初始化成功率
42
43 // 添加一个流
44 void AddStream() {
45 Stream stream = _server.MakeStream();// 从服务器创建一个流
46
47 // 客户端订阅流,并定义消息处理回调
48 _client.Subscribe(stream.token(), [this](carla::Buffer msg) {
49 carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(msg));
50 DEBUG_ASSERT_EQ(BufView->size(), _message->size());
51 boost::asio::post(_client_callback, [this]() {
52 CARLA_PROFILE_FPS(client, listen_callback);
53 ++_number_of_messages_received;// 增加接收到的消息计数
54 });
55 });
56
57 _streams.push_back(stream);// 将流添加到流列表中
58 }
59
60 // 添加多个流
61 void AddStreams(size_t count) {
62 for (auto i = 0u; i < count; ++i) {
63 AddStream();
64 }
65 }
66
67 // 运行基准测试
68 void Run(size_t number_of_messages) {
69 _threads.CreateThread([this]() { _client_callback.run(); });
70 _server.AsyncRun(_streams.size());
71 _client.AsyncRun(_streams.size());
72
73 std::this_thread::sleep_for(1s); // 等待客户端准备好,以确保接收所有消息
74
75 // 对每个流进行消息发送
76 for (auto &&stream : _streams) {
77 _threads.CreateThread([=]() mutable {
78 for (auto i = 0u; i < number_of_messages; ++i) {
79 std::this_thread::sleep_for(11ms); // 约90帧
80 {
81 CARLA_PROFILE_SCOPE(game, write_to_stream);// 记录写入流的性能
82 stream.Write(_message);// 向流写入消息
83 }
84 }
85 });
86 }
87
88 // 计算预期接收到的消息数量
89 const auto expected_number_of_messages = _streams.size() * number_of_messages;
90 const auto threshold =
91 static_cast<size_t>(_success_ratio * static_cast<double>(expected_number_of_messages));
92
93 // 等待消息接收完成
94 for (auto i = 0u; i < 10; ++i) {
95 std::cout << "received " << _number_of_messages_received
96 << " of " << expected_number_of_messages
97 << " messages,";
98 if (_number_of_messages_received >= expected_number_of_messages) {
99 break;// 如果接收到的消息数量达到预期,则退出
100 }
101 std::cout << " waiting..." << std::endl;
102 std::this_thread::sleep_for(1s);
103 }
104
105 _client_callback.stop();
106 _threads.JoinAll();
107 std::cout << " done." << std::endl;
108
109#ifdef NDEBUG
110 ASSERT_GE(_number_of_messages_received, threshold); // 断言接收到的消息数大于等于阈值
111#else
112
113 // 在调试模式下,如果未达到阈值,记录警告
114 if (_number_of_messages_received < threshold) {
115 carla::log_warning("threshold unmet:", _number_of_messages_received, '/', threshold);
116 }
117#endif // NDEBUG
118 }
119
120private:
121
123
125
127
129
130 boost::asio::io_context _client_callback;
131
132 boost::asio::io_context::work _work_to_do;
133
134 const double _success_ratio;
135
136 std::vector<Stream> _streams;
137
138 std::atomic_size_t _number_of_messages_received{0u};
139};
140
141// 获取最大并发数
142static size_t get_max_concurrency() {
143 size_t concurrency = std::thread::hardware_concurrency() / 2u;
144 return std::max((size_t) 2u, concurrency);// 返回至少为2的最大并发数
145}
146
147// 基准测试图像流
148static void benchmark_image(
149 const size_t dimensions,
150 const size_t number_of_streams = 1u,
151 const double success_ratio = 1.0) {
152 constexpr auto number_of_messages = 100u;// 定义消息数量
153 carla::logging::log("Benchmark:", number_of_streams, "streams at 90FPS.");// 输出基准测试信息
154 Benchmark benchmark(TESTING_PORT, 4u * dimensions, success_ratio);//
155 benchmark.AddStreams(number_of_streams);
156 benchmark.Run(number_of_messages);
157}
158
159TEST(benchmark_streaming, image_200x200) {
160 benchmark_image(200u * 200u);
161}
162
163TEST(benchmark_streaming, image_800x600) {
164 benchmark_image(800u * 600u, 1u, 0.9);
165}
166
167TEST(benchmark_streaming, image_1920x1080) {
168 benchmark_image(1920u * 1080u, 1u, 0.9);
169}
170
171TEST(benchmark_streaming, image_200x200_mt) {
172 benchmark_image(200u * 200u, get_max_concurrency());
173}
174
175TEST(benchmark_streaming, image_800x600_mt) {
176 benchmark_image(800u * 600u, get_max_concurrency(), 0.9);
177}
178
179TEST(benchmark_streaming, image_1920x1080_mt) {
180 benchmark_image(1920u * 1080u, get_max_concurrency(), 0.9);
181}
#define CARLA_PROFILE_SCOPE(context, profiler_name)
Definition Profiler.h:10
Benchmark(uint16_t port, size_t message_size, double success_ratio)
void Run(size_t number_of_messages)
const carla::SharedBufferView _message
boost::asio::io_context _client_callback
std::vector< Stream > _streams
void AddStreams(size_t count)
carla::ThreadGroup _threads
const double _success_ratio
boost::asio::io_context::work _work_to_do
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:60
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
size_type size() const noexcept
void Subscribe(const Token &token, Functor &&callback)
A streaming server.
Token token() const
与此流关联的 Token。客户端可使用该 Token 订阅此流。
static void log(Args &&... args)
Definition Logging.h:61
std::shared_ptr< BufferView > SharedBufferView
Definition BufferView.h:163
static void log_warning(Args &&... args)
Definition Logging.h:101
constexpr uint16_t TESTING_PORT
Definition test.h:24
static auto make_special_message(size_t size)
static void benchmark_image(const size_t dimensions, const size_t number_of_streams=1u, const double success_ratio=1.0)
static size_t get_max_concurrency()
TEST(benchmark_streaming, image_200x200)