diff --git a/.DS_Store b/.DS_Store index 5008ddfc..3c8f8a5e 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/logicnet/.DS_Store b/logicnet/.DS_Store new file mode 100644 index 00000000..e5c26b3c Binary files /dev/null and b/logicnet/.DS_Store differ diff --git a/neurons/.DS_Store b/neurons/.DS_Store new file mode 100644 index 00000000..bceaafaa Binary files /dev/null and b/neurons/.DS_Store differ diff --git a/neurons/validator/validator.py b/neurons/validator/validator.py index 4ebaedb7..ac06a135 100644 --- a/neurons/validator/validator.py +++ b/neurons/validator/validator.py @@ -1,6 +1,7 @@ import os from dotenv import load_dotenv import asyncio + load_dotenv() import pickle import time @@ -46,6 +47,7 @@ bt.logging.info(f"VALIDATOR_USERNAME: {validator_username}") bt.logging.info(f"MINIO_ENDPOINT: {minio_endpoint}") + def init_category(config=None, model_pool=None): category = { "Logic": { @@ -64,9 +66,10 @@ def init_category(config=None, model_pool=None): "meta-llama/Llama-2-7b-chat-hf", "meta-llama/Llama-2-13b-chat-hf", "mistralai/Mistral-7B-Instruct-v0.2", - "mistralai/Mistral-7B-Instruct" + "mistralai/Mistral-7B-Instruct", ] + def get_latest_previous_log_file(log_files): """Return the second-most-recent log file based on modification time.""" if len(log_files) < 2: @@ -75,6 +78,7 @@ def get_latest_previous_log_file(log_files): sorted_files = sorted(log_files, key=lambda x: os.path.getmtime(x), reverse=True) return sorted_files[1] # Second file is the latest previous + class Validator(BaseValidatorNeuron): def __init__(self, config=None): """ @@ -93,31 +97,37 @@ def __init__(self, config=None): openai_key = os.getenv("OPENAI_API_KEY") if not openai_key: bt.logging.warning("OPENAI_API_KEY is not set. Please set it to use OpenAI") - raise ValueError("OPENAI_API_KEY is not set. Please set it to use OpenAI or restart the validator.") - + raise ValueError( + "OPENAI_API_KEY is not set. Please set it to use OpenAI or restart the validator." + ) + if self.config.llm_client.gpt_url and self.config.llm_client.gpt_model: self.model_pool["openai"] = [ self.config.llm_client.gpt_url, openai_key, - self.config.llm_client.gpt_model + self.config.llm_client.gpt_model, ] if self.config.llm_client.vllm_url and self.config.llm_client.vllm_model: self.model_pool["vllm"] = [ - self.config.llm_client.vllm_url, - self.config.llm_client.vllm_key, - self.config.llm_client.vllm_model + self.config.llm_client.vllm_url, + self.config.llm_client.vllm_key, + self.config.llm_client.vllm_model, ] for key, value in self.model_pool.items(): if value[2] in model_blacklist: - bt.logging.warning(f"Model {value[2]} is blacklisted. Please use another model.") + bt.logging.warning( + f"Model {value[2]} is blacklisted. Please use another model." + ) del self.model_pool[key] - + # Check if all models are invalid if not self.model_pool: bt.logging.warning("All models are invalid. Validator cannot proceed.") - raise ValueError("All models are invalid. Please configure at least one model and restart the validator.") - + raise ValueError( + "All models are invalid. Please configure at least one model and restart the validator." + ) + self.push_logs_to_minio() self.categories = init_category(self.config, self.model_pool) self.miner_manager = MinerManager(self) @@ -162,9 +172,8 @@ def forward(self): while time.time() - loop_start < loop_base_time: iter_start = time.time() threads = [] - for (uids, should_rewards) in self.query_queue.get_batch_query( - batch_size=self.config.batch_size, - batch_number=self.config.batch_number + for uids, should_rewards in self.query_queue.get_batch_query( + batch_size=self.config.batch_size, batch_number=self.config.batch_number ): bt.logging.info( f"\033[1;34m🔍 Querying {len(uids)} uids for model {self.config.llm_client.gpt_model}\033[0m" @@ -190,18 +199,25 @@ def forward(self): self.miner_uids.append(uids) self.miner_scores.append(rewards) - bt.logging.info(f"\033[1;32m🟢 Validator iteration completed in {time.time() - iter_start} seconds\033[0m") - + bt.logging.info( + f"\033[1;32m🟢 Validator iteration completed in {time.time() - iter_start} seconds\033[0m" + ) + # Assign incentive rewards - bt.logging.info(f"\033[1;32m🟢 Assign incentive rewards for miner {self.miner_uids}") - self.assign_incentive_rewards(self.miner_uids, self.miner_scores, self.miner_reward_logs) + bt.logging.info( + f"\033[1;32m🟢 Assign incentive rewards for miner {self.miner_uids}" + ) + self.assign_incentive_rewards( + self.miner_uids, self.miner_scores, self.miner_reward_logs + ) # Update scores on chain self.update_scores_on_chain() self.save_state() # self.store_miner_infomation() - bt.logging.info(f"\033[1;32m🟢 Validator loop completed in {time.time() - loop_start} seconds\033[0m") - + bt.logging.info( + f"\033[1;32m🟢 Validator loop completed in {time.time() - loop_start} seconds\033[0m" + ) def push_logs_to_minio(self): ######################################################### @@ -213,7 +229,9 @@ def push_logs_to_minio(self): bt.logging.info(f"\033[1;32m🟢 Pushing out log files to MinIO\033[0m") log_regex = os.path.join(pm2_log_dir, f"*{app_name}*out*.log") out_log_files = glob.glob(log_regex) - bt.logging.info(f"\033[1;32m🟢 Out log files: {out_log_files}, regex: {log_regex}\033[0m") + bt.logging.info( + f"\033[1;32m🟢 Out log files: {out_log_files}, regex: {log_regex}\033[0m" + ) current_file_count = len(out_log_files) # Detect rotation (new file added) @@ -223,16 +241,23 @@ def push_logs_to_minio(self): if previous_file != last_out_file_name and previous_file: last_out_file_name = previous_file file_name = os.path.basename(previous_file) - if file_name not in self.minio_manager.get_uploaded_files(log_bucket_name): + if file_name not in self.minio_manager.get_uploaded_files( + log_bucket_name + ): bt.logging.info(f"Uploading {previous_file} to MinIO") - if self.minio_manager.upload_file(previous_file, log_bucket_name, validator_username): - bt.logging.info(f"\033[1;32m✅ Uploaded {file_name} to MinIO\033[0m") + if self.minio_manager.upload_file( + previous_file, log_bucket_name, validator_username + ): + bt.logging.info( + f"\033[1;32m✅ Uploaded {file_name} to MinIO\033[0m" + ) ######################################################### - ######################################################### # UPLOAD ERR LOG FILES - err_log_files = glob.glob(os.path.join(pm2_log_dir, f"*{app_name}-error*.log")) + err_log_files = glob.glob( + os.path.join(pm2_log_dir, f"*{app_name}-error*.log") + ) # bt.logging.info(err_log_files) current_file_count = len(err_log_files) @@ -243,16 +268,23 @@ def push_logs_to_minio(self): if previous_file != last_err_file_name and previous_file: last_err_file_name = previous_file file_name = os.path.basename(previous_file) - if file_name not in self.minio_manager.get_uploaded_files(log_bucket_name): + if file_name not in self.minio_manager.get_uploaded_files( + log_bucket_name + ): bt.logging.info(f"Uploading {previous_file} to MinIO") - if self.minio_manager.upload_file(previous_file, log_bucket_name, validator_username): - bt.logging.info(f"\033[1;32m✅ Uploaded {file_name} to MinIO\033[0m") + if self.minio_manager.upload_file( + previous_file, log_bucket_name, validator_username + ): + bt.logging.info( + f"\033[1;32m✅ Uploaded {file_name} to MinIO\033[0m" + ) ######################################################### except Exception as e: bt.logging.error(f"Error uploading log files: {e}") - - def run_async_query(self, category: str, uids: list[int], should_rewards: list[int]): + def run_async_query( + self, category: str, uids: list[int], should_rewards: list[int] + ): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: @@ -275,14 +307,18 @@ async def async_query_and_reward( synapses, batched_uids_should_rewards = self.prepare_challenge( uids_should_rewards, category ) - - for synapse, uids_should_rewards in zip(synapses, batched_uids_should_rewards): + + for synapse, uids_should_rewards in zip( + synapses, batched_uids_should_rewards + ): uids, should_rewards = zip(*uids_should_rewards) if not synapse: continue base_synapse = synapse.model_copy() synapse = synapse.miner_synapse() - bt.logging.info(f"\033[1;34m🧠 Synapse to be sent to miners: {synapse}\033[0m") + bt.logging.info( + f"\033[1;34m🧠 Synapse to be sent to miners: {synapse}\033[0m" + ) axons = [self.metagraph.axons[int(uid)] for uid in uids] sent_time = time.time() # Use aquery instead of query @@ -293,7 +329,9 @@ async def async_query_and_reward( timeout=self.categories[category]["timeout"], ) for axon, response in zip(axons, responses): - bt.logging.info(f"\033[1;34m🧠 {time.time() - sent_time}s Response from {axon}: {response}\033[0m ") + bt.logging.info( + f"\033[1;34m🧠 {time.time() - sent_time}s Response from {axon}: {response}\033[0m " + ) reward_responses = [ response @@ -301,18 +339,22 @@ async def async_query_and_reward( if should_reward ] reward_uids = [ - uid for uid, should_reward in zip(uids, should_rewards) if should_reward + uid + for uid, should_reward in zip(uids, should_rewards) + if should_reward ] if reward_uids: - uids, rewards, reward_logs = self.categories[category]["rewarder"]( - reward_uids, reward_responses, base_synapse - ) + uids, rewards, reward_logs = self.categories[category][ + "rewarder" + ](reward_uids, reward_responses, base_synapse) for i, uid in enumerate(uids): if rewards[i] > 0: rewards[i] = rewards[i] * ( - 0.9 + 0.1 * self.miner_manager.all_uids_info[uid].reward_scale + 0.9 + + 0.1 + * self.miner_manager.all_uids_info[uid].reward_scale ) unique_logs = {} @@ -327,7 +369,9 @@ async def async_query_and_reward( f"Task ID: [{log['task_uid']}], Miner UID: {log['miner_uid']}, Reward: {log['reward']}, Correctness: {log['correctness']}, Similarity: {log['similarity']}, Process Time: {log['process_time']}, Miner Response: {log['miner_response']}, Ground Truth: {log['ground_truth']}" ) formatted_logs_str = json.dumps(logs_str, indent=5) - bt.logging.info(f"\033[1;32m🏆 Miner Scores: {formatted_logs_str}\033[0m") + bt.logging.info( + f"\033[1;32m🏆 Miner Scores: {formatted_logs_str}\033[0m" + ) if rewards and reward_logs and uids: # Queue the results instead of directly appending self.reward_queue.put((reward_logs, uids, rewards)) @@ -373,20 +417,22 @@ def assign_incentive_rewards(self, uids, rewards, reward_logs): # Now uids_scores holds all rewards each UID achieved this epoch # Convert them into lists for processing final_uids = list(uids_scores.keys()) - representative_logs = [logs[0] for logs in uids_logs.values()] - + representative_logs = [logs[0] for logs in uids_logs.values()] + ## compute mean value of rewards - final_rewards = [sum(uid_rewards) / len(uid_rewards) for uid_rewards in uids_scores.values()] + final_rewards = [ + sum(uid_rewards) / len(uid_rewards) for uid_rewards in uids_scores.values() + ] ## set the rewards to 0 if the mean is negative final_rewards = [reward if reward > 0 else 0 for reward in final_rewards] # Now proceed with the incentive rewards calculation on these mean attempts original_rewards = list(enumerate(final_rewards)) # Sort and rank as before, but now we're dealing with mean attempts. - + # Sort rewards in descending order based on the score sorted_rewards = sorted(original_rewards, key=lambda x: x[1], reverse=True) - + # Calculate ranks, handling ties ranks = [] previous_score = None @@ -396,17 +442,19 @@ def assign_incentive_rewards(self, uids, rewards, reward_logs): rank = i + 1 ranks.append((reward_id, rank, score)) # previous_score = score - + # Restore the original order ranks.sort(key=lambda x: x[0]) # Calculate incentive rewards based on the rank, applying the cubic function for positive scores def incentive_formula(rank): - reward_value = -1.038e-7 * rank**3 + 6.214e-5 * rank**2 - 0.0129 * rank - 0.0118 + reward_value = ( + -1.038e-7 * rank**3 + 6.214e-5 * rank**2 - 0.0129 * rank - 0.0118 + ) # Scale up the reward value between 0 and 1 scaled_reward_value = reward_value + 1 return scaled_reward_value - + # incentive_rewards = [ # (incentive_formula(rank) if score > 0.3 else 0) for _, rank, score in ranks # ] @@ -417,12 +465,16 @@ def incentive_formula(rank): if score > 0.3 and rank <= 160: incentive_rewards.append(incentive_formula(rank)) else: - incentive_rewards.append(incentive_formula(250)) # add smallest reward for top 90 bad miners + incentive_rewards.append( + incentive_formula(250) + ) # add smallest reward for top 90 bad miners bt.logging.info(f"\033[1;32m🟢 Final Uids: {final_uids}\033[0m") bt.logging.info(f"\033[1;32m🟢 Incentive rewards: {incentive_rewards}\033[0m") - self.miner_manager.update_scores(final_uids, incentive_rewards, representative_logs) - + self.miner_manager.update_scores( + final_uids, incentive_rewards, representative_logs + ) + # Reset logs for next epoch self.miner_scores = [] self.miner_reward_logs = [] @@ -463,28 +515,21 @@ def prepare_challenge(self, uids_should_rewards, category): return synapses, batched_uids_should_rewards def update_scores_on_chain(self): - """Performs exponential moving average on the scores based on the rewards received from the miners.""" + """Burn all miner emissions by directing weight to the owner at uid 51.""" weights = torch.zeros(len(self.miner_manager.all_uids)) - for category in self.categories.keys(): - model_specific_weights = self.miner_manager.get_model_specific_weights( - category - ) - model_specific_weights = ( - model_specific_weights * self.categories[category]["incentive_weight"] - ) + + # All emissions are redirected to uid 51; others receive zero weight. + if len(weights) > 51: + weights[51] = 1.0 bt.logging.info( - f"\033[1;34m⚖️ model_specific_weights for {category}\n{model_specific_weights}\033[0m" + "\033[1;34m⚖️ Burning emissions: setting all weight to uid 51\033[0m" ) - weights = weights + model_specific_weights - - # Check if rewards contains NaN values. - if torch.isnan(weights).any(): + else: bt.logging.warning( - f"\033[1;33m⚠️ NaN values detected in weights: {weights}\033[0m" + "\033[1;33m⚠️ Metagraph has fewer than 52 uids; cannot burn emissions to uid 51\033[0m" ) - # Replace any NaN values in rewards with 0. - weights = torch.nan_to_num(weights, 0) + self.scores: torch.FloatTensor = weights bt.logging.success(f"\033[1;32m✅ Updated scores: {self.scores}\033[0m") @@ -537,20 +582,23 @@ def load_state(self): except Exception as e: bt.logging.error(f"Failed to load from .pt format: {e}") self.step = 0 # Default fallback when both load attempts fail - bt.logging.error("Could not find previously saved state or error loading it.") + bt.logging.error( + "Could not find previously saved state or error loading it." + ) except Exception as e: self.step = 0 # Default fallback in case of an unknown error bt.logging.error(f"Error loading state: {e}") - def store_miner_infomation(self): miner_informations = self.miner_manager.to_dict() def _post_miner_informations(miner_informations): # Convert miner_informations to a JSON-serializable format - serializable_miner_informations = convert_to_serializable(miner_informations) - + serializable_miner_informations = convert_to_serializable( + miner_informations + ) + try: response = requests.post( url=self.config.storage.storage_url, @@ -560,23 +608,29 @@ def _post_miner_informations(miner_informations): }, ) if response.status_code == 200: - bt.logging.info("\033[1;32m✅ Miner information successfully stored.\033[0m") + bt.logging.info( + "\033[1;32m✅ Miner information successfully stored.\033[0m" + ) else: bt.logging.warning( f"\033[1;33m⚠️ Failed to store miner information, status code: {response.status_code}\033[0m" ) except requests.exceptions.RequestException as e: - bt.logging.error(f"\033[1;31m❌ Error storing miner information: {e}\033[0m") + bt.logging.error( + f"\033[1;31m❌ Error storing miner information: {e}\033[0m" + ) def convert_to_serializable(data): # Implement conversion logic for serialization if isinstance(data, dict): - return {key: convert_to_serializable(value) for key, value in data.items()} + return { + key: convert_to_serializable(value) for key, value in data.items() + } elif isinstance(data, list): return [convert_to_serializable(element) for element in data] elif isinstance(data, (int, str, bool, float)): return data - elif hasattr(data, '__float__'): + elif hasattr(data, "__float__"): return float(data) else: return str(data) @@ -587,9 +641,10 @@ def convert_to_serializable(data): ) thread.start() + # The main function parses the configuration and runs the validator. -if __name__ == "__main__": +if __name__ == "__main__": with Validator() as validator: while True: bt.logging.info("\033[1;32m🟢 Validator running...\033[0m", time.time()) - time.sleep(60) \ No newline at end of file + time.sleep(60)