Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with SR3-like class: regularization term #610

Open
Codestriz opened this issue Mar 18, 2025 · 1 comment
Open

Issue with SR3-like class: regularization term #610

Codestriz opened this issue Mar 18, 2025 · 1 comment

Comments

@Codestriz
Copy link

Codestriz commented Mar 18, 2025

Hello everyone,

I am trying to use a class similar to SR3, and I attempted to replicate it by copying the available code from the GitHub repository. However, I am not obtaining the expected results. Specifically, I noticed that the regularization term differs by a factor of lambda × number of active terms, and I don’t understand why this discrepancy occurs.

Could this be related to a difference in versions? If so, which version should I be using to ensure consistency?
I am attaching the code I used along with the results I obtained. Any insights or guidance would be greatly appreciated!

Thanks in advance!

Beatriz

results:

Iteration ... |y - Xw|^2 ... |w-u|^2/v ... R(u) ... Total Error: |y-Xw|^2 + |w-u|^2/v + R(u)
0 ... 3.9650e-05 ... 4.0205e-02 ... 3.0814e-02 ... 7.1058e-02
10 ... 1.6021e-02 ... 3.0389e-02 ... 1.8734e-02 ... 6.5143e-02
20 ... 1.6307e-02 ... 2.7490e-02 ... 1.4771e-02 ... 5.8568e-02
30 ... 1.4109e-02 ... 2.5248e-02 ... 1.2624e-02 ... 5.1981e-02
40 ... 1.5152e-02 ... 2.1159e-02 ... 1.0773e-02 ... 4.7084e-02
50 ... 1.3493e-02 ... 1.6753e-02 ... 1.0492e-02 ... 4.0738e-02
(x)' = -2.001 x + 1.001 y^2
(y)' = 1.000 x + -0.350 y
Iteration ... |y - Xw|^2 ... |w-u|^2/v ... R(u) ... Total Error: |y-Xw|^2 + |w-u|^2/v + R(u)
0 ... 3.9650e-05 ... 4.0205e-02 ... 6.1627e-01 ... 6.5652e-01
10 ... 1.6021e-02 ... 3.0389e-02 ... 3.7467e-01 ... 4.2108e-01
20 ... 1.6307e-02 ... 2.7490e-02 ... 2.9542e-01 ... 3.3922e-01
30 ... 1.4109e-02 ... 2.5248e-02 ... 2.5248e-01 ... 2.9184e-01
40 ... 1.5152e-02 ... 2.1159e-02 ... 2.1546e-01 ... 2.5177e-01
50 ... 1.3493e-02 ... 1.6753e-02 ... 2.0984e-01 ... 2.4008e-01
(x)' = -2.001 x + 1.001 y^2
(y)' = 1.000 x + -0.350 y

code:

import numpy as np
from scipy.linalg import cho_factor, cho_solve
from scipy.integrate import solve_ivp 
from pysindy.optimizers import BaseOptimizer
from pysindy.optimizers.sr3 import get_regularization, get_prox
from pysindy.utils import capped_simplex_projection
import warnings
from sklearn.exceptions import ConvergenceWarning
import pysindy as ps
from typing import Union
from pysindy.optimizers import SR3, BaseOptimizer

