{{py: """ Template file for easily generate fused types consistent code using Tempita (https://github.com/cython/cython/blob/master/Cython/Tempita/_tempita.py). Generated file: sag_fast.pyx Each class is duplicated for all dtypes (float and double). The keywords between double braces are substituted in setup.py. Authors: Danny Sullivan Tom Dupre la Tour Arthur Mensch Joan Massich License: BSD 3 clause """ # name_suffix, c_type, np_type dtypes = [('64', 'double', 'np.float64'), ('32', 'float', 'np.float32')] }} """SAG and SAGA implementation""" import numpy as np from libc.math cimport exp, fabs, isfinite, log from libc.time cimport time, time_t from ._sgd_fast cimport LossFunction from ._sgd_fast cimport Log, SquaredLoss from ..utils._seq_dataset cimport SequentialDataset32, SequentialDataset64 from libc.stdio cimport printf {{for name_suffix, c_type, np_type in dtypes}} cdef inline {{c_type}} fmax{{name_suffix}}({{c_type}} x, {{c_type}} y) noexcept nogil: if x > y: return x return y {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef {{c_type}} _logsumexp{{name_suffix}}({{c_type}}* arr, int n_classes) noexcept nogil: """Computes the sum of arr assuming arr is in the log domain. Returns log(sum(exp(arr))) while minimizing the possibility of over/underflow. """ # Use the max to normalize, as with the log this is what accumulates # the less errors cdef {{c_type}} vmax = arr[0] cdef {{c_type}} out = 0.0 cdef int i for i in range(1, n_classes): if vmax < arr[i]: vmax = arr[i] for i in range(n_classes): out += exp(arr[i] - vmax) return log(out) + vmax {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef class MultinomialLogLoss{{name_suffix}}: cdef {{c_type}} _loss(self, {{c_type}} y, {{c_type}}* prediction, int n_classes, {{c_type}} sample_weight) noexcept nogil: r"""Multinomial Logistic regression loss. The multinomial logistic loss for one sample is: loss = - sw \sum_c \delta_{y,c} (prediction[c] - logsumexp(prediction)) = sw (logsumexp(prediction) - prediction[y]) where: prediction = dot(x_sample, weights) + intercept \delta_{y,c} = 1 if (y == c) else 0 sw = sample_weight Parameters ---------- y : {{c_type}}, between 0 and n_classes - 1 Indice of the correct class for current sample (i.e. label encoded). prediction : pointer to a np.ndarray[{{c_type}}] of shape (n_classes,) Prediction of the multinomial classifier, for current sample. n_classes : integer Total number of classes. sample_weight : {{c_type}} Weight of current sample. Returns ------- loss : {{c_type}} Multinomial loss for current sample. Reference --------- Bishop, C. M. (2006). Pattern recognition and machine learning. Springer. (Chapter 4.3.4) """ cdef {{c_type}} logsumexp_prediction = _logsumexp{{name_suffix}}(prediction, n_classes) cdef {{c_type}} loss # y is the indice of the correct class of current sample. loss = (logsumexp_prediction - prediction[int(y)]) * sample_weight return loss cdef void dloss(self, {{c_type}} y, {{c_type}}* prediction, int n_classes, {{c_type}} sample_weight, {{c_type}}* gradient_ptr) noexcept nogil: r"""Multinomial Logistic regression gradient of the loss. The gradient of the multinomial logistic loss with respect to a class c, and for one sample is: grad_c = - sw * (p[c] - \delta_{y,c}) where: p[c] = exp(logsumexp(prediction) - prediction[c]) prediction = dot(sample, weights) + intercept \delta_{y,c} = 1 if (y == c) else 0 sw = sample_weight Note that to obtain the true gradient, this value has to be multiplied by the sample vector x. Parameters ---------- prediction : pointer to a np.ndarray[{{c_type}}] of shape (n_classes,) Prediction of the multinomial classifier, for current sample. y : {{c_type}}, between 0 and n_classes - 1 Indice of the correct class for current sample (i.e. label encoded) n_classes : integer Total number of classes. sample_weight : {{c_type}} Weight of current sample. gradient_ptr : pointer to a np.ndarray[{{c_type}}] of shape (n_classes,) Gradient vector to be filled. Reference --------- Bishop, C. M. (2006). Pattern recognition and machine learning. Springer. (Chapter 4.3.4) """ cdef {{c_type}} logsumexp_prediction = _logsumexp{{name_suffix}}(prediction, n_classes) cdef int class_ind for class_ind in range(n_classes): gradient_ptr[class_ind] = exp(prediction[class_ind] - logsumexp_prediction) # y is the indice of the correct class of current sample. if class_ind == y: gradient_ptr[class_ind] -= 1.0 gradient_ptr[class_ind] *= sample_weight def __reduce__(self): return MultinomialLogLoss{{name_suffix}}, () {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef inline {{c_type}} _soft_thresholding{{name_suffix}}({{c_type}} x, {{c_type}} shrinkage) noexcept nogil: return fmax{{name_suffix}}(x - shrinkage, 0) - fmax{{name_suffix}}(- x - shrinkage, 0) {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} def sag{{name_suffix}}( SequentialDataset{{name_suffix}} dataset, {{c_type}}[:, ::1] weights_array, {{c_type}}[::1] intercept_array, int n_samples, int n_features, int n_classes, double tol, int max_iter, str loss_function, double step_size, double alpha, double beta, {{c_type}}[:, ::1] sum_gradient_init, {{c_type}}[:, ::1] gradient_memory_init, bint[::1] seen_init, int num_seen, bint fit_intercept, {{c_type}}[::1] intercept_sum_gradient_init, double intercept_decay, bint saga, bint verbose ): """Stochastic Average Gradient (SAG) and SAGA solvers. Used in Ridge and LogisticRegression. Some implementation details: - Just-in-time (JIT) update: In SAG(A), the average-gradient update is collinear with the drawn sample X_i. Therefore, if the data is sparse, the random sample X_i will change the average gradient only on features j where X_ij != 0. In some cases, the average gradient on feature j might change only after k random samples with no change. In these cases, instead of applying k times the same gradient step on feature j, we apply the gradient step only once, scaled by k. This is called the "just-in-time update", and it is performed in `lagged_update{{name_suffix}}`. This function also applies the proximal operator after the gradient step (if L1 regularization is used in SAGA). - Weight scale: In SAG(A), the weights are scaled down at each iteration due to the L2 regularization. To avoid updating all the weights at each iteration, the weight scale is factored out in a separate variable `wscale` which is only used in the JIT update. When this variable is too small, it is reset for numerical stability using the function `scale_weights{{name_suffix}}`. This reset requires applying all remaining JIT updates. This reset is also performed every `n_samples` iterations before each convergence check, so when the algorithm stops, we are sure that there is no remaining JIT updates. Reference --------- Schmidt, M., Roux, N. L., & Bach, F. (2013). Minimizing finite sums with the stochastic average gradient https://hal.inria.fr/hal-00860051/document (section 4.3) :arxiv:`Defazio, A., Bach F. & Lacoste-Julien S. (2014). "SAGA: A Fast Incremental Gradient Method With Support for Non-Strongly Convex Composite Objectives" <1407.0202>` """ # the data pointer for x, the current sample cdef {{c_type}} *x_data_ptr = NULL # the index pointer for the column of the data cdef int *x_ind_ptr = NULL # the number of non-zero features for current sample cdef int xnnz = -1 # the label value for current sample # the label value for current sample cdef {{c_type}} y # the sample weight cdef {{c_type}} sample_weight # helper variable for indexes cdef int f_idx, s_idx, feature_ind, class_ind, j # the number of pass through all samples cdef int n_iter = 0 # helper to track iterations through samples cdef int sample_itr # the index (row number) of the current sample cdef int sample_ind # the maximum change in weights, used to compute stopping criteria cdef {{c_type}} max_change # a holder variable for the max weight, used to compute stopping criteria cdef {{c_type}} max_weight # the start time of the fit cdef time_t start_time # the end time of the fit cdef time_t end_time # precomputation since the step size does not change in this implementation cdef {{c_type}} wscale_update = 1.0 - step_size * alpha # helper for cumulative sum cdef {{c_type}} cum_sum # the pointer to the coef_ or weights cdef {{c_type}}* weights = &weights_array[0, 0] # the sum of gradients for each feature cdef {{c_type}}* sum_gradient = &sum_gradient_init[0, 0] # the previously seen gradient for each sample cdef {{c_type}}* gradient_memory = &gradient_memory_init[0, 0] # the cumulative sums needed for JIT params cdef {{c_type}}[::1] cumulative_sums = np.empty(n_samples, dtype={{np_type}}, order="c") # the index for the last time this feature was updated cdef int[::1] feature_hist = np.zeros(n_features, dtype=np.int32, order="c") # the previous weights to use to compute stopping criteria cdef {{c_type}}[:, ::1] previous_weights_array = np.zeros((n_features, n_classes), dtype={{np_type}}, order="c") cdef {{c_type}}* previous_weights = &previous_weights_array[0, 0] cdef {{c_type}}[::1] prediction = np.zeros(n_classes, dtype={{np_type}}, order="c") cdef {{c_type}}[::1] gradient = np.zeros(n_classes, dtype={{np_type}}, order="c") # Intermediate variable that need declaration since cython cannot infer when templating cdef {{c_type}} val # Bias correction term in saga cdef {{c_type}} gradient_correction # the scalar used for multiplying z cdef {{c_type}} wscale = 1.0 # return value (-1 if an error occurred, 0 otherwise) cdef int status = 0 # the cumulative sums for each iteration for the sparse implementation cumulative_sums[0] = 0.0 # the multipliative scale needed for JIT params cdef {{c_type}}[::1] cumulative_sums_prox cdef {{c_type}}* cumulative_sums_prox_ptr cdef bint prox = beta > 0 and saga # Loss function to optimize cdef LossFunction loss # Whether the loss function is multinomial cdef bint multinomial = False # Multinomial loss function cdef MultinomialLogLoss{{name_suffix}} multiloss if loss_function == "multinomial": multinomial = True multiloss = MultinomialLogLoss{{name_suffix}}() elif loss_function == "log": loss = Log() elif loss_function == "squared": loss = SquaredLoss() else: raise ValueError("Invalid loss parameter: got %s instead of " "one of ('log', 'squared', 'multinomial')" % loss_function) if prox: cumulative_sums_prox = np.empty(n_samples, dtype={{np_type}}, order="c") cumulative_sums_prox_ptr = &cumulative_sums_prox[0] else: cumulative_sums_prox = None cumulative_sums_prox_ptr = NULL with nogil: start_time = time(NULL) for n_iter in range(max_iter): for sample_itr in range(n_samples): # extract a random sample sample_ind = dataset.random(&x_data_ptr, &x_ind_ptr, &xnnz, &y, &sample_weight) # cached index for gradient_memory s_idx = sample_ind * n_classes # update the number of samples seen and the seen array if seen_init[sample_ind] == 0: num_seen += 1 seen_init[sample_ind] = 1 # make the weight updates (just-in-time gradient step, and prox operator) if sample_itr > 0: status = lagged_update{{name_suffix}}( weights=weights, wscale=wscale, xnnz=xnnz, n_samples=n_samples, n_classes=n_classes, sample_itr=sample_itr, cumulative_sums=&cumulative_sums[0], cumulative_sums_prox=cumulative_sums_prox_ptr, feature_hist=&feature_hist[0], prox=prox, sum_gradient=sum_gradient, x_ind_ptr=x_ind_ptr, reset=False, n_iter=n_iter ) if status == -1: break # find the current prediction predict_sample{{name_suffix}}( x_data_ptr=x_data_ptr, x_ind_ptr=x_ind_ptr, xnnz=xnnz, w_data_ptr=weights, wscale=wscale, intercept=&intercept_array[0], prediction=&prediction[0], n_classes=n_classes ) # compute the gradient for this sample, given the prediction if multinomial: multiloss.dloss(y, &prediction[0], n_classes, sample_weight, &gradient[0]) else: gradient[0] = loss.dloss(y, prediction[0]) * sample_weight # L2 regularization by simply rescaling the weights wscale *= wscale_update # make the updates to the sum of gradients for j in range(xnnz): feature_ind = x_ind_ptr[j] val = x_data_ptr[j] f_idx = feature_ind * n_classes for class_ind in range(n_classes): gradient_correction = \ val * (gradient[class_ind] - gradient_memory[s_idx + class_ind]) if saga: # Note that this is not the main gradient step, # which is performed just-in-time in lagged_update. # This part is done outside the JIT update # as it does not depend on the average gradient. # The prox operator is applied after the JIT update weights[f_idx + class_ind] -= \ (gradient_correction * step_size * (1 - 1. / num_seen) / wscale) sum_gradient[f_idx + class_ind] += gradient_correction # fit the intercept if fit_intercept: for class_ind in range(n_classes): gradient_correction = (gradient[class_ind] - gradient_memory[s_idx + class_ind]) intercept_sum_gradient_init[class_ind] += gradient_correction gradient_correction *= step_size * (1. - 1. / num_seen) if saga: intercept_array[class_ind] -= \ (step_size * intercept_sum_gradient_init[class_ind] / num_seen * intercept_decay) + gradient_correction else: intercept_array[class_ind] -= \ (step_size * intercept_sum_gradient_init[class_ind] / num_seen * intercept_decay) # check to see that the intercept is not inf or NaN if not isfinite(intercept_array[class_ind]): status = -1 break # Break from the n_samples outer loop if an error happened # in the fit_intercept n_classes inner loop if status == -1: break # update the gradient memory for this sample for class_ind in range(n_classes): gradient_memory[s_idx + class_ind] = gradient[class_ind] if sample_itr == 0: cumulative_sums[0] = step_size / (wscale * num_seen) if prox: cumulative_sums_prox[0] = step_size * beta / wscale else: cumulative_sums[sample_itr] = \ (cumulative_sums[sample_itr - 1] + step_size / (wscale * num_seen)) if prox: cumulative_sums_prox[sample_itr] = \ (cumulative_sums_prox[sample_itr - 1] + step_size * beta / wscale) # If wscale gets too small, we need to reset the scale. # This also resets the just-in-time update system. if wscale < 1e-9: if verbose: with gil: print("rescaling...") status = scale_weights{{name_suffix}}( weights=weights, wscale=&wscale, n_features=n_features, n_samples=n_samples, n_classes=n_classes, sample_itr=sample_itr, cumulative_sums=&cumulative_sums[0], cumulative_sums_prox=cumulative_sums_prox_ptr, feature_hist=&feature_hist[0], prox=prox, sum_gradient=sum_gradient, n_iter=n_iter ) if status == -1: break # Break from the n_iter outer loop if an error happened in the # n_samples inner loop if status == -1: break # We scale the weights every n_samples iterations and reset the # just-in-time update system for numerical stability. # Because this reset is done before every convergence check, we are # sure there is no remaining lagged update when the algorithm stops. status = scale_weights{{name_suffix}}( weights=weights, wscale=&wscale, n_features=n_features, n_samples=n_samples, n_classes=n_classes, sample_itr=n_samples - 1, cumulative_sums=&cumulative_sums[0], cumulative_sums_prox=cumulative_sums_prox_ptr, feature_hist=&feature_hist[0], prox=prox, sum_gradient=sum_gradient, n_iter=n_iter ) if status == -1: break # check if the stopping criteria is reached max_change = 0.0 max_weight = 0.0 for idx in range(n_features * n_classes): max_weight = fmax{{name_suffix}}(max_weight, fabs(weights[idx])) max_change = fmax{{name_suffix}}(max_change, fabs(weights[idx] - previous_weights[idx])) previous_weights[idx] = weights[idx] if ((max_weight != 0 and max_change / max_weight <= tol) or max_weight == 0 and max_change == 0): if verbose: end_time = time(NULL) with gil: print("convergence after %d epochs took %d seconds" % (n_iter + 1, end_time - start_time)) break elif verbose: printf('Epoch %d, change: %.8f\n', n_iter + 1, max_change / max_weight) n_iter += 1 # We do the error treatment here based on error code in status to avoid # re-acquiring the GIL within the cython code, which slows the computation # when the sag/saga solver is used concurrently in multiple Python threads. if status == -1: raise ValueError(("Floating-point under-/overflow occurred at epoch" " #%d. Scaling input data with StandardScaler or" " MinMaxScaler might help.") % n_iter) if verbose and n_iter >= max_iter: end_time = time(NULL) print(("max_iter reached after %d seconds") % (end_time - start_time)) return num_seen, n_iter {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef int scale_weights{{name_suffix}}( {{c_type}}* weights, {{c_type}}* wscale, int n_features, int n_samples, int n_classes, int sample_itr, {{c_type}}* cumulative_sums, {{c_type}}* cumulative_sums_prox, int* feature_hist, bint prox, {{c_type}}* sum_gradient, int n_iter ) noexcept nogil: """Scale the weights and reset wscale to 1.0 for numerical stability, and reset the just-in-time (JIT) update system. See `sag{{name_suffix}}`'s docstring about the JIT update system. wscale = (1 - step_size * alpha) ** (n_iter * n_samples + sample_itr) can become very small, so we reset it every n_samples iterations to 1.0 for numerical stability. To be able to scale, we first need to update every coefficients and reset the just-in-time update system. This also limits the size of `cumulative_sums`. """ cdef int status status = lagged_update{{name_suffix}}( weights, wscale[0], n_features, n_samples, n_classes, sample_itr + 1, cumulative_sums, cumulative_sums_prox, feature_hist, prox, sum_gradient, NULL, True, n_iter ) # if lagged update succeeded, reset wscale to 1.0 if status == 0: wscale[0] = 1.0 return status {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef int lagged_update{{name_suffix}}( {{c_type}}* weights, {{c_type}} wscale, int xnnz, int n_samples, int n_classes, int sample_itr, {{c_type}}* cumulative_sums, {{c_type}}* cumulative_sums_prox, int* feature_hist, bint prox, {{c_type}}* sum_gradient, int* x_ind_ptr, bint reset, int n_iter ) noexcept nogil: """Hard perform the JIT updates for non-zero features of present sample. See `sag{{name_suffix}}`'s docstring about the JIT update system. The updates that awaits are kept in memory using cumulative_sums, cumulative_sums_prox, wscale and feature_hist. See original SAGA paper (Defazio et al. 2014) for details. If reset=True, we also reset wscale to 1 (this is done at the end of each epoch). """ cdef int feature_ind, class_ind, idx, f_idx, lagged_ind, last_update_ind cdef {{c_type}} cum_sum, grad_step, prox_step, cum_sum_prox for feature_ind in range(xnnz): if not reset: feature_ind = x_ind_ptr[feature_ind] f_idx = feature_ind * n_classes cum_sum = cumulative_sums[sample_itr - 1] if prox: cum_sum_prox = cumulative_sums_prox[sample_itr - 1] if feature_hist[feature_ind] != 0: cum_sum -= cumulative_sums[feature_hist[feature_ind] - 1] if prox: cum_sum_prox -= cumulative_sums_prox[feature_hist[feature_ind] - 1] if not prox: for class_ind in range(n_classes): idx = f_idx + class_ind weights[idx] -= cum_sum * sum_gradient[idx] if reset: weights[idx] *= wscale if not isfinite(weights[idx]): # returning here does not require the gil as the return # type is a C integer return -1 else: for class_ind in range(n_classes): idx = f_idx + class_ind if fabs(sum_gradient[idx] * cum_sum) < cum_sum_prox: # In this case, we can perform all the gradient steps and # all the proximal steps in this order, which is more # efficient than unrolling all the lagged updates. # Idea taken from scikit-learn-contrib/lightning. weights[idx] -= cum_sum * sum_gradient[idx] weights[idx] = _soft_thresholding{{name_suffix}}(weights[idx], cum_sum_prox) else: last_update_ind = feature_hist[feature_ind] if last_update_ind == -1: last_update_ind = sample_itr - 1 for lagged_ind in range(sample_itr - 1, last_update_ind - 1, -1): if lagged_ind > 0: grad_step = (cumulative_sums[lagged_ind] - cumulative_sums[lagged_ind - 1]) prox_step = (cumulative_sums_prox[lagged_ind] - cumulative_sums_prox[lagged_ind - 1]) else: grad_step = cumulative_sums[lagged_ind] prox_step = cumulative_sums_prox[lagged_ind] weights[idx] -= sum_gradient[idx] * grad_step weights[idx] = _soft_thresholding{{name_suffix}}(weights[idx], prox_step) if reset: weights[idx] *= wscale # check to see that the weight is not inf or NaN if not isfinite(weights[idx]): return -1 if reset: feature_hist[feature_ind] = sample_itr % n_samples else: feature_hist[feature_ind] = sample_itr if reset: cumulative_sums[sample_itr - 1] = 0.0 if prox: cumulative_sums_prox[sample_itr - 1] = 0.0 return 0 {{endfor}} {{for name_suffix, c_type, np_type in dtypes}} cdef void predict_sample{{name_suffix}}( {{c_type}}* x_data_ptr, int* x_ind_ptr, int xnnz, {{c_type}}* w_data_ptr, {{c_type}} wscale, {{c_type}}* intercept, {{c_type}}* prediction, int n_classes ) noexcept nogil: """Compute the prediction given sparse sample x and dense weight w. Parameters ---------- x_data_ptr : pointer Pointer to the data of the sample x x_ind_ptr : pointer Pointer to the indices of the sample x xnnz : int Number of non-zero element in the sample x w_data_ptr : pointer Pointer to the data of the weights w wscale : {{c_type}} Scale of the weights w intercept : pointer Pointer to the intercept prediction : pointer Pointer to store the resulting prediction n_classes : int Number of classes in multinomial case. Equals 1 in binary case. """ cdef int feature_ind, class_ind, j cdef {{c_type}} innerprod for class_ind in range(n_classes): innerprod = 0.0 # Compute the dot product only on non-zero elements of x for j in range(xnnz): feature_ind = x_ind_ptr[j] innerprod += (w_data_ptr[feature_ind * n_classes + class_ind] * x_data_ptr[j]) prediction[class_ind] = wscale * innerprod + intercept[class_ind] {{endfor}} def _multinomial_grad_loss_all_samples( SequentialDataset64 dataset, double[:, ::1] weights_array, double[::1] intercept_array, int n_samples, int n_features, int n_classes ): """Compute multinomial gradient and loss across all samples. Used for testing purpose only. """ cdef double *x_data_ptr = NULL cdef int *x_ind_ptr = NULL cdef int xnnz = -1 cdef double y cdef double sample_weight cdef double wscale = 1.0 cdef int i, j, class_ind, feature_ind cdef double val cdef double sum_loss = 0.0 cdef MultinomialLogLoss64 multiloss = MultinomialLogLoss64() cdef double[:, ::1] sum_gradient_array = np.zeros((n_features, n_classes), dtype=np.double, order="c") cdef double* sum_gradient = &sum_gradient_array[0, 0] cdef double[::1] prediction = np.zeros(n_classes, dtype=np.double, order="c") cdef double[::1] gradient = np.zeros(n_classes, dtype=np.double, order="c") with nogil: for i in range(n_samples): # get next sample on the dataset dataset.next( &x_data_ptr, &x_ind_ptr, &xnnz, &y, &sample_weight ) # prediction of the multinomial classifier for the sample predict_sample64( x_data_ptr, x_ind_ptr, xnnz, &weights_array[0, 0], wscale, &intercept_array[0], &prediction[0], n_classes ) # compute the gradient for this sample, given the prediction multiloss.dloss(y, &prediction[0], n_classes, sample_weight, &gradient[0]) # compute the loss for this sample, given the prediction sum_loss += multiloss._loss(y, &prediction[0], n_classes, sample_weight) # update the sum of the gradient for j in range(xnnz): feature_ind = x_ind_ptr[j] val = x_data_ptr[j] for class_ind in range(n_classes): sum_gradient[feature_ind * n_classes + class_ind] += gradient[class_ind] * val return sum_loss, sum_gradient_array