Appendix 2. Fitting Long Short-Term Memory (LSTM) models

Key points:

  • Create the custom class for hyperparameter tuning
  • Initialize the Tuner (Hyperband or RandomSearch) and models fitting
  • Save the hyperparameters and models' path to the db
In [2]:
import tensorflow as tf

gpus = tf.config.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
In [3]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.metrics import r2_score
In [3]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import EarlyStopping, TerminateOnNaN, ModelCheckpoint
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import TimeDistributed, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import BatchNormalization
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import LayerNormalization
In [4]:
import keras_tuner as kt
In [4]:
import json
import shutil
import os
import re
import glob
import time
import ta
from dbManager import DatabaseManager
import tempfile
import shutil
In [5]:
os.makedirs('plots', exist_ok=True)

# Get data
df = pd.read_csv("clean_data_Nasdaq_Helsinki_new_new.csv", index_col=0)
df.index = pd.to_datetime(df.index)

db_manager = DatabaseManager("thesis_ESG_csc.db")

df_sql = pd.read_sql('SELECT * FROM stock_data', db_manager.con)
Successfully connected to the database: thesis_ESG_csc.db

Custom class for hyperparameter tuning

In [7]:
class WindowTuner(kt.HyperModel):
    # data and index dicts to the constructor
    def __init__(self, data_scaled, lstm_inx_dict, test_inx_dict, create_sequences_fn):
        self.data_scaled = data_scaled
        self.lstm_inx_dict = lstm_inx_dict
        self.test_inx_dict = test_inx_dict
        self.create_sequences_fn = create_sequences_fn #custom data function

    def build(self, hp):
        window_size = hp.Int("window_size", 10, 50, step=5)
        n_features = self.data_scaled.shape[1]
    
        model = Sequential()
        n_lstm = hp.Int("n_lstm_layers", 1, 2)
    
        for i in range(n_lstm):
            units = hp.Int(f"lstm_units_{i}", 64, 512, step=32)
            dropout = hp.Float(f"lstm_dropout_{i}", 0.1, 0.3, step=0.1)
            return_seq = i < n_lstm - 1
    
            if i == 0:
                model.add(
                    LSTM(
                        units,
                        return_sequences=return_seq,
                        dropout=dropout,
                        kernel_regularizer=l2(1e-4),
                        input_shape=(window_size, n_features)
                    )
                )
            else:
                model.add(
                    LSTM(
                        units,
                        return_sequences=return_seq,
                        dropout=dropout,
                        kernel_regularizer=l2(1e-4)
                    )
                )
    
            model.add(LayerNormalization())
    
        if hp.Boolean("add_dense_layer"):
            dense_units = hp.Choice("dense_units", [32, 64, 128])
            model.add(Dense(dense_units, activation="relu"))
            model.add(Dropout(0.1))
    
        model.add(Dense(1))
    
        lr = hp.Float("lr", 1e-4, 3e-3, sampling="log")
        model.compile(
            optimizer=Adam(lr),
            loss=tf.keras.losses.Huber(),
            metrics=["mae"]
        )
    
        return model

    def fit(self, hp, model, **kwargs):
    
        current_window_size = hp.get("window_size")
        X_train, y_train = self.create_sequences_fn(self.data_scaled, self.lstm_inx_dict, current_window_size)
        X_val, y_val = self.create_sequences_fn(self.data_scaled, self.test_inx_dict, current_window_size)
    
        epochs = hp.Int('epochs', min_value=50, max_value=300, step=10)
    
        # Save checkpoint inside the trial folder
        
        project_dir = os.path.join(
            "my_lstm_window_tuning",
            f"deep_lstm_window_tune_{target_col}"
        )
    
        # List all subdirectories
        trial_dirs = [d for d in glob.glob(os.path.join(project_dir, "trial_*")) if os.path.isdir(d)]

        # Extract the numeric part and find the one with the max number
        def trial_number(folder_name):
            match = re.search(r"trial_(\d+)", folder_name)
            return int(match.group(1)) if match else -1
        
        # Get folder with the maximum trial number
        if trial_dirs:
            trial_dir = max(trial_dirs, key=trial_number)
        else:
            raise RuntimeError("No trial directories found.")
    
        if not trial_dirs:
            raise RuntimeError("No trial directory found — tuner not creating trial folders?")
    
        weights_path = os.path.join(trial_dir, "checkpoint.weights.h5")
        
        callbacks = [
            EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=False),
            ModelCheckpoint(
                filepath=weights_path,
                save_weights_only=True,
                save_best_only=True,
                monitor='val_loss',
                verbose=0
            )
        ]
    
        history = model.fit(
            X_train,
            y_train,
            epochs=epochs,
            batch_size=256,
            validation_split=0.1,
            callbacks=callbacks,
            verbose=0,
            shuffle=False
        )

        # Final safety save
        model.save_weights(weights_path)
        
        tf.keras.backend.clear_session()
        
        return history
In [8]:
# Help function to remove unnecessary features
df_correlation = pd.read_csv('feature_correlation.csv')
df_correlation = df_correlation.set_index('symbol')
df_correlation=df_correlation.drop(columns=['previous_price'])
df_correlation.head()

