CARLA
 
载入中...
搜索中...
未找到
CarlaEngine.cpp
浏览该文件的文档.
1// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
2// de Barcelona (UAB).
3//
4// This work is licensed under the terms of the MIT license.
5// For a copy, see <https://opensource.org/licenses/MIT>.
6
7#include "Carla.h"
9
17
18#include "Runtime/Core/Public/Misc/App.h"
19#include "PhysicsEngine/PhysicsSettings.h"
21
23#include <carla/Logging.h>
28#include <carla/ros2/ROS2.h>
32
33#include <thread>
34
35// =============================================================================
36// -- Static local methods -----------------------------------------------------
37// =============================================================================
38
39// init static variables
41
43{
44 return std::max(std::thread::hardware_concurrency(), 4u) - 2u;
45}
46
47static TOptional<double> FCarlaEngine_GetFixedDeltaSeconds()
48{
49 return FApp::IsBenchmarking() ? FApp::GetFixedDeltaTime() : TOptional<double>{};
50}
51
52static void FCarlaEngine_SetFixedDeltaSeconds(TOptional<double> FixedDeltaSeconds)
53{
54 FApp::SetBenchmarking(FixedDeltaSeconds.IsSet());
55 FApp::SetFixedDeltaTime(FixedDeltaSeconds.Get(0.0));
56}
57
58// =============================================================================
59// -- FCarlaEngine -------------------------------------------------------------
60// =============================================================================
61
63{
64 if (bIsRunning)
65 {
66 #if defined(WITH_ROS2)
68 if (ROS2->IsEnabled())
69 ROS2->Shutdown();
70 #endif
71 FWorldDelegates::OnWorldTickStart.Remove(OnPreTickHandle);
72 FWorldDelegates::OnWorldPostActorTick.Remove(OnPostTickHandle);
74 }
75}
76
78{
79 TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
80 if (!bIsRunning)
81 {
82 const auto StreamingPort = Settings.StreamingPort;
83 const auto SecondaryPort = Settings.SecondaryPort;
84 const auto PrimaryIP = Settings.PrimaryIP;
85 const auto PrimaryPort = Settings.PrimaryPort;
86
87 auto BroadcastStream = Server.Start(Settings.RPCPort, StreamingPort, SecondaryPort);
89
90 WorldObserver.SetStream(BroadcastStream);
91
92 OnPreTickHandle = FWorldDelegates::OnWorldTickStart.AddRaw(
93 this,
95 OnPostTickHandle = FWorldDelegates::OnWorldPostActorTick.AddRaw(
96 this,
99 this,
101
102 bIsRunning = true;
103
104 // check to convert this as secondary server
105 if (!PrimaryIP.empty())
106 {
107 // we are secondary server, connecting to primary server
108 bIsPrimaryServer = false;
109
110 // define the commands executor (when a command comes from the primary server)
111 auto CommandExecutor = [=](carla::multigpu::MultiGPUCommand Id, carla::Buffer Data) {
112 struct CarlaStreamBuffer : public std::streambuf
113 {
114 CarlaStreamBuffer(char *buf, std::size_t size) { setg(buf, buf, buf + size); }
115 };
116 switch (Id) {
118 {
120 {
121 TRACE_CPUPROFILER_EVENT_SCOPE_STR("MultiGPUCommand::SEND_FRAME");
122 // convert frame data from buffer to istream
123 CarlaStreamBuffer TempStream((char *) Data.data(), Data.size());
124 std::istream InStream(&TempStream);
125 GetCurrentEpisode()->GetFrameData().Read(InStream);
126 {
127 TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.emplace_back");
128 std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
129 FramesToProcess.emplace_back(GetCurrentEpisode()->GetFrameData());
130 }
131 }
132 // forces a tick
133 Server.Tick();
134 break;
135 }
137 {
138 FString FinalPath((char *) Data.data());
140 {
141 UGameplayStatics::OpenLevel(GetCurrentEpisode()->GetWorld(), *FinalPath, true);
142 }
143
144 break;
145 }
147 {
148 // get the sensor id
149 auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
150 // query dispatcher
152 carla::Buffer buf(reinterpret_cast<unsigned char *>(&token), (size_t) sizeof(token));
153 carla::log_info("responding with a token for port ", token.get_port());
154 Secondary->Write(std::move(buf));
155 break;
156 }
158 {
159 std::string msg("Yes, I'm alive");
160 carla::Buffer buf((unsigned char *) msg.c_str(), (size_t) msg.size());
161 carla::log_info("responding is alive command");
162 Secondary->Write(std::move(buf));
163 break;
164 }
166 {
167 // get the sensor id
168 auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
169 // query dispatcher
171 // return a 'true'
172 bool res = true;
173 carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
174 carla::log_info("responding ENABLE_ROS with a true");
175 Secondary->Write(std::move(buf));
176 break;
177 }
179 {
180 // get the sensor id
181 auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
182 // query dispatcher
184 // return a 'true'
185 bool res = true;
186 carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
187 carla::log_info("responding DISABLE_ROS with a true");
188 Secondary->Write(std::move(buf));
189 break;
190 }
192 {
193 // get the sensor id
194 auto sensor_id = *(reinterpret_cast<carla::streaming::detail::stream_id_type *>(Data.data()));
195 // query dispatcher
196 bool res = Server.GetStreamingServer().IsEnabledForROS(sensor_id);
197 carla::Buffer buf(reinterpret_cast<unsigned char *>(&res), (size_t) sizeof(bool));
198 carla::log_info("responding IS_ENABLED_ROS with: ", res);
199 Secondary->Write(std::move(buf));
200 break;
201 }
202 }
203 };
204
205 Secondary = std::make_shared<carla::multigpu::Secondary>(PrimaryIP, PrimaryPort, CommandExecutor);
206 Secondary->Connect();
207 // set this server in synchronous mode
208 bSynchronousMode = true;
209 }
210 else
211 {
212 // we are primary server, starting server
213 bIsPrimaryServer = true;
215 SecondaryServer->SetNewConnectionCallback([this]()
216 {
217 this->bNewConnection = true;
218 UE_LOG(LogCarla, Log, TEXT("New secondary connection detected"));
219 });
220 }
221 }
222
223 // create ROS2 manager
224 #if defined(WITH_ROS2)
225 if (Settings.ROS2)
226 {
227 auto ROS2 = carla::ros2::ROS2::GetInstance();
228 ROS2->Enable(true);
229 }
230 #endif
231
232 bMapChanged = true;
233}
234
236{
237 TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
239 CurrentEpisode = &Episode;
240
241 // Reset map settings
242 UWorld* World = CurrentEpisode->GetWorld();
243 ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
244 if (LargeMapManager)
245 {
248 }
249
250 if (!bIsPrimaryServer)
251 {
252 // set this secondary server with no-rendering mode
254 }
255
257
258 ResetFrameCounter(GFrameNumber);
259
260 // make connection between Episode and Recorder
261 if (Recorder)
262 {
263 Recorder->SetEpisode(&Episode);
264 Episode.SetRecorder(Recorder);
266 }
267
268 Server.NotifyBeginEpisode(Episode);
269
271}
272
278
279void FCarlaEngine::OnPreTick(UWorld *, ELevelTick TickType, float DeltaSeconds)
280{
281 TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
282 if (TickType == ELevelTick::LEVELTICK_All)
283 {
284
286 {
287 if (CurrentEpisode && !bSynchronousMode && SecondaryServer->HasClientsConnected())
288 {
289 // set synchronous mode
294 }
295
296 // process RPC commands
297 do
298 {
299 Server.RunSome(1u);
300 }
302 }
303 else
304 {
305 // process frame data
306 do
307 {
308 Server.RunSome(1u);
309 }
310 while (!FramesToProcess.size());
311 }
312
313 // update frame counter
315
316 if (CurrentEpisode)
317 {
318 CurrentEpisode->TickTimers(DeltaSeconds);
319
320 if (!bIsPrimaryServer)
321 {
322 if (FramesToProcess.size())
323 {
324 TRACE_CPUPROFILER_EVENT_SCOPE_STR("FramesToProcess.PlayFrameData");
325 std::lock_guard<std::mutex> Lock(FrameToProcessMutex);
326 FramesToProcess.front().PlayFrameData(CurrentEpisode, MappedId);
327 FramesToProcess.erase(FramesToProcess.begin()); // remove first element
328 }
329 }
330 }
331 }
332}
333
334
335void FCarlaEngine::OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
336{
337 TRACE_CPUPROFILER_EVENT_SCOPE_STR(__FUNCTION__);
338 // tick the recorder/replayer system
339 if (GetCurrentEpisode())
340 {
342 {
343 if (SecondaryServer->HasClientsConnected()) {
345 bNewConnection = false;
346 std::ostringstream OutStream;
347 GetCurrentEpisode()->GetFrameData().Write(OutStream);
348
349 // send frame data to secondary
350 std::string Tmp(OutStream.str());
351 SecondaryServer->GetCommander().SendFrameData(carla::Buffer(std::move((unsigned char *) Tmp.c_str()), (size_t) Tmp.size()));
352
354 }
355 }
356
357 auto* EpisodeRecorder = GetCurrentEpisode()->GetRecorder();
358 if (EpisodeRecorder)
359 {
360 EpisodeRecorder->Ticking(DeltaSeconds);
361 }
362 }
363
364 if ((TickType == ELevelTick::LEVELTICK_All) && (CurrentEpisode != nullptr))
365 {
366 // Look for lightsubsystem
367 bool LightUpdatePending = false;
368 if (World)
369 {
370 UCarlaLightSubsystem* CarlaLightSubsystem = World->GetSubsystem<UCarlaLightSubsystem>();
371 if (CarlaLightSubsystem)
372 {
373 LightUpdatePending = CarlaLightSubsystem->IsUpdatePending();
374 }
375 }
376
377 // send the worldsnapshot
378 WorldObserver.BroadcastTick(*CurrentEpisode, DeltaSeconds, bMapChanged, LightUpdatePending);
379 CurrentEpisode->GetSensorManager().PostPhysTick(World, TickType, DeltaSeconds);
381 }
382}
383
385{
387
389
390 if (GEngine && GEngine->GameViewport)
391 {
392 GEngine->GameViewport->bDisableWorldRendering = Settings.bNoRenderingMode;
393 }
394
396
397 // Setting parameters for physics substepping
398 UPhysicsSettings* PhysSett = UPhysicsSettings::Get();
399 PhysSett->bSubstepping = Settings.bSubstepping;
400 PhysSett->MaxSubstepDeltaTime = Settings.MaxSubstepDeltaTime;
401 PhysSett->MaxSubsteps = Settings.MaxSubsteps;
402
403 UWorld* World = CurrentEpisode->GetWorld();
404 ALargeMapManager* LargeMapManager = UCarlaStatics::GetLargeMapManager(World);
405 if (LargeMapManager)
406 {
407 LargeMapManager->SetLayerStreamingDistance(Settings.TileStreamingDistance);
408 LargeMapManager->SetActorStreamingDistance(Settings.ActorActiveDistance);
409 }
410}
411
static void FCarlaEngine_SetFixedDeltaSeconds(TOptional< double > FixedDeltaSeconds)
static uint32 FCarlaEngine_GetNumberOfThreadsForRPCServer()
static TOptional< double > FCarlaEngine_GetFixedDeltaSeconds()
void Ticking(float DeltaSeconds)
CarlaReplayer * GetReplayer(void)
void SetEpisode(UCarlaEpisode *ThisEpisode)
float GetLayerStreamingDistance() const
void SetActorStreamingDistance(float Distance)
void SetLayerStreamingDistance(float Distance)
float GetActorStreamingDistance() const
void CheckPlayAfterMapLoaded(void)
void OnEpisodeSettingsChanged(const FEpisodeSettings &Settings)
FDelegateHandle OnEpisodeSettingsChangeHandle
std::shared_ptr< carla::multigpu::Router > SecondaryServer
std::mutex FrameToProcessMutex
void OnPreTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
static uint64_t FrameCounter
Definition CarlaEngine.h:35
void ResetSimulationState()
FWorldObserver WorldObserver
FCarlaServer Server
static void ResetFrameCounter(uint64_t Value=0)
Definition CarlaEngine.h:81
void NotifyEndEpisode()
std::unordered_map< uint32_t, uint32_t > MappedId
FEpisodeSettings CurrentSettings
bool bNewConnection
static uint64_t UpdateFrameCounter()
Definition CarlaEngine.h:70
UCarlaEpisode * CurrentEpisode
std::vector< FFrameData > FramesToProcess
void NotifyBeginEpisode(UCarlaEpisode &Episode)
void NotifyInitGame(const UCarlaSettings &Settings)
ACarlaRecorder * Recorder
bool bIsPrimaryServer
bool bSynchronousMode
FDelegateHandle OnPreTickHandle
FDelegateHandle OnPostTickHandle
std::shared_ptr< carla::multigpu::Secondary > Secondary
UCarlaEpisode * GetCurrentEpisode()
Definition CarlaEngine.h:55
void OnPostTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
bool TickCueReceived()
std::shared_ptr< carla::multigpu::Router > GetSecondaryServer()
void RunSome(uint32 Milliseconds)
carla::streaming::Server & GetStreamingServer()
void NotifyEndEpisode()
void NotifyBeginEpisode(UCarlaEpisode &Episode)
FDataMultiStream Start(uint16_t RPCPort, uint16_t StreamingPort, uint16_t SecondaryPort)
void AsyncRun(uint32 NumberOfWorkerThreads)
static FOnEpisodeSettingsChange OnEpisodeSettingsChange
void GetFrameData(UCarlaEpisode *ThisEpisode, bool bAdditionalData=false, bool bIncludeActorsAgain=false)
Definition FrameData.cpp:23
void Read(std::istream &InStream)
void Write(std::ostream &OutStream)
void Clear()
void PostPhysTick(UWorld *World, ELevelTick TickType, float DeltaSeconds)
void SetStream(FDataMultiStream InStream)
Replace the Stream associated with this sensor.
void BroadcastTick(const UCarlaEpisode &Episode, float DeltaSeconds, bool MapChange, bool PendingLightUpdate)
Send a message to every connected client with the info about the given Episode.
A simulation episode.
FFrameData & GetFrameData()
FEpisodeSettings EpisodeSettings
void ApplySettings(const FEpisodeSettings &Settings)
void SetRecorder(ACarlaRecorder *Rec)
void TickTimers(float DeltaSeconds)
ACarlaRecorder * GetRecorder() const
FSensorManager & GetSensorManager()
Global settings for CARLA.
uint32 StreamingPort
setting for the streaming port.
std::string PrimaryIP
setting for the IP and Port of the primary server to connect.
uint32 RPCPort
World port to listen for client connections.
uint32 SecondaryPort
setting for the secondary servers port.
static ALargeMapManager * GetLargeMapManager(const UObject *WorldContextObject)
A piece of raw data.
static std::shared_ptr< ROS2 > GetInstance()
Definition ROS2.h:51
token_type GetToken(stream_id sensor_id)
bool IsEnabledForROS(stream_id sensor_id)
void EnableForROS(stream_id sensor_id)
void DisableForROS(stream_id sensor_id)
Serializes a stream endpoint.
uint32_t stream_id_type
Definition Types.h:18
static void log_info(Args &&... args)
Definition Logging.h:82