import numpy as np
import pandas as pd
import random
import lightgbm as lgb
from collections import Counter, defaultdict
from scipy.interpolate import interp1d
from scipy.special import softmax
[docs]
def process_parent(parent, pairs):
'''
Dig into the biogeme expression to retrieve name of variable and beta parameter. Work only with simple utility specification (beta * variable).
'''
# final expression to be stored
if parent.getClassName() == 'Times':
pairs.append(get_pair(parent))
else: #if not final
try: #dig into the expression
left = parent.left
right = parent.right
except: #if no left and right children
return pairs
else: #dig further left and right
process_parent(left, pairs)
process_parent(right, pairs)
return pairs
[docs]
def get_pair(parent):
'''
Return beta and variable names on a tupple from a parent expression.
'''
left = parent.left
right = parent.right
beta = None
variable = None
for exp in [left, right]:
if exp.getClassName() == 'Beta':
beta = exp.name
elif exp.getClassName() == 'Variable':
variable = exp.name
if beta and variable:
return (beta, variable)
else:
raise ValueError("Parent does not contain beta and variable")
[docs]
def bio_to_rumboost(model, all_columns = False, monotonic_constraints = True, interaction_contraints = True, fct_effect_variables = []):
'''
Converts a biogeme model to a rumboost dict.
Parameters
----------
model : a BIOGEME object
The model used to create the rumboost structure dictionary.
all_columns : bool, optional (default = False)
If True, do not consider alternative-specific features.
monotonic_constraints : bool, optional (default = True)
If False, do not consider monotonic constraints.
interaction_contraints : bool, optional (default = True)
If False, do not consider feature interactions constraints.
fct_effect_variables : list, optional (default = [])
The list of variables in the functional effect part of the model
Returns
-------
rum_structure : dict
A dictionary specifying the structure of a RUMBoost object.
'''
utilities = model.loglike.util #biogeme expression
rum_structure = []
#for all utilities
for k, v in utilities.items():
rum_structure.append({'columns': [], 'monotone_constraints': [], 'interaction_constraints': [], 'betas': [], 'categorical_feature': []})
if len(fct_effect_variables) > 0:
rum_structure_re = {'columns': [], 'monotone_constraints': [], 'interaction_constraints': [], 'betas': [], 'categorical_feature': []}
for i, pair in enumerate(process_parent(v, [])): # get all the pairs of the utility
if pair[1] in fct_effect_variables:
rum_structure_re['columns'].append(pair[1]) #append variable name
rum_structure_re['betas'].append(pair[0]) #append beta name
if interaction_contraints:
rum_structure_re['interaction_constraints'].append(len(rum_structure_re['interaction_constraints'])) #no interaction between features
if monotonic_constraints:
bounds = model.getBoundsOnBeta(pair[0]) #get bounds on beta parameter for monotonic constraint
if (bounds[0] is not None) and (bounds[1] is not None):
raise ValueError("Only one bound can be not None")
if bounds[0] is not None:
if bounds[0] >= 0:
rum_structure_re['monotone_constraints'].append(1) #register positive monotonic constraint
elif bounds[1] is not None:
if bounds[1] <= 0:
rum_structure_re['monotone_constraints'].append(-1) #register negative monotonic constraint
else:
rum_structure_re['monotone_constraints'].append(0) #none
else:
rum_structure[-1]['columns'].append(pair[1]) #append variable name
rum_structure[-1]['betas'].append(pair[0]) #append beta name
if interaction_contraints:
if len(fct_effect_variables) > 0:
rum_structure[-1]['interaction_constraints'].append([len(rum_structure[-1]['interaction_constraints'])]) #no interaction between features
else:
rum_structure[-1]['interaction_constraints'].append([i]) #no interaction between features
if monotonic_constraints:
bounds = model.getBoundsOnBeta(pair[0]) #get bounds on beta parameter for monotonic constraint
if (bounds[0] is not None) and (bounds[1] is not None):
raise ValueError("Only one bound can be not None")
if bounds[0] is not None:
if bounds[0] >= 0:
rum_structure[-1]['monotone_constraints'].append(1) #register positive monotonic constraint
elif bounds[1] is not None:
if bounds[1] <= 0:
rum_structure[-1]['monotone_constraints'].append(-1) #register negative monotonic constraint
else:
rum_structure[k]['monotone_constraints'].append(0) #none
if all_columns:
rum_structure[-1]['columns'] = [col for col in model.database.data.drop(['choice'], axis=1).columns.values.tolist()]
if len(fct_effect_variables) > 0:
rum_structure.append(rum_structure_re)
return rum_structure
[docs]
def get_mid_pos(data, split_points, end='data'):
'''
Return the mid point in-between two split points for a specific feature (used in pw linear predict).
Parameters
----------
data: pandas Series
The column of the dataframe associated with the feature.
split_points : list
The list of split points for that feature.
end : str
How to compute the mid position of the first and last point, it can be:
-'data': add min and max values of data
-'split point': add first and last split points
-'mean_data': add the mean of data before the first split point, and after the last split point
Returns
-------
mid_pos : list
A list of points in the middle of every consecutive split points.
'''
#getting position in the middle of splitting points intervals
if len(split_points) > 1:
mid_pos = [(sp2 + sp1)/2 for sp2, sp1 in zip(split_points[:-1], split_points[1:])]
else:
mid_pos = []
if end == 'data':
mid_pos.insert(0, min(data)) #adding first point
mid_pos.append(max(data)) #adding last point
elif end == 'split point':
mid_pos.insert(0, min(split_points)) #adding first point
mid_pos.append(max(split_points)) #adding last point
elif end == 'mean_data':
mid_pos.insert(0, data[data<split_points[0]].mean()) #adding first point
mid_pos.append(data[data>split_points[-1]].mean()) #adding last point
return mid_pos
[docs]
def get_mean_pos(data, split_points):
'''
Return the mean point in-between two split points for a specific feature (used in smoothing).
At end points, it is the mean of data before the first split point, and after the last split point.
Parameters
----------
data : pandas.Series
The column of the dataframe associated with the feature.
split_points : list
The list of split points for that feature.
Returns
-------
mean_data : list
A list of points in the mean of every consecutive split points.
'''
#getting the mean of data of splitting points intervals
mean_data = [np.mean(data[(data < s_ii) & (data > s_i)]) for s_i, s_ii in zip(split_points[:-1], split_points[1:])]
mean_data.insert(0, np.mean(data[data<split_points[0]])) #adding first point
mean_data.append(np.mean(data[data>split_points[-1]])) #adding last point
return mean_data
[docs]
def data_leaf_value(data, weights_feature, technique='data_weighted'):
'''
Computes the utility values of given data, according to the prespecified technique.
Parameters
----------
data : pandas.Series
The column of the dataframe associated with the feature.
weight_feature : dict
The dictionary corresponding to the feature leaf values.
technique : str, optional (default = weight_data)
The technique used to compute data values. It can be:
data_weighted : feature data and its utility values.
mid_point : the mid point in between all splitting points.
mean_data : the mean of data in between all splitting points.
mid_point_weighted : the mid points in between all splitting points, weighted by the number of data points in the interval.
mean_data_weighted : the mean of data in between all splitting points, weighted by the number of data points in the interval.
Returns
-------
data_ordered : numpy array
X coordinates of the data, or feature data point values.
data_values : numpy array
Y coordinates of the data, or utility values
'''
if technique == 'data_weighted':
data_ordered = np.sort(data)
idx = np.searchsorted(np.array(weights_feature['Splitting points']), data_ordered)
data_values = np.array(weights_feature['Histogram values'])[idx]
return np.array(data_ordered), data_values
if technique == 'mid_point':
mid_points = np.array(get_mid_pos(data, weights_feature['Splitting points']))
return mid_points, np.array(weights_feature['Histogram values'])
elif technique == 'mean_data':
mean_data = np.array(get_mean_pos(data, weights_feature['Splitting points']))
return mean_data, np.array(weights_feature['Histogram values'])
data_ordered = data.copy().sort_values()
data_values = [weights_feature['Histogram values'][0]]*sum(data_ordered < weights_feature['Splitting points'][0])
if technique == 'mid_point_weighted':
mid_points = get_mid_pos(data, weights_feature['Splitting points'])
mid_points_weighted = [mid_points[0]]*sum(data_ordered < weights_feature['Splitting points'][0])
elif technique == 'mean_data_weighted':
mean_data = get_mean_pos(data, weights_feature['Splitting points'])
mean_data_weighted = [mean_data[0]]*sum(data_ordered < weights_feature['Splitting points'][0])
for i, (s_i, s_ii) in enumerate(zip(weights_feature['Splitting points'][:-1], weights_feature['Splitting points'][1:])):
data_values += [weights_feature['Histogram values'][i+1]]*sum((data_ordered < s_ii) & (data_ordered > s_i))
if technique == 'mid_point_weighted':
mid_points_weighted += [mid_points[i+1]]*sum((data_ordered < s_ii) & (data_ordered > s_i))
elif technique == 'mean_data_weighted':
mean_data_weighted += [mean_data[i+1]]*sum((data_ordered < s_ii) & (data_ordered > s_i))
data_values += [weights_feature['Histogram values'][-1]]*sum(data_ordered > weights_feature['Splitting points'][-1])
if technique == 'mid_point_weighted':
mid_points_weighted += [mid_points[-1]]*sum(data_ordered > weights_feature['Splitting points'][-1])
return np.array(mid_points_weighted), np.array(data_values)
elif technique == 'mean_data_weighted':
mean_data_weighted += [mean_data[-1]]*sum(data_ordered > weights_feature['Splitting points'][-1])
return np.array(mean_data_weighted), np.array(data_values)
return np.array(data_ordered), np.array(data_values)
[docs]
def get_grad(x, y, technique='slope', sample_points=30, normalise = False):
'''
Computes the arc gradient according to the prespecified technique.
Parameters
----------
x : numpy array
X coordinates of the point to compute the gradient.
y : numpy array
Y coordinates of the point to compute the gradient.
technique : str, optional (default = slope)
The technique used to compute data values. It can be:
slope : compute the slope as gradient between each point.
sample_data : compute the slope between uniformly distributed sampled data.
Returns
-------
grad : numpy array
A vector with gradient values at each given points.
x_sample : numpy array
The x coordinates of the sampled points if the technique is sample_data.
y_sample : numpy array
The y coordinates of the sampled points if the technique is sample_data.
'''
if len(y) <= 1:
return 0
x_values = x
y_values = y
if normalise:
x_values = (x - np.min(x))/(np.max(x) - np.min(x))
y_values = (y - np.min(y))/(np.max(y) - np.min(y))
if technique == 'slope' :
grad = [(y_values[i+1]-y_values[i])/(x_values[i+1]-x_values[i]) for i in range(0, len(x_values)-1)]
#grad.insert(0, 0) #adding first slope
grad.append(0) #adding last slope
elif technique == 'sample_data':
x_sample = np.linspace(np.min(x_values), np.max(x_values), sample_points)
f = interp1d(x_values, y_values, kind='previous')
y_sample = f(x_sample)
grad = [(y_sample[i+1]-y_sample[i])/(x_sample[i+1]-x_sample[i]) for i in range(0, len(x_sample)-1)]
#grad.insert(0, 0) #adding first slope
grad.append(0) #adding last slope
if normalise:
x_sample = x_sample*(np.max(x) - np.min(x)) + np.min(x)
y_sample = y_sample*(np.max(y) - np.min(y)) + np.min(y)
return grad, x_sample, y_sample
return grad
[docs]
def get_angle_diff(x_values, y_values):
'''
Computes the angle between three given points.
Parameters
----------
x_values : numpy array
X coordinates of the point to compute the angle.
y_values : numpy array
Y coordinates of the point to compute the angle.
Returns
-------
diff_angle : list
A list containing all vectors for each subsequent three points.
'''
slope = get_grad(x_values, y_values, normalise = True)
angle = np.arctan(slope)
diff_angle = [np.pi - np.abs(angle[0])]
diff_angle += [np.pi - np.abs(a_1-a_0) for (a_1, a_0) in zip(angle[1:], angle[:-1])]
return diff_angle
[docs]
def find_disc(x_values, grad):
'''
Find discontinuities for a given feature values. The angle must be smaller than 0.2 radian and the slope bigger than 5. Values are normalised.
Parameters
----------
x_values : numpy array
X coordinates of the point to find discontinuities.
grad : numpy array
A vector with gradient values at each given points.
Returns
-------
disc : numpy array
The coordinates of discontinuities.
disc_idx : numpy array
The index of discontinuities.
num_disc : int
The number of discontinuities.
'''
diff_angle = get_angle_diff(x_values, grad)
is_disc = [True if (angle < 0.2) and (np.abs(g) > 5) else False for angle, g in zip(diff_angle, grad)]
disc = x_values[is_disc]
disc_idx = np.nonzero(is_disc)[0]
num_disc = np.sum(is_disc)
return disc, disc_idx, num_disc
[docs]
def utility_ranking(weights, spline_utilities):
"""
Rank attributes utility importance by their utility range. The first rank is the attribute having the largest
max(V(x)) - min(V(x)).
Parameters
----------
weights : dict
A dictionary containing all the split points and leaf values for all attributes, for all utilities.
spline_utilities : dict
A dictionary containing attributes where splines are applied. Must be in the form ]
{utility_indx: [attributes1, attributes2, ...], ...}.
Returns
-------
util_ranks_ascend : list of tupple
A list of tupple where the first tupple is the one having the largest utility range. Tupples are composed of
their utility and the name of their attributes.
"""
util_ranks = []
util_ranges = []
for u in spline_utilities:
for f in spline_utilities[u]:
#compute range
util_ranges.append(np.max(weights[u][f]['Histogram values']) - np.min(weights[u][f]['Histogram values']))
util_ranks.append((u, f))
sort_idx = np.argsort(util_ranges)
util_ranks = np.array(util_ranks)
util_ranks_ascend = util_ranks[np.flip(sort_idx)]
return util_ranks_ascend
[docs]
def map_x_knots(x_knots, num_splines_range, x_first = None, x_last = None):
'''
Map the 1d array of x_knots into a dictionary with utility and attributes as keys.
Parameters
----------
x_knots : 1d np.array
The positions of knots in a 1d array, following this structure:
np.array([x_att1_1, x_att1_2, ... x_att1_m, x_att2_1, ... x_attn_m]) where m is the number of knots
and n the number of attributes that are interpolated with splines.
num_splines_range: dict
A dictionary of the same format than weights of features names for each utility that are interpolated with monotonic splines.
The key is a spline interpolated feature name, and the value is the number of splines used for interpolation as an int.
There should be a key for all features where splines are used.
x_first : list, optional (default=None)
A list of all first knots in the order of the attributes from spline_utilities and num_splines_range.
x_last : list, optional (default=None)
A list of all last knots in the order of the attributes from spline_utilities and num_splines_range.
Returns
-------
x_knots_dict : dict
A dictionary in the form of {utility: {attribute: x_knots}} where x_knots are the spline knots for the corresponding
utility and attributes
'''
x_knots_dict = {}
starter = 0
i=0
for u in num_splines_range:
x_knots_dict[u]={}
for f in num_splines_range[u]:
if x_first is not None:
x_knots_dict[u][f] = [x_first[i]]
x_knots_dict[u][f].extend(x_knots[starter:starter+num_splines_range[u][f]-1])
x_knots_dict[u][f].append(x_last[i])
x_knots_dict[u][f] = np.array(x_knots_dict[u][f])
starter += num_splines_range[u][f]-1
i +=1
else:
x_knots_dict[u][f] = x_knots[starter:starter+num_splines_range[u][f]+1]
starter += num_splines_range[u][f]+1
return x_knots_dict
[docs]
def compute_VoT(util_collection, u, f1, f2):
'''
The function compute the Value of Time of the attributes specified in attribute_VoT.
Parameters
----------
util_collection : dict
A dictionary containing the type of utility to use for all features in all utilities.
u : str
The utility number, as a str (e.g. '0', '1', ...).
f1 : str
The time related attribtue name.
f2 : str
The cost related attribtue name.
Return
------
VoT : lamda function
The function calculating value of time for attribute1 and attribute2.
'''
VoT = lambda x1, x2, u1 = util_collection[u][f1], u2 = util_collection[u][f2]: u1.derivative()(x1) / u2.derivative()(x2)
return VoT
[docs]
def accuracy(preds, labels):
"""
Compute accuracy of the model.
Parameters
----------
preds : numpy array
Predictions for all data points and each classes from a softmax function. preds[i, j] correspond
to the prediction of data point i to belong to class j.
labels : numpy array
The labels of the original dataset, as int.
Returns
-------
Accuracy: float
The computed accuracy, as a float.
"""
return np.mean(np.argmax(preds, axis=1) == labels)
[docs]
def cross_entropy(preds, labels):
"""
Compute negative cross entropy for given predictions and data.
Parameters
----------
preds: numpy array
Predictions for all data points and each classes from a softmax function. preds[i, j] correspond
to the prediction of data point i to belong to class j.
labels: numpy array
The labels of the original dataset, as int.
Returns
-------
Cross entropy : float
The negative cross-entropy, as float.
"""
num_data = len(labels)
data_idx = np.arange(num_data)
return - np.mean(np.log(preds[data_idx, labels]))
[docs]
def nest_probs(raw_preds, mu, nests):
"""compute nested predictions.
Parameters
----------
raw_preds :
The raw predictions from the booster
mu :
The list of mu values for each nest.
The first value correspond to the first nest and so on.
nests :
The dictionary keys are alternatives number and their values are their nest number.
By example, {0:0, 1:1, 2:0} means that alt 0 and 2 are in nest 0 and alt 1 is in nest 1.
Returns
-------
preds.T :
The nested predictions
pred_i_m :
The prediction of choosing alt i knowing nest m
pred_m :
The prediction of choosing nest m
"""
#initialisation
n_obs = np.size(raw_preds, 0)
data_idx = np.arange(n_obs)
n_alt = np.size(raw_preds, 1)
pred_i_m = np.array(np.zeros((n_obs, n_alt)))
V_tilde_m = np.array(np.zeros((n_obs, len(mu))))
#for each alternative and their nest
for alt, nest in nests.items():
#compute the list of alternative in nests
nest_alt = [a for a, n in nests.items() if n == nest]
#pred of choosing i knowing m. Softmax within the nest (raw_preds[data_idx, :][:, nest_alt]) to get prediction of alternatives within the nest)
pred_i_m[:, alt] = np.exp(mu[nest] * raw_preds[data_idx, alt]) / np.sum(np.exp(mu[nest] * raw_preds[data_idx, :][:, nest_alt]), axis=1)
#maximum expectation of utility within nest m
V_tilde_m[:, nest] = 1/mu[nest] * np.log(np.sum(np.exp(mu[nest] * raw_preds[data_idx, :][:, nest_alt]), axis=1))
#pred of choosing nest m
pred_m = softmax(V_tilde_m, axis=1)
#final predictions for choosing i
preds = np.array([pred_i_m[:, i] * pred_m[:, nests[i]] for i in nests.keys()])
return preds.T, pred_i_m, pred_m
[docs]
def cross_nested_probs(raw_preds, mu, alphas):
"""compute nested predictions.
Parameters
----------
raw_preds :
The raw predictions from the booster
mu :
The list of mu values for each nest.
The first value correspond to the first nest and so on.
alphas :
An array of J (alternatives) by M (nests).
alpha_jn represents the degree of membership of alternative j to nest n
By example, alpha_12 = 0.5 means that alternative one belongs 50% to nest 2.
Returns
-------
raw_preds :
The cross nested predictions
pred_i_m :
The prediction of choosing alt i knowing nest m
pred_m :
The prediction of choosing nest m
"""
#initialisation
n_obs = np.size(raw_preds, 0)
data_idx = np.arange(n_obs)
n_alt = np.size(raw_preds, 1)
pred_i_m = np.array(np.zeros((n_obs, n_alt, len(mu))))
pred_m = np.array(np.zeros((n_obs, n_alt, len(mu))))
sum_of_nest = []
#for each nest
for m, mu_m in enumerate(mu):
#pred of choosing i knowing m.
pred_i_m[:, :, m] = (alphas[:, m] ** mu_m * np.exp(mu_m * raw_preds)) / np.sum(alphas[:, m] ** mu_m * np.exp(mu_m * raw_preds), axis=1, keepdims=True)
#storing sum of nest for computing pred of choosing m easily
sum_of_nest.append([np.sum(alphas[:, m] ** mu_m * np.exp(mu_m * raw_preds), axis=1) ** (1/mu_m)]*n_alt)
sum_of_nest = np.array(sum_of_nest).T
#pred of choosing m
pred_m[:, :, :] = sum_of_nest / np.sum(sum_of_nest, axis=2, keepdims=True)
#final predictions for choosing i
preds = np.sum(pred_i_m * pred_m, axis=2)
return preds, pred_i_m, pred_m
[docs]
def create_name(features):
"""Create new feature names from a list of feature names"""
new_name = features[0]
for f_name in features[1:]:
new_name += '-'+f_name
return new_name
[docs]
def get_child(model, weights, weights_2d, weights_market, tree, split_points, features, feature_names, i, market_segm, direction = None):
"""Dig into the tree to get splitting points, features, left and right leaves values"""
min_r = 0
max_r = 10000
if feature_names[tree['split_feature']] not in features:
features.append(feature_names[tree['split_feature']])
split_points.append(tree['threshold'])
if 'leaf_value' in tree['left_child'] and 'leaf_value' in tree['right_child']:
if direction is None:
weights.append([feature_names[tree['split_feature']], tree['threshold'], tree['left_child']['leaf_value'], tree['right_child']['leaf_value'], i])
elif direction == 'left':
if len(features) == 1:
weights.append([feature_names[tree['split_feature']], tree['threshold'], tree['left_child']['leaf_value'], tree['right_child']['leaf_value'], i])
weights.append([feature_names[tree['split_feature']], split_points[0], 0, -tree['right_child']['leaf_value'], i])
elif market_segm:
feature_name = create_name(features)
if features[0] in model.rum_structure[i]['categorical_feature']:
weights_market.append([features[-1]+'-0', tree['threshold'], tree['left_child']['leaf_value'], tree['right_child']['leaf_value'], i])
else:
weights_market.append([features[0]+'-0', split_points[0], tree['left_child']['leaf_value'], 0, i])
weights_market.append([features[0]+'-1', split_points[0], tree['right_child']['leaf_value'], 0, i])
else:
feature_name = create_name(features)
weights_2d.append([feature_name, (min_r, split_points[0]), (min_r, tree['threshold']), tree['left_child']['leaf_value'], i])
weights_2d.append([feature_name, (min_r, split_points[0]), (tree['threshold'], max_r), tree['right_child']['leaf_value'], i])
if len(features) > 1:
features.pop(-1)
split_points.pop(-1)
elif direction == 'right':
if len(features) == 1:
weights.append([feature_names[tree['split_feature']], tree['threshold'], tree['left_child']['leaf_value'], tree['right_child']['leaf_value'], i])
weights.append([feature_names[tree['split_feature']], split_points[0], -tree['left_child']['leaf_value'], 0, i])
elif market_segm:
feature_name = create_name(features)
if features[0] in model.rum_structure[i]['categorical_feature']:
weights_market.append([features[-1]+'-1', tree['threshold'], tree['left_child']['leaf_value'], tree['right_child']['leaf_value'], i])
else:
weights_market.append([features[0]+'-0', split_points[0], 0, tree['left_child']['leaf_value'], i])
weights_market.append([features[0]+'-1', split_points[0], 0, tree['right_child']['leaf_value'], i])
else:
feature_name = create_name(features)
weights_2d.append([feature_name, (split_points[0], max_r), (min_r, tree['threshold']), tree['left_child']['leaf_value'], i])
weights_2d.append([feature_name, (split_points[0], max_r), (tree['threshold'], max_r), tree['right_child']['leaf_value'], i])
elif 'leaf_value' in tree['left_child']:
weights.append([feature_names[tree['split_feature']], tree['threshold'], tree['left_child']['leaf_value'], 0, i])
get_child(model, weights, weights_2d, weights_market, tree['right_child'], split_points, features, feature_names, i, market_segm, direction='right')
elif 'leaf_value' in tree['right_child']:
weights.append([feature_names[tree['split_feature']], tree['threshold'], 0, tree['right_child']['leaf_value'], i])
get_child(model, weights, weights_2d, weights_market, tree['left_child'], split_points, features, feature_names, i, market_segm, direction='left')
else:
get_child(model, weights, weights_2d, weights_market, tree['left_child'], split_points, features, feature_names, i, market_segm, direction='left')
get_child(model, weights, weights_2d, weights_market, tree['right_child'], split_points, features, feature_names, i, market_segm, direction='right')
[docs]
def get_weights(model):
"""
Get leaf values from a RUMBoost model.
Parameters
----------
model : RUMBoost
A trained RUMBoost object.
Returns
-------
weights_df : pandas DataFrame
DataFrame containing all split points and their corresponding left and right leaves value,
for all features.
weights_2d_df : pandas DataFrame
Dataframe with weights arranged for a 2d plot, used in the case of 2d feature interaction.
weights_market : pandas DataFrame
Dataframe with weights arranged for market segmentation, used in the case of market segmentation.
"""
#using self object or a given model
model_json = model.dump_model()
weights = []
weights_2d = []
weights_market = []
for i, b in enumerate(model_json):
feature_names = b['feature_names']
for trees in b['tree_info']:
features = []
split_points = []
market_segm = False
get_child(model, weights, weights_2d, weights_market, trees['tree_structure'], split_points, features, feature_names, i, market_segm)
weights_df = pd.DataFrame(weights, columns= ['Feature', 'Split point', 'Left leaf value', 'Right leaf value', 'Utility'])
weights_2d_df = pd.DataFrame(weights_2d, columns=['Feature', 'higher_lvl_range', 'lower_lvl_range', 'area_value', 'Utility'])
weights_market_df = pd.DataFrame(weights_market, columns= ['Feature', 'Cat value', 'Split point', 'Left leaf value', 'Right leaf value', 'Utility'])
return weights_df, weights_2d_df, weights_market_df
[docs]
def weights_to_plot_v2(model, market_segm=False):
"""
Arrange weights by ascending splitting points and cumulative sum of weights.
Parameters
----------
model : RUMBoost
A trained RUMBoost object.
Returns
-------
weights_for_plot : dict
Dictionary containing splitting points and corresponding cumulative weights value for all features.
"""
#get raw weights
if market_segm:
_, _, weights= get_weights(model)
else:
weights, _, _ = get_weights(model)
weights_for_plot = {}
#for all features
for i in weights.Utility.unique():
weights_for_plot[str(i)] = {}
for f in weights[weights.Utility == i].Feature.unique():
split_points = []
function_value = [0]
#getting values related to the corresponding utility
weights_util = weights[weights.Utility == i]
#sort by ascending order
feature_data = weights_util[weights_util.Feature == f]
ordered_data = feature_data.sort_values(by = ['Split point'], ignore_index = True)
for j, s in enumerate(ordered_data['Split point']):
#new split point
if s not in split_points:
split_points.append(s)
#add a new right leaf value to the current right side value
function_value.append(function_value[-1] + float(ordered_data.loc[j, 'Right leaf value']))
#add left leaf value to all other current left leaf values
function_value[:-1] = [h + float(ordered_data.loc[j, 'Left leaf value']) for h in function_value[:-1]]
else:
#add right leaf value to the current right side value
function_value[-1] += float(ordered_data.loc[j, 'Right leaf value'])
#add left leaf value to all other current left leaf values
function_value[:-1] = [h + float(ordered_data.loc[j, 'Left leaf value']) for h in function_value[:-1]]
weights_for_plot[str(i)][f] = {'Splitting points': split_points,
'Histogram values': function_value}
return weights_for_plot
[docs]
def non_lin_function(weights_ordered, x_min, x_max, num_points):
"""
Create the nonlinear function for parameters, from weights ordered by ascending splitting points.
Parameters
----------
weights_ordered : dict
Dictionary containing splitting points and corresponding cumulative weights value for a specific
feature's parameter.
x_min : float, int
Minimum x value for which the nonlinear function is computed.
x_max : float, int
Maximum x value for which the nonlinear function is computed.
num_points : int
Number of points used to draw the nonlinear function line.
Returns
-------
x_values : list
X values for which the function will be plotted.
nonlin_function : list
Values of the function at the corresponding x points.
"""
#create x points
x_values = np.linspace(x_min, x_max, num_points)
nonlin_function = []
i = 0
max_i = len(weights_ordered['Splitting points']) #all splitting points
#handling no split points
if max_i == 0:
return x_values, float(weights_ordered['Histogram values'][i])*x_values
for x in x_values:
#compute the value of the function at x according to the weights value in between splitting points
if x < float(weights_ordered['Splitting points'][i]):
nonlin_function += [float(weights_ordered['Histogram values'][i])]
else:
nonlin_function += [float(weights_ordered['Histogram values'][i+1])]
#go to next splitting points
if i < max_i-1:
i+=1
return x_values, nonlin_function
[docs]
def get_asc(weights, alt_to_normalise = 'Driving', alternatives = {'Walking':'0', 'Cycling':'1', 'Public Transport':'2', 'Driving':'3'}):
'''Retrieve ASCs from a dictionary of all values from a dictionary of leaves values per alternative per feature'''
ASCs = []
for k, alt in alternatives.items():
asc_temp = 0
for feat in weights[alt]:
asc_temp += weights[alt][feat]['Histogram values'][0]
ASCs.append(asc_temp)
return [a - ASCs[int(alternatives[alt_to_normalise])] for a in ASCs]
[docs]
def function_2d(weights_2d, x_vect, y_vect):
"""
Create the nonlinear contour plot for parameters, from weights gathered in getweights_v2
Parameters
----------
weights_2d : dict
Pandas DataFrame containing all possible rectangles with their corresponding area values, for the given feature and utility.
x_vect : numpy array
Vector of higher level feature.
y_vect : numpy array
Vector of lower level feature.
Returns
-------
contour_plot_values : numpy array
Array with values at (x,y) points.
"""
contour_plot_values = np.zeros(shape=(len(x_vect), len(y_vect)))
for k in range(len(weights_2d.index)):
if (weights_2d['lower_lvl_range'].iloc[k][1] == 10000) and (weights_2d['higher_lvl_range'].iloc[k][1] == 10000):
i_x = np.searchsorted(x_vect, weights_2d['higher_lvl_range'].iloc[k][0])
i_y = np.searchsorted(y_vect, weights_2d['lower_lvl_range'].iloc[k][0])
contour_plot_values[i_x:, i_y:] += weights_2d['area_value'].iloc[k]
elif (weights_2d['lower_lvl_range'].iloc[k][1] == 10000):
i_x = np.searchsorted(x_vect, weights_2d['higher_lvl_range'].iloc[k][1])
i_y = np.searchsorted(y_vect, weights_2d['lower_lvl_range'].iloc[k][0])
contour_plot_values[:i_x, i_y:] += weights_2d['area_value'].iloc[k]
elif (weights_2d['higher_lvl_range'].iloc[k][1] == 10000):
i_x = np.searchsorted(x_vect, weights_2d['higher_lvl_range'].iloc[k][0])
i_y = np.searchsorted(y_vect, weights_2d['lower_lvl_range'].iloc[k][1])
contour_plot_values[i_x:, :i_y] += weights_2d['area_value'].iloc[k]
else:
i_x = np.searchsorted(x_vect, weights_2d['higher_lvl_range'].iloc[k][1])
i_y = np.searchsorted(y_vect, weights_2d['lower_lvl_range'].iloc[k][1])
contour_plot_values[:i_x, :i_y] += weights_2d['area_value'].iloc[k]
return contour_plot_values
# Sample a dataset grouped by `groups` and stratified by `y`
# Source: https://www.kaggle.com/jakubwasikowski/stratified-group-k-fold-cross-validation
[docs]
def stratified_group_k_fold(X, y, groups, k, seed=None):
labels_num = np.max(y) + 1
y_counts_per_group = defaultdict(lambda: np.zeros(labels_num))
y_distr = Counter()
for label, g in zip(y, groups):
y_counts_per_group[g][label] += 1
y_distr[label] += 1
y_counts_per_fold = defaultdict(lambda: np.zeros(labels_num))
groups_per_fold = defaultdict(set)
def eval_y_counts_per_fold(y_counts, fold):
y_counts_per_fold[fold] += y_counts
std_per_label = []
for label in range(labels_num):
label_std = np.std([y_counts_per_fold[i][label] / y_distr[label] for i in range(k)])
std_per_label.append(label_std)
y_counts_per_fold[fold] -= y_counts
return np.mean(std_per_label)
groups_and_y_counts = list(y_counts_per_group.items())
random.Random(seed).shuffle(groups_and_y_counts)
for g, y_counts in sorted(groups_and_y_counts, key=lambda x: -np.std(x[1])):
best_fold = None
min_eval = None
for i in range(k):
fold_eval = eval_y_counts_per_fold(y_counts, i)
if min_eval is None or fold_eval < min_eval:
min_eval = fold_eval
best_fold = i
y_counts_per_fold[best_fold] += y_counts
groups_per_fold[best_fold].add(g)
all_groups = set(groups)
for i in range(k):
train_groups = all_groups - groups_per_fold[i]
test_groups = groups_per_fold[i]
train_indices = [i for i, g in enumerate(groups) if g in train_groups]
test_indices = [i for i, g in enumerate(groups) if g in test_groups]
yield train_indices, test_indices