54TEST(streaming, low_level_sending_strings) {
65 constexpr auto number_of_messages = 100u;
67 const std::string message_text =
"Hello client!";
70 std::atomic_size_t message_count{0u};
87 ASSERT_EQ(message.size(), message_text.size());
89 const std::string msg = as_string(message);
91 ASSERT_EQ(msg, message_text);
95 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
97 for (
auto i = 0u; i < number_of_messages; ++i) {
99 std::this_thread::sleep_for(2ms);
106 std::this_thread::sleep_for(2ms);
108 ASSERT_GE(message_count, number_of_messages - 3u);
114TEST(streaming, low_level_unsubscribing) {
125 constexpr auto number_of_messages = 50u;
127 const std::string message_text =
"Hello client!";
139 for (
auto n = 0u; n < 10u; ++n) {
143 std::atomic_size_t message_count{0u};
150 ASSERT_EQ(message.size(), message_text.size());
152 const std::string msg = as_string(message);
154 ASSERT_EQ(msg, message_text);
157 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
161 for (
auto i = 0u; i < number_of_messages; ++i) {
163 std::this_thread::sleep_for(4ms);
171 std::this_thread::sleep_for(4ms);
176 for (
auto i = 0u; i < number_of_messages; ++i) {
178 std::this_thread::sleep_for(2ms);
185 ASSERT_GE(message_count, number_of_messages - 3u);
192TEST(streaming, low_level_tcp_small_message) {
196 boost::asio::io_context io_context;
203 std::atomic_bool done{
false};
205 std::atomic_size_t message_count{0u};
207 const std::string msg =
"Hola!";
210 srv.
Listen([&](std::shared_ptr<tcp::ServerSession> session) {
211 ASSERT_EQ(session->get_stream_id(), 1u);
214 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
220 std::this_thread::sleep_for(1ns);
223 session->Write(View);
225 std::cout <<
"done!\n";
226 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout <<
"session closed!\n"; });
231 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](
carla::Buffer message) {
233 ASSERT_FALSE(message.empty());
234 ASSERT_EQ(message.size(), 5u);
235 const std::string received = util::buffer::as_string(message);
237 ASSERT_EQ(received, msg);
245 std::max(2u, std::thread::hardware_concurrency()),
246 [&]() { io_context.run(); });
248 std::this_thread::sleep_for(2s);
251 std::cout <<
"client received " << message_count <<
" messages\n";
252 ASSERT_GT(message_count, 10u);
264TEST(streaming, stream_outlives_server) {
267 constexpr size_t iterations = 10u;
268 std::atomic_bool done{
false};
269 const std::string message =
"Hello client, how are you?";
270 std::shared_ptr<Stream> stream;
276 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
279 std::this_thread::sleep_for(1ms);
280 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
288 for (
auto i = 0u; i < iterations; ++i) {
292 auto s = std::make_shared<Stream>(srv.
MakeStream());
293 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
295 std::atomic_size_t messages_received{0u};
301 c.
Subscribe(stream->token(), [&](
auto buffer) {
302 const std::string result = as_string(buffer);
303 ASSERT_EQ(result, message);
306 std::this_thread::sleep_for(20ms);
308 ASSERT_GT(messages_received, 0u);
310 std::this_thread::sleep_for(20ms);
318 constexpr size_t number_of_messages = 100u;
319 constexpr size_t number_of_clients = 6u;
320 constexpr size_t iterations = 10u;
321 const std::string message =
"Hi y'all!";
329 for (
auto i = 0u; i < iterations; ++i) {
330 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
333 for (
auto &pair : v) {
336 pair.second = std::make_unique<Client>();
337 pair.second->AsyncRun(1u);
338 pair.second->Subscribe(stream.token(), [&](
auto buffer) {
339 const std::string result = as_string(buffer);
340 ASSERT_EQ(result, message);
345 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
347 std::this_thread::sleep_for(6ms);
348 for (
auto j = 0u; j < number_of_messages; ++j) {
349 std::this_thread::sleep_for(6ms);
353 std::this_thread::sleep_for(6ms);
355 for (
auto &pair : v) {
357 ASSERT_GE(pair.first, number_of_messages - 3u);
boost::asio::io_context::work _work_to_do
boost::asio::io_context service
carla::ThreadGroup _threads
io_context_running(size_t threads=2u)