Stock Price Prediction System ML Model
comprehensive approach using LSTM (Long Short-Term Memory) networks, which are particularly effective for time series prediction.
1. Setup and Installation
bash
pip install yfinance numpy pandas scikit-learn tensorflow matplotlib seaborn
2. Data Collection and Preprocessing
python
import yfinance as yf
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
# Download stock data
def download_stock_data(ticker, start_date='2019-01-01', end_date='2024-01-01'):
stock = yf.download(ticker, start=start_date, end=end_date)
return stock
# Load data (example: Apple stock)
ticker = 'AAPL'
data = download_stock_data(ticker)
# Explore data
print(data.head())
print(data.info())
# Visualize closing price
plt.figure(figsize=(12,6))
plt.plot(data.index, data['Close'])
plt.title(f'{ticker} Stock Closing Price History')
plt.xlabel('Date')
plt.ylabel('Price (USD)')
plt.show()
3. Feature Engineering
python
# Add technical indicators
def add_technical_indicators(df):
# Moving averages
df['MA_7'] = df['Close'].rolling(window=7).mean()
df['MA_21'] = df['Close'].rolling(window=21).mean()
df['MA_50'] = df['Close'].rolling(window=50).mean()
# Relative Strength Index (RSI)
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# Volume indicators
df['Volume_SMA'] = df['Volume'].rolling(window=20).mean()
df['Volume_Ratio'] = df['Volume'] / df['Volume_SMA']
# Price change features
df['Daily_Return'] = df['Close'].pct_change()
df['Price_Change'] = df['Close'] - df['Open']
df['High_Low_Ratio'] = df['High'] / df['Low']
return df
data = add_technical_indicators(data)
data = data.dropna() # Remove NaN values
print(data[['Close', 'MA_7', 'MA_21', 'RSI', 'Volume_Ratio']].tail())
4. LSTM Model for Time Series Prediction
python
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
def create_lstm_model(input_shape):
model = Sequential([
LSTM(100, return_sequences=True, input_shape=input_shape),
Dropout(0.2),
LSTM(100, return_sequences=True),
Dropout(0.2),
LSTM(100),
Dropout(0.2),
Dense(50),
Dense(25),
Dense(1)
])
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
def prepare_lstm_data(data, look_back=60):
# Normalize data
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data[['Close']])
X, y = [], []
for i in range(look_back, len(scaled_data)):
X.append(scaled_data[i-look_back:i, 0])
y.append(scaled_data[i, 0])
X, y = np.array(X), np.array(y)
X = X.reshape(X.shape[0], X.shape[1], 1)
return X, y, scaler
# Prepare data for LSTM
look_back = 60
X, y, scaler = prepare_lstm_data(data, look_back)
# Split data
split_ratio = 0.8
split_idx = int(len(X) * split_ratio)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
# Create and train model
lstm_model = create_lstm_model((look_back, 1))
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
history = lstm_model.fit(
X_train, y_train,
epochs=100,
batch_size=32,
validation_split=0.1,
callbacks=[early_stop],
verbose=1
)
# Predict
y_pred = lstm_model.predict(X_test)
y_pred_actual = scaler.inverse_transform(y_pred.reshape(-1, 1))
y_test_actual = scaler.inverse_transform(y_test.reshape(-1, 1))
5. Random Forest Model (Alternative)
python
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
def prepare_ml_data(data, target_col='Close'):
# Select features
feature_cols = ['Open', 'High', 'Low', 'Volume', 'MA_7', 'MA_21',
'RSI', 'Volume_Ratio', 'Daily_Return']
X = data[feature_cols].values
y = data[target_col].values
# Normalize features
scaler_X = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X)
# Create time-lagged features
look_back = 10
X_lagged, y_lagged = [], []
for i in range(look_back, len(X_scaled)):
X_lagged.append(X_scaled[i-look_back:i].flatten())
y_lagged.append(y[i])
return np.array(X_lagged), np.array(y_lagged), scaler_X
# Prepare data
X_ml, y_ml, _ = prepare_ml_data(data)
# Split data
X_train_ml, X_test_ml, y_train_ml, y_test_ml = train_test_split(
X_ml, y_ml, test_size=0.2, shuffle=False
)
# Train Random Forest
rf_model = RandomForestRegressor(
n_estimators=100,
max_depth=15,
random_state=42,
n_jobs=-1
)
rf_model.fit(X_train_ml, y_train_ml)
# Predict
rf_predictions = rf_model.predict(X_test_ml)
6. Model Evaluation
python
def evaluate_model(y_true, y_pred, model_name):
mse = mean_squared_error(y_true, y_pred)
mae = mean_absolute_error(y_true, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_true, y_pred)
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
print(f"{model_name} Performance:")
print(f"MSE: {mse:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R² Score: {r2:.4f}")
print(f"MAPE: {mape:.2f}%")
print("-" * 40)
return {
'MSE': mse, 'RMSE': rmse, 'MAE': mae,
'R2': r2, 'MAPE': mape
}
# Evaluate LSTM
lstm_results = evaluate_model(y_test_actual, y_pred_actual, "LSTM")
# Evaluate Random Forest
rf_results = evaluate_model(y_test_ml[-len(rf_predictions):], rf_predictions, "Random Forest")
7. Visualization
python
def plot_predictions(y_test, predictions, title):
plt.figure(figsize=(15, 8))
plt.subplot(2, 2, 1)
plt.plot(y_test, label='Actual')
plt.plot(predictions, label='Predicted', alpha=0.7)
plt.title(f'{title} - Price Prediction')
plt.xlabel('Time')
plt.ylabel('Price (USD)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.subplot(2, 2, 2)
errors = y_test.flatten() - predictions.flatten()
plt.hist(errors, bins=30, edgecolor='black')
plt.title('Prediction Error Distribution')
plt.xlabel('Error')
plt.ylabel('Frequency')
plt.grid(True, alpha=0.3)
plt.subplot(2, 2, 3)
plt.scatter(y_test, predictions, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.title('Actual vs Predicted')
plt.xlabel('Actual Price')
plt.ylabel('Predicted Price')
plt.grid(True, alpha=0.3)
plt.subplot(2, 2, 4)
plt.plot(errors)
plt.axhline(y=0, color='r', linestyle='--')
plt.title('Prediction Errors Over Time')
plt.xlabel('Time')
plt.ylabel('Error')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Plot predictions
plot_predictions(y_test_actual, y_pred_actual, "LSTM")
8. Future Price Prediction
python
def predict_future_days(model, last_sequence, scaler, days=30):
predictions = []
current_sequence = last_sequence.copy()
for _ in range(days):
# Predict next day
next_pred = model.predict(current_sequence.reshape(1, current_sequence.shape[0], 1), verbose=0)
predictions.append(next_pred[0, 0])
# Update sequence
current_sequence = np.roll(current_sequence, -1)
current_sequence[-1] = next_pred
# Inverse transform predictions
predictions = np.array(predictions).reshape(-1, 1)
return scaler.inverse_transform(predictions)
# Predict next 30 days
last_60_days = X_test[-1, :, 0] # Last sequence from test set
future_predictions = predict_future_days(lstm_model, last_60_days, scaler, days=30)
print("Future 30-day price predictions:")
for i, price in enumerate(future_predictions, 1):
print(f"Day {i}: ${price[0]:.2f}")
# Visualize future predictions
plt.figure(figsize=(12,6))
plt.plot(range(len(y_test_actual)), y_test_actual[-100:], label='Historical (Actual)')
plt.plot(range(len(y_test_actual), len(y_test_actual) + 30), future_predictions, 'r--', label='Future Prediction')
plt.title('Stock Price Prediction - Next 30 Days')
plt.xlabel('Days')
plt.ylabel('Price (USD)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
9. Save and Load Model
python
# Save model
lstm_model.save('stock_price_lstm_model.h5')
import joblib
joblib.dump(scaler, 'scaler.pkl')
# Load model
from tensorflow.keras.models import load_model
loaded_model = load_model('stock_price_lstm_model.h5')
loaded_scaler = joblib.load('scaler.pkl')