任务复现
环境配置
conda create -n trafficpy39 python=3.9 #用conda创建python3.9版本的名为trafficpy39的虚拟环境
conda activate trafficpy39 #激活该虚拟环境
conda install tensorflow==2.10.0
# 如果需要独立GPU版本,可改成(需要CUDA 11.2 + cuDNN 8.1,建议参考官方文档)
# conda install -c conda-forge cudatoolkit=11.2 cudnn=8.1
# pip install tensorflow==2.10.0
# 有可能版本冲突,所以写在这里
conda install "numpy>=1.20,<1.24"
conda install "scikit-learn>=0.24,<1.3"
跑模型代码再README.md文件中
结果保存再results文件夹中
模型代码详讲
数据集
Chicago文件夹中存放的是芝加哥的数据集
NYC文件夹中存放的是纽约的数据集
其中.npy文件是NumPy 库的二进制序列化文件格式 ,专门用来存储 NumPy 数组(可保留数据类型、形状、维度),比 txt 更高效(读写速度快、占用空间小),是深度学习中存储数据集的常用格式。
文件夹中各个.npy文件代表的含义:
dict:坐标,每个网格经纬度
label:存储每个时间步、每个网格的交通异常标签(0 = 正常,1 = 拥堵 / 事故等异常)
POI:存储每个网格的 POI(兴趣点)属性(比如是否有商场、学校、医院等,以数值编码表示)
record:存储每个时间步、每个网格的动态交通数据(比如车流量、速度、流量变化率等)
road:存储每个网格的道路属性(比如道路类型、车道数、限速等,以数值编码表示)
threshold:存储每个时间步、每个网格的事件阈值参数(0~1 之间的数值)
data:已经融合好的数据(直接完成了时空编码以及静态嵌入)
重要参数
| 参数名 | 取值 | 业务含义 |
|---|---|---|
| number_region (N) | 27 | 芝加哥交通网格总数(简化版) |
| dr2 | 58 | 基础特征维度(静态/动态各 58 维) |
| len_recent_time | 5 | 模型输入时间步(理论 15min 一个,实际 1h) |
| T_total | 8784 | 数据集总时间步(约 8784/24≈366 天) |
| 特征归一化值域 | 0~1 | 所有特征统一值域,适配激活函数 |
| 标签值 | 0/1 | 0=交通正常,1=交通异常(拥堵/事故) |
详细内容代表的含义:
在.txt文件中1代表有这个东西,0代表没有这个东西;
在.npy文件中,首先代开的代码在README.md文件中
#例如:
conda activate trafficpy39 #激活该虚拟环境
#查看数据集文件:
python test.py nyc/data_nyc.npy --max_print 20
#示范:
(trafficpy39) PS D:\project\trafficcompare> python test.py nyc/data_nyc.npy --max_print 20
type: <class 'numpy.ndarray'>
dtype: float32
shape: (13128, 64, 116)
first 20 elements (flattened):
[0. 0. 0. 0. 1. 0.
0. 0. 1. 0. 0. 0.03508772
0. 0.02702703 0. 0. 0.03125 0.
0.07142857 0. ]
其中shape: (13128, 64, 116):
第一维:时间步,13128代表有13128个时间节点;
第二维:区域数量,64代表在这里面划了64个区域
第三维:特征维数,116代表有116维
底下的[ ]中的代表融合好的特征,“0.”静态特征没有,“0.03125 ”动态特征比如交通流量。
原始数据集文件展示:
1代表有这个东西,0代表没有这个东西,核心内容是它代表的这个东西是什么
| 文件名称 | 格式 | 形状(简化版) | 核心内容(带实际数据片段) |
|---|---|---|---|
| road_ad.txt | .txt | [27, 27] | 列 1 = 主干道,列 2 = 十字路口,列 3 = 单行路…(27 类道路属性) |
| poi_ad.txt | .txt | [27, 27] | 列 1 = 有商场,列 2 = 有学校,列 3 = 有医院…(27 类 POI) |
| record_ad.txt | .txt | [27, 27] | 列 1 = 高峰拥堵区,列 2 = 交通枢纽…(27 类流量属性不是0也不是1是0.几几) |
| dict_xy.npy | .npy | [27, 2] | 网格地理坐标,示例数据:{0: (0, 2), 1: (0, 3)… 26: (5, 4)},映射关系 |
| data_chicago.npy | .npy | (8784, 27, 116) | 融合好的数据,[[0.8, 1.0, 0.0, 0.9, 1.0, 0.625, …] …][…] |
| label.npy | .npy | (8784, 27) | 交通异常标签,示例:[[0, 0, 1, …, 0], [0, 1, 0, …, 0], …](第 1 时间步网格 3 异常) |
| threshold_nc.npy | .npy | (8784, 27, 1) | 8784 个时间步,27 个网格,1 维阈值(控制静态 / 动态特征融合比例),[[[0.7]], [[0.65]], …](网格 1 的阈值 = 0.7)0.7X静态+0.3X一个动态 |
动态演化嵌入
前两部分在融合数据集直接有了
在model.py文件里面class Evolution(Layer):
类的初始化定义
def __init__(self, dr2, use_smooth_gate=True, **kwargs):
# 定义维度,也就是ppt里讲的2D
self.dr2 = dr2
self.use_smooth_gate = bool(use_smooth_gate)#添加的自适应平滑门
# 继承父类的属性和方法
super(Evolution, self).__init__(**kwargs)
build函数
作用:定义权重矩阵和参数,还有自适应平滑门功能
#这就是权重矩阵
def build(self, input_shape):
self.w1 = self.add_weight(name='wl',
shape=(2 * self.dr2, self.dr2),#w1和w2合成一个总的w加快速度
initializer=keras.initializers.RandomNormal(mean=1.0, stddev=0.5, seed=2021),
trainable=True)
# 平滑门控参数:控制「软更新比例」,这也是咱后加的创新点,logit控制偏向旧状态的程度
# 初始值 1.3862944 = ln(4)
# sigmoid(ln(4)) = 4/(1+4) = 0.8
# 初始偏向:80%新信息 + 20%旧信息
self.smooth_gate_logit = self.add_weight(
name='smooth_gate_logit',
shape=(1,),
initializer=keras.initializers.Constant(1.3862944),
trainable=True)
# 平滑门控参数:控制「软更新比例」,平滑缩放因子,控制变化量的敏感度
# smooth_scale > 1.0:放大变化,网络更"灵敏"
# smooth_scale < 1.0:抑制变化,网络更"平滑"
# smooth_scale ≈ 0.0:几乎忽略输入变化
self.smooth_scale = self.add_weight(
name='smooth_scale',
shape=(1,),
initializer=keras.initializers.Constant(1.0),
trainable=True)
super(Evolution, self).build(input_shape)
call函数
作用:有点类似于main函数,把之前写的东西都调用一遍;这边是把动态演化嵌入的三步操作了。
def call(self, all_data_static, threshold_nc, all_data_dynamic_now):
# 1. 初始化动态特征序列:给初始动态特征加时间维度,形状从[N,dr2] → [1, N, dr2](训练开始时,所有网格的动态嵌入初始化为 0 向量)
all_data_dynamic = tf.expand_dims(all_data_dynamic_now, 0)
# 2. 缓存初始动态特征,作为「旧特征」(空白向量会和流量等动态特征相加)
all_data_dynamic_now_prev = all_data_dynamic_now
# 3. 计算【候选更新的动态特征】:核心的Update,decay过程,初始化
all_data_dynamic_now_candidate = tf.sigmoid(
tf.matmul(tf.concat([all_data_dynamic_now, all_data_static[0]], axis=-1), self.w1)
* tf.repeat(threshold_nc[0], self.dr2, axis=-1) + all_data_dynamic_now #它和上一行执行Update操作
* tf.repeat(1 - threshold_nc[0], self.dr2, axis=-1)) * math.exp(-1 / 2) #执行Decay操作
if self.use_smooth_gate: #是否使用平滑门
delta0 = tf.reduce_mean(tf.abs(all_data_dynamic_now_candidate - all_data_dynamic_now_prev), axis=-1, keepdims=True)
smooth_gate0 = tf.sigmoid(self.smooth_gate_logit + self.smooth_scale * delta0)
all_data_dynamic_now = smooth_gate0 * all_data_dynamic_now_prev + (1 - smooth_gate0) * all_data_dynamic_now_candidate
else:
all_data_dynamic_now = all_data_dynamic_now_candidate
all_data_dynamic_diff = [] #以上对第0个时间步进行初始化
# 获取总时间步T
time_steps = threshold_nc.shape[0]
# 遍历从第1步到最后1步,逐时间步更新动态特征(重复上面第0个时间步的操作)
for i in range(1, time_steps):
all_data_dynamic_now_diff = all_data_dynamic_now
all_data_dynamic_now_prev = all_data_dynamic_now
# 【和初始化阶段完全相同的核心更新逻辑】生成候选特征
all_data_dynamic_now_candidate = tf.sigmoid(
tf.matmul(tf.concat([all_data_dynamic_now, all_data_static[i]], axis=-1), self.w1)
* tf.repeat(threshold_nc[i], self.dr2, axis=-1) + all_data_dynamic_now
* tf.repeat(1 - threshold_nc[i], self.dr2, axis=-1)) * math.exp(-1 / 2)
if self.use_smooth_gate:
delta = tf.reduce_mean(tf.abs(all_data_dynamic_now_candidate - all_data_dynamic_now_prev), axis=-1, keepdims=True)
smooth_gate = tf.sigmoid(self.smooth_gate_logit + self.smooth_scale * delta)
all_data_dynamic_now = smooth_gate * all_data_dynamic_now_prev + (1 - smooth_gate) * all_data_dynamic_now_candidate
else:
all_data_dynamic_now = all_data_dynamic_now_candidate
# 计算「特征差分」:当前特征 - 上一步特征 → 捕捉动态特征的变化率(流量变化/风险变化)
all_data_dynamic_now_diff = all_data_dynamic_now - all_data_dynamic_now_diff
all_data_dynamic_diff.append(tf.expand_dims(all_data_dynamic_now_diff, 0))
# 把当前时间步的动态特征,拼接到完整序列中
all_data_dynamic = tf.concat([all_data_dynamic, tf.expand_dims(all_data_dynamic_now, 0)], axis=0)
# 最终输出
all_data_dynamic_diff = tf.concat(all_data_dynamic_diff, axis=0)
return all_data_dynamic, all_data_dynamic_now, all_data_dynamic_diff
空间相互影响学习
选择周围邻居中对我影响最大的一个
在model.py文件里面class Attention(Layer)
单分支的空间注意力机制,简单的说也就是让每个独立的交通网格,自适应学习周边邻居网格的特征
类的初始化定义
def __init__(self, dr2, len_recent_time, number_region, mode='scaled_dot', **kwargs):
# 统一的维度2D
self.dr2 = dr2
# 时间总步长,和动态层统一
self.len_recent_time = len_recent_time
# 城市网格数量
self.number_region = number_region
# 要学习的模式,有两种选择:scaled_dot(点积学习), mean(平均学习),如何把邻居网格的特征整合到当前网格
self.mode = mode
super(Attention, self).__init__(**kwargs)
build函数
这个里面的参数是权重矩阵,一共需要多少个呢?
- 输入不需要权重矩阵
- 注意力权重计算:
- 定义Q需要一个,K、V共享一个权重矩阵-
- 计算注意力权重需要的前面有了
- 邻居特征聚合:$W_{ds}$,看一个空间网格对周围邻居的学习权重是多少
所以一个三个权重矩阵
# 初始化【3 个核心可训练权重】
def build(self, input_shape):
self.wq = self.add_weight(
shape=(self.dr2, self.dr2),
initializer=keras.initializers.RandomNormal(mean=0.01, stddev=0.005, seed=2021),
trainable=True)
self.wk = self.add_weight(
shape=(self.dr2, self.dr2),
initializer=keras.initializers.RandomNormal(mean=0.01, stddev=0.005, seed=2021),
trainable=True)
self.wd_s = self.add_weight(
shape=(self.dr2, self.dr2),
initializer=keras.initializers.RandomNormal(mean=0.01, stddev=0.005, seed=2021),
trainable=True)
super(Attention, self).build(input_shape)
call函数
实现1.输入2.注意力权重计算3.残差连接这三步;3.静态嵌入后面调用两步就可以了
# 输入:data(特征张量), neigh_index(每个网格的邻居索引)
def call(self, data, neigh_index):
# 核心操作:根据邻居索引,找到每个网格的所有邻居特征,实现「网格-邻居」的特征关联
data_neigh = tf.nn.embedding_lookup(tf.transpose(data, (2, 0, 1, 3)),
neigh_index)
# 张量形状变化:[N, len, T, dr2] → [N, K, len, T, dr2] → 每个网格都对应K个邻居的完整特征
data_neigh = tf.transpose(data_neigh, (2, 3, 0, 1, 4))
# 将自身特征重塑:增加「邻居维度」K=1,形状变为 [len, T, N, 1, dr2],和邻居特征形状对齐
data = tf.reshape(data, (-1, data.shape[1], data.shape[2], 1, data.shape[-1]))
# 以上为输入
# 自身特征 → Query向量(q):data * wq
data = tf.matmul(data, self.wq)
# 邻居特征 → Key向量(k):data_neigh * wk (Value=Key,权重共享)
data_neigh = tf.matmul(data_neigh, self.wk)
# 低配版:均值池化,直接对邻居特征取平均,无自适应权重 → 等价于GCN的等权聚合
if self.mode == 'mean':
out = tf.reduce_mean(data_neigh, axis=-2, keepdims=True)
else:
logits = tf.matmul(data, data_neigh, transpose_b=True)
# 核心版:缩放点积注意力(scaled_dot)→ 自适应学习邻居的「重要性权重」
if self.mode == 'scaled_dot':
logits = logits / tf.math.sqrt(tf.cast(self.dr2, logits.dtype))
# softmax归一化:注意力分数→注意力权重,权重和为1,代表「每个邻居对当前网格的贡献度」
out = tf.matmul(tf.nn.softmax(logits, axis=-1), data_neigh)
# 残差连接
out = data + out
out = tf.sigmoid(
tf.matmul(tf.reshape(out, (-1, self.len_recent_time, self.number_region, self.dr2)), self.wd_s))
return out
邻居三个视图中那个视图对我影响比较大
在model.py文件里面class MultiAttention(Layer)
类的初始化定义
def __init__(self, num_sp, dr2, len_recent_time, number_region, attention_mode='scaled_dot', **kwargs):
self.dr2 = dr2 #维度
self.num_sp = num_sp #决定注意力块
self.attention_layers_poi = [Attention(self.dr2, len_recent_time, number_region, mode=attention_mode) for _ in range(self.num_sp)] #POI
self.attention_layers_road = [Attention(self.dr2, len_recent_time, number_region, mode=attention_mode) for _ in range(self.num_sp)] #静态特征
self.attention_layers_record = [Attention(self.dr2, len_recent_time, number_region, mode=attention_mode) for _ in range(self.num_sp)] #动态特征
super(MultiAttention, self).__init__(**kwargs)
build函数
一共$W_{GF}$,$W_{GA}$,$W_{GT}$三个权重矩阵,$W_{ds}$不需要通过前三个就可以看出那个图对我比较重要,重要比例是多少
def build(self, input_shape):
self.w_poi = self.add_weight(
shape=(self.dr2,),
initializer=keras.initializers.Zeros(),
trainable=True)
self.w_road = self.add_weight(
shape=(self.dr2,),
initializer=keras.initializers.Zeros(),
trainable=True)
self.w_record = self.add_weight(
shape=(self.dr2,),
initializer=keras.initializers.Zeros(),
trainable=True)
super(MultiAttention, self).build(input_shape)
call函数
def call(self, all_data, neigh_poi_index, neigh_road_index, neigh_record_index):
# 初始化三个分支的输入特征,都是同一个融合特征
all_data_static_poi = all_data
all_data_static_road = all_data
all_data_static_record = all_data
# 逐轮执行注意力学习:每类特征做num_sp轮空间学习,特征越来越精准
for i in range(self.num_sp):
all_data_static_poi = self.attention_layers_poi[i](all_data_static_poi, neigh_poi_index)
all_data_static_road = self.attention_layers_road[i](all_data_static_road, neigh_road_index)
all_data_static_record = self.attention_layers_record[i](all_data_static_record, neigh_record_index)
# 核心:三类分支特征 加权相加 + Sigmoid激活 → 最终的空间融合特征
out = tf.sigmoid(all_data_static_poi * self.w_poi + all_data_static_road * self.w_road +
all_data_static_record * self.w_record)
return out
实现静态嵌入
在model.py文件里面class MYPLAN(tf.keras.models.Model)函数中
# ========== 第三步:实例化【空间注意力层×2】 ==========
self.multiattention = [
MultiAttention(number_sp, 2 * dr, len_recent_time, number_region, attention_mode=attention_mode)
for _ in range(2)
]
# 核心设计:创建2个完全独立的MultiAttention空间注意力层!
# 分工明确:第0个 → 专门处理【动态特征】的空间学习;第1个 → 专门处理【静态特征】的空间学习
# 为什么分离?静态特征(POI/道路)和动态特征(流量/风险)的「空间依赖规律完全不同」,分开学习更精准,不会互相干扰
模型运转以及损失函数
在model.py文件里面class MYPLAN(tf.keras.models.Model)函数中
定义LSTM和全连接层
# ========== 第四步:实例化【模型核心 - ConvLSTM2D 时空建模层】 ==========,后续还会和普通LSTM作比较
self.convlstm = keras.layers.ConvLSTM2D(1, 1, strides=(1, 1),
padding='valid',
data_format=None,
dilation_rate=(1, 1),
activation='tanh',
recurrent_activation='hard_sigmoid',
use_bias=True,
kernel_initializer='glorot_uniform',
recurrent_initializer='orthogonal',
bias_initializer='zeros',
unit_forget_bias=True,
return_sequences=False,
)
# ========== 第五步:实例化【最终输出层 - Dense全连接层】 ==========
self.final_layer = keras.layers.Dense(number_region, activation='sigmoid', bias_initializer='ones')
call函数
# 调用前面设计的所有环节
def call(self, all_data_static, threshold_nc, all_data_dynamic_now):
all_data_dynamic, all_data_dynamic_now, all_data_dynamic_diff = self.evolution(all_data_static, threshold_nc,
all_data_dynamic_now)#实现动态嵌入
all_data_dynamic = self.multiattention[0](all_data_dynamic, self.neigh_poi_index, self.neigh_road_index,
self.neigh_record_index) #注意力机制
all_data_static = self.multiattention[1](all_data_static, self.neigh_poi_index, self.neigh_road_index,
self.neigh_record_index) #注意力机制
all_data_dynamic = tf.expand_dims(all_data_dynamic, 3) #静态动态维度对齐
all_data_static = tf.expand_dims(all_data_static, 3) #静态动态维度对齐
all_data = tf.concat([all_data_dynamic, all_data_static], axis=-1) #静态动态维度对齐
all_data = self.convlstm(all_data) #输入到lstm模型中
all_data = tf.reshape(all_data, (-1, all_data.shape[1])) #再进行对齐
all_data = self.final_layer(all_data) #输入到前馈神经网络层
# print(all_data.shape)
return all_data, all_data_dynamic_now, all_data_dynamic_diff
Streaming后处理
在train.py中按Ctrl+F搜索use_streaming在第191行
use_streaming = bool(int(args.streaming_postprocess))
if use_streaming:
batch_val = math.ceil(len(val_x) / batch_size)
val_pred = tf.zeros((batch_size, val_y.shape[-1]))
_y_dy = y_dynamic
for i in range(batch_val):
y_pred, _y_dy, _ = model(val_x[i * batch_size:(i + 1) * batch_size],
val_threshold_nc[i * batch_size:(i + 1) * batch_size], _y_dy)
val_pred = tf.concat([val_pred, y_pred], axis=0)
val_pred = val_pred[batch_size:].numpy().reshape((-1, 1))
val_y_np = val_y.numpy().reshape((-1, 1))
offline_state_val = (val_pred > float(threshold_f1)).astype(np.int8)
offline_f1_val = float(f1_score(val_y_np, offline_state_val))
offline_pos_rate = float(np.mean(offline_state_val))
offline_acc_val = float(accuracy_score(val_y_np, offline_state_val))
offline_recall_val = float(recall_score(val_y_np, offline_state_val))
损失函数
如何找到损失函数,在哪里定义?
- 在train.py中Ctrl+F搜索loss,查找到loss_function
- 右键在文件中查找,找到def loss_function
- 在utils.py文件中找到损失函数
def loss_function(pred, y, dy_diff, a_dy=6, lambda_=0.005, epsilon=0.9, alpha=0.25, gamma=2):
zeros = tf.zeros_like(pred, dtype=pred.dtype)
focal_loss = -alpha * (1 - pred) ** gamma * y * tf.math.log(pred) - (1 - alpha) * pred ** gamma * (
1 - y) * tf.math.log(1 - pred)
focal_loss = tf.where(pred * y + (1 - pred) * (1 - y) > epsilon, zeros, focal_loss)
focal_loss = tf.reduce_mean(focal_loss)
dy_loss = tf.keras.losses.mean_absolute_error(dy_diff, 0.)
dy_loss = a_dy - tf.reduce_mean(dy_loss)
dy_loss = tf.reduce_max([0, dy_loss])
loss = focal_loss + lambda_ * dy_loss
return loss, focal_loss, dy_loss
训练代码
在train.py里面写很详细了
其他部分
- utils.py:用到的一些组件,函数的定义
- params.py:超参数的一些设计,但是设计的比较隐晦