def get_unnecessary_features_by_symbol(symbol_name, df, threshold=0.3):
    """
    Returns a list of features for a specific symbol where 
    the absolute correlation value is less than the threshold.
    """
    # Check if the symbol exists in the index to avoid crashes
    if symbol_name not in df.index:
        print(f"Warning: {symbol_name} not found in the correlation dataframe.")
        return []

    # Extract the row for the specific symbol
    symbol_row = df.loc[symbol_name]
    
    # Filter features where the absolute value is below the threshold
    unnecessary = symbol_row[symbol_row.abs() < threshold].index.tolist()
    necessary = symbol_row[symbol_row.abs() >= threshold].index.tolist()
    
    return unnecessary, necessary

Model fitting

In [7]:
results={}
m=0
for col in df.columns:
    target_col = col
    df_selected = df[[target_col]]

    # get model parametesr + hyperparameters
    output_r2 = db_manager.get_r2_lstm_model(col.replace(".HE",""))
    output_da = db_manager.get_directional_accuracy_lstm_model(col.replace(".HE",""))
    output_hyperparameters = db_manager.get_hyperparameters_lstm_model(col.replace(".HE",""))

    # break the for if directional_accuracy is well (optionally)
    '''
    if output_da > 70:
        continue
    '''
    
    print(target_col)

    # Resample weekly on Wednesdays
    df_weekly = df_selected[df_selected.index.weekday == 2].dropna().sort_index().copy()
    df_weekly[target_col] = df_weekly[target_col].rolling(window=3).mean()
    df_weekly['RSI'] = ta.momentum.RSIIndicator(close=df_weekly[target_col], window=14).rsi()
    
    macd = ta.trend.MACD(close=df_weekly[target_col])
    df_weekly['MACD'] = macd.macd()
    df_weekly['MACD_signal'] = macd.macd_signal()
    
    boll = ta.volatility.BollingerBands(close=df_weekly[target_col], window=20, window_dev=2)
    df_weekly['BB_high'] = boll.bollinger_hband()
    df_weekly['BB_low'] = boll.bollinger_lband()
    df_weekly['BB_width'] = df_weekly['BB_high'] - df_weekly['BB_low']


    # Update target timeseries
    df_original=pd.DataFrame(df_weekly[target_col].copy())
    df_weekly[target_col] = df_weekly[target_col].diff()
    df_weekly=df_weekly.dropna().sort_index()

    total_len = len(df_weekly)

    # drop unnecessary features
    unnecessary_features, necessary_features = get_unnecessary_features_by_symbol(target_col, df_correlation, threshold=0.3)
    df_weekly=df_weekly.drop(columns=unnecessary_features)
    
    # Convert % to indices
    p = lambda x: int(total_len * x)
    
    # Define splits
    lstm_inx_dict = {0: [p(0.0), p(0.5)], 1: [p(0.7), p(0.8)]}
    rl_inx_dict = {0: [p(0.5), p(0.7)], 1: [p(0.8), p(0.9)]}
    test_inx_dict = {0: [p(0.9), total_len]}
    
    # Create index arrays
    def merge_index_ranges(index_dict):
        return np.concatenate([np.arange(start, end) for start, end in index_dict.values()])
    
    lstm_indices = merge_index_ranges(lstm_inx_dict)
    rl_indices = merge_index_ranges(rl_inx_dict)
    test_indices = merge_index_ranges(test_inx_dict)
    
    dates = df_weekly.index

    # Make sure data_scaled and df_weekly have same length
    assert len(dates) == total_len
    
    # Print date ranges for each set
    split_main_dict = {"LSTM Train": [lstm_inx_dict, lstm_indices], "RL Train": [rl_inx_dict, rl_indices], "Test": [test_inx_dict, test_indices]}

    def create_sequences_from_ranges(data, index_dict, window_size):
        X, y = [], []
    
        for start, end in index_dict.values():
            for i in range(start, end - window_size):
                X.append(data[i:i + window_size])
                y.append(data[i + window_size][0])  # target is first column
    
        return np.array(X), np.array(y).reshape(-1, 1)
    
    # Standardize
    scalers = {}
    data_scaled = pd.DataFrame(index=df_weekly.index)

    for col in df_weekly.columns:
        scalers[col] = StandardScaler()
    
        # Use consistent type (NumPy arrays) for fit and transform
        train_values = df_weekly.iloc[lstm_indices][[col]].values  # NumPy
        scalers[col].fit(train_values)
    
        # Transform full column, also as NumPy
        transformed = scalers[col].transform(df_weekly[[col]].values)
    
        data_scaled[col] = transformed
    
    data_scaled = data_scaled.to_numpy()

    # Clear any previous models
    K.clear_session()

    lstm_hypermodel = WindowTuner(
        data_scaled=data_scaled, 
        lstm_inx_dict=lstm_inx_dict, 
        test_inx_dict=test_inx_dict,
        create_sequences_fn=create_sequences_from_ranges
    )
    
    # Initialize the Tuner (Hyperband or RandomSearch)
    '''
    tuner = kt.Hyperband(
        lstm_hypermodel, 
        objective='val_loss',
        max_epochs=50,
        factor=4,
        directory='my_lstm_window_tuning',
        project_name=f'deep_lstm_window_tune_{target_col}',
        #overwrite=True,
        max_retries_per_trial=0
    )
    '''
    tuner = kt.RandomSearch(
        hypermodel=lstm_hypermodel,
        objective='val_loss',
        max_trials=10,
        executions_per_trial=1,
        directory='my_lstm_window_tuning',
        project_name=f'deep_lstm_window_tune_{target_col}',
        overwrite=False
    )
    
    
    # Start the Search
    # The data is generated inside the tuner's fit method for each trial.
    tuner.search()
    
    # get best model
    best_model = tuner.get_best_models(num_models=1)[0]
    
    # For most recent KerasTuner versions
    best_hps_list = tuner.get_best_hyperparameters()  # returns a list
    best_hps = best_hps_list[0]
    hyperparameters = best_hps.values
    hyperparameters['necessary_features'] = necessary_features
    window_size = hyperparameters['window_size']
    hyperparameters_json_string = json.dumps(hyperparameters)
    print(hyperparameters_json_string)

    X_train, y_train = create_sequences_from_ranges(data_scaled, lstm_inx_dict, window_size)
    X_test, y_test = create_sequences_from_ranges(data_scaled, test_inx_dict, window_size)

    # Evaluate on test using R²
    y_pred = best_model.predict(X_test)
    y_train_pred = best_model.predict(X_train)
    
    target_index = df_weekly.index[window_size:]  # These match y_all
    
    train_index = target_index[:len(y_train)]
    test_index = target_index[len(y_train):]
    
    y_test_inv = pd.DataFrame(columns=[target_col])
    y_pred_inv = pd.DataFrame(columns=[target_col])
    
    y_train_inv = pd.DataFrame(columns=[target_col])
    y_train_pred_inv = pd.DataFrame(columns=[target_col])
    
    # Inverse transform predictions and actuals
    y_test_inv[target_col] = scalers[target_col].inverse_transform(y_test).flatten()
    y_pred_inv[target_col] = scalers[target_col].inverse_transform(y_pred).flatten()
    
    y_train_inv[target_col] = scalers[target_col].inverse_transform(y_train).flatten()
    y_train_pred_inv[target_col] = scalers[target_col].inverse_transform(y_train_pred).flatten()
    
    # Calculate direction accuracy
    actual_direction = np.sign(y_test)
    
    # Get the direction (sign) of the predicted change
    predicted_direction = np.sign(y_pred)
    
    # 3. Check for a match
    matches = (actual_direction == predicted_direction)
    
    # 4. Calculate Directional Accuracy
    directional_accuracy = round(matches.mean() * 100,2)
    
    results[target_col]={"Directional Accuracy": directional_accuracy, "R2": r2_score(y_test_inv, y_pred_inv)}
    
    plt.figure(figsize=(24, 10))
    plt.plot(y_test_inv, label='Actual', linewidth=2, marker='o')
    plt.plot(y_pred_inv, label='Predicted', linewidth=2, marker='o')
    plt.title(f'Prediction vs Actual - TEST for {target_col}')
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'plots/{target_col}_prediction.png')  # save per target
    plt.show()
    plt.close()
    
    # CALCULATE ORIGINAL TIME SERIES
    # Get last original item
    last_original_item = df_original.iloc[-len(y_test_inv)-1][target_col]
    
    original_list = [last_original_item]
    prediction_list = [last_original_item]
    for index, row in y_test_inv.iterrows():
        original_list.append(original_list[-1]+row[target_col])
        prediction_list.append(original_list[-2]+y_pred_inv.loc[index,target_col])

    r2 = r2_score(original_list, prediction_list)
    
    plt.figure(figsize=(24, 10))
    plt.plot(original_list, label='Actual', linewidth=2, marker='o')
    plt.plot(prediction_list, label='Predicted', linewidth=2, marker='o')
    plt.title(f'ORIGINAL TIMESERIES: Prediction vs Actual - TEST for {target_col}')
    plt.xlabel('Time Step')
    plt.ylabel('Value')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()
    plt.close()

    # Save model
    if output_da is None or output_da < directional_accuracy:
        # Save the best model
        model_path = f"lstm_models_upd/{target_col}_model.keras"
        best_model.save(model_path)
        # Save to db
        db_manager.update_lstm_model(target_col.replace(".HE",""), model_path, r2, window_size, directional_accuracy, hyperparameters_json_string)
    
    print(m, target_col)
    print(f"Directional Accuracy on Differences: {directional_accuracy:.2f}%")
    print(f"R²: {r2_score(y_test_inv, y_pred_inv):.4f}")
    print(f"R² (in original df): {r2_score(original_list, prediction_list):.4f}")

    # remove folder
    base_directory = 'my_lstm_window_tuning' 
    project_name_folder = f'deep_lstm_window_tune_{target_col}'
    full_path_to_delete = os.path.join(base_directory, project_name_folder)
    shutil.rmtree(full_path_to_delete)
    
    m+=1
In [ ]: