[SSUDA] Pytorch로 LSTM, Prophet 사용하기 with 태양광 발전량 예측 경진대회
from google.colab import drive
drive.mount('/content/drive')
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings("ignore")
path = '/content/drive/MyDrive/sun/'
energy = pd.read_csv(path + 'energy.csv')
sample_submission = pd.read_csv(path + 'sample_submission.csv')
energy.info()
energy.fillna(energy.mean(),inplace = True)
energy = energy.set_index('time')
energy.head()
결측치를 채우고 시간을 인덱스로 바꿨습니다.
ulsan = energy['ulsan'].values.astype(float)
dangjin_floating = energy['dangjin_floating'].values.astype(float)
dangjin_warehouse = energy['dangjin_warehouse'].values.astype(float)
dangjin = energy['dangjin'].values.astype(float)
데이터를 각각 뽑아냅니다.
learning_rate = 0.0001
sequence_length = 12 # 24 일때가 가장 좋았음. but 코렙 램이 터지는 관계로 12 사용
epochs = 2000
def make_batch(input_data, sl):
train_x = []
train_y = []
L = len(input_data)
for i in range(L-sl):
# sl기간 만큼 있는 데이터에서 다음 시점 맞추기.
train_seq = input_data[i:i+sl]
train_label = input_data[i+sl:(i+sl+1)]
# 리스트 값을 train_x에 어팬드함.
train_x.append(train_seq)
train_y.append(train_label)
return train_x, train_y
class simple_lstm(nn.Module):
def __init__(self):
super().__init__()
self.input_vector = 1 # 입력 벡터 길이
self.sequence_length = 12 # 데이터 묶음 길이(24개 데이터 사용)
self.output_vector = 100 # 은닉층 사이즈
self.num_layers = 4 # 층 개수
self.lstm = nn.LSTM(input_size = self.input_vector, hidden_size = self.output_vector,
num_layers = self.num_layers, batch_first = True)
self.linear = nn.Sequential(
nn.Linear(self.output_vector, 50),
nn.Linear(50, 30),
nn.Linear(30, 10),
nn.Linear(10, 1)
)
def forward(self, x):
output, _ = self.lstm(x)
return self.linear(output[:, -1, :])
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
train_x, train_y = make_batch(dangjin_floating.reshape(-1,1), sequence_length)
# 텐서에 데이터 실기
tensor_x = torch.Tensor(train_x)
tensor_y = torch.Tensor(train_y)
dangjin_floatings = simple_lstm()
# 모델을 디바이스에 실음 (디바이스에는 모델과 데이터를 실어야함)
dangjin_floatings = dangjin_floatings.to(device)
# 아담 옵티마이저 사용 (학습하려는 모델 파라미터, 학습률)
optimizer = torch.optim.Adam(dangjin_floatings.parameters(), lr = learning_rate)
criterion = nn.MSELoss()
for i in range(epochs):
# 모델 학습 모드
dangjin_floatings.train()
tensor_x = tensor_x.to(device)
tensor_y = tensor_y.to(device)
output = dangjin_floatings(tensor_x)
loss = criterion(output, tensor_y.view(-1,1))
# 옵티마이저 초기화 (배치마다 해줘야함)
optimizer.zero_grad()
# 로스함수를 사용해 역전파
loss.backward()
# 옵티마이저를 이용해 가중치 업데이트
optimizer.step()
# 100번째 배치마다 로스 값 출력
if i % 100 == 0:
print('Epoch {}, Loss {:.5f}'.format(i, loss.item()))
x_input = np.array(energy.dangjin_floating[-12:])
x_input = x_input.reshape((1,12,1))
dangjin_floating_pred = []
for i in range(672):
x_input = torch.Tensor(x_input)
x_input = x_input.to(device)
# 모델에 넣고 output값 확인 위해서는 cpu로 돌린 뒤 넘파이로 변환.
predict = dangjin_floatings(x_input).cpu().detach().numpy()
# 예측값 배출
new_input = predict.reshape((1,1,1))
# 예측값을 실제값인 것 처럼 재사용하여 다시 모델에 넣음(나이브한 방식)
x_input = np.concatenate((x_input[:,-11:].cpu(), new_input), axis = 1)
# 예측값은 dangjin_floating_pred 리스트에 계속 저장해둠
dangjin_floating_pred.append(predict[0][0])
# train_x = [길이 12 어레이, 길이 12 어레이, ...]
# train_y = [길이 1 어레이, 길이 1 어레이, ..]
train_x, train_y = make_batch(dangjin_warehouse.reshape(-1,1), sequence_length)
# 데이터 텐서로 실기(리스트도 실기 가능)
tensor_x = torch.Tensor(train_x)
tensor_y = torch.Tensor(train_y)
# 모델 불러와서 디바이스에 실기 (GPU 사용 위해선 모델과 데이터를 실어야함)
dangjin_warehouses = simple_lstm()
dangjin_warehouses = dangjin_warehouses.to(device)
# 옵티마이저 아담(많이 사용됨) 사용
optimizer = torch.optim.Adam(dangjin_warehouses.parameters(), lr = learning_rate)
# 손실함수 MSE 사용
criterion = nn.MSELoss()
for i in range(epochs):
# 학습 모드(가중치 업데이트 됨)로 변환
dangjin_warehouses.train()
# 데이터(텐서 형식) 디바이스에 실기
tensor_x = tensor_x.to(device)
tensor_y = tensor_y.to(device)
# 모델에 x 데이터 넣기
output = dangjin_warehouses(tensor_x)
# 로스 값 구하기
loss = criterion(output, tensor_y.view(-1,1))
# 옵티마이저 초기화(매 배치마다 초기화 해야함)
optimizer.zero_grad()
# 손실함수 이용해 역전파
loss.backward()
# 옵티마이저 사용해 가중치 업데이트
optimizer.step()
# 100번째 배치마다 로스 계산
if i % 100 == 0:
# loss.item 함수 사용하면 로스 값 제출해줌
print('Epoch {}, Loss {:.5f}'.format(i, loss.item()))
# 테스트 데이터 계산 위해 맨 뒤 12개 데이터 사용
x_input = np.array(energy.dangjin_warehouse[-12:])
x_input = x_input.reshape((1,12,1))
dangjin_warehouse_pred = []
for i in range(672):
x_input = torch.Tensor(x_input)
x_input = x_input.to(device)
predict = dangjin_warehouses(x_input).cpu().detach().numpy()
new_input = predict.reshape((1,1,1))
x_input = np.concatenate((x_input[:,-23:].cpu(), new_input), axis = 1)
dangjin_warehouse_pred.append(predict[0][0])
나머지 2개 변수 예측은 추후에 lstm 코드 복습 시 사용. (앞 두 변수 방식과 동일)
sample_submission.iloc[:24*28, 1] = dangjin_floating_pred
sample_submission.iloc[:24*28, 2] = dangjin_warehouse_pred
# sample_submission.iloc[:24*28, 3] = dangjin_pred
# sample_submission.iloc[:24*28, 4] = ulsan_pred
# sample_submission
from datetime import datetime
from neuralprophet import NeuralProphet
energy = pd.read_csv(path + 'energy.csv')
sample_submission = pd.read_csv(path + 'sample_submission.csv')
def convert_time(x):
# 2018-03-01 1:00:00 값 변환하기
Ymd, HMS = x.split(' ')
H, M, S = HMS.split(':')
H = str(int(H) - 1)
# 다시 시간 합치기
HMS = ':'.join([H, M, S])
return ' '.join([Ymd, HMS])
energy['time'] = energy['time'].apply(lambda x:convert_time(x))
column = 'dangjin_floating'
df = pd.DataFrame()
df['ds'] = energy['time']
df['y'] = energy[column]
model = NeuralProphet()
# 훈련
loss = model.fit(df, freq = 'H')
# 미래 예측용 데이터 프레임 만들기
df_pred = model.make_future_dataframe(df, periods = 18000)
# 미레 예측 하기
predict = model.predict(df_pred)
predict_1 = predict.copy()
predict_1 = predict_1.query('ds >= "2021-02-01 00:00:00"')
predict_1 = predict_1.query('ds < "2021-03-01 00:00:00"')
# 2021-06-09 ~ 2021-07-09
predict_2 = predict.copy()
predict_2 = predict_2.query('ds >= "2021-06-09 00:00:00"')
predict_2 = predict_2.query('ds < "2021-07-09 00:00:00"')
# 제출 파일 업데이트
sample_submission[column] = list(predict_1['yhat1']) + list(predict_2['yhat1'])
sample_submission.head()