Source code for rumboost.linear_trees

import json
import numpy as np
import warnings


[docs] class LinearTree: def __init__( self, x: np.ndarray = None, init_leaf_val: int = 0, monotonic_constraint: int = 0, max_bin: int = 255, learning_rate: float = 0.1, lambda_l1: float = 0, lambda_l2: float = 0, bagging_fraction: float = 1, bagging_freq: int = 1, min_data_in_leaf: int = 20, min_sum_hessian_in_leaf: float = 1e-3, min_gain_to_split: int = 0, min_data_in_bin: int = 3, ): """ Parameters ---------- x : np.ndarray, optional Feature values for the training data. If None, the model is initialized without data. init_leaf_val : int, optional Initial value for the leaves of the tree. Default is 0. monotonic_constraint : int, optional Monotonic constraint for the tree. 0 means no constraint, 1 means positive, -1 means negative. max_bin : int, optional Maximum number of bins for histogram-based feature values. Default is 255. learning_rate : float, optional Learning rate for the model. Default is 0.1. lambda_l1 : float, optional L1 regularisation term. Default is 0. lambda_l2 : float, optional L2 regularisation term. Default is 0. bagging_fraction : float, optional Fraction of data to use for bagging. Default is 1 (no bagging). bagging_freq : int, optional Frequency of bagging. If > 0, bagging is applied every `bagging_freq` iterations. Default is 1. min_data_in_leaf : int, optional Minimum number of data points in a leaf. Default is 20. min_sum_hessian_in_leaf : float, optional Minimum sum of Hessian in a leaf. Default is 1e-3. min_gain_to_split : int, optional Minimum gain required to perform a split. Default is 0. min_data_in_bin : int, optional Minimum number of data points in a bin. Default is 3. """ if x is not None: self.x = x self.bin_edges, self.histograms, self.bin_indices = ( self.build_lightgbm_style_histogram(x, max_bin, min_data_in_bin) ) self.bin_indices_2d = self.bin_indices.reshape(1, -1).repeat( self.bin_edges.shape[0] - 2, axis=0 ) self.upper_bound_left = init_leaf_val * np.ones(self.bin_edges.shape[0] - 2) self.lower_bound_left = init_leaf_val * np.ones(self.bin_edges.shape[0] - 2) self.upper_bound_right = init_leaf_val * np.ones( self.bin_edges.shape[0] - 2 ) self.lower_bound_right = init_leaf_val * np.ones( self.bin_edges.shape[0] - 2 ) self.x_minus_bin_edges = self.x[None, :] - self.bin_edges[1:-1, None] self.split_and_leaf_values = { "splits": self.bin_edges, "leaves": init_leaf_val * np.ones(self.bin_edges.shape[0] - 1), "value_at_splits": init_leaf_val * self.bin_edges, } self.bagging_fraction = bagging_fraction self.bagging_freq = bagging_freq if self.bagging_freq > 0: self.bagged_indices = np.random.choice( np.arange(x.shape[0]), size=int((1 - self.bagging_fraction) * x.shape[0]), replace=False, ) self.monotonic_constraint = monotonic_constraint self.feature_importance_dict = {"gain": np.array([])} self.valid_sets = [] self.name_valid_sets = [] self.bin_indices_valid = [] self.learning_rate = learning_rate self.lambda_l1 = lambda_l1 if self.lambda_l1 > 0: warnings.warn( "L1 regularisation not implemented yet, ignoring `lambda_l1` value." ) self.lambda_l2 = lambda_l2 self.min_data_in_leaf = min_data_in_leaf self.min_sum_hessian_in_leaf = min_sum_hessian_in_leaf self.min_gain_to_split = min_gain_to_split self.min_data_in_bin = min_data_in_bin self.boosting_count = 0
[docs] def build_lightgbm_style_histogram( self, feature_values: np.ndarray, max_bin: int = 255, min_data_in_bin: int = 3 ): """ Build histogram for feature values similar to LightGBM's histogram-based binning. Parameters ---------- feature_values : np.ndarray Feature values to be binned. max_bin : int, optional Maximum number of bins to create. Default is 255. min_data_in_bin : int, optional Minimum number of data points required in each bin. Default is 3. Returns ------- bin_edges : np.ndarray Edges of the bins. histogram : np.ndarray Histogram of the feature values. bin_indices : np.ndarray Indices of the bins for each feature value. """ while max_bin > 1: percentiles = np.linspace(0, 100, max_bin + 1) bin_edges = np.unique(np.percentile(feature_values, percentiles)) bin_indices = np.digitize(feature_values, bins=bin_edges[1:-1], right=True) histogram = np.bincount(bin_indices, minlength=len(bin_edges) - 1) if (histogram < min_data_in_bin).any(): warnings.warn( f"[Warning] Not enough data in bins. Reducing max_bin from {max_bin} to {max_bin // 2}." ) max_bin = int(max_bin / 2) # reduce bins and try again else: break if (histogram < min_data_in_bin).any(): raise ValueError( "[Fatal] Not enough data per bin. Consider reducing `min_data_in_bin`." ) return bin_edges, histogram, bin_indices
[docs] def feature_importance(self, type: str): """ Get the feature importance for the specified type. Parameters ---------- type : str Type of feature importance to retrieve. Currently only "gain" is supported. Returns ------- np.ndarray Feature importance values for the specified type. """ return self.feature_importance_dict[type]
[docs] def update(self, train_set, fobj): """ Update the model with new training data and compute the best split. Parameters ---------- train_set : Dataset Training dataset containing the feature values. fobj : callable Objective function to compute gradients and Hessians. """ # get gradients and Hessians and scale with data grad, hess = fobj(1, 2) if self.bagging_freq > 0: grad[self.bagged_indices] = 0 hess[self.bagged_indices] = 0 grad_x = grad * self.x_minus_bin_edges hess_x = hess * self.x_minus_bin_edges**2 # compute sum of grads and hessians in all bins N = self.bin_indices_2d.max() + 1 id = ( self.bin_indices_2d + (N * np.arange(self.bin_indices_2d.shape[0]))[:, None] ) grad_x_binned = np.bincount(id.ravel(), weights=grad_x.ravel()).reshape(-1, N) hess_x_binned = np.bincount(id.ravel(), weights=hess_x.ravel()).reshape(-1, N) # mask for left and right leaves arange = np.arange(grad_x_binned.shape[1]) edgerange = np.arange(grad_x_binned.shape[1] - 1) + 1 mask = arange[None, :] < edgerange[:, None] # compute gains and leaves sum_hessian_left = (hess_x_binned * mask).sum(axis=1) sum_hessian_right = (hess_x_binned * ~mask).sum(axis=1) # left left_gain = (grad_x_binned * mask).sum(axis=1) ** 2 / ( sum_hessian_left + self.lambda_l2 ) left_leaf = -(grad_x_binned * mask).sum(axis=1) / ( sum_hessian_left + self.lambda_l2 ) left_gain = np.nan_to_num(left_gain, nan=-np.inf) # right right_gain = (grad_x_binned * ~mask).sum(axis=1) ** 2 / ( sum_hessian_right + self.lambda_l2 ) right_leaf = -(grad_x_binned * ~mask).sum(axis=1) / ( sum_hessian_right + self.lambda_l2 ) right_gain = np.nan_to_num(right_gain, nan=-np.inf) # no split gain no_split_gain = grad_x_binned.sum(axis=1) ** 2 / ( hess_x_binned.sum(axis=1) + self.lambda_l2 ) no_split_gain = np.nan_to_num(no_split_gain, nan=-np.inf) # apply monotonic constraints and regularisation if self.monotonic_constraint == 1: left_gain[self.learning_rate * left_leaf < -self.lower_bound_left] = 0 left_leaf[self.learning_rate * left_leaf < -self.lower_bound_left] = 0 right_gain[self.learning_rate * right_leaf < -self.lower_bound_right] = 0 right_leaf[self.learning_rate * right_leaf < -self.lower_bound_right] = 0 if self.monotonic_constraint == -1: left_gain[self.learning_rate * left_leaf > -self.upper_bound_left] = 0 left_leaf[self.learning_rate * left_leaf > -self.upper_bound_left] = 0 right_gain[self.learning_rate * right_leaf > -self.upper_bound_right] = 0 right_leaf[self.learning_rate * right_leaf > -self.upper_bound_right] = 0 if self.min_sum_hessian_in_leaf > 0: left_gain[sum_hessian_left < self.min_sum_hessian_in_leaf] = 0 left_leaf[sum_hessian_left < self.min_sum_hessian_in_leaf] = 0 right_gain[sum_hessian_right < self.min_sum_hessian_in_leaf] = 0 right_leaf[sum_hessian_right < self.min_sum_hessian_in_leaf] = 0 if self.min_data_in_leaf > 0: left_gain[(self.histograms * mask).sum(axis=1) < self.min_data_in_leaf] = 0 left_leaf[(self.histograms * mask).sum(axis=1) < self.min_data_in_leaf] = 0 right_gain[ (self.histograms * ~mask).sum(axis=1) < self.min_data_in_leaf ] = 0 right_leaf[ (self.histograms * ~mask).sum(axis=1) < self.min_data_in_leaf ] = 0 # gain of split gain = left_gain + right_gain - no_split_gain gain = np.nan_to_num(gain, nan=-np.inf) if self.min_gain_to_split > 0: gain[gain < self.min_gain_to_split] = -np.inf if (gain <= 0).all(): warnings.warn("No splits with positive gains. Ignoring boosting round.") self.ignore_round = True else: self.ignore_round = False # find best split self.best_index = np.argmax(gain) self.best_split = self.bin_edges[self.best_index + 1] self.best_left_leaf = self.learning_rate * left_leaf[self.best_index] self.best_right_leaf = self.learning_rate * right_leaf[self.best_index] best_gain = gain[self.best_index] self.feature_importance_dict["gain"] = np.concatenate( [self.feature_importance_dict["gain"], np.array([best_gain])], axis=0 )
[docs] def rollback_one_iter(self): """Rollback the last boosting iteration.""" self.best_split = None self.best_left_leaf = None self.best_right_leaf = None self.feature_importance_dict["gain"] = self.feature_importance_dict["gain"][:-1] self.boosting_count += 1 if self.bagging_freq > 0 and self.boosting_count % self.bagging_freq == 0: self.bagged_indices = np.random.choice( np.arange(self.x.shape[0]), size=int((1 - self.bagging_fraction) * self.x.shape[0]), replace=False, )
def _inner_predict(self, data_idx: int): """ Predict the values for the given data index. Parameters ---------- data_idx : int Index of the data to predict. 0 for training data, >0 for validation data. Returns ------- np.ndarray Predicted values for the specified data index. """ if data_idx == 0: #train set x = self.x indices = self.bin_indices else: # validation set x = self.valid_sets[data_idx - 1] indices = self.bin_indices_valid[data_idx - 1] vas = self.split_and_leaf_values["value_at_splits"][indices] s = self.split_and_leaf_values["splits"][indices] l = self.split_and_leaf_values["leaves"][indices] return vas + (x - s) * l
[docs] def predict(self, x): indices = np.digitize(x, bins=self.bin_edges[1:-1], right=True) vas = self.split_and_leaf_values["value_at_splits"][indices] s = self.split_and_leaf_values["splits"][indices] l = self.split_and_leaf_values["leaves"][indices] return vas + (x - s) * l
def _update_linear_constants(self): """ Update the linear constants of the model based on the best split and leaf values. """ if ( self.best_split is None or self.best_left_leaf is None or self.best_right_leaf is None ): return 0 if self.ignore_round: self.rollback_one_iter() return 0 s = self.best_split l_0 = self.best_left_leaf l_1 = self.best_right_leaf self.split_and_leaf_values["leaves"][: self.best_index + 1] += l_0 self.split_and_leaf_values["leaves"][self.best_index + 1 :] += l_1 distance_to_s = self.split_and_leaf_values["splits"] - s self.split_and_leaf_values["value_at_splits"][: self.best_index + 1] += ( l_0 * distance_to_s[: self.best_index + 1] ) self.split_and_leaf_values["value_at_splits"][self.best_index + 1 :] += ( l_1 * distance_to_s[self.best_index + 1 :] ) # update bounds for monotonic constraints self.update_bounds() self.boosting_count += 1 if self.bagging_freq > 0 and self.boosting_count % self.bagging_freq == 0: self.bagged_indices = np.random.choice( np.arange(self.x.shape[0]), size=int((1 - self.bagging_fraction) * self.x.shape[0]), replace=False, )
[docs] def update_bounds(self): """ Update the bounds for the left and right leaves based on the current split and leaf values. This is necessary for enforcing monotonic constraints. """ arange = np.arange(self.upper_bound_left.shape[0] + 1) edgerange = np.arange(self.upper_bound_left.shape[0]) + 1 mask = arange[None, :] < edgerange[:, None] self.lower_bound_left = np.min( self.split_and_leaf_values["leaves"][None, :].repeat(mask.shape[0], axis=0), where=mask, axis=1, initial=np.inf, ) self.upper_bound_left = np.max( self.split_and_leaf_values["leaves"][None, :].repeat(mask.shape[0], axis=0), where=mask, axis=1, initial=-np.inf, ) self.lower_bound_right = np.min( self.split_and_leaf_values["leaves"][None, :].repeat(mask.shape[0], axis=0), where=~mask, axis=1, initial=np.inf, ) self.upper_bound_right = np.max( self.split_and_leaf_values["leaves"][None, :].repeat(mask.shape[0], axis=0), where=~mask, axis=1, initial=-np.inf, )
[docs] def eval_train(self, feval): return np.zeros(0)
[docs] def eval_valid(self, feval): return np.zeros(0)
[docs] def model_to_string(self, **kwargs) -> dict: """ Serialize the model to a JSON string. """ model_dict = { "bin_edges": self.bin_edges.tolist(), "histograms": self.histograms.tolist(), "bin_indices": self.bin_indices.tolist(), "split_and_leaf_values": { k: v.tolist() for k, v in self.split_and_leaf_values.items() }, "monotonic_constraint": self.monotonic_constraint, "upper_bound_left": self.upper_bound_left.tolist(), "lower_bound_left": self.lower_bound_left.tolist(), "upper_bound_right": self.upper_bound_right.tolist(), "lower_bound_right": self.lower_bound_right.tolist(), "feature_importance_dict": { k: v.tolist() for k, v in self.feature_importance_dict.items() }, "learning_rate": self.learning_rate, "lambda_l1": self.lambda_l1, "lambda_l2": self.lambda_l2, "bagging_fraction": self.bagging_fraction, "bagging_freq": self.bagging_freq, "min_data_in_leaf": self.min_data_in_leaf, "min_sum_hessian_in_leaf": self.min_sum_hessian_in_leaf, "min_gain_to_split": self.min_gain_to_split, "min_data_in_bin": self.min_data_in_bin, "boosting_count": self.boosting_count, } return model_dict
[docs] def model_from_string(self, s: dict): """ Load the model from a dictionary. """ model_dict = s self.bin_edges = np.array(model_dict["bin_edges"]) self.histograms = np.array(model_dict["histograms"]) self.bin_indices = np.array(model_dict["bin_indices"]) self.split_and_leaf_values = { k: np.array(v) for k, v in model_dict["split_and_leaf_values"].items() } self.monotonic_constraint = model_dict["monotonic_constraint"] self.upper_bound_left = np.array(model_dict["upper_bound_left"]) self.lower_bound_left = np.array(model_dict["lower_bound_left"]) self.upper_bound_right = np.array(model_dict["upper_bound_right"]) self.lower_bound_right = np.array(model_dict["lower_bound_right"]) self.feature_importance_dict = { k: np.array(v) for k, v in model_dict["feature_importance_dict"].items() } self.learning_rate = model_dict["learning_rate"] self.lambda_l1 = model_dict["lambda_l1"] self.lambda_l2 = model_dict["lambda_l2"] self.bagging_fraction = model_dict["bagging_fraction"] self.bagging_freq = model_dict["bagging_freq"] self.min_data_in_leaf = model_dict["min_data_in_leaf"] self.min_sum_hessian_in_leaf = model_dict["min_sum_hessian_in_leaf"] self.min_gain_to_split = model_dict["min_gain_to_split"] self.min_data_in_bin = model_dict["min_data_in_bin"] self.boosting_count = model_dict["boosting_count"] self.x = None self.valid_sets = [] self.name_valid_sets = [] self.bin_indices_valid = [] return self
[docs] def dump_model(self, **kwargs) -> dict: """Dump the model to a json string.""" return self.model_to_string()
[docs] def free_dataset( self, ): self.x = None
[docs] def set_train_data_name(self, name: str) -> "LinearTree": """Set the name to the training Dataset. Parameters ---------- name : str Name for the training Dataset. Returns ------- self : Booster Booster with set training Dataset name. """ self._train_data_name = name return self
[docs] def add_valid(self, data, name: str) -> "LinearTree": """Add validation data. Parameters ---------- data : Dataset Validation data. name : str Name of validation data. Returns ------- self : Booster Booster with set validation data. """ if not isinstance(self.valid_sets, list): self.valid_sets = [] if not isinstance(self.name_valid_sets, list): self.name_valid_sets = [] if not isinstance(self.bin_indices_valid, list): self.bin_indices_valid = [] data.construct() valid_data = data.get_data().reshape(-1) self.valid_sets.append(valid_data) self.name_valid_sets.append(name) self.bin_indices_valid.append( np.digitize(valid_data, bins=self.bin_edges[1:-1], right=True) ) return self