时间序列数据建模流程范例
前言
最开始在学习神经网络,PyTorch 的时候,懂的都还不多,虽然也知道 RNN, CNN 这些网络的原理,但真正自己实现起来又是另一回事,代码往往也都是从网上 copy 过来然后再自己魔改的,这也就导致了一系列的问题,代码格式不统一,没弄懂具体实现细节等等。当然,凭这些 copy 过来的代码让模型运行起来还是不难的,你只需要知晓一定的原理。显而易见,这些时间往往最后都是要“还”的。
写这篇文章主要还是记录一下整体的思路,并对网络训练的整个过程进行标准化。当然,这只是我自己在写网络时的总结而已,未必适合每一个人的风格,希望能对你有所启发。
还是从一个例子开始,问题的背景很简单,一维时序数据的预测问题。
假如你对 RNN、LSTM 的原理并不了解同样不影响阅读,说白了,这里探讨的并不是怎么建立网络,重要的是整体的流程。
你也可以 点击这里 了解 RNN、LSTM 的工作原理
准备数据
首先就是准备数据,这部分往往是最花费时间,最会发生问题的地方。这里说的准备数据并不只是丢出来一个数据库或是 csv 文件,它涉及到数据获取,数据清洗,数据标准化,创建数据集等过程,让我们一个一个来讨论。
数据获取
数据获取部分没什么好讲的,根据你的数据来源,可能是格式化的,也可能的非格式化的。
你可以 点击这里 获取本文所使用的数据。
这里我使用的数据是从 2020/08/01 到 2020/08/31 的小时数据,如下图所示。
数据清洗
视你的需求以及原始数据来说,数据清洗可以很简单,也可以很复杂。简单来说,去除空值,去除重复值,去除连续常值,正态分布的 3σ 去除异常值等等,根据你想要的目标,选择不同的数据清洗方式。
下面是一个简单的标准化函数,使用 MinMaxScaler
将数据归一化为 0 - 1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def data_normalized(data): '''标准化数据
Args: data(pd.DataFrame): 待标准化数据
Returns: norm_data(tensor): 标准化后的数据 scaler(MinMaxScaler): 标准化器 ''' __data = np.array(data) __data[__data < 0] = 0 scaler = MinMaxScaler() norm_data = scaler.fit_transform(__data.reshape(-1, 1)) norm_data = torch.tensor(norm_data, dtype=torch.float32) return norm_data, scaler
|
为了简便起见,这里我给出的数据是已经经过了差分,重采样等步骤之后的数据。
1 2 3 4 5 6 7 8 9
| data = pd.read_csv('TIME_SEQ_DATA.csv') data['CreateDate'] = pd.to_datetime(data['CreateDate'])
data.dropna(inplace=True) data.drop_duplicates(inplace=True) data.sort_values(by=['CreateDate'], inplace=True) data.reset_index(drop=True, inplace=True)
norm_data, scaler = data_normalized(data['Value'])
|
上面的处理都是常规操作,还是那句话,根据你的实际需求。
至此,我们完成了简单的数据清洗,获得了标准化的数据。
创建数据集
创建数据集同样也有很多方法,手动对数据划分,或是利用 PyTorch 定义好的 Dataset
进行重写。
网上有许多手动划分的例子,大多数都是类似下面这样的。
1 2 3 4 5 6 7 8 9 10 11
| def create_dataset(data, look_back): dataset_x, dataset_y = [], [] for i in range(len(data) - look_back): dataset_x.append(data[i:(i + look_back)]) dataset_y.append(data[i + look_back]) return np.array(dataset_x), np.array(dataset_y)
...
train_size = int(len(dataset_x) * 0.7) ...
|
这里我使用 Dataset
和 DataLoader
这两个工具类来构建数据
Dataset
定义了数据集的内容,它相当于一个类似列表的数据结构,具有确定的长度,能够用索引获取数据集中的元素。
DataLoader
定义了按 batch 加载数据集的方法,能够控制 batch 的大小,batch 中元素的采样方法,以及将 batch 结果整理成模型所需输入形式的方法,并且能够使用多进程读取数据。
根据 Tensor 创建数据集
现在让我们暂时抛开背景问题,下面这个例子很好的说明了创建鸢尾花数据集的过程:
- 使用
TensorDataset
,将 data 和 target,也就是 x 和 y 分别传入,得到了 TensorDataset
类型的数据,你可以使用 for 循环查看里面的具体形式。
- 使用
random_split
,将整个数据集划分为训练集和预测集,得到 Subset
,你可以加上 torch.manual_seed(0)
来指定随机种子。
- 使用
DataLoader
加载数据集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| from sklearn import datasets
iris = datasets.load_iris() ds_iris = TensorDataset(torch.tensor(iris.data), torch.tensor(iris.target))
n_train = int(len(ds_iris) * 0.8) n_valid = len(ds_iris) - n_train ds_train, ds_valid = random_split(ds_iris, [n_train, n_valid])
print(type(ds_iris))
print(type(ds_train))
dl_train = DataLoader(ds_train, batch_size = 8) dl_valid = DataLoader(ds_valid, batch_size = 8)
for features, labels in dl_train: print(features, labels) break
|
创建自定义数据集
在上面的例子中,我们使用 TensorDataset
直接创建数据集。当你完成了对 x 和 y 的划分之后,对于划分简单的数据可以直接使用这样的方法。对于一些要求复杂的数据集,更优秀的方法是自定义。
我们只需实现 Dataset
的 __len__
方法和 __getitem__
方法,就可以轻松构建自己的数据集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class myDataset(Dataset): def __init__(self, data, look_back) -> None: super().__init__() self.data = data self.look_back = look_back
def __len__(self) -> int: return len(self.data) - self.look_back
def __getitem__(self, index): feature = self.data[index:index + self.look_back] label = self.data[index + self.look_back] return feature, label
|
这里,我们通过 look_back
个数据点,预测下一个数据点。
具体来说,我们对 __len__
方法和 __getitem__
方法进行了重写,具体的代码并不复杂。接下来,我们就可以使用 myDataset
达到和上面提到的 create_dataset
同样的效果。
1 2 3 4 5 6 7 8 9 10
| ds_data = myDataset(norm_data.view(-1).to(DEVICE), look_back=LOOK_BACK)
n_train = int(len(ds_data) * 0.8) n_test = len(ds_data) - n_train ds_train, ds_test = random_split(ds_data, [n_train, n_test])
dl_tarin = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True, drop_last=True) dl_test = DataLoader(ds_test, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)
|
类似的,参考创建鸢尾花数据集的方法,同样将数据集分为训练集和测试集,并使用 DataLoader
加载。
使用 DataLoader 加载数据集
现在让我们回过头来看看 DataLoader
的具体使用。
DataLoader
能够控制 batch 的大小,batch 中元素的采样方法,以及将 batch 结果整理成模型所需输入形式的方法,并且能够使用多进程读取数据。
DataLoader
的函数签名如下:
1 2 3 4 5 6 7 8
| DataLoader( dataset, batch_size=1, shuffle=False, num_workers=0, drop_last=False, ... )
|
一般情况下,我们仅仅会配置 dataset
, batch_size
, shuffle
, num_workers
, drop_last
这五个参数,其他参数使用默认值即可。
关于 shape 的一些问题
准备数据的过程往往是复杂的,后面模型出了问题,或许就是数据处理上出了问题。上面我们着重将了如何创建数据集,但还有隐含在其中的另一个重要的点没有提及,也就是 size,或者说 shape。
最开始学习的时候,相信许多人都有疑问,为什么这里要 reshape()
,为什么那里要 view(-1)
,为什么这里要 flatten()
,为什么那里要 unsqueeze(0)
…
问题的根本原因就是,没有弄清楚经过某个处理之后你的数据的 shape 的变化,再或许就是没搞清上面这些函数的用法。
1 2 3
| output, hidden = self.lstm(input)
|
另外就是 layer 往往需要特定的输入维度,以 LSTM 为例,它需要传入的是三维参数:(seq_len, batch_size, input_size)
,out 的输出维度 (seq_len, batch_size, output_size)
,在我看来,时刻注意 shape 是一个好的习惯,特别是当数据经过那些你不熟悉的函数后。
定义模型
好了,终于到了定义网络的时候了,或许这部分是最简单的。
一般来说,我们使用 nn.Sequential
按层顺序构建模型,或是继承 nn.Module
基类构建自定义模型。
感觉就像这样,你只需要把它当做一个复合的层:
1 2 3 4 5 6
| self.my_seq = nn.Sequential(nn.Linear(input_size, 24), nn.Dropout(0.5), nn.ReLU(True), nn.Linear(24, 10), nn.Dropout(0.5), nn.ReLU(True))
|
这里,我只是简单搭建了一个 LSTM 网络,就像所有其他网络一样,结构并不复杂。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| class LSTM(nn.Module): ''' Args: input_size: feature size hidden_size: number of hidden units output_size: number of output num_layers: layers of LSTM to stack ''' def __init__(self, input_size, hidden_size, output_size=1, num_layers=3): super(LSTM, self).__init__() self.lstm = nn.LSTM(input_size, hidden_size, num_layers) self.linear = nn.Linear(hidden_size, output_size)
def forward(self, input): output, hidden = self.lstm(input) output = self.linear(output[-1]) return output
|
训练模型
如何训练网络因人而异,但大致都是类似的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| def train(dl_train): model = LSTM(LOOK_BACK, 64).to(DEVICE) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) train_loss = [] for e in range(EPOCH): __loss = 0 for feature, label in dl_train: out = model(feature.unsqueeze(0)) loss = criterion(out, label.unsqueeze(1)) optimizer.zero_grad() loss.backward() optimizer.step() __loss += loss.item() train_loss.append(__loss) if (e + 1) % 10 == 0: print('Epoch: {}, Loss: {}'.format(e + 1, __loss / len(dl_train)))
return model, train_loss
|
1
| model, train_loss = train(dl_tarin)
|
可视化损失函数在训练集上的迭代情况。
评估模型
在这里,我们直接使用之前创建的测试集进行训练,并计算根均方误差。
1 2 3 4 5 6 7 8 9 10 11 12 13
| model = model.eval() pred, actual = [], [] for feature, label in dl_test: out = model(feature.unsqueeze(0)) pred += out.view(-1).data.cpu().tolist() actual += label.view(-1).data.cpu().tolist() pred, actual = np.array(pred), np.array(actual)
rmse = np.sqrt(mean_squared_error(actual.reshape(-1), pred.reshape(-1))) print("根均方误差(RMSE):" + str(rmse))
|
根均方误差(RMSE):0.12173503830068468
小结
感谢你阅读至此,本文只是简单介绍了一些自己的经验,梳理了一下建模的简单流程。总的来说,我希望我的代码是模块化,标准化的,相信你也如此,希望本文能对你有所帮助。
你可以 点击这里 得到完整代码。
参考资料