Skip to content

Commit ae7d590

Browse files
multithreading
1 parent 00e91cd commit ae7d590

File tree

5 files changed

+35
-92
lines changed

5 files changed

+35
-92
lines changed

.github/workflows/build_wheels.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ jobs:
1111
os: [ubuntu-20.04, windows-2019]
1212

1313
steps:
14-
- uses: actions/checkout@v3
14+
- uses: actions/checkout@v4
1515

1616
- name: Build wheels
17-
uses: pypa/cibuildwheel@v2.14.0
17+
uses: pypa/cibuildwheel@v2.16.5
1818
env:
1919
CIBW_SKIP: "*musllinux*"
2020

21-
- uses: actions/upload-artifact@v3
21+
- uses: actions/upload-artifact@v4
2222
with:
23+
name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }}
2324
path: ./wheelhouse/*.whl

cpp/APLRRegressor.h

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#pragma once
22
#include <string>
33
#include <limits>
4-
#include <thread>
54
#include <future>
65
#include <random>
76
#include <vector>
7+
#include <omp.h>
88
#include "../dependencies/eigen-3.4.0/Eigen/Dense"
99
#include "functions.h"
1010
#include "term.h"
@@ -82,6 +82,7 @@ class APLRRegressor
8282
MatrixXi preprocess_cv_observations(const MatrixXi &cv_observations, const VectorXd &y);
8383
void preprocess_prioritized_predictors_and_interaction_constraints(const MatrixXd &X, const std::vector<size_t> &prioritized_predictors_indexes,
8484
const std::vector<std::vector<size_t>> &interaction_constraints);
85+
void initialize_multithreading();
8586
void fit_model_for_cv_fold(const MatrixXd &X, const VectorXd &y, const VectorXd &sample_weight,
8687
const std::vector<std::string> &X_names, const VectorXi &cv_observations_in_fold,
8788
const std::vector<int> &monotonic_constraints, const VectorXi &group, const MatrixXd &other_data,
@@ -96,8 +97,8 @@ class APLRRegressor
9697
VectorXd calculate_neg_gradient_current();
9798
VectorXd calculate_neg_gradient_current_for_group_mse(GroupData &group_residuals_and_count, const VectorXi &group,
9899
const std::set<int> &unique_groups);
99-
void execute_boosting_steps();
100-
void execute_boosting_step(size_t boosting_step);
100+
void execute_boosting_steps(Eigen::Index fold_index);
101+
void execute_boosting_step(size_t boosting_step, Eigen::Index fold_index);
101102
std::vector<size_t> find_terms_eligible_current_indexes_for_a_base_term(size_t base_term);
102103
void estimate_split_point_for_each_term(std::vector<Term> &terms, std::vector<size_t> &terms_indexes);
103104
size_t find_best_term_index(std::vector<Term> &terms, std::vector<size_t> &terms_indexes);
@@ -119,7 +120,7 @@ class APLRRegressor
119120
double calculate_validation_error(const VectorXd &predictions);
120121
double calculate_group_mse_by_prediction_validation_error(const VectorXd &predictions);
121122
void update_term_eligibility();
122-
void print_summary_after_boosting_step(size_t boosting_step);
123+
void print_summary_after_boosting_step(size_t boosting_step, Eigen::Index fold_index);
123124
void print_final_summary();
124125
void find_optimal_m_and_update_model_accordingly();
125126
void merge_similar_terms(const MatrixXd &X);
@@ -316,6 +317,7 @@ void APLRRegressor::fit(const MatrixXd &X, const VectorXd &y, const VectorXd &sa
316317
interaction_constraints, other_data);
317318
MatrixXi cv_observations_used{preprocess_cv_observations(cv_observations, y)};
318319
preprocess_prioritized_predictors_and_interaction_constraints(X, prioritized_predictors_indexes, interaction_constraints);
320+
initialize_multithreading();
319321
cv_fold_models.resize(cv_observations_used.cols());
320322
for (Eigen::Index i = 0; i < cv_observations_used.cols(); ++i)
321323
{
@@ -343,6 +345,17 @@ void APLRRegressor::preprocess_prioritized_predictors_and_interaction_constraint
343345
}
344346
}
345347

348+
void APLRRegressor::initialize_multithreading()
349+
{
350+
size_t available_cores{static_cast<size_t>(std::thread::hardware_concurrency())};
351+
size_t cores_to_use;
352+
if (n_jobs == 0)
353+
cores_to_use = available_cores;
354+
else
355+
cores_to_use = std::min(n_jobs, available_cores);
356+
omp_set_num_threads(cores_to_use);
357+
}
358+
346359
void APLRRegressor::fit_model_for_cv_fold(const MatrixXd &X, const VectorXd &y, const VectorXd &sample_weight,
347360
const std::vector<std::string> &X_names, const VectorXi &cv_observations_in_fold,
348361
const std::vector<int> &monotonic_constraints, const VectorXi &group, const MatrixXd &other_data,
@@ -351,7 +364,7 @@ void APLRRegressor::fit_model_for_cv_fold(const MatrixXd &X, const VectorXd &y,
351364
define_training_and_validation_sets(X, y, sample_weight, cv_observations_in_fold, group, other_data);
352365
scale_response_if_using_log_link_function();
353366
initialize(monotonic_constraints);
354-
execute_boosting_steps();
367+
execute_boosting_steps(fold_index);
355368
print_final_summary();
356369
find_optimal_m_and_update_model_accordingly();
357370
merge_similar_terms(X_train);
@@ -1005,12 +1018,12 @@ VectorXd APLRRegressor::differentiate_predictions_wrt_linear_predictor()
10051018
return VectorXd(0);
10061019
}
10071020

1008-
void APLRRegressor::execute_boosting_steps()
1021+
void APLRRegressor::execute_boosting_steps(Eigen::Index fold_index)
10091022
{
10101023
abort_boosting = false;
10111024
for (size_t boosting_step = 0; boosting_step < m; ++boosting_step)
10121025
{
1013-
execute_boosting_step(boosting_step);
1026+
execute_boosting_step(boosting_step, fold_index);
10141027
if (abort_boosting)
10151028
break;
10161029
if (loss_function == "group_mse_cycle")
@@ -1024,7 +1037,7 @@ void APLRRegressor::execute_boosting_steps()
10241037
}
10251038
}
10261039

1027-
void APLRRegressor::execute_boosting_step(size_t boosting_step)
1040+
void APLRRegressor::execute_boosting_step(size_t boosting_step, Eigen::Index fold_index)
10281041
{
10291042
model_has_changed_in_this_boosting_step = false;
10301043
update_intercept(boosting_step);
@@ -1065,7 +1078,7 @@ void APLRRegressor::execute_boosting_step(size_t boosting_step)
10651078
if (abort_boosting)
10661079
return;
10671080
update_term_eligibility();
1068-
print_summary_after_boosting_step(boosting_step);
1081+
print_summary_after_boosting_step(boosting_step, fold_index);
10691082
}
10701083

10711084
void APLRRegressor::update_intercept(size_t boosting_step)
@@ -1120,35 +1133,10 @@ std::vector<size_t> APLRRegressor::find_terms_eligible_current_indexes_for_a_bas
11201133
void APLRRegressor::estimate_split_point_for_each_term(std::vector<Term> &terms, std::vector<size_t> &terms_indexes)
11211134
{
11221135
bool multithreading{n_jobs != 1 && terms_indexes.size() > 1};
1123-
if (multithreading)
1136+
#pragma omp parallel for schedule(auto) if (multithreading)
1137+
for (size_t i = 0; i < terms_indexes.size(); ++i)
11241138
{
1125-
distributed_terms = distribute_terms_indexes_to_cores(terms_indexes, n_jobs);
1126-
1127-
std::vector<std::thread> threads(distributed_terms.size());
1128-
1129-
auto estimate_split_point_for_distributed_terms_in_one_thread = [this, &terms, &terms_indexes](size_t thread_index)
1130-
{
1131-
for (size_t i = 0; i < distributed_terms[thread_index].size(); ++i)
1132-
{
1133-
terms[terms_indexes[distributed_terms[thread_index][i]]].estimate_split_point(X_train, neg_gradient_current, sample_weight_train, bins, v, min_observations_in_split);
1134-
}
1135-
};
1136-
1137-
for (size_t i = 0; i < threads.size(); ++i)
1138-
{
1139-
threads[i] = std::thread(estimate_split_point_for_distributed_terms_in_one_thread, i);
1140-
}
1141-
for (size_t i = 0; i < threads.size(); ++i)
1142-
{
1143-
threads[i].join();
1144-
}
1145-
}
1146-
else
1147-
{
1148-
for (size_t i = 0; i < terms_indexes.size(); ++i)
1149-
{
1150-
terms[terms_indexes[i]].estimate_split_point(X_train, neg_gradient_current, sample_weight_train, bins, v, min_observations_in_split);
1151-
}
1139+
terms[terms_indexes[i]].estimate_split_point(X_train, neg_gradient_current, sample_weight_train, bins, v, min_observations_in_split);
11521140
}
11531141
}
11541142

@@ -1571,11 +1559,11 @@ void APLRRegressor::update_term_eligibility()
15711559
}
15721560
}
15731561

1574-
void APLRRegressor::print_summary_after_boosting_step(size_t boosting_step)
1562+
void APLRRegressor::print_summary_after_boosting_step(size_t boosting_step, Eigen::Index fold_index)
15751563
{
15761564
if (verbosity >= 2)
15771565
{
1578-
std::cout << "Boosting step: " << boosting_step + 1 << ". Model terms: " << terms.size() << ". Terms eligible: " << number_of_eligible_terms << ". Validation error: " << validation_error_steps.col(0)[boosting_step] << ".\n";
1566+
std::cout << "Fold: " << fold_index << ". Boosting step: " << boosting_step + 1 << ". Model terms: " << terms.size() << ". Terms eligible: " << number_of_eligible_terms << ". Validation error: " << validation_error_steps.col(0)[boosting_step] << ".\n";
15791567
}
15801568
}
15811569

@@ -1700,7 +1688,6 @@ void APLRRegressor::set_term_names(const std::vector<std::string> &X_names)
17001688
terms[i].name.pop_back();
17011689
terms[i].name += "!=0)";
17021690
}
1703-
terms[i].name = "Interaction level: " + std::to_string(terms[i].get_interaction_level()) + ". " + terms[i].name;
17041691
}
17051692

17061693
term_names.resize(terms.size() + 1);

cpp/functions.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
#include <vector>
66
#include <fstream>
77
#include <iostream>
8-
#include <thread>
9-
#include <future>
108
#include <random>
119
#include <set>
1210
#include <map>
@@ -370,12 +368,6 @@ void save_as_csv_file(std::string fileName, MatrixXd matrix)
370368
}
371369
}
372370

373-
struct DistributedIndices
374-
{
375-
std::vector<size_t> index_lowest;
376-
std::vector<size_t> index_highest;
377-
};
378-
379371
template <typename T> // Type must implement a size() method
380372
size_t calculate_max_index_in_vector(T &vector)
381373
{

cpp/term.h

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -793,46 +793,6 @@ double Term::get_estimated_term_importance()
793793
return estimated_term_importance;
794794
}
795795

796-
std::vector<std::vector<size_t>> distribute_terms_indexes_to_cores(std::vector<size_t> &term_indexes, size_t n_jobs)
797-
{
798-
size_t num_eligible_terms{term_indexes.size()};
799-
800-
size_t available_cores{static_cast<size_t>(std::thread::hardware_concurrency())};
801-
if (n_jobs > 1)
802-
available_cores = std::min(n_jobs, available_cores);
803-
size_t units_per_core{std::max(num_eligible_terms / available_cores, static_cast<size_t>(1))};
804-
805-
std::vector<std::vector<size_t>> output(available_cores);
806-
for (size_t i = 0; i < available_cores; ++i)
807-
{
808-
output[i] = std::vector<size_t>();
809-
output[i].reserve(num_eligible_terms);
810-
}
811-
812-
size_t core{0};
813-
size_t count{0};
814-
for (size_t i = 0; i < term_indexes.size(); ++i)
815-
{
816-
output[core].push_back(i);
817-
++count;
818-
if (count >= units_per_core)
819-
{
820-
if (core < available_cores - 1)
821-
++core;
822-
else
823-
core = 0;
824-
count = 0;
825-
}
826-
}
827-
828-
for (size_t i = 0; i < available_cores; ++i)
829-
{
830-
output[i].shrink_to_fit();
831-
}
832-
833-
return output;
834-
}
835-
836796
std::vector<size_t> create_term_indexes(std::vector<Term> &terms)
837797
{
838798
std::vector<size_t> term_indexes;

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
extra_compile_args = []
44
if "win" not in sys.platform:
5+
extra_compile_args.append("-fopenmp")
56
extra_compile_args.append("-std=c++17")
67
extra_compile_args.append("-pthread")
8+
else:
9+
extra_compile_args.append("/openmp:llvm")
710

811
sfc_module = setuptools.Extension(
912
name="aplr_cpp",
@@ -15,7 +18,7 @@
1518

1619
setuptools.setup(
1720
name="aplr",
18-
version="9.1.0",
21+
version="9.2.0",
1922
description="Automatic Piecewise Linear Regression",
2023
ext_modules=[sfc_module],
2124
author="Mathias von Ottenbreit",

0 commit comments

Comments
 (0)