CARLA
 
载入中...
搜索中...
未找到
test_streaming.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#include "test.h"
8
9#include <carla/ThreadGroup.h>
17
18#include <atomic>
19
20using namespace std::chrono_literals;
21
22// This is required for low level to properly stop the threads in case of
23// exception/assert.
25public:
26
27 boost::asio::io_context service;
28
29 explicit io_context_running(size_t threads = 2u)
31 _threads.CreateThreads(threads, [this]() { service.run(); });
32 }
33
35 service.stop();
36 }
37
38private:
39
40 boost::asio::io_context::work _work_to_do;
41
43};
44
45TEST(streaming, low_level_sending_strings) {
46 using namespace util::buffer;
47 using namespace carla::streaming;
48 using namespace carla::streaming::detail;
49 using namespace carla::streaming::low_level;
50
51 constexpr auto number_of_messages = 100u;
52 const std::string message_text = "Hello client!";
53
54 std::atomic_size_t message_count{0u};
55
57
59 srv.SetTimeout(1s);
60
61 auto stream = srv.MakeStream();
62
64 c.Subscribe(io.service, stream.token(), [&](auto message) {
65 ++message_count;
66 ASSERT_EQ(message.size(), message_text.size());
67 const std::string msg = as_string(message);
68 ASSERT_EQ(msg, message_text);
69 });
70
71 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
73 for (auto i = 0u; i < number_of_messages; ++i) {
74 std::this_thread::sleep_for(2ms);
75 carla::SharedBufferView View = BufView;
76 stream.Write(View);
77 }
78
79 std::this_thread::sleep_for(2ms);
80 ASSERT_GE(message_count, number_of_messages - 3u);
81
82 io.service.stop();
83}
84
85TEST(streaming, low_level_unsubscribing) {
86 using namespace util::buffer;
87 using namespace carla::streaming;
88 using namespace carla::streaming::detail;
89 using namespace carla::streaming::low_level;
90
91 constexpr auto number_of_messages = 50u;
92 const std::string message_text = "Hello client!";
93
95
97 srv.SetTimeout(1s);
98
100 for (auto n = 0u; n < 10u; ++n) {
101 auto stream = srv.MakeStream();
102 std::atomic_size_t message_count{0u};
103
104 c.Subscribe(io.service, stream.token(), [&](auto message) {
105 ++message_count;
106 ASSERT_EQ(message.size(), message_text.size());
107 const std::string msg = as_string(message);
108 ASSERT_EQ(msg, message_text);
109 });
110
111 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
113 for (auto i = 0u; i < number_of_messages; ++i) {
114 std::this_thread::sleep_for(4ms);
115 carla::SharedBufferView View = BufView;
116 stream.Write(View);
117 }
118
119 std::this_thread::sleep_for(4ms);
120 c.UnSubscribe(stream.token());
121
122 for (auto i = 0u; i < number_of_messages; ++i) {
123 std::this_thread::sleep_for(2ms);
124 carla::SharedBufferView View = BufView;
125 stream.Write(View);
126 }
127
128 ASSERT_GE(message_count, number_of_messages - 3u);
129 }
130
131 io.service.stop();
132}
133
134TEST(streaming, low_level_tcp_small_message) {
135 using namespace carla::streaming;
136 using namespace carla::streaming::detail;
137
138 boost::asio::io_context io_context;
139 tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
140
141 tcp::Server srv(io_context, ep);
142 srv.SetTimeout(1s);
143 std::atomic_bool done{false};
144 std::atomic_size_t message_count{0u};
145
146 const std::string msg = "Hola!";
147
148 srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
149 ASSERT_EQ(session->get_stream_id(), 1u);
150
151 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
153 while (!done) {
154 std::this_thread::sleep_for(1ns);
155 carla::SharedBufferView View = BufView;
156 session->Write(View);
157 }
158 std::cout << "done!\n";
159 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout << "session closed!\n"; });
160
161 Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(srv.GetLocalEndpoint())};
162 auto stream = dispatcher.MakeStream();
163 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](carla::Buffer message) {
164 ++message_count;
165 ASSERT_FALSE(message.empty());
166 ASSERT_EQ(message.size(), 5u);
167 const std::string received = util::buffer::as_string(message);
168 ASSERT_EQ(received, msg);
169 });
170 c->Connect();
171
172 // We need at least two threads because this server loop consumes one.
173 carla::ThreadGroup threads;
174 threads.CreateThreads(
175 std::max(2u, std::thread::hardware_concurrency()),
176 [&]() { io_context.run(); });
177
178 std::this_thread::sleep_for(2s);
179 io_context.stop();
180 done = true;
181 std::cout << "client received " << message_count << " messages\n";
182 ASSERT_GT(message_count, 10u);
183 c->Stop();
184}
185
186struct DoneGuard {
187 ~DoneGuard() { done = true; };
188 std::atomic_bool &done;
189};
190
191TEST(streaming, stream_outlives_server) {
192 using namespace carla::streaming;
193 using namespace util::buffer;
194 constexpr size_t iterations = 10u;
195 std::atomic_bool done{false};
196 const std::string message = "Hello client, how are you?";
197 std::shared_ptr<Stream> stream;
198
199 carla::ThreadGroup sender;
200 DoneGuard g = {done};
201 sender.CreateThread([&]() {
202
203 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
205 while (!done) {
206 std::this_thread::sleep_for(1ms);
207 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
208 if (s != nullptr) {
209 carla::SharedBufferView View = BufView;
210 s->Write(View);
211 }
212 }
213 });
214
215 for (auto i = 0u; i < iterations; ++i) {
216 Server srv(TESTING_PORT);
217 srv.AsyncRun(2u);
218 {
219 auto s = std::make_shared<Stream>(srv.MakeStream());
220 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
221 }
222 std::atomic_size_t messages_received{0u};
223 {
224 Client c;
225 c.AsyncRun(2u);
226 c.Subscribe(stream->token(), [&](auto buffer) {
227 const std::string result = as_string(buffer);
228 ASSERT_EQ(result, message);
229 ++messages_received;
230 });
231 std::this_thread::sleep_for(20ms);
232 } // client dies here.
233 ASSERT_GT(messages_received, 0u);
234 } // server dies here.
235 std::this_thread::sleep_for(20ms);
236 done = true;
237} // stream dies here.
238
239TEST(streaming, multi_stream) {
240 using namespace carla::streaming;
241 using namespace util::buffer;
242 constexpr size_t number_of_messages = 100u;
243 constexpr size_t number_of_clients = 6u;
244 constexpr size_t iterations = 10u;
245 const std::string message = "Hi y'all!";
246
247 Server srv(TESTING_PORT);
248 srv.AsyncRun(number_of_clients);
249 auto stream = srv.MakeStream();
250
251 for (auto i = 0u; i < iterations; ++i) {
252 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
253
254 for (auto &pair : v) {
255 pair.first = 0u;
256 pair.second = std::make_unique<Client>();
257 pair.second->AsyncRun(1u);
258 pair.second->Subscribe(stream.token(), [&](auto buffer) {
259 const std::string result = as_string(buffer);
260 ASSERT_EQ(result, message);
261 ++pair.first;
262 });
263 }
264
265 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
267 std::this_thread::sleep_for(6ms);
268 for (auto j = 0u; j < number_of_messages; ++j) {
269 std::this_thread::sleep_for(6ms);
270 carla::SharedBufferView View = BufView;
271 stream.Write(View);
272 }
273 std::this_thread::sleep_for(6ms);
274
275 for (auto &pair : v) {
276 ASSERT_GE(pair.first, number_of_messages - 3u);
277 }
278 }
279}
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
Definition BufferView.h:56
A piece of raw data.
void CreateThreads(size_t count, F functor)
Definition ThreadGroup.h:32
void CreateThread(F &&functor)
Definition ThreadGroup.h:27
A client able to subscribe to multiple streams.
void AsyncRun(size_t worker_threads)
void Subscribe(const Token &token, Functor &&callback)
A streaming server.
void AsyncRun(size_t worker_threads)
Keeps the mapping between streams and sessions.
Definition Dispatcher.h:27
carla::streaming::Stream MakeStream()
void SetTimeout(time_duration timeout)
Set session time-out.
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
Start listening for connections.
A client able to subscribe to multiple streams.
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)
boost::asio::io_context::work _work_to_do
boost::asio::io_context service
carla::ThreadGroup _threads
io_context_running(size_t threads=2u)
std::shared_ptr< BufferView > SharedBufferView
Definition BufferView.h:151
std::atomic_bool & done
constexpr uint16_t TESTING_PORT
Definition test.h:24
TEST(streaming, low_level_sending_strings)