CARLA
 
载入中...
搜索中...
未找到
test_benchmark_streaming.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#include "test.h"
8
9#include <carla/Buffer.h>
10#include <carla/BufferView.h>
13
14#include <boost/asio/post.hpp>
15
16#include <algorithm>
17
18using namespace carla::streaming;
19using namespace std::chrono_literals;
20
21static auto make_special_message(size_t size) {
22 std::vector<uint32_t> v(size/sizeof(uint32_t), 42u);
23 carla::Buffer msg(v);
24 EXPECT_EQ(msg.size(), size);
26
27 return BufView;
28}
29
30class Benchmark {
31public:
32
33 Benchmark(uint16_t port, size_t message_size, double success_ratio)
34 : _server(port),
35 _client(),
36 _message(make_special_message(message_size)),
39 _success_ratio(success_ratio) {}
40
41 void AddStream() {
42 Stream stream = _server.MakeStream();
43
44 _client.Subscribe(stream.token(), [this](carla::Buffer msg) {
45 carla::SharedBufferView BufView = carla::BufferView::CreateFrom(std::move(msg));
46 DEBUG_ASSERT_EQ(BufView->size(), _message->size());
47 boost::asio::post(_client_callback, [this]() {
48 CARLA_PROFILE_FPS(client, listen_callback);
49 ++_number_of_messages_received;
50 });
51 });
52
53 _streams.push_back(stream);
54 }
55
56 void AddStreams(size_t count) {
57 for (auto i = 0u; i < count; ++i) {
58 AddStream();
59 }
60 }
61
62 void Run(size_t number_of_messages) {
63 _threads.CreateThread([this]() { _client_callback.run(); });
64 _server.AsyncRun(_streams.size());
65 _client.AsyncRun(_streams.size());
66
67 std::this_thread::sleep_for(1s); // the client needs to be ready so we make
68 // sure we get all the messages.
69
70 for (auto &&stream : _streams) {
71 _threads.CreateThread([=]() mutable {
72 for (auto i = 0u; i < number_of_messages; ++i) {
73 std::this_thread::sleep_for(11ms); // ~90FPS.
74 {
75 CARLA_PROFILE_SCOPE(game, write_to_stream);
76 stream.Write(_message);
77 }
78 }
79 });
80 }
81
82 const auto expected_number_of_messages = _streams.size() * number_of_messages;
83 const auto threshold =
84 static_cast<size_t>(_success_ratio * static_cast<double>(expected_number_of_messages));
85
86 for (auto i = 0u; i < 10; ++i) {
87 std::cout << "received " << _number_of_messages_received
88 << " of " << expected_number_of_messages
89 << " messages,";
90 if (_number_of_messages_received >= expected_number_of_messages) {
91 break;
92 }
93 std::cout << " waiting..." << std::endl;
94 std::this_thread::sleep_for(1s);
95 }
96
97 _client_callback.stop();
98 _threads.JoinAll();
99 std::cout << " done." << std::endl;
100
101#ifdef NDEBUG
102 ASSERT_GE(_number_of_messages_received, threshold);
103#else
104 if (_number_of_messages_received < threshold) {
105 carla::log_warning("threshold unmet:", _number_of_messages_received, '/', threshold);
106 }
107#endif // NDEBUG
108 }
109
110private:
111
113
115
117
119
120 boost::asio::io_context _client_callback;
121
122 boost::asio::io_context::work _work_to_do;
123
124 const double _success_ratio;
125
126 std::vector<Stream> _streams;
127
128 std::atomic_size_t _number_of_messages_received{0u};
129};
130
131static size_t get_max_concurrency() {
132 size_t concurrency = std::thread::hardware_concurrency() / 2u;
133 return std::max((size_t) 2u, concurrency);
134}
135
136static void benchmark_image(
137 const size_t dimensions,
138 const size_t number_of_streams = 1u,
139 const double success_ratio = 1.0) {
140 constexpr auto number_of_messages = 100u;
141 carla::logging::log("Benchmark:", number_of_streams, "streams at 90FPS.");
142 Benchmark benchmark(TESTING_PORT, 4u * dimensions, success_ratio);
143 benchmark.AddStreams(number_of_streams);
144 benchmark.Run(number_of_messages);
145}
146
147TEST(benchmark_streaming, image_200x200) {
148 benchmark_image(200u * 200u);
149}
150
151TEST(benchmark_streaming, image_800x600) {
152 benchmark_image(800u * 600u, 1u, 0.9);
153}
154
155TEST(benchmark_streaming, image_1920x1080) {
156 benchmark_image(1920u * 1080u, 1u, 0.9);
157}
158
159TEST(benchmark_streaming, image_200x200_mt) {
160 benchmark_image(200u * 200u, get_max_concurrency());
161}
162
163TEST(benchmark_streaming, image_800x600_mt) {
164 benchmark_image(800u * 600u, get_max_concurrency(), 0.9);
165}
166
167TEST(benchmark_streaming, image_1920x1080_mt) {
168 benchmark_image(1920u * 1080u, get_max_concurrency(), 0.9);
169}
#define CARLA_PROFILE_SCOPE(context, profiler_name)
Definition Profiler.h:10
Benchmark(uint16_t port, size_t message_size, double success_ratio)
void Run(size_t number_of_messages)
const carla::SharedBufferView _message
boost::asio::io_context _client_callback
std::vector< Stream > _streams
void AddStreams(size_t count)
carla::ThreadGroup _threads
const double _success_ratio
boost::asio::io_context::work _work_to_do
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:56
A piece of raw data.
size_type size() const noexcept
A client able to subscribe to multiple streams.
void Subscribe(const Token &token, Functor &&callback)
A streaming server.
Token token() const
Token associated with this stream.
static void log(Args &&... args)
Definition Logging.h:59
std::shared_ptr< BufferView > SharedBufferView
Definition BufferView.h:151
static void log_warning(Args &&... args)
Definition Logging.h:96
constexpr uint16_t TESTING_PORT
Definition test.h:24
static auto make_special_message(size_t size)
static void benchmark_image(const size_t dimensions, const size_t number_of_streams=1u, const double success_ratio=1.0)
static size_t get_max_concurrency()
TEST(benchmark_streaming, image_200x200)