Appendix 2. Fitting Long Short-Term Memory (LSTM) models
Key points:
- Create the custom class for hyperparameter tuning
- Initialize the Tuner (Hyperband or RandomSearch) and models fitting
- Save the hyperparameters and models' path to the db
In [2]:
import tensorflow as tf
gpus = tf.config.list_physical_devices('GPU')
if gpus:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
In [3]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import r2_score
In [3]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping, TerminateOnNaN, ModelCheckpoint
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import TimeDistributed, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import BatchNormalization
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import LayerNormalization
In [4]:
import keras_tuner as kt
In [4]:
import json
import shutil
import os
import re
import glob
import time
import ta
from dbManager import DatabaseManager
import tempfile
import shutil
In [5]:
os.makedirs('plots', exist_ok=True)
# Get data
df = pd.read_csv("clean_data_Nasdaq_Helsinki_new_new.csv", index_col=0)
df.index = pd.to_datetime(df.index)
db_manager = DatabaseManager("thesis_ESG_csc.db")
df_sql = pd.read_sql('SELECT * FROM stock_data', db_manager.con)
Successfully connected to the database: thesis_ESG_csc.db
Custom class for hyperparameter tuning
In [7]:
class WindowTuner(kt.HyperModel):
# data and index dicts to the constructor
def __init__(self, data_scaled, lstm_inx_dict, test_inx_dict, create_sequences_fn):
self.data_scaled = data_scaled
self.lstm_inx_dict = lstm_inx_dict
self.test_inx_dict = test_inx_dict
self.create_sequences_fn = create_sequences_fn #custom data function
def build(self, hp):
window_size = hp.Int("window_size", 10, 50, step=5)
n_features = self.data_scaled.shape[1]
model = Sequential()
n_lstm = hp.Int("n_lstm_layers", 1, 2)
for i in range(n_lstm):
units = hp.Int(f"lstm_units_{i}", 64, 512, step=32)
dropout = hp.Float(f"lstm_dropout_{i}", 0.1, 0.3, step=0.1)
return_seq = i < n_lstm - 1
if i == 0:
model.add(
LSTM(
units,
return_sequences=return_seq,
dropout=dropout,
kernel_regularizer=l2(1e-4),
input_shape=(window_size, n_features)
)
)
else:
model.add(
LSTM(
units,
return_sequences=return_seq,
dropout=dropout,
kernel_regularizer=l2(1e-4)
)
)
model.add(LayerNormalization())
if hp.Boolean("add_dense_layer"):
dense_units = hp.Choice("dense_units", [32, 64, 128])
model.add(Dense(dense_units, activation="relu"))
model.add(Dropout(0.1))
model.add(Dense(1))
lr = hp.Float("lr", 1e-4, 3e-3, sampling="log")
model.compile(
optimizer=Adam(lr),
loss=tf.keras.losses.Huber(),
metrics=["mae"]
)
return model
def fit(self, hp, model, **kwargs):
current_window_size = hp.get("window_size")
X_train, y_train = self.create_sequences_fn(self.data_scaled, self.lstm_inx_dict, current_window_size)
X_val, y_val = self.create_sequences_fn(self.data_scaled, self.test_inx_dict, current_window_size)
epochs = hp.Int('epochs', min_value=50, max_value=300, step=10)
# Save checkpoint inside the trial folder
project_dir = os.path.join(
"my_lstm_window_tuning",
f"deep_lstm_window_tune_{target_col}"
)
# List all subdirectories
trial_dirs = [d for d in glob.glob(os.path.join(project_dir, "trial_*")) if os.path.isdir(d)]
# Extract the numeric part and find the one with the max number
def trial_number(folder_name):
match = re.search(r"trial_(\d+)", folder_name)
return int(match.group(1)) if match else -1
# Get folder with the maximum trial number
if trial_dirs:
trial_dir = max(trial_dirs, key=trial_number)
else:
raise RuntimeError("No trial directories found.")
if not trial_dirs:
raise RuntimeError("No trial directory found — tuner not creating trial folders?")
weights_path = os.path.join(trial_dir, "checkpoint.weights.h5")
callbacks = [
EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=False),
ModelCheckpoint(
filepath=weights_path,
save_weights_only=True,
save_best_only=True,
monitor='val_loss',
verbose=0
)
]
history = model.fit(
X_train,
y_train,
epochs=epochs,
batch_size=256,
validation_split=0.1,
callbacks=callbacks,
verbose=0,
shuffle=False
)
# Final safety save
model.save_weights(weights_path)
tf.keras.backend.clear_session()
return history
In [8]:
# Help function to remove unnecessary features
df_correlation = pd.read_csv('feature_correlation.csv')
df_correlation = df_correlation.set_index('symbol')
df_correlation=df_correlation.drop(columns=['previous_price'])
df_correlation.head()
def get_unnecessary_features_by_symbol(symbol_name, df, threshold=0.3):
"""
Returns a list of features for a specific symbol where
the absolute correlation value is less than the threshold.
"""
# Check if the symbol exists in the index to avoid crashes
if symbol_name not in df.index:
print(f"Warning: {symbol_name} not found in the correlation dataframe.")
return []
# Extract the row for the specific symbol
symbol_row = df.loc[symbol_name]
# Filter features where the absolute value is below the threshold
unnecessary = symbol_row[symbol_row.abs() < threshold].index.tolist()
necessary = symbol_row[symbol_row.abs() >= threshold].index.tolist()
return unnecessary, necessary
Model fitting
In [7]:
results={}
m=0
for col in df.columns:
target_col = col
df_selected = df[[target_col]]
# get model parametesr + hyperparameters
output_r2 = db_manager.get_r2_lstm_model(col.replace(".HE",""))
output_da = db_manager.get_directional_accuracy_lstm_model(col.replace(".HE",""))
output_hyperparameters = db_manager.get_hyperparameters_lstm_model(col.replace(".HE",""))
# break the for if directional_accuracy is well (optionally)
'''
if output_da > 70:
continue
'''
print(target_col)
# Resample weekly on Wednesdays
df_weekly = df_selected[df_selected.index.weekday == 2].dropna().sort_index().copy()
df_weekly[target_col] = df_weekly[target_col].rolling(window=3).mean()
df_weekly['RSI'] = ta.momentum.RSIIndicator(close=df_weekly[target_col], window=14).rsi()
macd = ta.trend.MACD(close=df_weekly[target_col])
df_weekly['MACD'] = macd.macd()
df_weekly['MACD_signal'] = macd.macd_signal()
boll = ta.volatility.BollingerBands(close=df_weekly[target_col], window=20, window_dev=2)
df_weekly['BB_high'] = boll.bollinger_hband()
df_weekly['BB_low'] = boll.bollinger_lband()
df_weekly['BB_width'] = df_weekly['BB_high'] - df_weekly['BB_low']
# Update target timeseries
df_original=pd.DataFrame(df_weekly[target_col].copy())
df_weekly[target_col] = df_weekly[target_col].diff()
df_weekly=df_weekly.dropna().sort_index()
total_len = len(df_weekly)
# drop unnecessary features
unnecessary_features, necessary_features = get_unnecessary_features_by_symbol(target_col, df_correlation, threshold=0.3)
df_weekly=df_weekly.drop(columns=unnecessary_features)
# Convert % to indices
p = lambda x: int(total_len * x)
# Define splits
lstm_inx_dict = {0: [p(0.0), p(0.5)], 1: [p(0.7), p(0.8)]}
rl_inx_dict = {0: [p(0.5), p(0.7)], 1: [p(0.8), p(0.9)]}
test_inx_dict = {0: [p(0.9), total_len]}
# Create index arrays
def merge_index_ranges(index_dict):
return np.concatenate([np.arange(start, end) for start, end in index_dict.values()])
lstm_indices = merge_index_ranges(lstm_inx_dict)
rl_indices = merge_index_ranges(rl_inx_dict)
test_indices = merge_index_ranges(test_inx_dict)
dates = df_weekly.index
# Make sure data_scaled and df_weekly have same length
assert len(dates) == total_len
# Print date ranges for each set
split_main_dict = {"LSTM Train": [lstm_inx_dict, lstm_indices], "RL Train": [rl_inx_dict, rl_indices], "Test": [test_inx_dict, test_indices]}
def create_sequences_from_ranges(data, index_dict, window_size):
X, y = [], []
for start, end in index_dict.values():
for i in range(start, end - window_size):
X.append(data[i:i + window_size])
y.append(data[i + window_size][0]) # target is first column
return np.array(X), np.array(y).reshape(-1, 1)
# Standardize
scalers = {}
data_scaled = pd.DataFrame(index=df_weekly.index)
for col in df_weekly.columns:
scalers[col] = StandardScaler()
# Use consistent type (NumPy arrays) for fit and transform
train_values = df_weekly.iloc[lstm_indices][[col]].values # NumPy
scalers[col].fit(train_values)
# Transform full column, also as NumPy
transformed = scalers[col].transform(df_weekly[[col]].values)
data_scaled[col] = transformed
data_scaled = data_scaled.to_numpy()
# Clear any previous models
K.clear_session()
lstm_hypermodel = WindowTuner(
data_scaled=data_scaled,
lstm_inx_dict=lstm_inx_dict,
test_inx_dict=test_inx_dict,
create_sequences_fn=create_sequences_from_ranges
)
# Initialize the Tuner (Hyperband or RandomSearch)
'''
tuner = kt.Hyperband(
lstm_hypermodel,
objective='val_loss',
max_epochs=50,
factor=4,
directory='my_lstm_window_tuning',
project_name=f'deep_lstm_window_tune_{target_col}',
#overwrite=True,
max_retries_per_trial=0
)
'''
tuner = kt.RandomSearch(
hypermodel=lstm_hypermodel,
objective='val_loss',
max_trials=10,
executions_per_trial=1,
directory='my_lstm_window_tuning',
project_name=f'deep_lstm_window_tune_{target_col}',
overwrite=False
)
# Start the Search
# The data is generated inside the tuner's fit method for each trial.
tuner.search()
# get best model
best_model = tuner.get_best_models(num_models=1)[0]
# For most recent KerasTuner versions
best_hps_list = tuner.get_best_hyperparameters() # returns a list
best_hps = best_hps_list[0]
hyperparameters = best_hps.values
hyperparameters['necessary_features'] = necessary_features
window_size = hyperparameters['window_size']
hyperparameters_json_string = json.dumps(hyperparameters)
print(hyperparameters_json_string)
X_train, y_train = create_sequences_from_ranges(data_scaled, lstm_inx_dict, window_size)
X_test, y_test = create_sequences_from_ranges(data_scaled, test_inx_dict, window_size)
# Evaluate on test using R²
y_pred = best_model.predict(X_test)
y_train_pred = best_model.predict(X_train)
target_index = df_weekly.index[window_size:] # These match y_all
train_index = target_index[:len(y_train)]
test_index = target_index[len(y_train):]
y_test_inv = pd.DataFrame(columns=[target_col])
y_pred_inv = pd.DataFrame(columns=[target_col])
y_train_inv = pd.DataFrame(columns=[target_col])
y_train_pred_inv = pd.DataFrame(columns=[target_col])
# Inverse transform predictions and actuals
y_test_inv[target_col] = scalers[target_col].inverse_transform(y_test).flatten()
y_pred_inv[target_col] = scalers[target_col].inverse_transform(y_pred).flatten()
y_train_inv[target_col] = scalers[target_col].inverse_transform(y_train).flatten()
y_train_pred_inv[target_col] = scalers[target_col].inverse_transform(y_train_pred).flatten()
# Calculate direction accuracy
actual_direction = np.sign(y_test)
# Get the direction (sign) of the predicted change
predicted_direction = np.sign(y_pred)
# 3. Check for a match
matches = (actual_direction == predicted_direction)
# 4. Calculate Directional Accuracy
directional_accuracy = round(matches.mean() * 100,2)
results[target_col]={"Directional Accuracy": directional_accuracy, "R2": r2_score(y_test_inv, y_pred_inv)}
plt.figure(figsize=(24, 10))
plt.plot(y_test_inv, label='Actual', linewidth=2, marker='o')
plt.plot(y_pred_inv, label='Predicted', linewidth=2, marker='o')
plt.title(f'Prediction vs Actual - TEST for {target_col}')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'plots/{target_col}_prediction.png') # save per target
plt.show()
plt.close()
# CALCULATE ORIGINAL TIME SERIES
# Get last original item
last_original_item = df_original.iloc[-len(y_test_inv)-1][target_col]
original_list = [last_original_item]
prediction_list = [last_original_item]
for index, row in y_test_inv.iterrows():
original_list.append(original_list[-1]+row[target_col])
prediction_list.append(original_list[-2]+y_pred_inv.loc[index,target_col])
r2 = r2_score(original_list, prediction_list)
plt.figure(figsize=(24, 10))
plt.plot(original_list, label='Actual', linewidth=2, marker='o')
plt.plot(prediction_list, label='Predicted', linewidth=2, marker='o')
plt.title(f'ORIGINAL TIMESERIES: Prediction vs Actual - TEST for {target_col}')
plt.xlabel('Time Step')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
plt.close()
# Save model
if output_da is None or output_da < directional_accuracy:
# Save the best model
model_path = f"lstm_models_upd/{target_col}_model.keras"
best_model.save(model_path)
# Save to db
db_manager.update_lstm_model(target_col.replace(".HE",""), model_path, r2, window_size, directional_accuracy, hyperparameters_json_string)
print(m, target_col)
print(f"Directional Accuracy on Differences: {directional_accuracy:.2f}%")
print(f"R²: {r2_score(y_test_inv, y_pred_inv):.4f}")
print(f"R² (in original df): {r2_score(original_list, prediction_list):.4f}")
# remove folder
base_directory = 'my_lstm_window_tuning'
project_name_folder = f'deep_lstm_window_tune_{target_col}'
full_path_to_delete = os.path.join(base_directory, project_name_folder)
shutil.rmtree(full_path_to_delete)
m+=1
In [ ]: