45TEST(streaming, low_level_sending_strings) {
51 constexpr auto number_of_messages = 100u;
52 const std::string message_text =
"Hello client!";
54 std::atomic_size_t message_count{0u};
66 ASSERT_EQ(message.size(), message_text.size());
67 const std::string msg = as_string(message);
68 ASSERT_EQ(msg, message_text);
71 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
73 for (
auto i = 0u; i < number_of_messages; ++i) {
74 std::this_thread::sleep_for(2ms);
79 std::this_thread::sleep_for(2ms);
80 ASSERT_GE(message_count, number_of_messages - 3u);
85TEST(streaming, low_level_unsubscribing) {
91 constexpr auto number_of_messages = 50u;
92 const std::string message_text =
"Hello client!";
100 for (
auto n = 0u; n < 10u; ++n) {
102 std::atomic_size_t message_count{0u};
106 ASSERT_EQ(message.size(), message_text.size());
107 const std::string msg = as_string(message);
108 ASSERT_EQ(msg, message_text);
111 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
113 for (
auto i = 0u; i < number_of_messages; ++i) {
114 std::this_thread::sleep_for(4ms);
119 std::this_thread::sleep_for(4ms);
122 for (
auto i = 0u; i < number_of_messages; ++i) {
123 std::this_thread::sleep_for(2ms);
128 ASSERT_GE(message_count, number_of_messages - 3u);
134TEST(streaming, low_level_tcp_small_message) {
138 boost::asio::io_context io_context;
143 std::atomic_bool done{
false};
144 std::atomic_size_t message_count{0u};
146 const std::string msg =
"Hola!";
148 srv.
Listen([&](std::shared_ptr<tcp::ServerSession> session) {
149 ASSERT_EQ(session->get_stream_id(), 1u);
151 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
154 std::this_thread::sleep_for(1ns);
156 session->Write(View);
158 std::cout <<
"done!\n";
159 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout <<
"session closed!\n"; });
163 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](
carla::Buffer message) {
165 ASSERT_FALSE(message.empty());
166 ASSERT_EQ(message.size(), 5u);
167 const std::string received = util::buffer::as_string(message);
168 ASSERT_EQ(received, msg);
175 std::max(2u, std::thread::hardware_concurrency()),
176 [&]() { io_context.run(); });
178 std::this_thread::sleep_for(2s);
181 std::cout <<
"client received " << message_count <<
" messages\n";
182 ASSERT_GT(message_count, 10u);
191TEST(streaming, stream_outlives_server) {
194 constexpr size_t iterations = 10u;
195 std::atomic_bool done{
false};
196 const std::string message =
"Hello client, how are you?";
197 std::shared_ptr<Stream> stream;
203 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
206 std::this_thread::sleep_for(1ms);
207 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
215 for (
auto i = 0u; i < iterations; ++i) {
219 auto s = std::make_shared<Stream>(srv.
MakeStream());
220 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
222 std::atomic_size_t messages_received{0u};
226 c.
Subscribe(stream->token(), [&](
auto buffer) {
227 const std::string result = as_string(buffer);
228 ASSERT_EQ(result, message);
231 std::this_thread::sleep_for(20ms);
233 ASSERT_GT(messages_received, 0u);
235 std::this_thread::sleep_for(20ms);
242 constexpr size_t number_of_messages = 100u;
243 constexpr size_t number_of_clients = 6u;
244 constexpr size_t iterations = 10u;
245 const std::string message =
"Hi y'all!";
251 for (
auto i = 0u; i < iterations; ++i) {
252 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
254 for (
auto &pair : v) {
256 pair.second = std::make_unique<Client>();
257 pair.second->AsyncRun(1u);
258 pair.second->Subscribe(stream.token(), [&](
auto buffer) {
259 const std::string result = as_string(buffer);
260 ASSERT_EQ(result, message);
265 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
267 std::this_thread::sleep_for(6ms);
268 for (
auto j = 0u; j < number_of_messages; ++j) {
269 std::this_thread::sleep_for(6ms);
273 std::this_thread::sleep_for(6ms);
275 for (
auto &pair : v) {
276 ASSERT_GE(pair.first, number_of_messages - 3u);
boost::asio::io_context::work _work_to_do
boost::asio::io_context service
carla::ThreadGroup _threads
io_context_running(size_t threads=2u)