交通流量预测
注意
运行该脚本之前,应先运行generate_traffic.py并确保在town10中成功放置车辆。
main函数中首先定义了argparser和carla模块,其中,argparse模块用于解析命令行参数,包括host(主机IP地址)、port(TCP端口号)、traffic_id(交通灯ID)、color_id(交通灯颜色ID)和color_time(交通灯时间)。
args= argparser.parse_arg()
argparser = argparse.ArgumentParser(description=__doc__)
argparser.add_argument('--host', metavar='H', default='127.0.0.1', help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument('-p', '--port', metavar='P', default=2000, type=int, help='TCP port to listen to (default: 2000)')
argparser.add_argument('--traffic_id', metavar='I', default=1, type=int, help='traffic light id')
argparser.add_argument('--color_id', metavar='C', default=1, type=int, help='traffic light color id')
argparser.add_argument('--color_time', metavar='T', default=20, type=int, help='set traffic light time')
使用指定的主机IP地址和端口号连接到Carla模拟器。同时,设置与模拟器通信的超时时间。最后,代码通过获取Carla模拟器的世界对象,可以在模拟环境中进行交通灯的操作和控制。
client = carla.Client(args.host, args.port)
client.set_timeout(10.0)
world = client.get_world()
使用world.get_settings()方法获取Carla模拟器的当前设置。将Carla模拟器的设置更改为同步模式,并将时间步长设置为0.05秒,以便以固定的时间间隔进行模拟,控制模拟的速度和精确度。
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05 #模拟环境会以0.05秒的时间前进。
world.apply_settings(settings)
获取所有正在行驶的车辆列表
vehicle_list = world.get_actors().filter('vehicle.*')
划定路口范围
xmax = -11
xmin = -77
ymax = 57
ymin = -13
创建多个数据结构,其中vehicle_positions用于存储车辆实时位置,traffic_data用于存储路口实时的车流量数据,timestamps用于存储时间戳,traffic_flow用于统计车流量。
vehicle_positions = {}
traffic_data = []
timestamps = []
traffic_flow = []
统计车流量数据。假设模拟时间步一共为1000。在每个时间步中,遍历所有正在行驶的车辆。对于每辆车辆,获取其位置信息,并判断其是否在目标路口的区域内。如果车辆在区域内,将计数器加1,并将车辆的位置信息存储到字典vehicle_positions中。
for t in range(1000):
world.tick()
count = 0
for vehicle in vehicle_list:
location = vehicle.get_location()
x = location.x
y = location.y
z = location.z
if x <= xmax and x >= xmin and y <= ymax and y >= ymin:
count += 1
vehicle_positions[vehicle.id] = (x, y, z)
对应的时间戳和车流量数据分别存入timestamps、traffic_flow, 而traffic_data包括这两种数据。
traffic_data.append((t, count))
timestamps.append(t)
traffic_flow.append(count)
traffic_data写入csv文件
filename = "traffic_data.csv"
with open(filename, "w", newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(["head", "value"]) # 写入表头
for data in traffic_data:
writer.writerow([data[0], data[1]]) # 写入数据
data = load_data('traffic_data.csv', 200)#读取数据
从交通流量文件中加载数据并进行预处理。首先将数据读取为DataFrame对象,并绘制原始数据的折线图。通过指定的时间步大小对数据进行切片,形成训练样本并返回。
def load_data(filename, time_step):
df = pd.read_csv(filename)
data = df.value
plt.title('original data')
plt.plot(data)
plt.show()
result = []
for index in range(len(data) - time_step):
result.append(data[index:index + time_step + 1])
return np.array(result)
使用MinMaxScaler对数据进行*归一化处理,然后按照7:3的比例将数据集划分为训练集和测试集。
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(data)
train_count = int(0.7 * len(dataset))
x_train_set, x_test_set = dataset[:train_count, :-1], dataset[train_count:, :-1]
y_train_set, y_test_set = dataset[:train_count, -1], dataset[train_count:, -1]
对训练集和测试集的输入和输出数据进行维度调整,其中输入数据x_train_set、x_test_set 为三维,如(样本数量, 时间步数, 特征数量),输出数据y_train_set、y_test_set为二维,如(样本数量, 1)。以满足LSTM模型的输入要求。
x_train_set = x_train_set.reshape(x_train_set.shape[0], 1, x_train_set.shape[1])
x_test_set = x_test_set.reshape(x_test_set.shape[0], 1, x_test_set.shape[1])
y_train_set = y_train_set.reshape(y_train_set.shape[0], 1)
y_test_set = y_test_set.reshape(y_test_set.shape[0], 1)
定义并创建了层次结构为[200,500,500,1]LSTM模型,其中200,500,500,1分别表示模型的三个隐层层的神经元数量,1为输出层的神经元数量。
layer = [200, 500, 500, 1]
model = build_lstm_model(layer)
构建LST模型。该函数创建Sequential模型对象,用于按顺序堆叠神经网络层。添加LSTM层、Dropout层和Dense全连接层,以及设置损失函数和优化器,提供一个简洁的方式来创建和编译LSTM神经网络模型,以便进行后续的训练和预测操作。
def build_lstm_model(layer):
model = Sequential()
model.add(LSTM(
input_shape=(1, layer[0]), units=layer[1], return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(
units=layer[2], return_sequences=False))
model.add(Dropout(0.2))
model.add(Dense(units=layer[3], activation='linear'))
model.compile(loss='mse', optimizer='adam')
return model
训练模型。将准备好的训练集数据x_train_set和对应的目标值y_train_set,对构建好的模型进行训练。其中,batch_size为每次迭代使用的样本数量,epochs为训练的迭代次数 ,validation_split: 验证集的比例。
model.fit(x_train_set, y_train_set, batch_size=100, epochs=25, validation_split=0.2)
预测未来交通流量。使用训练好的模型对测试集数据x_test_set进行预测,得到预测结果y_predicted。
y_predicted = model.predict(x_test_set)
绘制曲线,首先进行了数据的填充和逆标准化处理,将其转换回原始数据的范围,然后调用plot_curve函数进行曲线绘制,以便可视化对比模型的预测结果和真实的原始数据,以便进行结果评估和分析。
temp = np.zeros((len(y_test_set), 200))
origin_temp = np.hstack((temp, y_test_set))
predict_temp = np.hstack((temp, y_predicted))
origin_test = scaler.inverse_transform(origin_temp)
predict_test = scaler.inverse_transform(predict_temp)
predict_test = np.round(predict_test).astype(int)
plot_curve(origin_test[:, -1], predict_test[:, -1])
定义plot_curve函数用于绘制真实数据和预测数据的曲线图。下图基于200秒之前的历史交通流量数据,对未来时间序列的流量预测。
def plot_curve(true_data, predicted_data):
fig, ax = plt.subplots()
ax.plot(true_data, label='True data')
ax.plot(predicted_data, label='Predicted data')
ax.legend()
# 设置y轴的格式化为整数
ax.yaxis.set_major_formatter(ticker.ScalarFormatter(useOffset=False, useMathText=True))
ax.yaxis.set_major_locator(ticker.MaxNLocator(integer=True))
plt.show()
下图基于200秒的历史交通流量数据,对未来时间序列的流量预测。