27using namespace std::chrono_literals;
54TEST(streaming, low_level_sending_strings) {
65 constexpr auto number_of_messages = 100u;
67 const std::string message_text =
"Hello client!";
70 std::atomic_size_t message_count{0u};
87 ASSERT_EQ(message.size(), message_text.size());
89 const std::string msg = as_string(message);
91 ASSERT_EQ(msg, message_text);
95 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
97 for (
auto i = 0u; i < number_of_messages; ++i) {
99 std::this_thread::sleep_for(2ms);
106 std::this_thread::sleep_for(2ms);
108 ASSERT_GE(message_count, number_of_messages - 3u);
114TEST(streaming, low_level_unsubscribing) {
125 constexpr auto number_of_messages = 50u;
127 const std::string message_text =
"Hello client!";
139 for (
auto n = 0u; n < 10u; ++n) {
143 std::atomic_size_t message_count{0u};
150 ASSERT_EQ(message.size(), message_text.size());
152 const std::string msg = as_string(message);
154 ASSERT_EQ(msg, message_text);
157 carla::Buffer Buf(boost::asio::buffer(message_text.c_str(), message_text.size()));
161 for (
auto i = 0u; i < number_of_messages; ++i) {
163 std::this_thread::sleep_for(4ms);
171 std::this_thread::sleep_for(4ms);
176 for (
auto i = 0u; i < number_of_messages; ++i) {
178 std::this_thread::sleep_for(2ms);
185 ASSERT_GE(message_count, number_of_messages - 3u);
192TEST(streaming, low_level_tcp_small_message) {
196 boost::asio::io_context io_context;
203 std::atomic_bool done{
false};
205 std::atomic_size_t message_count{0u};
207 const std::string msg =
"Hola!";
210 srv.
Listen([&](std::shared_ptr<tcp::ServerSession> session) {
211 ASSERT_EQ(session->get_stream_id(), 1u);
214 carla::Buffer Buf(boost::asio::buffer(msg.c_str(), msg.size()));
220 std::this_thread::sleep_for(1ns);
223 session->Write(View);
225 std::cout <<
"done!\n";
226 }, [](std::shared_ptr<tcp::ServerSession>) { std::cout <<
"session closed!\n"; });
231 auto c = std::make_shared<tcp::Client>(io_context, stream.token(), [&](
carla::Buffer message) {
233 ASSERT_FALSE(message.empty());
234 ASSERT_EQ(message.size(), 5u);
235 const std::string received = util::buffer::as_string(message);
237 ASSERT_EQ(received, msg);
245 std::max(2u, std::thread::hardware_concurrency()),
246 [&]() { io_context.run(); });
248 std::this_thread::sleep_for(2s);
251 std::cout <<
"client received " << message_count <<
" messages\n";
252 ASSERT_GT(message_count, 10u);
264TEST(streaming, stream_outlives_server) {
267 constexpr size_t iterations = 10u;
268 std::atomic_bool done{
false};
269 const std::string message =
"Hello client, how are you?";
270 std::shared_ptr<Stream> stream;
276 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
279 std::this_thread::sleep_for(1ms);
280 auto s = std::atomic_load_explicit(&stream, std::memory_order_relaxed);
288 for (
auto i = 0u; i < iterations; ++i) {
292 auto s = std::make_shared<Stream>(srv.
MakeStream());
293 std::atomic_store_explicit(&stream, s, std::memory_order_relaxed);
295 std::atomic_size_t messages_received{0u};
301 c.
Subscribe(stream->token(), [&](
auto buffer) {
302 const std::string result = as_string(buffer);
303 ASSERT_EQ(result, message);
306 std::this_thread::sleep_for(20ms);
308 ASSERT_GT(messages_received, 0u);
310 std::this_thread::sleep_for(20ms);
318 constexpr size_t number_of_messages = 100u;
319 constexpr size_t number_of_clients = 6u;
320 constexpr size_t iterations = 10u;
321 const std::string message =
"Hi y'all!";
329 for (
auto i = 0u; i < iterations; ++i) {
330 std::vector<std::pair<std::atomic_size_t, std::unique_ptr<Client>>> v(number_of_clients);
333 for (
auto &pair : v) {
336 pair.second = std::make_unique<Client>();
337 pair.second->AsyncRun(1u);
338 pair.second->Subscribe(stream.token(), [&](
auto buffer) {
339 const std::string result = as_string(buffer);
340 ASSERT_EQ(result, message);
345 carla::Buffer Buf(boost::asio::buffer(message.c_str(), message.size()));
347 std::this_thread::sleep_for(6ms);
348 for (
auto j = 0u; j < number_of_messages; ++j) {
349 std::this_thread::sleep_for(6ms);
353 std::this_thread::sleep_for(6ms);
355 for (
auto &pair : v) {
357 ASSERT_GE(pair.first, number_of_messages - 3u);
static std::shared_ptr< BufferView > CreateFrom(Buffer &&buffer)
一块原始数据。 请注意,如果需要更多容量,则会分配一个新的内存块,并 删除旧的内存块。这意味着默认情况下,缓冲区只能增长。要释放内存,使用 clear 或 pop。
void CreateThreads(size_t count, F functor)
void CreateThread(F &&functor)
void AsyncRun(size_t worker_threads)
void Subscribe(const Token &token, Functor &&callback)
void AsyncRun(size_t worker_threads)
Keeps the mapping between streams and sessions.
carla::streaming::Stream MakeStream()
警告:在io_context停止之前,不能销毁这个服务器实例
void SetTimeout(time_duration timeout)
设置会话超时时间,仅对新创建的会话有效,默认为10秒
boost::asio::ip::tcp::endpoint endpoint
endpoint GetLocalEndpoint() const
void Listen(FunctorT1 on_session_opened, FunctorT2 on_session_closed)
A client able to subscribe to multiple streams.
void Subscribe(boost::asio::io_context &io_context, token_type token, Functor &&callback)
void UnSubscribe(token_type token)
一个低级的流媒体服务器。每个新流都有一个关联的令牌, 客户端可以使用这个令牌来订阅流。此服务器需要外部 io_context 运行。
void SetTimeout(time_duration timeout)
boost::asio::io_context::work _work_to_do
boost::asio::io_context service
carla::ThreadGroup _threads
io_context_running(size_t threads=2u)
包含Boost.Asio库中的IP地址类定义。 包含Boost.Asio库中的TCP协议类定义。
std::shared_ptr< BufferView > SharedBufferView
constexpr uint16_t TESTING_PORT
TEST(streaming, low_level_sending_strings)