class CustomSR3(BaseOptimizer):
    def __init__(
        self,
        reg_weight_lam=0.005,
        regularizer="L0",
        relax_coeff_nu=1.0,
        tol=1e-5,
        trimming_fraction=0.0,
        trimming_step_size=1.0,
        max_iter=30,
        copy_X=True,
        initial_guess=None,
        normalize_columns=False,
        verbose=False,
        unbias=False,
    ):
        super(CustomSR3, self).__init__(
            max_iter=max_iter,
            initial_guess=initial_guess,
            copy_X=copy_X,
            normalize_columns=normalize_columns,
            #unbias=unbias,
        )
        if relax_coeff_nu <= 0:
            raise ValueError("relax_coeff_nu must be positive")
        if tol <= 0:
            raise ValueError("tol must be positive")
        if (trimming_fraction < 0) or (trimming_fraction > 1):
            raise ValueError("trimming fraction must be between 0 and 1")
        if trimming_fraction > 0 and unbias:
            raise ValueError(
                "Unbiasing counteracts some of the effects of trimming.  Either set"
                " unbias=False or trimming_fraction=0.0"
            )
        if regularizer.lower() not in (
            "l0",
            "l1",
            "l2",
            "weighted_l0",
            "weighted_l1",
            "weighted_l2",
        ):
            raise NotImplementedError(
                "Please use a valid thresholder, l0, l1, l2, "
                "weighted_l0, weighted_l1, weighted_l2."
            )
        if regularizer[:8].lower() == "weighted" and not isinstance(
            reg_weight_lam, np.ndarray
        ):
            raise ValueError(
                "weighted regularization requires the reg_weight_lam parameter "
                "to be a 2d array"
            )
        if np.any(reg_weight_lam < 0):
            raise ValueError("reg_weight_lam cannot contain negative entries")
        if isinstance(reg_weight_lam, np.ndarray):
            reg_weight_lam = reg_weight_lam.T
        self.reg_weight_lam = reg_weight_lam
        self.relax_coeff_nu = relax_coeff_nu
        self.tol = tol
        self.regularizer = regularizer
        self.reg = get_regularization(regularizer)
        self.prox = get_prox(regularizer)
        if trimming_fraction == 0.0:
            self.use_trimming = False
        else:
            self.use_trimming = True
        self.trimming_fraction = trimming_fraction
        self.trimming_step_size = trimming_step_size
        self.verbose = verbose

    @staticmethod
    def calculate_l0_weight(
        threshold: Union[float, np.ndarray[np.float64]], relax_coeff_nu: float
    ):
        """
        Calculate the L0 regularizer weight that is equivalent to a known L0 threshold

        See Appendix S1 of the following paper for more details.
            Champion, K., Zheng, P., Aravkin, A. Y., Brunton, S. L., & Kutz, J. N.
            (2020). A unified sparse optimization framework to learn parsimonious
            physics-informed models from data. IEEE Access, 8, 169259-169271.
        """
        return (threshold**2) / (2 * relax_coeff_nu)

    def enable_trimming(self, trimming_fraction):
        """
        Enable the trimming of potential outliers.

        Parameters
        ----------
        trimming_fraction: float
            The fraction of samples to be trimmed.
            Must be between 0 and 1.
        """
        self.use_trimming = True
        self.trimming_fraction = trimming_fraction

    def disable_trimming(self):
        """Disable trimming of potential outliers."""
        self.use_trimming = False
        self.trimming_fraction = None

    def _objective(self, x, y, q, coef_full, coef_sparse, trimming_array=None):
        """Objective function"""
        if q != 0:
            print_ind = q % (self.max_iter // 10.0)
        else:
            print_ind = q
        R2 = (y - np.dot(x, coef_full)) ** 2
        D2 = (coef_full - coef_sparse) ** 2
        if self.use_trimming:
            assert trimming_array is not None
            R2 *= trimming_array.reshape(x.shape[0], 1)
        regularization = self.reg(coef_full, self.reg_weight_lam)
        if print_ind == 0 and self.verbose:
            row = [
                q,
                np.sum(R2),
                np.sum(D2) / self.relax_coeff_nu,
                regularization,
                np.sum(R2) + np.sum(D2) + regularization,
            ]
            print(
                "{0:10d} ... {1:10.4e} ... {2:10.4e} ... {3:10.4e}"
                " ... {4:10.4e}".format(*row)
            )
        return (
            0.5 * np.sum(R2)
            + 0.5 * regularization
            + 0.5 * np.sum(D2) / self.relax_coeff_nu
        )

    def _update_full_coef(self, cho, x_transpose_y, coef_sparse):
        """Update the unregularized weight vector"""
        b = x_transpose_y + coef_sparse / self.relax_coeff_nu
        coef_full = cho_solve(cho, b)
        self.iters += 1
        return coef_full

    def _update_sparse_coef(self, coef_full):
        """Update the regularized weight vector"""
        coef_sparse = self.prox(coef_full, self.reg_weight_lam * self.relax_coeff_nu)
        return coef_sparse

    def _update_trimming_array(self, coef_full, trimming_array, trimming_grad):
        trimming_array = trimming_array - self.trimming_step_size * trimming_grad
        trimming_array = capped_simplex_projection(
            trimming_array, self.trimming_fraction
        )
        self.history_trimming_.append(trimming_array)
        return trimming_array

    def _convergence_criterion(self):
        """Calculate the convergence criterion for the optimization"""
        this_coef = self.history_[-1]
        if len(self.history_) > 1:
            last_coef = self.history_[-2]
        else:
            last_coef = np.zeros_like(this_coef)
        err_coef = np.sqrt(np.sum((this_coef - last_coef) ** 2)) / self.relax_coeff_nu
        if self.use_trimming:
            this_trimming_array = self.history_trimming_[-1]
            if len(self.history_trimming_) > 1:
                last_trimming_array = self.history_trimming_[-2]
            else:
                last_trimming_array = np.zeros_like(this_trimming_array)
            err_trimming = (
                np.sqrt(np.sum((this_trimming_array - last_trimming_array) ** 2))
                / self.trimming_step_size
            )
            return err_coef + err_trimming
        return err_coef

    def _reduce(self, x, y):
        """
        Perform at most ``self.max_iter`` iterations of the SR3 algorithm.

        Assumes initial guess for coefficients is stored in ``self.coef_``.
        """
        if self.initial_guess is not None:
            self.coef_ = self.initial_guess

        coef_sparse = self.coef_.T
        n_samples, n_features = x.shape

        coef_full = coef_sparse.copy()
        if self.use_trimming:
            trimming_array = np.repeat(1.0 - self.trimming_fraction, n_samples)
            self.history_trimming_ = [trimming_array]
        else:
            trimming_array = None

        # Precompute some objects for upcoming least-squares solves.
        # Assumes that self.nu is fixed throughout optimization procedure.
        cho = cho_factor(
            np.dot(x.T, x) + np.diag(np.full(x.shape[1], 1.0 / self.relax_coeff_nu))
        )
        x_transpose_y = np.dot(x.T, y)

        # Print initial values for each term in the optimization
        if self.verbose:
            row = [
                "Iteration",
                "|y - Xw|^2",
                "|w-u|^2/v",
                "R(u)",
                "Total Error: |y-Xw|^2 + |w-u|^2/v + R(u)",
            ]
            print(
                "{: >10} ... {: >10} ... {: >10} ... {: >10} ... {: >10}".format(*row)
            )
        objective_history = []

        for k in range(self.max_iter):
            if self.use_trimming:
                x_weighted = x * trimming_array.reshape(n_samples, 1)
                cho = cho_factor(
                    np.dot(x_weighted.T, x)
                    + np.diag(np.full(x.shape[1], 1.0 / self.relax_coeff_nu))
                )
                x_transpose_y = np.dot(x_weighted.T, y)
                trimming_grad = 0.5 * np.sum((y - x.dot(coef_full)) ** 2, axis=1)
            coef_full = self._update_full_coef(cho, x_transpose_y, coef_sparse)
            coef_sparse = self._update_sparse_coef(coef_full)
            self.history_.append(coef_sparse.T)
            if self.use_trimming:
                trimming_array = self._update_trimming_array(
                    coef_full, trimming_array, trimming_grad
                )
            objective_history.append(
                self._objective(x, y, k, coef_full, coef_sparse, trimming_array)
            )
            if self._convergence_criterion() < self.tol:
                # Could not (further) select important features
                break
        else:
            warnings.warn(
                f"SR3 did not converge after {self.max_iter} iterations.",
                ConvergenceWarning,
            )
        self.coef_ = coef_sparse.T
        self.coef_full_ = coef_full.T
        if self.use_trimming:
            self.trimming_array = trimming_array
        self.objective_history = objective_history


# Define the problem
def sistema_acoplado(t, y):
    y1, y2 = y  
    dy1_dt = -2*y1 + y2**2  
    dy2_dt = y1 - 0.35*y2  
    return [dy1_dt, dy2_dt]  


## MAIN CODE
# Initial conditions
y0 = [1.0, 0.0]
t_span = (0, 10)  
time = np.linspace(t_span[0], t_span[1], 500)  # Timesteps

#Solve the system
sol = solve_ivp(sistema_acoplado, t_span, y0, t_eval=time)
xsol = sol.y[0]  
ysol = sol.y[1] 

X = np.column_stack([xsol, ysol])

feature_library = ps.PolynomialLibrary(degree=3, include_bias=False)
names = ['x', 'y']  

#First I apply ps.SR3
soptimizer_std = ps.SR3(
    threshold=0.05,
    thresholder="l1",
    nu=1.0,
    max_iter=100,
    tol=1e-3,
    verbose=True,
)
modelo_sr3 = ps.SINDy(feature_library=feature_library, optimizer=soptimizer_std, feature_names=names)
modelo_sr3.fit(X, t=time)
modelo_sr3.print()

#Second I apply CustomSR3
soptimizer = CustomSR3(
    reg_weight_lam=0.05,
    regularizer="l1",
    relax_coeff_nu=1.0,
    max_iter=100,
    tol=1e-3,
    verbose=True,
)
modelo_custom = ps.SINDy(feature_library=feature_library, optimizer=soptimizer, feature_names=names)
modelo_custom.fit(X, t=time)
modelo_custom.print()
@Codestriz
Copy link
Author

Hello,

I found the error: the stable version reads:
regularization = self.reg(coef_full, self.threshold**2 / self.nu) , while the latest version in GitHub reads:
regularization = self.reg(coef_full, self.reg_weight_lam)

Source of the stable version: https://pysindy.readthedocs.io/en/stable/_modules/pysindy/optimizers/sr3.html#SR3

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant