diff --git a/experiments/docker-compose.yml b/experiments/docker-compose.yml index a306d1f..26e7adf 100644 --- a/experiments/docker-compose.yml +++ b/experiments/docker-compose.yml @@ -7,7 +7,7 @@ services: stdin_open: true volumes: - ${PWD}:/pod - - neo4j_data:/neo4j_data/data + - /data/elastic-notebook/data:/pod/nbdata working_dir: /pod command: ["/bin/bash"] @@ -30,21 +30,6 @@ services: - redis_data:/data command: ["redis-server", "--port", "6379"] - podneo4j: - # image: neo4j:5.15.0-community-bullseye - image: neo4j:5.15.0-enterprise-bullseye - hostname: podneo4j - tty: true - stdin_open: true - volumes: - - neo4j_data:/data - environment: - NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes" # Using Neo4j Developer License (https://neo4j.com/licensing/) - NEO4J_AUTH: neo4j/podneo4jPassword - NEO4J_server_memory_pagecache_size: 2G - NEO4J_server_memory_heap_max__size: 2G - command: ["neo4j"] - podmongo: image: mongo:7.0.5-jammy hostname: podmongo @@ -53,4 +38,3 @@ services: volumes: redis_data: - neo4j_data: diff --git a/experiments/pod.Dockerfile b/experiments/pod.Dockerfile index fcfe5c0..f6cfff7 100644 --- a/experiments/pod.Dockerfile +++ b/experiments/pod.Dockerfile @@ -36,6 +36,7 @@ RUN python -m pip install -r /pod/requirements.txt COPY ./pod /pod/pod COPY ./setup.py /pod/setup.py COPY ./README.md /pod/README.md + RUN python -m pip install -e /pod/ WORKDIR / diff --git a/experiments/train.sh b/experiments/train.sh new file mode 100644 index 0000000..b548c41 --- /dev/null +++ b/experiments/train.sh @@ -0,0 +1,12 @@ +for alpha in 0.1; do + for gamma in 0.7; do + echo "alpha $alpha gamma $gamma" + stdout_file="eval_logs/stdout_alpha${alpha}_gamma${gamma}.log" + stderr_file="eval_logs/stderr_alpha${alpha}_gamma${gamma}.log" + # Ensure the train_logs directory exists + mkdir -p eval_logs + # Execute the command and use tee for stdout and stderr + ( docker exec experiments-pod-1 python pod/train.py --gamma $gamma --alpha $alpha 2> >(tee "$stderr_file" >&2) | tee "$stdout_file" ) & + done +done +echo "done with script" diff --git a/pod/find_bench_size.py b/pod/find_bench_size.py new file mode 100644 index 0000000..1471aac --- /dev/null +++ b/pod/find_bench_size.py @@ -0,0 +1,124 @@ +import contextlib +import gc +import io +import multiprocessing as mp +import os +import shutil +import sys +import time +from dataclasses import dataclass +from functools import partial +from multiprocessing import Process, Queue +from pathlib import Path +from typing import List +import json + +import numpy as np +from loguru import logger +from model import QLearningPoddingModel +from tqdm import tqdm + +from pod.bench import BenchArgs, NotebookExecutor, Notebooks +from pod.common import PodId +from pod.feature import __FEATURE__ +from pod.pickling import ManualPodding, SnapshotPodPickling, StaticPodPickling +from pod.stats import ExpStat +from pod.storage import FilePodStorage + + +@dataclass +class TrainArgs: + gamma: float + alpha: float + + +def run_iter(nb_path, update_q: Queue): + # print(nb_path) + args = BenchArgs(expname="", nb=nb_path, sut="snapshot") + # Load notebook. + logger.info(f"PID {os.getpid()}, {nb_path}") + save_file_str = nb_path + nb_cells = Notebooks.nb(args=args) + nb_exec = NotebookExecutor(nb_cells) + + pod_storage_path = Path(f"tmp/pod{save_file_str}") + if pod_storage_path.exists(): + shutil.rmtree(pod_storage_path) + + # Initialize sut + # sut = SnapshotPodPickling(Path(f"tmp/pod{save_file_str}")) + sut = SnapshotPodPickling(Path(f"tmp/pod{save_file_str}")) + + sizes = [] + times = [] + last_storage_size = 0 + # expstat = ExpStat() + pids: List[PodId] = [] + for nth, (cell, the_globals, the_locals) in enumerate(nb_exec.iter()): + # Dump current state. + dump_start_ts = time.time() + pid = sut.dump(the_locals) + dump_stop_ts = time.time() + + # Record measurements. + cur_size = sut.estimate_size() + dump_time = dump_stop_ts - dump_start_ts + times.append(dump_time) + pids.append(pid) + size = cur_size - last_storage_size + last_storage_size = cur_size + sizes.append(size) + # Reset environment to reduce noise. + gc.collect() + + update_q.put({"nb": nb_path, "sizes" : sizes, "times" : times, "final_size" : cur_size}) + print("DONE") + return + + +def find_bench_size(nbs): + """Finds average size using snapshot""" + procs: List[Process] = [] + update_q = Queue() + for nb_path in nbs: + p = Process(target=run_iter, args=(nb_path, update_q)) + procs.append(p) + try: + print("STARTING PROC") + p.start() + except: + logger.info("ERROR STARTING PROCESS") + return + + global_data = {} + popped = 0 + while popped < len(nbs): + print("GETTING FROM UPD") + data = update_q.get() + popped += 1 + global_data[data["nb"]] = {"sizes" : data["sizes"], "times" : data["times"], "final_size" : data["final_size"]} + for p in procs: + try: + p.join() + except: + logger.info("ERROR JOINING") + return global_data + + +if __name__ == "__main__": + # logger.info(f"Arguments {sys.argv}") + bench_data = find_bench_size( + [ + "notebooks/it-s-that-time-of-the-year-again.ipynb", + "notebooks/better-xgb-baseline.ipynb", + "notebooks/fast-fourier-transform-denoising.ipynb", + "notebooks/cv19w3-2-v2-play-2-v3fix-sub-last6dayopt.ipynb", + # "notebooks/amex-dataset.ipynb", + "notebooks/denoising-with-direct-wavelet-transform.ipynb", + "notebooks/04_training_linear_models.ipynb", + ] + ) + json_object = json.dumps(bench_data, indent=4) + with open("benchdata.json", "w") as f: + print("WRITING") + f.write(json_object) diff --git a/pod/inspect_qt.py b/pod/inspect_qt.py new file mode 100644 index 0000000..25ec3de --- /dev/null +++ b/pod/inspect_qt.py @@ -0,0 +1,65 @@ +import numpy as np +import matplotlib.pyplot as plt +from itertools import product +from model import QLearningPoddingModel + + +def idx_to_state(idx): + # The lists from which the Cartesian product was created + lists = [ + [True, False], + QLearningPoddingModel.SIZES, + QLearningPoddingModel.SIZES, + QLearningPoddingModel.PROBABILITIES, + QLearningPoddingModel.PROBABILITIES, + QLearningPoddingModel.TYPES, + QLearningPoddingModel.ACTION_CHOICES, + ] + + total_combinations = [len(lst) for lst in lists] + parameters = [] + for lst_len in reversed(total_combinations): + value_idx = idx % lst_len + parameters.append(lists[len(lists) - len(parameters) - 1][value_idx]) + idx //= lst_len + parameters.reverse() + return tuple(parameters) + + +def sort_by_max(arr): + max_values = np.max(arr, axis=1) + sorted_indices = np.argsort(max_values)[::-1] + sorted_arr = arr[sorted_indices] + return sorted_indices, sorted_arr + + +def inspect(qt_path): + qt = np.load(qt_path) + fresh_qt = np.load("qtables/EVAL.npy") + # print(f"MAX {np.max(qt)}") + # print(f"MIN {np.min(qt)}") + # plt.hist(qt, bins=10, edgecolor='black') + # plt.title('Histogram of Data') + # plt.xlabel('Value') + # plt.ylabel('Frequency') + # plt.savefig("qt_vals.png") + # plt.show() + used_values = np.where((qt == 10) | (qt == 20) | (qt == 30), -10000, qt) + sorted_used_idx, sorted_used_qt = sort_by_max(used_values) + sorted_fresh_idx, sorted_fresh_qt = sort_by_max(fresh_qt) + with open("differences.txt", "w") as diff_file: + for idx in sorted_used_idx: + if np.max(qt[idx] < 1e-8): + continue + if np.argmax(qt[idx]) != np.argmax(fresh_qt[idx]): + diff_file.write(f"STATE {idx_to_state(idx)} VALUE IN USED {qt[idx]} VALUE IN FRESH {fresh_qt[idx]}\n") + # relevant_max_id = sorted_indices[-28650:-28500] + # print(f"Max modified idices {relevant_max_id}, values {qt.flatten()[relevant_max_id]}") + # print(f"Min idices {sorted_indices[:20]}, values {qt.flatten()[sorted_indices[:20]]}") + # for i in relevant_max_id: + # print(index_to_state(i)) + + + +if __name__ == "__main__": + inspect("qtables/0-6&0-1.npy") \ No newline at end of file diff --git a/pod/model.py b/pod/model.py index 7043401..00c9773 100644 --- a/pod/model.py +++ b/pod/model.py @@ -1,20 +1,338 @@ import random +from itertools import product from pathlib import Path +from types import CodeType, FunctionType, ModuleType, NoneType from typing import Dict, List, Optional +import matplotlib.figure +import matplotlib.pyplot as plt +import numpy as np import pandas as pd from loguru import logger from pod.common import Object, PodId from pod.feature import __FEATURE__ from pod.pickling import BasePickler, PodAction, PoddingFunction +from pod.xgb_predictor import XGBPredictor class PoddingModel: + def podding_fn(self, obj: Object) -> PodAction: raise NotImplementedError("Abstract method") +class QLearningPoddingModel(PoddingModel): + SIMPLE_TYPES = [t.__name__ for t in [ + float, + int, + complex, + bool, + tuple, + NoneType, + type, + ]] + + SIMPLE_TYPE = "SIMPLE_TYPE" + OTHER = "other" + TYPES = [SIMPLE_TYPE] + [t.__name__ for t in [ + list, + dict, + str, + bytes, + np.ndarray, + pd.DataFrame, + matplotlib.figure.Figure, + FunctionType, + ModuleType, + CodeType, + ]] + [OTHER] + + SPLIT_TYPES = [t.__name__ for t in [ + # Builtin types. + str, + bytes, + list, + dict, + # Numerical types. + np.ndarray, + pd.DataFrame, + matplotlib.figure.Figure, + # Nested types. + FunctionType, + CodeType, + ]] + FINAL_TYPES = [t.__name__ for t in [ + # Builtin types. + str, + bytes, + # Numerical types. + np.ndarray, + pd.DataFrame, + matplotlib.figure.Figure, + ModuleType, + ]] + + SIZES = [1_000_000_000, 1_000_000, 1_000, 100, 1, -2] + PROBABILITIES = [1.0, 0.8, 0.6, 0.4, 0.2, 0.0] + ACTION_CHOICES = list(range(len(list(PodAction)))) + [-1] + actions = list(PodAction) + def __init__(self, predictor: XGBPredictor, qt_path=None, train=False, alpha=0.2, gamma=0.8): + self.state_to_qt_idx = {} + self.action_history = {} + idx = 0 + self.actions = list(PodAction) + self.change_predictor = predictor + num_rows_qt = ( + 2 + * len(QLearningPoddingModel.SIZES) + * len(QLearningPoddingModel.SIZES) + * len(QLearningPoddingModel.PROBABILITIES) + * len(QLearningPoddingModel.PROBABILITIES) + * len(QLearningPoddingModel.TYPES) + * len(QLearningPoddingModel.ACTION_CHOICES) + ) + + if qt_path is not None: + with open(qt_path, "rb") as qtable_path: + self.q_table = np.load(qtable_path) + else: + self.q_table = np.zeros((num_rows_qt, len(self.actions))) + + self.pod_action_to_action = {self.actions[i]: i for i in range(len(self.actions))} + + for idx, (has_changed, obj_s, pod_s, pod_max_change_prob, oid_change_prob, obj_type, past_action) in enumerate( + product( + [True, False], + QLearningPoddingModel.SIZES, + QLearningPoddingModel.SIZES, + QLearningPoddingModel.PROBABILITIES, + QLearningPoddingModel.PROBABILITIES, + QLearningPoddingModel.TYPES, + QLearningPoddingModel.ACTION_CHOICES, + ) + ): + self.state_to_qt_idx[(has_changed, obj_s, pod_s, pod_max_change_prob, oid_change_prob, obj_type, past_action)] = idx + if qt_path is None: + self._set_inductive_bias(obj_type, idx, has_changed, past_action) + + self.train = train + self.features = { + "oid": [], + "pod_size": [], + "pod_max_change_prob": [], + "oid_count": [], + "oid_change_count": [], + "oid_change_prob": [], + "obj_type": [], + "obj_len": [], + "has_changed": [], + "past_action" : [] + } + + if self.train: + self.epsilon = 1 + self.history = [] + self.alpha = alpha + self.gamma = gamma + self.reward_history = [] + self.size_history = [] + self.dump_time_history = [] + + def _set_inductive_bias(self, obj_type, idx, has_changed, past_action): + if obj_type == QLearningPoddingModel.SIMPLE_TYPE: + self.q_table[idx, self.pod_action_to_action[PodAction.bundle]] += 10 + elif obj_type in QLearningPoddingModel.FINAL_TYPES: + self.q_table[idx, self.pod_action_to_action[PodAction.split_final]] += 10 + elif obj_type in QLearningPoddingModel.SPLIT_TYPES: + self.q_table[idx, self.pod_action_to_action[PodAction.split]] += 10 + if not has_changed: + if past_action != -1: + self.q_table[idx, past_action] += 20 + + def get_features(self, obj, pickler): + self.features["oid"].append(id(obj)) + self.features["pod_size"].append(self._get_pod_size(pickler)) + self.features["pod_max_change_prob"].append(__FEATURE__.pod_max_change_prob(pickler)) + self.features["oid_count"].append(__FEATURE__.oid_count(id(obj))) + self.features["oid_change_count"].append(__FEATURE__.oid_change_count(id(obj))) + self.features["oid_change_prob"].append(__FEATURE__.oid_change_prob(id(obj))) + self.features["obj_type"].append(type(obj).__name__) + self.features["obj_len"].append(self._get_obj_len(obj)) + self.features["past_action"].append(self.action_history.get(id(obj), -1)) + return self.features + + def _get_obj_len(self, obj: Object) -> Optional[int]: + try: + len_fn = getattr(obj, "__len__", None) + if len_fn is None: + return None + return len_fn() + except: + return None + + def _get_pod_size(self, pickler: BasePickler) -> Optional[int]: + file = getattr(pickler, "file", None) + if file is None: + logger.warning("Unexpectedly missing pickler's field: file.") + return None + return len(file.getvalue()) + + def _get_pod_id(self, pickler: BasePickler) -> Optional[PodId]: + root_pid = getattr(pickler, "root_pid", None) + if root_pid is None: + logger.warning("Unexpectedly missing pickler's field: root_pid.") + return None + return root_pid + + def _get_pod_max_change_prob(self, pickler: BasePickler) -> Optional[float]: + root_pid = self._get_pod_id(pickler) + if root_pid is None: + return None + return __FEATURE__.pod_max_change_prob(root_pid) + + def podding_fn(self, obj: Object, pickler: BasePickler, history_list: list = None) -> PodAction: + features = self.get_features(obj, pickler) + q_table_idx = self._get_q_table_idx(features) + action_idx = np.argmax(self.q_table[q_table_idx]) + if self.train and random.random() < self.epsilon: + action_idx = random.randint(0, len(self.actions) - 1) # Both sides of range are inclusive + if self.train: + # self.history.append((q_table_idx, action_idx)) + if history_list: + history_list.append((q_table_idx, action_idx)) + self.action_history[id(obj)] = action_idx + return self.actions[action_idx] + + + def post_podding_fn(self) -> None: + for oid in self.features["oid"][len(self.features["has_changed"]) :]: + self.features["has_changed"].append(__FEATURE__.has_changed(oid)) + self.save_features("features.csv") + + def set_epsilon(self, eps): + self.epsilon = eps + + def _get_q_table_idx(self, features): + has_changed = features["has_changed"][-1] if len(features["has_changed"]) > 0 else False + obj_size, pod_size = self._get_size_values(features) + pod_max_change_prob, oid_change_prob = self._get_prob_values(features) + obj_type = features["obj_type"][-1] + past_action = features["past_action"][-1] + if obj_type in QLearningPoddingModel.SIMPLE_TYPES: + obj_type = QLearningPoddingModel.SIMPLE_TYPE + elif obj_type not in QLearningPoddingModel.TYPES: + obj_type = QLearningPoddingModel.OTHER + state = (has_changed, obj_size, pod_size, pod_max_change_prob, oid_change_prob, obj_type, past_action) + return self.state_to_qt_idx[state] + + def _get_size_values(self, features): + obj_size = -1 + pod_size = -1 + feature_obj_len = features["obj_len"][-1] + feature_pod_size = features["pod_size"][-1] + if feature_obj_len is None: + feature_obj_len = -2 + if feature_pod_size is None: + feature_pod_size = -2 + for s in self.SIZES: + if obj_size == -1 and feature_obj_len >= s: + obj_size = s + if pod_size == -1 and feature_pod_size >= s: + pod_size = s + return obj_size, pod_size + + def _get_prob_values(self, features): + pod_max_change_prob = -1.0 + oid_change_prob = -1.0 + for i in range(len(self.PROBABILITIES) - 2): + hi_p, lo_p = self.PROBABILITIES[i], self.PROBABILITIES[i + 1] + feature_pod_max_change = features["pod_max_change_prob"][-1] + if feature_pod_max_change is None: + feature_pod_max_change = 0.0 + if feature_pod_max_change >= lo_p and pod_max_change_prob < 0: + if (hi_p - feature_pod_max_change) <= (feature_pod_max_change - lo_p): + pod_max_change_prob = hi_p + else: + pod_max_change_prob = lo_p + + feature_oid_change_prob = features["oid_change_prob"][-1] + if feature_oid_change_prob is None: + feature_oid_change_prob = 0.0 + if feature_oid_change_prob >= lo_p and oid_change_prob < 0: + if (hi_p - feature_oid_change_prob) <= (feature_oid_change_prob - lo_p): + oid_change_prob = hi_p + else: + oid_change_prob = lo_p + return max(pod_max_change_prob, 0.0), max(oid_change_prob, 0.0) + + def clear_action_history(self): + self.action_history = {} + + def save_features(self, save_path: Path) -> None: + save_path.parent.mkdir(parents=True, exist_ok=True) + pd.DataFrame(self.features).to_csv(save_path) + + def batch_update_q(self, reward): + # logger.info(f"UPDATING ON {len(self.history)} ITEMS") + self.reward_history.append(reward) + self.history = self.history.reverse() + final_state_idx, final_action_idx = self.history[0] + self.q_table[final_state_idx, final_action_idx] = reward + for i in range(1, len(self.history)): + state_idx, action_idx = self.history[i] + next_state_idx, _ = self.history[i - 1] + next_max = np.max(self.q_table[next_state_idx]) + updated_q_val = (1 - self.alpha) * self.q_table[state_idx, action_idx] + self.alpha * (self.gamma * next_max) + self.q_table[state_idx, action_idx] = updated_q_val + self.history = [] + + def batch_update_q_parallel(self, reward_hist_list): + for reward, history in reward_hist_list: + if len(history) > 0: + history.reverse() + final_state_idx, final_action_idx = history[0] + self.q_table[final_state_idx, final_action_idx] = reward + for i in range(1, len(history)): + state_idx, action_idx = history[i] + next_state_idx, _ = history[i - 1] + next_max = np.max(self.q_table[next_state_idx]) + updated_q_val = (1 - self.alpha) * self.q_table[state_idx, action_idx] + self.alpha * ( + self.gamma * next_max + ) + self.q_table[state_idx, action_idx] = updated_q_val + + def save_q_table(self, save_path): + with open(save_path, "wb") as qt_save_path: + np.save(save_path, self.q_table) + + def plot_stats(self, name=""): + plt.cla() + plt.clf() + plt.figure() + plt.plot(self.reward_history) + plt.xlabel("Epoch") + plt.ylabel("Avg. Reward") + plt.savefig(f"reward_plot_{name}.png") + + plt.cla() + plt.clf() + plt.figure() + plt.plot(self.size_history) + plt.xlabel("Epoch") + plt.ylabel("Avg Storage Size") + plt.savefig(f"size_{name}.png") + plt.show() + + plt.cla() + plt.clf() + plt.figure() + plt.plot(self.dump_time_history) + plt.xlabel("Epoch") + plt.ylabel("Avg dump time") + plt.savefig(f"dump_time_{name}.png") + + class RandomPoddingModel: def __init__(self, weights: Optional[List[float]] = None) -> None: self.weights = weights if weights is not None else [1.0 / 2, 1.0 / 4, 1.0 / 4] @@ -56,7 +374,7 @@ def podding_fn(self, obj: Object, pickler: BasePickler) -> PodAction: def post_podding_fn(self) -> None: for oid in self.features["oid"][len(self.features["has_changed"]) :]: self.features["has_changed"].append(__FEATURE__.has_changed(oid)) - self.save(save_path=self._save_path) + self.save(save_path=self._save_path) def save(self, save_path: Path) -> None: save_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/pod/train.py b/pod/train.py new file mode 100644 index 0000000..37e88d4 --- /dev/null +++ b/pod/train.py @@ -0,0 +1,378 @@ +import gc +import os +import shutil +import sys +import time +from dataclasses import dataclass +from functools import partial +from multiprocessing import Process, Queue +from pathlib import Path +from typing import List +import json + +import simple_parsing +from loguru import logger +from model import QLearningPoddingModel +from tqdm import tqdm + +from pod.bench import BenchArgs, NotebookExecutor, Notebooks +from pod.common import PodId +from pod.feature import __FEATURE__ +from pod.pickling import ManualPodding, StaticPodPickling +from pod.storage import FilePodStorage +from pod.xgb_predictor import XGBPredictor + +with open("benchdata.json", "r") as bench_file: + BENCH_DATA = json.load(bench_file) + + +@dataclass +class TrainArgs: + gamma: float + alpha: float + + +def train_notebook_iter(nb_path, update_q: Queue, train_args: TrainArgs, model: QLearningPoddingModel, p_id: int = 0): + # print(nb_path) + args = BenchArgs(expname="", nb=nb_path, sut="pod_file") + # Load notebook. + logger.info(f"PID {os.getpid()}, {nb_path}") + save_file_str = (str(train_args.gamma) + "&" + str(train_args.alpha)).replace(".", "-") + str(p_id) + nb_cells = Notebooks.nb(args=args) + nb_exec = NotebookExecutor(nb_cells) + + bench_sizes = BENCH_DATA[nb_path]["sizes"] + bench_times = BENCH_DATA[nb_path]["times"] + + pod_storage_path = Path(f"tmp/pod{save_file_str}") + if pod_storage_path.exists(): + shutil.rmtree(pod_storage_path) + + hist_list = [] + reward_list = [] + + podding_wrapper = partial(model.podding_fn, history_list=hist_list) + # Initialize sut + sut = StaticPodPickling( + FilePodStorage(Path(f"tmp/pod{save_file_str}")), podding_fn=podding_wrapper, post_podding_fn=model.post_podding_fn + ) + + last_storage_size = 0 + pids: List[PodId] = [] + reward_sum = 0 + times = [] + sz = [] + for nth, (cell, the_globals, the_locals) in enumerate(nb_exec.iter()): + # Dump current state. + dump_start_ts = time.time() + pid = sut.dump(the_locals) + dump_stop_ts = time.time() + + # Record measurements. + cur_size = sut.estimate_size() + pids.append(pid) + + size = cur_size - last_storage_size + dump_time = dump_stop_ts - dump_start_ts + times.append(dump_time) + bench_size = bench_sizes[nth] + bench_time = bench_times[nth] + reward = (0.01 * (bench_time - dump_time) / dump_time) + (100 * (bench_size - size) / bench_size) + sz.append((bench_size - size) / bench_size) + + reward_sum += reward + last_storage_size = cur_size + + # Reset environment to reduce noise. + gc.collect() + reward_list.append(reward) + hist_list.append("|") + + # Generating output to be placed on queue + + final_hist_list = [] + curr_hist_list = [] + for item in hist_list: + if item != "|": + curr_hist_list.append(item) + else: + final_hist_list.append(curr_hist_list) + curr_hist_list = [] + reward_hist_list = [(reward_list[i], final_hist_list[i]) for i in range(len(final_hist_list))] + avg_time = sum(times)/len(times) + update_q.put((reward_hist_list, cur_size, avg_time, reward_sum)) + avg_size = sum(sz)/len(sz) + logger.info(f"{nb_path} AVG TIME {avg_time} AVG SCALED SIZE {avg_size}") + # print("DONE") + return + + +def manual_notebook_iter(nb_path, update_q: Queue, train_args: TrainArgs, p_id: int = 0): + # print(nb_path) + args = BenchArgs(expname="", nb=nb_path, sut="pod_file") + # Load notebook. + logger.info(f"PID {os.getpid()}, {nb_path}") + save_file_str = (str(train_args.gamma) + "&" + str(train_args.alpha)).replace(".", "-") + str(p_id) + nb_cells = Notebooks.nb(args=args) + nb_exec = NotebookExecutor(nb_cells) + + bench_sizes = BENCH_DATA[nb_path]["sizes"] + bench_times = BENCH_DATA[nb_path]["times"] + + pod_storage_path = Path(f"tmp/pod{save_file_str}") + if pod_storage_path.exists(): + shutil.rmtree(pod_storage_path) + + hist_list = [] + reward_list = [] + + # Initialize sut + sut = StaticPodPickling( + FilePodStorage(Path(f"tmp/pod{save_file_str}")), podding_fn=ManualPodding.podding_fn, post_podding_fn=None + ) + + last_storage_size = 0 + # expstat = ExpStat() + pids: List[PodId] = [] + reward_sum = 0 + times = [] + sz = [] + for nth, (cell, the_globals, the_locals) in enumerate(nb_exec.iter()): + # Dump current state. + dump_start_ts = time.time() + pid = sut.dump(the_locals) + dump_stop_ts = time.time() + + # Record measurements. + cur_size = sut.estimate_size() + pids.append(pid) + + size = cur_size - last_storage_size + dump_time = dump_stop_ts - dump_start_ts + times.append(dump_time) + bench_size = bench_sizes[nth] + bench_time = bench_times[nth] + reward = 0.01 * ((bench_time - dump_time) / dump_time) + 100 * ((bench_size - size) / bench_size) + sz.append((bench_size - size) / bench_size) + times.append(dump_time) + + reward_sum += reward + last_storage_size = cur_size + + # Reset environment to reduce noise. + gc.collect() + reward_list.append(reward) + hist_list.append("|") + + # Generating output to be placed on queue + + final_hist_list = [] + curr_hist_list = [] + for item in hist_list: + if item != "|": + curr_hist_list.append(item) + else: + final_hist_list.append(curr_hist_list) + curr_hist_list = [] + avg_time = sum(times)/len(times) + reward_hist_list = [(reward_list[i], final_hist_list[i]) for i in range(len(final_hist_list))] + update_q.put((reward_hist_list, cur_size, avg_time, reward_sum)) + logger.info(f"{nb_path} AVG TIME {avg_time} AVG SCALED SIZE {sum(sz)/len(sz)}") + # print("DONE") + return + + +def train(n_epochs, nbs, args: TrainArgs): + """Trains model on n_epochs on nbs""" + eps = 0.4 + eps_decay_factor = 0.945 + try: + os.mkdir("qtables") + except FileExistsError: + pass + with __FEATURE__: + save_file_str = (str(args.gamma) + "&" + str(args.alpha)).replace(".", "-") + prob_predictor = XGBPredictor(data_dir="podding_features/") + prob_predictor.train() + model = QLearningPoddingModel(train=True, gamma=args.gamma, alpha=args.alpha, predictor=prob_predictor) + for n in tqdm(range(n_epochs)): + model.set_epsilon(eps) + logger.info(f"EPOCH = {n}, GAMMA = {args.gamma}, ALPHA = {args.alpha}, EPSILON = {eps}") + eps *= eps_decay_factor + procs: List[Process] = [] + update_q = Queue() + p_id = 0 + for nb_path in nbs: + p = Process(target=train_notebook_iter, args=(nb_path, update_q, args, model, p_id)) + procs.append(p) + try: + p.start() + p_id += 1 + except: + logger.info("ERROR STARTING PROCESS") + return + + sizes = [] + rewards = [] + times = [] + popped = 0 + while popped < len(nbs): + update_val, size, time, reward_sum = update_q.get() + popped += 1 + model.batch_update_q_parallel(update_val) + sizes.append(size) + times.append(time) + rewards.append(reward_sum) + + for p in procs: + try: + p.join() + except: + logger.info("ERROR JOINING") + + model.size_history.append(sum(sizes) / len(sizes)) + model.dump_time_history.append(sum(times)/len(times)) + avg_reward = sum(rewards) / len(rewards) + model.reward_history.append(avg_reward) + logger.info(f"EPOCH {n}, AVG SUM OF REWARDS {avg_reward}") + model.save_features("podding_features/data.csv") + + if n % 10 == 0: + model.save_q_table(f"qtables/{save_file_str}-{n}.npy") + model.clear_action_history() + + logger.info("PLOTTING") + model.plot_stats(name=save_file_str) + logger.info("DONE PLOTTING") + model.save_q_table(f"qtables/{save_file_str}.npy") + logger.info(f"SAVED TO qtables/{save_file_str}.npy") + + +def eval(qt_path, nbs): + try: + os.mkdir("qtables") + except FileExistsError: + pass + with __FEATURE__: + model = QLearningPoddingModel(train=False, gamma=0, alpha=0, qt_path=qt_path) + procs: List[Process] = [] + update_q = Queue() + p_id = 0 + for nb_path in nbs: + p = Process(target=train_notebook_iter, args=(nb_path, update_q, args, model, p_id)) + procs.append(p) + try: + p.start() + p_id += 1 + except: + logger.info("ERROR STARTING PROCESS") + return + + sizes = [] + rewards = [] + popped = 0 + while popped < len(nbs): + update_val, size, time, reward_sum = update_q.get() + popped += 1 + sizes.append(size) + rewards.append(reward_sum) + + for p in procs: + try: + p.join() + except: + logger.info("ERROR JOINING") + print("DONE W ITERS") + + logger.info(f"AVG SIZE {sum(sizes)/len(sizes)}") + avg_reward = sum(rewards) / len(rewards) + logger.info(f"AVG SUM OF REWARDS {avg_reward}") + # model.save_q_table(f"qtables/EVAL.npy") + + +def bench(nbs): + with __FEATURE__: + procs: List[Process] = [] + update_q = Queue() + p_id = 0 + for nb_path in nbs: + p = Process(target=manual_notebook_iter, args=(nb_path, update_q, args, p_id)) + procs.append(p) + try: + p.start() + p_id += 1 + except: + logger.info("ERROR STARTING PROCESS") + return + + sizes = [] + rewards = [] + popped = 0 + while popped < len(nbs): + update_val, size, reward_sum = update_q.get() + popped += 1 + sizes.append(size) + rewards.append(reward_sum) + + for p in procs: + try: + p.join() + except: + logger.info("ERROR JOINING") + + logger.info(f"AVG SIZE {sum(sizes)/len(sizes)}") + avg_reward = sum(rewards) / len(rewards) + logger.info(f"AVG SUM OF REWARDS {avg_reward}") + + +if __name__ == "__main__": + # logger.info(f"Arguments {sys.argv}") + if len(sys.argv) != 5: + logger.error("Usage --gamma --alpha ") + sys.exit(2) + args = simple_parsing.parse(TrainArgs, args=sys.argv[1:]) + train(75, [ + "notebooks/it-s-that-time-of-the-year-again.ipynb", + "notebooks/better-xgb-baseline.ipynb", + "notebooks/fast-fourier-transform-denoising.ipynb", + "notebooks/cv19w3-2-v2-play-2-v3fix-sub-last6dayopt.ipynb", + # "notebooks/amex-dataset.ipynb", + "notebooks/denoising-with-direct-wavelet-transform.ipynb", + "notebooks/04_training_linear_models.ipynb" + ], args) + + # eval("qtables/EVAL.npy", [ + # "notebooks/it-s-that-time-of-the-year-again.ipynb", + # "notebooks/better-xgb-baseline.ipynb", + # "notebooks/fast-fourier-transform-denoising.ipynb", + # "notebooks/cv19w3-2-v2-play-2-v3fix-sub-last6dayopt.ipynb", + # # "notebooks/amex-dataset.ipynb", + # "notebooks/denoising-with-direct-wavelet-transform.ipynb", + # "notebooks/04_training_linear_models.ipynb" + # ]) + + # bench( + # [ + # "notebooks/it-s-that-time-of-the-year-again.ipynb", + # "notebooks/better-xgb-baseline.ipynb", + # "notebooks/fast-fourier-transform-denoising.ipynb", + # "notebooks/cv19w3-2-v2-play-2-v3fix-sub-last6dayopt.ipynb", + # # "notebooks/amex-dataset.ipynb", + # "notebooks/denoising-with-direct-wavelet-transform.ipynb", + # "notebooks/04_training_linear_models.ipynb", + + # ] + # ) + + # train(1, ["notebooks/simple.ipynb", "notebooks/04_training_linear_models.ipynb"], args) - good, < 5 min + # train(1, ["notebooks/twitter_networks.ipynb"], args), 11 min + # train(1, ["notebooks/amex-dataset.ipynb"], args) < 5 min + # train(1, ["notebooks/better-xgb-baseline.ipynb"], args) good 5 min + # train(1, ["notebooks/it-s-that-time-of-the-year-again.ipynb"], args) < 5 min, very sim to 2nd place ncaaw + # "notebooks/2nd-place-ncaaw-2021.ipynb" good < 5 min + # "notebooks/fast-fourier-transform-denoising.ipynb" good < 5 min + # "notebooks/denoising-with-direct-wavelet-transform.ipynb" good < 5 min + # "notebooks/ncaa-starter-the-simpler-the-better.ipynb" good < 5 min + # baysean needs ffmpeg notebooks/introduction-to-bayesian-inference.ipynb good < 5 min + # notebooks/cv19w3-2-v2-play-2-v3fix-sub-last6dayopt.ipynb good < 5 min + # notebooks/numpy.ipynb good < 5 min diff --git a/pod/xgb_predictor.py b/pod/xgb_predictor.py new file mode 100644 index 0000000..2d49d37 --- /dev/null +++ b/pod/xgb_predictor.py @@ -0,0 +1,57 @@ +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import seaborn as sns +import xgboost as xgb +from xgboost import XGBClassifier +import os +from sklearn.model_selection import train_test_split +import pickle + + + +class XGBPredictor: + def __init__(self, data_dir): + self.data_dir = data_dir + + def train(self): + feature_paths = [] + for f in os.listdir(self.data_dir): + feature_paths.append(os.path.join(self.data_dir, f)) + df = pd.concat([pd.read_csv(feature_path) for feature_path in feature_paths], ignore_index=True) + X_df = df[[ + # "oid", + "pod_size", + "pod_max_change_prob", + "oid_count", + "oid_change_count", + "oid_change_prob", + # "obj_type", + "obj_len", + ]] + # X_df.loc[:,"obj_type"] = X_df["obj_type"].astype("category") + ys = df["has_changed"].astype("int").values + + X_train, X_test, y_train, y_test = train_test_split(X_df, ys, test_size=.2) + pos_fraction = y_train.sum() / len(y_train) + neg_fraction = 1 - pos_fraction + neg_pos_ratio = neg_fraction / pos_fraction + est = XGBClassifier(enable_categorical=True, n_estimators=3, scale_pos_weight=neg_pos_ratio) + est.fit(X_train, y_train) + y_test_pred = est.predict(X_test) + acc = (y_test == y_test_pred).mean() + self.est = est + + + def predict(self, data): + probs = self.est.predict_proba(data) + return probs + + def save(self, save_path): + with open(save_path, "wb") as model_file: + pickle.dump(self.est, model_file) + + def load(self, load_path): + with open(load_path, "rb") as model_file: + model = pickle.load(model_file) + return model \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index fc7d989..94011d1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,237 +1,239 @@ -absl-py==2.0.0 -annotated-types==0.6.0 -anyio==4.2.0 -appnope==0.1.3 -argon2-cffi==23.1.0 -argon2-cffi-bindings==21.2.0 -arrow==1.3.0 -asttokens==2.4.1 -astunparse==1.6.3 -async-lru==2.0.4 -attrs==23.2.0 -Babel==2.14.0 -beautifulsoup4==4.12.2 -black==23.12.1 -bleach==6.1.0 -blis==0.7.11 -bokeh==3.3.2 -cachetools==5.3.2 -catalogue==2.0.10 -catboost==1.2.2 -certifi==2023.11.17 -cffi==1.16.0 -charset-normalizer==3.3.2 -click==8.1.7 -cloudpathlib==0.16.0 -cloudpickle==3.0.0 -cmdstanpy==1.2.0 -comm==0.2.0 -confection==0.1.4 -contourpy==1.2.0 -coverage==7.4.0 -cycler==0.12.1 -cymem==2.0.8 -dataclasses==0.6 -dataclasses-json==0.6.3 -debugpy==1.8.0 -decorator==5.1.1 -defusedxml==0.7.1 -dill==0.3.7 -dm-tree==0.1.8 -docstring-parser==0.15 -eli5==0.13.0 -executing==2.0.1 -fastjsonschema==2.19.1 -filelock==3.13.1 -flake8==6.1.0 -flatbuffers==23.5.26 -fonttools==4.47.0 -fqdn==1.5.1 -fsspec==2023.12.2 -gast==0.5.4 -ghp-import==2.1.0 -gitchangelog==3.0.4 -google-auth==2.26.1 -google-auth-oauthlib==1.2.0 -google-pasta==0.2.0 -graphviz==0.20.1 -grpcio==1.60.0 -h5py==3.10.0 -holidays==0.40 -huggingface-hub==0.20.1 -idna==3.6 -imbalanced-learn==0.11.0 -imblearn==0.0 -importlib-resources==6.1.1 -iniconfig==2.0.0 -ipykernel==6.28.0 -ipython==8.19.0 -ipywidgets==8.1.1 -isoduration==20.11.0 -isort==5.13.2 -jedi==0.19.1 -Jinja2==3.1.2 -joblib==1.3.2 -json5==0.9.14 -jsonpointer==2.4 -jsonschema==4.20.0 -jsonschema-specifications==2023.12.1 -jupyter==1.0.0 -jupyter-console==6.6.3 -jupyter-events==0.9.0 -jupyter-lsp==2.2.1 -jupyter_client==8.6.0 -jupyter_core==5.5.1 -jupyter_server==2.12.1 -jupyter_server_terminals==0.5.1 -jupyterlab==4.0.10 -jupyterlab-widgets==3.0.9 -jupyterlab_pygments==0.3.0 -jupyterlab_server==2.25.2 -keras==2.15.0 -kiwisolver==1.4.5 -langcodes==3.3.0 -libclang==16.0.6 -lightgbm==4.2.0 -llvmlite==0.41.1 -loguru==0.7.2 -Markdown==3.5.1 -markdown-it-py==3.0.0 -MarkupSafe==2.1.3 -marshmallow==3.20.1 -matplotlib==3.8.2 -matplotlib-inline==0.1.6 -mccabe==0.7.0 -mdurl==0.1.2 -mergedeep==1.3.4 -mistune==3.0.2 -mkdocs==1.5.3 -ml-dtypes==0.2.0 -mpmath==1.3.0 -murmurhash==1.0.10 -mypy==1.8.0 -mypy-extensions==1.0.0 -namex==0.0.7 -nbclient==0.9.0 -nbconvert==7.13.1 -nbformat==5.9.2 -neo4j==5.16.0 -nest-asyncio==1.5.8 -networkx==3.2.1 -nltk==3.8.1 -notebook==7.0.6 -notebook_shim==0.2.3 -numba==0.58.1 -numpy==1.26.2 -oauthlib==3.2.2 -opt-einsum==3.3.0 -overrides==7.4.0 -packaging==23.2 -pandas==2.1.4 -pandocfilters==1.5.0 -parso==0.8.3 -pathspec==0.12.1 -patsy==0.5.5 -pexpect==4.9.0 -Pillow==10.1.0 -platformdirs==4.1.0 -plotly==5.18.0 -pluggy==1.3.0 -preshed==3.0.9 -prometheus-client==0.19.0 -prompt-toolkit==3.0.43 -prophet==1.1.5 -protobuf==4.23.4 -psutil==5.9.7 -psycopg2-binary==2.9.9 -ptyprocess==0.7.0 -pure-eval==0.2.2 -pyasn1==0.5.1 -pyasn1-modules==0.3.0 -pycodestyle==2.11.1 -pycparser==2.21 -pydantic==2.5.3 -pydantic_core==2.14.6 -pyflakes==3.1.0 -Pygments==2.17.2 -pymongo==4.6.1 -pyparsing==3.1.1 -pytest==7.4.4 -pytest-cov==4.1.0 -python-dateutil==2.8.2 -python-json-logger==2.0.7 -pytz==2023.3.post1 -PyYAML==6.0.1 -pyyaml_env_tag==0.1 -pyzmq==25.1.2 -qtconsole==5.5.1 -QtPy==2.4.1 -redis==5.0.1 -referencing==0.32.0 -regex==2023.12.25 -requests==2.31.0 -requests-oauthlib==1.3.1 -rfc3339-validator==0.1.4 -rfc3986-validator==0.1.1 -rich==13.7.0 -rpds-py==0.16.2 -rsa==4.9 -safetensors==0.4.1 -scikit-learn==1.3.2 -scipy==1.11.4 -seaborn==0.13.1 -Send2Trash==1.8.2 -shap==0.44.0 -simple-parsing==0.1.4 -six==1.16.0 -slicer==0.0.7 -smart-open==6.4.0 -sniffio==1.3.0 -soupsieve==2.5 -spacy==3.7.2 -spacy-legacy==3.0.12 -spacy-loggers==1.0.5 -srsly==2.4.8 -stack-data==0.6.3 -stanio==0.3.0 -statsmodels==0.14.1 -sympy==1.12 -tabulate==0.9.0 -tenacity==8.2.3 -tensorboard==2.15.1 -tensorboard-data-server==0.7.2 -tensorflow==2.15.0 -tensorflow-estimator==2.15.0 -tensorflow-io-gcs-filesystem==0.34.0 -termcolor==2.4.0 -terminado==0.18.0 -thinc==8.2.2 -threadpoolctl==3.2.0 -tinycss2==1.2.1 -tokenizers==0.15.0 -torch==2.1.2 -torchvision==0.16.2 -tornado==6.4 -tqdm==4.66.1 -traitlets==5.14.0 -transformers==4.36.2 -typer==0.9.0 -types-python-dateutil==2.8.19.14 -typing-inspect==0.9.0 -typing_extensions==4.9.0 -tzdata==2023.4 -uri-template==1.3.0 -urllib3==2.1.0 -wasabi==1.1.2 -watchdog==3.0.0 -wcwidth==0.2.12 -weasel==0.3.4 -webcolors==1.13 -webencodings==0.5.1 -websocket-client==1.7.0 -Werkzeug==3.0.1 -widgetsnbextension==4.0.9 -wordcloud==1.9.3 -wrapt==1.14.1 -xgboost==2.0.3 -xyzservices==2023.10.1 +absl-py +annotated-types +anyio +appnope +argon2-cffi +argon2-cffi-bindings +arrow +asttokens +astunparse +async-lru +attrs +Babel +beautifulsoup4 +black +bleach +blis +bokeh +cachetools +catalogue +catboost +certifi +cffi +charset-normalizer +click +cloudpathlib +cloudpickle +cmdstanpy +comm +confection +contourpy +coverage +cycler +cymem +dataclasses +dataclasses-json +debugpy +decorator +defusedxml +dill +dm-tree +docstring-parser +eli5 +executing +fastjsonschema +filelock +flake8 +flatbuffers +fonttools +fqdn +fsspec +gast +ghp-import +gitchangelog +google-auth +google-auth-oauthlib +google-pasta +graphviz +grpcio +h5py +holidays +huggingface-hub +idna +imbalanced-learn +imblearn +importlib-resources +iniconfig +ipykernel +ipython +ipywidgets +isoduration +isort +jedi +Jinja2 +joblib +json5 +jsonpointer +jsonschema +jsonschema-specifications +jupyter +jupyter-console +jupyter-events +jupyter-lsp +jupyter_client +jupyter_core +jupyter_server +jupyter_server_terminals +jupyterlab +jupyterlab-widgets +jupyterlab_pygments +jupyterlab_server +keras +kiwisolver +langcodes +libclang +lightgbm +llvmlite +loguru +Markdown +markdown-it-py +MarkupSafe +marshmallow +matplotlib +matplotlib-inline +mccabe +mdurl +mergedeep +mistune +mkdocs +ml-dtypes +mpmath +murmurhash +mypy +mypy-extensions +namex +nbclient +nbconvert +nbformat +neo4j +nest-asyncio +networkx +nltk +notebook +notebook_shim +numba +numpy +oauthlib +opt-einsum +overrides +packaging +pandas +pandocfilters +parso +pathspec +patsy +pexpect +Pillow +platformdirs +plotly +pluggy +preshed +prometheus-client +prompt-toolkit +prophet +protobuf +psutil +psycopg2-binary +ptyprocess +pure-eval +pyarrow +PyWavelets +pyasn1 +pyasn1-modules +pycodestyle +pycparser +pydantic +pydantic_core +pyflakes +Pygments +pymongo +pyparsing +pytest +pytest-cov +python-dateutil +python-json-logger +pytz +PyYAML +pyyaml_env_tag +pyzmq +qtconsole +QtPy +redis +referencing +regex +requests +requests-oauthlib +rfc3339-validator +rfc3986-validator +rich +rpds-py +rsa +safetensors +scikit-learn +scipy +seaborn +Send2Trash +shap +simple-parsing +six +slicer +smart-open +sniffio +soupsieve +spacy +spacy-legacy +spacy-loggers +srsly +stack-data +stanio +statsmodels +sympy +tabulate +tenacity +tensorboard +tensorboard-data-server +tensorflow +tensorflow-estimator +tensorflow-io-gcs-filesystem +termcolor +terminado +thinc +threadpoolctl +tinycss2 +tokenizers +torch +torchvision +tornado +tqdm +traitlets +transformers +typer +types-python-dateutil +typing-inspect +typing_extensions +tzdata +uri-template +urllib3 +wasabi +watchdog +wcwidth +weasel +webcolors +webencodings +websocket-client +Werkzeug +widgetsnbextension +wordcloud +wrapt +xgboost +xyzservices