Python Code for Portfolio Optimization
Chapter 11 – Risk Parity Portfolios
Daniel P. Palomar (2025). Portfolio Optimization: Theory and Application. Cambridge University Press.
Last update: March 27, 2025
Contributors:
Libraries¶
The following libraries are used in the examples:
# Core numerical computing import numpy as np import pandas as pd from typing import Dict, Tuple, List, Callable import warnings warnings.filterwarnings('ignore') import time from statistics import median # For financial data import yfinance as yf # Loading financial data import empyrical as emp # Performance metrics # Book data (pip install "git+https://github.com/dppalomar/pob.git#subdirectory=python") from pob_python import SP500_stocks_2015to2020, SP500_index_2015to2020 # Plotting import matplotlib.pyplot as plt import matplotlib.dates as mdates import matplotlib.gridspec as gridspec import seaborn as sns sns.set_theme(style="darkgrid") # Optimization import riskparityportfolio as rpp import cvxpy as cp # interface for convex optimization solvers from qpsolvers import solve_qp import quadprog from scipy import optimize, linalg
Some math definitions for the equations:
$\def\bm#1{\boldsymbol{#1}}$ $\def\textm#1{\textsf{#1}}$ $\def\T{{\mkern-2mu\raise-1mu\mathsf{T}}}}$ $\def\R{\mathbb{R}}$ $\def\E{\rm I\kern-.2em E}$ $\def\w{\bm{w}}$ $\def\bmu{\bm{\mu}}}$ $\def\bSigma{\bm{\Sigma}}}$
From dollar to risk diversification¶
Portfolio allocation and risk allocation for the $1/N$ portfolio and risk parity portfolio (based on the riskparityportfolio package available for installation at PiPy riskparityportfolio, see the riskparityportfolio documentation):
# Get data stock_prices = SP500_stocks_2015to2020.loc["2020":"2020-09", ["AAPL", "NFLX", "TSCO", "MGM", "MSFT", "FB", "AMZN", "GOOGL"]] T = stock_prices.shape[0] T_trn = round(0.70*T) # Calculate returns X = np.diff(np.log(stock_prices.iloc[:T_trn].values), axis=0) Sigma = np.cov(X.T) N = X.shape[1]
# Portfolios w_EWP = np.ones(N) / N w_RPP = rpp.RiskParityPortfolio(covariance=Sigma).weights
# Helper function to compute risk contribution def risk_contribution(w, Sigma): portfolio_risk = np.sqrt(w.T @ Sigma @ w) marginal_risk_contribution = Sigma @ w / portfolio_risk risk_contribution = w * marginal_risk_contribution return risk_contribution # Helper function to compute relative risk contribution def relative_risk_contribution(w, Sigma): rc = risk_contribution(w, Sigma) return rc / np.sum(rc)
# Helper function to plot barplots of portfolio weights and risk contribution def barplot_w_and_RCC(weights_dict, Sigma): fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) # Extract stock names (assuming it's always the first key) stocks = weights_dict['stocks'] # Get all portfolio keys except 'stocks' portfolio_keys = [key for key in weights_dict.keys() if key != 'stocks'] # Extract portfolios and their names portfolios = [weights_dict[key] for key in portfolio_keys] portfolio_names = portfolio_keys x = np.arange(len(stocks)) width = 0.8 / len(portfolios) # Adjust width based on number of portfolios # Plot weights for i, (portfolio, name) in enumerate(zip(portfolios, portfolio_names)): position = x + i*width - (len(portfolios)-1)*width/2 ax1.bar(position, portfolio, width, label=name, edgecolor='black') # Plot relative risk contributions for i, (portfolio, name) in enumerate(zip(portfolios, portfolio_names)): position = x + i*width - (len(portfolios)-1)*width/2 rrc = relative_risk_contribution(portfolio, Sigma) ax2.bar(position, rrc, width, label=name, edgecolor='black') # Set labels and titles ax1.set_title('Portfolio weights') ax1.set_ylabel('Weight') ax1.set_xticks(x) ax1.set_xticklabels(stocks) ax1.legend() ax2.set_title('Relative risk contribution') ax2.set_ylabel('Risk') ax2.set_xticks(x) ax2.set_xticklabels(stocks) ax2.legend() plt.tight_layout()
# Plot weights and risk contributions barplot_w_and_RCC({ 'stocks': stock_prices.columns, '1/N': w_EWP, 'RPP': w_RPP, }, Sigma)
Naive diagonal formulation¶
Portfolio allocation and risk contribution of the $1/N$ portfolio and naive RPP:
# Naive diagonal risk parity portfolio def naive_risk_parity(Sigma): w = 1 / np.sqrt(np.diag(Sigma)) return w / np.sum(w) w_naive_RPP = naive_risk_parity(Sigma) w_naive_RPP_package = rpp.RiskParityPortfolio(Sigma).get_diag_solution()
# Plot weights and risk contributions barplot_w_and_RCC({ 'stocks': stock_prices.columns, '1/N': w_EWP, 'Naive diagonal RPP': w_naive_RPP, 'Naive diagonal RPP (package)': w_naive_RPP_package, }, Sigma)
Vanilla convex formulations¶
Portfolio allocation and risk contribution of the vanilla convex RPP compared to benchmarks:
# Portfolios w_EWP = np.ones(N) / N w_naive_RPP = rpp.RiskParityPortfolio(Sigma).get_diag_solution() w_convex_RPP = rpp.RiskParityPortfolio(Sigma).weights
# Plot weights and risk contributions barplot_w_and_RCC({ 'stocks': stock_prices.columns, '1/N': w_EWP, 'Naive diagonal RPP': w_naive_RPP, 'Vanilla convex RPP': w_convex_RPP, }, Sigma)
Algorithms¶
General-purpose nonlinear solver¶
We can solve the convex problem formulation with SciPy's general-purpose nonlinear solver scipy.optimize (see ):
# Initial point w = np.ones(N) / N x0 = w / np.sqrt(w.T @ Sigma @ w) # Function definition def fn_convex(x, Sigma): N = Sigma.shape[0] return 0.5 * x.T @ Sigma @ x - (1/N) * np.sum(np.log(x)) # Optimize with general-purpose solver result = optimize.minimize( fn_convex, x0, args=(Sigma,), method='BFGS' ) x_general_solver = result.x w_general_solver_RPP = x_general_solver / np.sum(x_general_solver) # Sanity check of the solution b = np.ones(N) / N residual = Sigma @ x_general_solver - b / x_general_solver print("Optimality condition residual:", np.linalg.norm(residual))
Optimality condition residual: 1.4157366298264829e-05
Newton's method¶
Newton's method for Spinu's RPP formulation:
def newton_method_risk_parity(Sigma, num_iter=5): N = Sigma.shape[0] b = np.ones(N) / N # Initial point w = np.ones(N) / N x = w / np.sqrt(w.T @ Sigma @ w) # For tracking progress results = [] results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': None # Will be filled later }) for k in range(1, num_iter + 1): start_time = time.time() # Newton step grad = Sigma @ x - b / x Hessian = Sigma + np.diag(b / x**2) x_new = x - np.linalg.solve(Hessian, grad) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': None # Will be filled later }) # Calculate optimal value (assuming the last iteration is close to optimal) opt_value = results[-1]['obj_value'] # Fill in the gaps for result in results: result['gap'] = result['obj_value'] - opt_value return pd.DataFrame(results), x
# Run Newton's method df_newton, x_newton = newton_method_risk_parity(Sigma) df_newton['CPU time [ms]'] = df_newton['cpu_time_k'].cumsum()
# Plot convergence fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) ax1.semilogy(df_newton['k'], df_newton['gap'], 'b-', linewidth=1.5) ax1.set_xlabel('Iterations') ax1.set_ylabel('Optimality gap') ax1.set_title('Optimality gap versus iterations') ax1.set_xlim(0, df_newton['k'].max()) ax1.set_xticks(range(0, df_newton['k'].max() + 1)) ax2.semilogy(df_newton['CPU time [ms]'], df_newton['gap'], 'b-', linewidth=1.5) ax2.set_xlabel('CPU time [ms]') ax2.set_ylabel('Optimality gap') ax2.set_title('Optimality gap versus CPU time') ax2.set_xlim(0, 0.25) plt.tight_layout()
Cyclical coordinate descent algorithm¶
Cyclical coordinate descent algorithm for Spinu's RPP formulation:
def cyclical_coordinate_descent(Sigma, num_iter=10): N = Sigma.shape[0] b = np.ones(N) / N # Initial point w = np.ones(N) / N x = w / np.sqrt(w.T @ Sigma @ w) # For tracking progress results = [] results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': None # Will be filled later }) for k in range(1, num_iter + 1): start_time = time.time() x_new = x.copy() for i in range(N): # Update each coordinate Sigma_xk_i = np.dot(x_new[:i], Sigma[:i, i]) + np.dot(x_new[i+1:], Sigma[i+1:, i]) x_new[i] = (-Sigma_xk_i + np.sqrt(Sigma_xk_i**2 + 4 * Sigma[i, i] * b[i])) / (2 * Sigma[i, i]) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': None # Will be filled later }) # Calculate optimal value (assuming the last iteration is close to optimal) opt_value = results[-1]['obj_value'] # Fill in the gaps for result in results: result['gap'] = result['obj_value'] - opt_value return pd.DataFrame(results), x
# Run cyclical coordinate descent df_ccd, x_ccd = cyclical_coordinate_descent(Sigma) df_ccd['CPU time [ms]'] = df_ccd['cpu_time_k'].cumsum()
# Plot convergence fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8)) ax1.semilogy(df_ccd['k'], df_ccd['gap'], 'b-', linewidth=1.5) ax1.set_xlabel('Iterations') ax1.set_ylabel('Optimality gap') ax1.set_title('Optimality gap versus iterations') ax1.set_xlim(0, df_ccd['k'].max()) ax1.set_xticks(range(0, df_ccd['k'].max() + 1)) ax2.semilogy(df_ccd['CPU time [ms]'], df_ccd['gap'], 'b-', linewidth=1.5) ax2.set_xlabel('CPU time [ms]') ax2.set_ylabel('Optimality gap') ax2.set_title('Optimality gap versus CPU time') ax2.set_xlim(0, 50) plt.tight_layout()
Numerical comparison¶
Convergence of Newton, MM, and SCA methods for Spinu's RPP problem, comparing the original version with the improved one (i.e., using the correlation matrix instead of the covariance matrix and using the linear normalization step). See Chapter 11 in book for details. It seems that the best method is the improved SCA.
num_iter = 25 N = Sigma.shape[0] b = np.ones(N) / N # Prepare correlation matrix sigma = np.sqrt(np.diag(Sigma)) C = np.corrcoef(X.T) # Compute optimal value (using vanilla convex RPP) w_opt = rpp.RiskParityPortfolio(Sigma).weights x_opt = w_opt / np.sqrt(w_opt.T @ Sigma @ w_opt) opt_value = 0.5 * x_opt.T @ Sigma @ x_opt - np.sum(b * np.log(x_opt)) results = []
# # Newton's method # x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1)) results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'Newton' }) for k in range(1, num_iter + 1): start_time = time.time() # Newton step grad = Sigma @ x - b / x Hessian = Sigma + np.diag(b / x**2) x_new = x - np.linalg.solve(Hessian, grad) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'Newton' })
# # Newton's method (improved) # x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1)) rowsum_C = np.sum(C, axis=1) results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'Newton - improved' }) for k in range(1, num_iter + 1): start_time = time.time() # Newton step with correlation matrix grad = C @ x - b / x Hessian = C + np.diag(b / x**2) x_new = x - np.linalg.solve(Hessian, grad) x_new = x_new * np.sqrt(np.sum(b/x_new) / np.sum(rowsum_C * x_new)) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'Newton - improved' })
# # MM algorithm # x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1)) results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'MM' }) lmd_max = np.max(np.linalg.eigvals(Sigma)) Sigma_lmd_max = Sigma - lmd_max * np.eye(N) for k in range(1, num_iter + 1): start_time = time.time() # MM update Sigma_lmd_max_xk = Sigma_lmd_max @ x x_new = (-Sigma_lmd_max_xk + np.sqrt(Sigma_lmd_max_xk**2 + 4 * lmd_max * b)) / (2 * lmd_max) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'MM' })
# # MM algorithm (improved) # x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1)) rowsum_C = np.sum(C, axis=1) results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'MM - improved' }) lmd_max = np.max(np.linalg.eigvals(C)) C_lmd_max = C - lmd_max * np.eye(N) for k in range(1, num_iter + 1): start_time = time.time() # MM update with correlation matrix C_lmd_max_xk = C_lmd_max @ x x_new = (-C_lmd_max_xk + np.sqrt(C_lmd_max_xk**2 + 4 * lmd_max * b)) / (2 * lmd_max) x_new = x_new * np.sqrt(np.sum(b/x_new) / np.sum(rowsum_C * x_new)) cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'MM - improved' })
# # SCA algorithm # x = np.sqrt(b) / np.sqrt(np.sum(Sigma, axis=1)) gamma = 1 eps = 0.1 results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'SCA' }) Sigma_Diag_Sigma = Sigma - np.diag(np.diag(Sigma)) for k in range(1, num_iter + 1): start_time = time.time() # SCA update Sigma_Diag_Sigma_xk = Sigma_Diag_Sigma @ x x_hat = (-Sigma_Diag_Sigma_xk + np.sqrt(Sigma_Diag_Sigma_xk**2 + 4 * np.diag(Sigma) * b)) / (2 * np.diag(Sigma)) x_new = gamma * x_hat + (1 - gamma) * x cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new gamma = gamma * (1 - eps * gamma) results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)), 'gap': 0.5 * x.T @ Sigma @ x - np.sum(b * np.log(x)) - opt_value, 'method': 'SCA' })
# # SCA algorithm (improved) # x = np.sqrt(b) / np.sqrt(np.sum(C, axis=1)) rowsum_C = np.sum(C, axis=1) gamma = 1 eps = 0.1 results.append({ 'k': 0, 'cpu_time_k': 0, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'SCA - improved' }) C_minus_I = C - np.eye(N) for k in range(1, num_iter + 1): start_time = time.time() # SCA update with correlation matrix C_minus_I_xk = C_minus_I @ x x_hat = (-C_minus_I_xk + np.sqrt(C_minus_I_xk**2 + 4 * b)) / 2 x_hat = x_hat * np.sqrt(np.sum(b/x_hat) / np.sum(rowsum_C * x_hat)) x_new = gamma * x_hat + (1 - gamma) * x cpu_time = (time.time() - start_time) * 1000 # in milliseconds x = x_new gamma = gamma * (1 - eps * gamma) results.append({ 'k': k, 'cpu_time_k': cpu_time, 'obj_value': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)), 'gap': 0.5 * (x/sigma).T @ Sigma @ (x/sigma) - np.sum(b * np.log(x/sigma)) - opt_value, 'method': 'SCA - improved' })
# Convert results to DataFrame df = pd.DataFrame(results) # Calculate cumulative CPU time df['CPU time [ms]'] = df.groupby('method')['cpu_time_k'].cumsum() # Plot convergence fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) # Plot gap vs iterations for method in df['method'].unique(): method_data = df[df['method'] == method] ax1.semilogy(method_data['k'], method_data['gap'], linewidth=1.5, label=method) ax1.set_xlabel('Iterations') ax1.set_ylabel('Optimality gap') ax1.set_title('Optimality gap versus iterations') ax1.set_xlim(0, num_iter) ax1.set_xticks(range(0, num_iter+1, 5)) ax1.legend() ax1.grid(True, which="both", ls="--", alpha=0.3) # Plot gap vs CPU time for method in df['method'].unique(): method_data = df[df['method'] == method] ax2.semilogy(method_data['CPU time [ms]'], method_data['gap'], linewidth=1.5, label=method) ax2.set_xlabel('CPU time [ms]') ax2.set_ylabel('Optimality gap') ax2.set_title('Optimality gap versus CPU time') ax2.set_xlim(0, 0.25) ax2.legend() ax2.grid(True, which="both", ls="--", alpha=0.3) plt.tight_layout() plt.show()
General nonconvex formulations¶
Portfolio allocation and risk contribution of general nonconvex RPP (with $w_i \leq 0.15$) compared to benchmarks ($1/N$ portfolio, naive diagonal RPP, and vanilla convex RPP):
# Portfolios w_EWP = np.ones(N) / N w_naive_RPP = rpp.RiskParityPortfolio(Sigma).get_diag_solution() w_convex_RPP = rpp.RiskParityPortfolio(Sigma).weights # General nonconvex formulation with upper bound w < 0.15 via Cw = c and Dw <= d) ub = 0.14 my_portfolio = rpp.RiskParityPortfolio(Sigma) my_portfolio.design(Cmat=np.ones((1, N)), cvec=np.array([1.0]), # sum(w) = 1 Dmat=np.vstack([-np.eye(N), np.eye(N)]), dvec=np.concatenate([np.zeros(N), ub*np.ones(N)])) # -w <= 0 and w < 0.15 w_nonconvex_RPP = my_portfolio.weights
# Plot weights and risk contributions barplot_w_and_RCC({ 'stocks': stock_prices.columns, '1/N': w_EWP, 'Naive diagonal RPP': w_naive_RPP, 'Vanilla convex RPP': w_convex_RPP, 'General nonconvex RPP': w_nonconvex_RPP, }, Sigma)
Algorithms¶
Converge comparison for different algorithms to solve the nonconvex RPP formulation: $$ \begin{array}{ll} \underset{\w}{\textm{minimize}} & \begin{aligned}\sum_{i=1}^N \left(\frac{w_i\left(\bSigma\w\right)_i}{\w^\T\bSigma\w} - b_i\right)^2\end{aligned}\\ \textm{subject to} & \w \in \mathcal{W}. \end{array} $$
# Prep data N = 200 stock_prices = SP500_stocks_2015to2020.iloc[:, :N] X = np.diff(np.log(stock_prices.values), axis=0) # Calculate returns from stock prices Sigma = np.cov(X.T) b = np.ones(N) / N # Risk budget w_ub = 0.008 # RPP with upper bound w_init = np.ones(N) / N num_iter = 20 num_times = 10 # to compute the CPU time
# # General nonlinear solver (using scipy.optimize) # def R(w): Sigma_w = Sigma @ w r = w * Sigma_w sum_r = np.sum(r) r_sumr_b = r/sum_r - b return np.sum(np.square(r_sumr_b)) def R_grad(w): Sigma_w = Sigma @ w r = w * Sigma_w sum_r = np.sum(r) r_sumr_b = r/sum_r - b v = (2/sum_r) * (r_sumr_b - np.sum(r_sumr_b * r)/sum_r) return (Sigma @ (w * v) + Sigma_w * v) def heq(w, *args): """Equality constraint: sum(w) = 1""" return np.sum(w) - 1 def heq_jac(w, *args): """Jacobian of equality constraint""" return np.ones((1, len(w))) def hin(w, *args): """Inequality constraints: w >= 0 and w <= w_ub""" return np.concatenate([w, w_ub - w]) def hin_jac(w, *args): """Jacobian of inequality constraints""" N = len(w) return np.vstack([np.eye(N), -np.eye(N)])
# Initialize dataframe for results w = w_init.copy() df = pd.DataFrame({ "k": [0], "cpu_time_per_iter": [np.nan], "CPU time [ms]": [0], "obj_value": [R(w)], "method": ["nonlinear solver (scipy)"] }) # Run optimization for different iterations for k in range(1, num_iter + 1): cpu_times = [] for _ in range(num_times): start_time = time.time() # Define constraints for scipy.optimize constraints = [ {'type': 'eq', 'fun': heq, 'jac': heq_jac}, {'type': 'ineq', 'fun': hin, 'jac': hin_jac} ] # Run optimization with same initial point and with limited iterations res = optimize.minimize( R, w_init, jac=R_grad, constraints=constraints, method='SLSQP', options={'maxiter': k, 'disp': False} ) w = res.x end_time = time.time() cpu_times.append((end_time - start_time) * 1e6) # microseconds cpu_time = median(cpu_times) # Append results to dataframe df = pd.concat([df, pd.DataFrame({ "k": [k], "cpu_time_per_iter": [np.nan], "CPU time [ms]": [1e3 * cpu_time], # convert to milliseconds "obj_value": [R(w)], "method": ["nonlinear solver (scipy)"] })], ignore_index=True)
# # SCA via explicit implementation # gamma = 0.9 zeta = 1e-7 # Constraints (A' * b >= b0) meq = 1 # Create constraint matrices ones_vec = np.ones((1, N)) neg_eye = -np.eye(N) pos_eye = np.eye(N) Amat = np.vstack([ones_vec, neg_eye, pos_eye]).T # Transpose to match R's t(rbind(...)) bvec = np.concatenate([[1], -np.ones(N) * w_ub, np.zeros(N)]) # Intermediate variables tau = 0.05 * np.sum(np.diag(Sigma)) / (2*N) tauI = tau * np.eye(N) # Define functions for risk concentration my_portfolio = rpp.RiskParityPortfolio(Sigma) risk = rpp.RiskContribOverVarianceMinusBudget(my_portfolio) g = risk.risk_concentration_vector # gvec A = risk.jacobian_risk_concentration_vector() # Amat # # Numerical check # g(np.ones(N) / N), A(np.ones(N) / N)
# Initialize wk = w_init.copy() df = pd.concat([df, pd.DataFrame({ "k": [0], "cpu_time_per_iter": [0], "CPU time [ms]": [np.nan], "obj_value": [R(wk)], "method": ["SCA"] })], ignore_index=True) # Loop for k in range(1, num_iter + 1): cpu_times = [] for _ in range(num_times): start_time = time.time() # Auxiliary quantities gk = np.asarray(g(wk)) Jk = np.asarray(A(wk)) Qk = 2 * Jk.T @ Jk + tauI qk = 2 * Jk.T @ gk - Qk @ wk # Prepare constraints for QP solver # Equality constraint: sum(w) = 1 A_eq = np.ones((1, N)) b_eq = np.array([1.0]) # Inequality constraints: 0 <= w <= w_ub G = np.vstack([-np.eye(N), np.eye(N)]) # -w <= 0 and w <= w_ub h = np.concatenate([np.zeros(N), np.ones(N) * w_ub]) # Combine then together Cmat = np.vstack([A_eq, -G]).T bvec = np.concatenate([b_eq, -h]) # Solve QP #w_hat = solve_qp(P=Qk, q=qk, A=A_eq, b=b_eq, lb=np.zeros(N), ub=np.ones(N) * w_ub, solver="quadprog") #w_hat = solve_qp(P=Qk, q=qk, G=G, h=h, A=A_eq, b=b_eq, solver="quadprog") w_hat = quadprog.solve_qp(Qk, -qk, C=Cmat, b=bvec, meq=1)[0] w_new = wk + gamma * (w_hat - wk) end_time = time.time() cpu_times.append((end_time - start_time) * 1e6) # microseconds cpu_time_k = median(cpu_times) wk = w_new gamma = gamma * (1 - zeta*gamma) # Append results to dataframe df = pd.concat([df, pd.DataFrame({ "k": [k], "cpu_time_per_iter": [1e3 * cpu_time_k], "CPU time [ms]": [np.nan], "obj_value": [R(wk)], "method": ["SCA"] })], ignore_index=True) df.loc[df['method'] == 'SCA', 'CPU time [ms]'] = df.loc[df['method'] == 'SCA', 'cpu_time_per_iter'].cumsum()
# Process dataframe #df['method'] = pd.Categorical(df['method'], categories=df['method'].unique()) # Add optimality gap using the actual minimum value min_obj_value = df['obj_value'].min() df['gap'] = df['obj_value'] - min_obj_value
# # Plotting # fig = plt.figure(figsize=(12, 10)) gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1]) # Plot 1: Gap vs iterations ax1 = plt.subplot(gs[0]) for method in df['method'].unique(): method_data = df[df['method'] == method] ax1.semilogy(method_data['k'], method_data['gap'], linewidth=1.5, label=method) ax1.set_xlabel('Iterations') ax1.set_ylabel('Optimality gap') ax1.set_title('Optimality gap versus iterations') ax1.set_xlim(0, num_iter) ax1.set_xticks(range(0, num_iter+1, 1)) ax1.set_ylim(1e-7, None) ax1.legend() ax1.grid(True, which="both", ls="--", alpha=0.3) # Plot 2: Gap vs CPU time ax2 = plt.subplot(gs[1]) for method in df['method'].unique(): method_data = df[df['method'] == method] ax2.semilogy(method_data['CPU time [ms]'], method_data['gap'], linewidth=1.5, label=method) ax2.set_xlabel('CPU time [ms]') ax2.set_ylabel('Optimality gap') ax2.set_title('Optimality gap versus CPU time') ax2.set_ylim(1e-7, None) ax2.legend() ax2.grid(True, which="both", ls="--", alpha=0.3) plt.tight_layout() plt.show()