-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulated_annealing.py
More file actions
556 lines (486 loc) · 24.5 KB
/
simulated_annealing.py
File metadata and controls
556 lines (486 loc) · 24.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
from collections import Counter
from typing import Dict, List, Optional, Union, Any
import logging
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import random
import seaborn as sns
import time
import torch
from PepBD_surrogate import ScoreModel
from scipy.stats import entropy
class PeptideGenerator:
def __init__(
self,
session_name: str = 'default',
initial_temperature: float = 0.5,
cooling_rate: float = 0.9,
temperature_min: float = 0.1,
max_iterations: int = 2500,
steps_per_temp: int = 75,
move_probabilities: Dict[str, float] = {'substitution': 0.75, 'swap': 0.25},
cooling_schedule: str = 'exponential',
log_dir: str = 'logs',
seed: Optional[int] = None,
score_threshold: float = -50.0,
re_initiate: bool = False,
plastic_type: Optional[str] = None
):
"""
Initialize the PeptideGenerator with simulated annealing parameters.
Parameters:
- session_name: Name to identify this run
- initial_temperature: Initial temperature for the SA algorithm
- cooling_rate: Cooling rate for temperature reduction
- temperature_min: Minimum temperature to terminate the algorithm
- max_iterations: Maximum number of iterations
- steps_per_temp: Number of steps at each temperature before cooling
- move_probabilities: Dict specifying probabilities for different moves
- cooling_schedule: Type of cooling schedule ('exponential', 'linear', 'logarithmic')
- log_dir: Directory to save logs
- seed: Random seed for reproducibility
- score_threshold: Threshold for saving intermediate peptides (default: -50.0)
- re_initiate: Whether to run optimization twice, using best peptide from first run (default: False)
- plastic_type: Optional plastic type for the PeptideGenerator
"""
self.session_name = session_name
self.initial_temperature = initial_temperature
self.temperature = initial_temperature
self.cooling_rate = cooling_rate
self.temperature_min = temperature_min
self.max_iterations = max_iterations
self.steps_per_temp = steps_per_temp
self.move_probabilities = move_probabilities
self.cooling_schedule = cooling_schedule
self.log_dir = log_dir
self.seed = seed
self.score_threshold = score_threshold
self.re_initiate = re_initiate
self.initial_sequences = [] # Will store initial sequences for plotting
self.plastic_type = plastic_type
if seed is not None:
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
def setup_logging(self, hyperparameters_key: str):
"""Set up logging with hyperparameters key."""
self.result_dir = os.path.join('simulated_annealing', 'Results', self.session_name, hyperparameters_key)
os.makedirs(self.result_dir, exist_ok=True)
log_filename = os.path.join(self.result_dir, 'sa_log.txt')
logging.basicConfig(filename=log_filename, level=logging.INFO, filemode='w')
self.logger = logging.getLogger('PeptideGenerator')
def get_parameters_key(self, embedding_type: str) -> str:
"""Create a unique key string based on the hyperparameters."""
params = {
'initial_temperature': self.initial_temperature,
'cooling_rate': self.cooling_rate,
'temperature_min': self.temperature_min,
'max_iterations': self.max_iterations,
'substitution_prob': self.move_probabilities['substitution'],
'swap_prob': self.move_probabilities['swap'],
'cooling_schedule': self.cooling_schedule,
'embedding_type': embedding_type
}
if self.plastic_type is not None:
params['plastic_type'] = self.plastic_type
return '_'.join(f"{k}-{v}" for k, v in params.items())
def peptide_to_one_hot(self, peptide: str) -> np.ndarray:
"""Convert peptide sequence to one-hot encoding."""
amino_acids = 'ADEFGHIKLMNQRSTVWY'
aa_to_index = {aa: idx for idx, aa in enumerate(amino_acids)}
one_hot = np.zeros((12, 18))
for i, aa in enumerate(peptide):
idx = aa_to_index.get(aa)
if idx is not None:
one_hot[i, idx] = 1
else:
raise ValueError(f'Invalid amino acid {aa} in peptide {peptide}')
return one_hot
def objective_function(self, peptide: str, embedding: Any, embedding_type: str, score_model: Any) -> float:
"""Compute the energy of a peptide sequence."""
if embedding_type == 'one_hot':
one_hot = self.peptide_to_one_hot(peptide)
embedding_output = one_hot[np.newaxis, :, :]
else:
raise ValueError(f'Unknown embedding type: {embedding_type}')
if embedding_type == 'one_hot' and isinstance(score_model, list):
individual_scores = {}
if self.plastic_type == 'All':
total = 0.0
for plastic, model in score_model:
s = model.predict(embedding_output)[0]
individual_scores[plastic] = s
total += s
avg_score = total / len(score_model)
self.logger.info(f"Peptide {peptide}: individual scores: {individual_scores}, average score: {avg_score}")
return avg_score
elif self.plastic_type == 'PET_only':
pet_score = None
other_scores = []
for plastic, model in score_model:
s = model.predict(embedding_output)[0]
individual_scores[plastic] = s
if plastic == 'PET':
pet_score = s
else:
other_scores.append(s)
if pet_score is None:
raise ValueError("PET model not found for PET_only scoring.")
num_other = len(other_scores)
combined_score = (num_other * pet_score - np.sum(other_scores)) / (num_other + 1) if num_other > 0 else pet_score
self.logger.info(f"Peptide {peptide}: individual scores: {individual_scores}, PET_only combined score: {combined_score}")
return combined_score
elif self.plastic_type == 'PP-PET':
pet_score = None
pp_score = None
for plastic, model in score_model:
s = model.predict(embedding_output)[0]
individual_scores[plastic] = s
if plastic == 'PET':
pet_score = s
elif plastic == 'PP':
pp_score = s
if pet_score is None or pp_score is None:
raise ValueError("PET or PP model not found for PP-PET scoring.")
combined_score = pp_score - pet_score
self.logger.info(f"Peptide {peptide}: individual scores: {individual_scores}, PP-PET combined score: {combined_score}")
return combined_score
elif self.plastic_type is not None:
for plastic, model in score_model:
if plastic == self.plastic_type:
s = model.predict(embedding_output)[0]
self.logger.info(f"Peptide {peptide}: score for plastic {plastic}: {s}")
return s
raise ValueError(f"Model for specified plastic {self.plastic_type} not found.")
else:
total = 0.0
for plastic, model in score_model:
s = model.predict(embedding_output)[0]
individual_scores[plastic] = s
total += s
avg_score = total / len(score_model)
self.logger.info(f"Peptide {peptide}: individual scores: {individual_scores}, average score: {avg_score}")
return avg_score
else:
score = score_model.predict(embedding_output)[0]
return score
def move_operator(self, peptide: str) -> str:
"""Generate a neighboring peptide by applying a move operator."""
move_type = random.choices(
population=['substitution', 'swap'],
weights=[self.move_probabilities['substitution'], self.move_probabilities['swap']]
)[0]
if move_type == 'substitution':
return self.substitution(peptide)
elif move_type == 'swap':
return self.swap(peptide)
else:
raise ValueError(f'Unknown move type: {move_type}')
def substitution(self, peptide: str) -> str:
"""Perform single amino acid substitution."""
amino_acids = 'ADEFGHIKLMNQRSTVWY'
position = random.randint(0, len(peptide) - 1)
current_aa = peptide[position]
possible_aas = list(amino_acids.replace(current_aa, ''))
new_aa = random.choice(possible_aas)
return peptide[:position] + new_aa + peptide[position + 1:]
def swap(self, peptide: str) -> str:
"""Perform swap mutation by exchanging two amino acids."""
pos1, pos2 = random.sample(range(len(peptide)), 2)
peptide_list = list(peptide)
peptide_list[pos1], peptide_list[pos2] = peptide_list[pos2], peptide_list[pos1]
return ''.join(peptide_list)
def acceptance_criterion(self, delta_energy: float) -> bool:
"""Decide whether to accept the new solution."""
if delta_energy <= 0:
return True
else:
probability = np.exp(-delta_energy / self.temperature)
return random.random() < probability
def update_temperature(self, iteration: int):
"""Update the temperature according to the cooling schedule."""
if self.cooling_schedule == 'exponential':
self.temperature *= self.cooling_rate
elif self.cooling_schedule == 'linear':
self.temperature -= self.cooling_rate
self.temperature = max(0, self.temperature)
elif self.cooling_schedule == 'logarithmic':
self.temperature = self.initial_temperature / np.log(iteration + 2)
else:
raise ValueError(f'Unknown cooling schedule: {self.cooling_schedule}')
def run_simulated_annealing(
self,
initial_peptide: str,
embedding: Any,
embedding_type: str,
score_model: Any
) -> tuple[str, float, List[tuple[str, float]], List[tuple[int, float]], dict]:
"""
Run the simulated annealing algorithm for a single peptide.
If re_initiate is True, runs optimization twice using best peptide from first run.
Returns:
- best_peptide: The best peptide found
- best_energy: The energy of the best peptide
- intermediate_peptides: List of (peptide, score) tuples for peptides meeting threshold
- trajectory: List of (iteration, score) tuples for accepted peptides
- stats: Dictionary containing run statistics
"""
start_time = time.time()
self.temperature = self.initial_temperature
current_peptide = initial_peptide
best_peptide = initial_peptide
best_energy = self.objective_function(initial_peptide, embedding, embedding_type, score_model)
intermediate_peptides = []
trajectory = []
# Initialize statistics
stats = {
'total_iterations': 0,
'accepted_moves': 0,
'rejected_moves': 0,
'total_sequences_sampled': 0,
'sequences_below_threshold': 0,
'runtime_seconds': 0.0,
'best_peptide': best_peptide,
'best_score': best_energy
}
# Function to run one round of optimization
def run_optimization_round(start_peptide: str, start_iteration: int) -> tuple[str, float, int]:
nonlocal self, trajectory, intermediate_peptides, stats
print(f"Starting optimization round with peptide: {start_peptide}")
current_peptide = start_peptide
current_best_peptide = start_peptide
current_best_energy = self.objective_function(start_peptide, embedding, embedding_type, score_model)
iteration = start_iteration
steps_at_current_temp = 0
self.temperature = self.initial_temperature # Reset temperature
while self.temperature > self.temperature_min and iteration < self.max_iterations:
new_peptide = self.move_operator(current_peptide)
stats['total_sequences_sampled'] += 1
current_energy = self.objective_function(current_peptide, embedding, embedding_type, score_model)
new_energy = self.objective_function(new_peptide, embedding, embedding_type, score_model)
delta_energy = new_energy - current_energy
if self.acceptance_criterion(delta_energy):
current_peptide = new_peptide
stats['accepted_moves'] += 1
self.logger.info(f'Iteration {iteration}: Accepted new peptide {new_peptide} with energy {new_energy}')
# Track trajectory for accepted moves
trajectory.append((iteration, new_energy))
# Save intermediate peptides meeting threshold
if new_energy < self.score_threshold:
intermediate_peptides.append((new_peptide, new_energy))
stats['sequences_below_threshold'] += 1
if new_energy < current_best_energy:
current_best_peptide = new_peptide
current_best_energy = new_energy
stats['best_peptide'] = current_best_peptide
stats['best_score'] = current_best_energy
self.logger.info(f'New best peptide {new_peptide} with energy {new_energy}')
else:
stats['rejected_moves'] += 1
self.logger.info(f'Iteration {iteration}: Rejected peptide {new_peptide} with energy {new_energy}')
steps_at_current_temp += 1
if steps_at_current_temp >= self.steps_per_temp:
self.update_temperature(iteration)
steps_at_current_temp = 0
iteration += 1
stats['total_iterations'] += 1
return current_best_peptide, current_best_energy, iteration
# Run first optimization round
best_peptide, best_energy, last_iteration = run_optimization_round(initial_peptide, 0)
# If re_initiate is True, run second optimization round starting from best peptide
if self.re_initiate:
self.logger.info(f'Starting second optimization round from best peptide: {best_peptide}')
second_best_peptide, second_best_energy, _ = run_optimization_round(best_peptide, last_iteration)
# Update overall best if second round found better solution
if second_best_energy < best_energy:
best_peptide = second_best_peptide
best_energy = second_best_energy
stats['best_peptide'] = best_peptide
stats['best_score'] = best_energy
# Record total runtime
stats['runtime_seconds'] = time.time() - start_time
return best_peptide, best_energy, intermediate_peptides, trajectory, stats
def analyze_unique_sequences(self, results_df: pd.DataFrame) -> dict:
"""
Analyze unique sequences generated for each original sequence.
Parameters:
- results_df: DataFrame containing generated sequences
Returns:
- Dictionary containing unique sequence statistics and DataFrame ready data
"""
unique_stats = {}
unique_sequences_data = []
# Group by original sequence
for orig_seq in results_df['Original_Sequence'].unique():
# Get all generated sequences for this original sequence
seq_group = results_df[
(results_df['Original_Sequence'] == orig_seq) &
(results_df['Generated_Sequence'] != '') # Exclude empty sequences
]
# Get unique generated sequences with their scores
unique_sequences = seq_group[['Generated_Sequence', 'Predicted_Score']].drop_duplicates()
# Store statistics
unique_stats[orig_seq] = {
'total_generated': len(seq_group),
'unique_generated': len(unique_sequences),
}
# Store sequence data for CSV
for _, row in unique_sequences.iterrows():
unique_sequences_data.append({
'Original_Sequence': orig_seq,
'Generated_Sequence': row['Generated_Sequence'],
'Predicted_Score': row['Predicted_Score'],
'Total_Generated': len(seq_group),
'Unique_Generated': len(unique_sequences)
})
return unique_stats, pd.DataFrame(unique_sequences_data)
def plot_trajectories(self, trajectories: List[List[tuple[int, float]]], save_path: str):
"""Plot optimization trajectories for multiple runs with enhanced styling."""
plt.figure(figsize=(15, 10))
for idx, trajectory in enumerate(trajectories[:10]): # Plot first 10 trajectories
iterations, scores = zip(*trajectory)
plt.plot(iterations, scores,
label=f'Sequence: {self.initial_sequences[idx]}',
alpha=0.7,
linewidth=2,
marker='o',
markersize=6,
markevery=max(1, len(iterations)//20))
plt.xlabel('Iteration', fontsize=14, fontweight='bold')
plt.ylabel('Score', fontsize=14, fontweight='bold')
plt.title('Simulated Annealing Optimization Trajectories',
fontsize=16, fontweight='bold')
plt.legend(bbox_to_anchor=(1.05, 1),
loc='upper left',
fontsize=12)
plt.grid(True, alpha=0.3)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.close()
def generate_peptides(
self,
initial_sequences: List[str],
initial_true_scores: List[float],
embedding: Any,
embedding_type: str,
score_model: Any
) -> str:
"""
Generate peptides using simulated annealing and analyze results.
Parameters:
- initial_sequences: List of initial peptide sequences
- initial_true_scores: List of true scores for initial sequences
- embedding: Embedding object (QuantumReservoir, ScoreHandlerProtBERT, etc.)
- embedding_type: Type of embedding ('qr', 'protbert', 'esm')
- score_model: Instance of ScoreModel class
Returns:
- Path to the results directory
"""
total_start_time = time.time()
# Setup logging with parameters key
hyperparameters_key = self.get_parameters_key(embedding_type)
self.setup_logging(hyperparameters_key)
# Store initial sequences for plotting
self.initial_sequences = initial_sequences
# Initialize results storage
all_results = []
trajectories = []
all_stats = []
# Process each initial sequence
for idx, seq in enumerate(initial_sequences):
initial_true_score = initial_true_scores[idx]
# Get initial predicted score
initial_predicted_score = self.objective_function(seq, embedding, embedding_type, score_model)
# Run simulated annealing
best_peptide, best_energy, intermediate_peptides, trajectory, stats = self.run_simulated_annealing(
seq, embedding, embedding_type, score_model
)
# Store trajectory and stats
trajectories.append(trajectory)
all_stats.append(stats)
# Store initial sequence results
all_results.append({
'Original_Sequence': seq,
'Generated_Sequence': '', # Will be filled with intermediate peptides
'True_Score': initial_true_score,
'Predicted_Score': initial_predicted_score,
'Is_Original': True,
'Is_Best': False
})
# Store best peptide result
all_results.append({
'Original_Sequence': seq,
'Generated_Sequence': best_peptide,
'True_Score': None,
'Predicted_Score': best_energy,
'Is_Original': False,
'Is_Best': True
})
# Store intermediate results
for peptide, score in intermediate_peptides:
if peptide != best_peptide: # Skip if already added as best peptide
all_results.append({
'Original_Sequence': seq,
'Generated_Sequence': peptide,
'True_Score': None,
'Predicted_Score': score,
'Is_Original': False,
'Is_Best': False
})
self.logger.info(f"Processed sequence {idx + 1}/{len(initial_sequences)}")
self.logger.info(f"Initial sequence: {seq}, score: {initial_true_score}")
self.logger.info(f"Best generated sequence: {best_peptide}, predicted score: {best_energy}")
# Calculate total runtime
total_runtime = time.time() - total_start_time
# Save results to CSV
results_df = pd.DataFrame(all_results)
csv_path = os.path.join(self.result_dir, 'generated_peptides.csv')
results_df.to_csv(csv_path, index=False)
# Analyze and save statistics
avg_stats = {
key: sum(stat[key] for stat in all_stats) / len(all_stats)
for key in all_stats[0].keys()
if key not in ['best_peptide', 'best_score'] # Exclude non-numeric fields
}
avg_stats['total_runtime_seconds'] = total_runtime
stats_df = pd.DataFrame([avg_stats])
stats_df.to_csv(os.path.join(self.result_dir, 'average_statistics.csv'), index=False)
# Save detailed statistics for each sequence
detailed_stats = []
for idx, stats in enumerate(all_stats):
seq_stats = {
'Original_Sequence': initial_sequences[idx],
'Best_Generated_Sequence': stats['best_peptide'],
'Best_Score': stats['best_score'],
'Runtime_Seconds': stats['runtime_seconds'],
'Total_Iterations': stats['total_iterations'],
'Accepted_Moves': stats['accepted_moves'],
'Rejected_Moves': stats['rejected_moves'],
'Total_Sequences_Sampled': stats['total_sequences_sampled'],
'Sequences_Below_Threshold': stats['sequences_below_threshold']
}
detailed_stats.append(seq_stats)
detailed_stats_df = pd.DataFrame(detailed_stats)
detailed_stats_df.to_csv(os.path.join(self.result_dir, 'detailed_statistics.csv'), index=False)
# Analyze unique sequences and save to CSV
unique_stats, unique_sequences_df = self.analyze_unique_sequences(results_df)
# Sort by Original_Sequence and Predicted_Score for better readability
unique_sequences_df = unique_sequences_df.sort_values(
['Original_Sequence', 'Predicted_Score'],
ascending=[True, True]
)
# Save unique sequences analysis to CSV
unique_sequences_df.to_csv(
os.path.join(self.result_dir, 'unique_sequences_analysis.csv'),
index=False
)
# Plot trajectories
plot_path = os.path.join(self.result_dir, 'optimization_trajectories.png')
self.plot_trajectories(trajectories, plot_path)
return self.result_dir