-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSimulation.py
More file actions
288 lines (228 loc) · 13.8 KB
/
Simulation.py
File metadata and controls
288 lines (228 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# from Structure.SmartAgent import SmartAgent
from Agents.AbstractAgent import AbstractAgent
from Structure.Blockchain import Blockchain
from Structure.BlocktimeOracle import BlocktimeOracle
from Structure.Block import Block
import logging as lg
from Structure.Result import SimResult
from tqdm import tqdm
class Simulator:
def __init__(self, display: bool, **kwargs):
self.number_of_periods: int = kwargs['periods']
self.agents: list[AbstractAgent] = kwargs['agents']
self.WINDOW_SIZE: int = kwargs['window_size']
self.TIME_PER_BLOCK: float = kwargs['expected_block_time']
self.difficulty: float = kwargs['init_difficulty']
self.display: bool = display
self.period_lengths = [0.0 for _ in range(self.number_of_periods)]
self.blockchain: Blockchain = Blockchain()
self.blocktime_oracle: BlocktimeOracle = BlocktimeOracle(agents=self.agents, difficulty=self.difficulty)
self.orphan_blocks = {_: {"selfish": 0, "honest": 0} for _ in range(self.number_of_periods)}
self.difficulties: list[float] = []
self.BLOCK_REWARD: float = 6.25
self.COIN_VALUE: float = 45038.0
self.COST_PER_BLOCK: float = 9000.0
self.selfish_blocks =[0 for _ in range(self.number_of_periods)]
# this method sets instructions of what the agent will do when get_longest_published_chain is called
def transmit_block_to_all_agents(self, payload: dict) -> None:
for agent in self.agents:
# do not transmit to the original sender
if agent != payload["agent"]:
agent.receive_blocks(payload)
def get_longest_published_chain(self) -> list[tuple[AbstractAgent, int]]:
POSITION_OF_LEN = 1
# we are receiving list[blocks] from each agent
received_blocks = []
# only largest pp_size gets accepted
for agent in self.agents:
# (agent and transmitted blocks)
# TODO: transmit actual chain instead of just the size? (to keep timestamps)
received_blocks.append(agent.broadcast) # type tuple(AbstractAgent, int)
# find pp_size maxes
# structure: [ (agent, int), ... , (agent, int)]
max_len = max(received_blocks, key=lambda x: x[POSITION_OF_LEN]) # len of transmitted blocks is pp_size
max_blocks = [x for x in received_blocks if x[POSITION_OF_LEN] == max_len[POSITION_OF_LEN]]
return max_blocks
def get_longest_internal_chain(self, internal_state: dict) -> list[tuple[AbstractAgent, int]]:
max_len: int = internal_state[
max(internal_state, key=lambda x: internal_state[x])] # len of transmitted blocks is pp_size
return [(x, internal_state[x]) for x in internal_state if internal_state[x] == max_len]
@staticmethod
def explicit_val_to_payload(agent: AbstractAgent, pp_size: int) -> dict:
return {"agent": agent, "pp_size": pp_size}
@staticmethod
def tuple_to_payload(input_tuple: tuple[AbstractAgent, int]) -> dict:
return {"agent": input_tuple[0], "pp_size": input_tuple[1]}
def update_difficulty(self, period_index: int) -> None:
period_time = self.period_lengths[period_index]
# if period_index > 0:
# period_time: float = self.period_lengths[period_index] - self.period_lengths[period_index - 1]
self.difficulty = self.difficulty * \
((self.WINDOW_SIZE * self.TIME_PER_BLOCK) / period_time)
# must be run at every period iteration
def decum_periods(self, period_index: int):
prev_sum: float = 0.0
for index in range(period_index):
prev_sum += self.period_lengths[index]
if period_index > 0:
self.period_lengths[period_index] -= prev_sum
def transmit_difficulty(self):
for agent in self.agents:
agent.receive_difficulty(self.difficulty)
def reset_all_broadcast(self):
for agent in self.agents:
agent.reset_broadcast()
from Agents.SelfishAgent import SelfishAgent
def get_selfish_agent(self) -> SelfishAgent:
for agent in self.agents:
if agent.type == "selfish":
return agent
def get_reward_leak(self) -> float:
blocks_mined_if_honest: float = self.get_selfish_agent().alpha * self.number_of_periods * self.WINDOW_SIZE
print(f"blocks_if_honest: {blocks_mined_if_honest}")
blocks_mined: int = 0
for i in range(self.number_of_periods):
blocks_mined += self.selfish_blocks[i]
print(f"selfish_blocks {blocks_mined}")
reward_leak = (blocks_mined - blocks_mined_if_honest) * self.BLOCK_REWARD * self.COIN_VALUE
return reward_leak
def get_wasted_power(self) -> float:
total_orphans: int = 0
for _ in self.orphan_blocks:
total_orphans += self.orphan_blocks[_]["selfish"]
total_orphans += self.orphan_blocks[_]["honest"]
wasted_power = total_orphans * self.COST_PER_BLOCK
return wasted_power
def run(self) -> None:
# Run this loop for as many periods we want to simulate
iterations = list(range(self.number_of_periods))
for period_index in tqdm(iterations):
self.period_lengths[period_index] = 0.0
# Keep looping until the length of the blockchain is equal to the window size.
while len(self.blockchain) < self.WINDOW_SIZE:
transmission: Block = self.blocktime_oracle.next_time()
self.period_lengths[period_index] = transmission.timestamp
# agent needs to be aware of the block they mined
lg.debug("winner: " + transmission.winning_agent.__str__())
transmission.winning_agent.receive_blocks_from_oracle([transmission])
# if the agent chooses to transmit it publicly to the rest of the miners, we trigger the while loop
if not transmission.winning_agent.publish_block:
continue
# Make the agents remember the lengths of their private chains before publishing begins
for agent in self.agents:
agent.freeze_lengths()
agent.delta = agent.store_length
# Pops the transmitted block from the mining queue of the agent
# should always be non_empty
transmission.winning_agent.mining_queue.get_nowait()
# Payload variable is passed around between receive and transmit.
payload = {"agent": transmission.winning_agent, "pp_size": 1}
# set internal state for each time-step to 0's
internal_state = dict.fromkeys(self.agents, 0)
# this is the honest miner that found one. hard-coding the win before iterating further
internal_state[payload["agent"]] = 1
# defected blocks - these will be added to the honest agent wins at the end of the do-while
# FIXME: This only applies to 2-agent cases
# TODO: Look at internal state and count every entry that is not the max to get the defector blocks
defector_blocks: int = 0
# do-while
while True:
self.reset_all_broadcast()
# Send all agents the most recent payload
self.transmit_block_to_all_agents(payload)
# Collect published chains from each agent and select the longest one
longest_published_chain: list[tuple[AbstractAgent, int]] = self.get_longest_published_chain()
# base case
# check that all received values are 0; [1] b/c looking at int in tuple[AbstractAgent, int]
# TODO: make sure this isn't buggy...
INT_POSITION = 1
if longest_published_chain == [x for x in longest_published_chain if x[INT_POSITION] == 0]:
# return the longest chain(s); will be used to create forks otherwise
longest_state_chains: list[tuple[AbstractAgent, int]] = self.get_longest_internal_chain(
internal_state)
# If the longest chain is unique; no need to fork
if len(longest_state_chains) == 1:
# payload = self.tuple_to_payload(next(iter(longest_state_chains)))
# At this point of the code we have the winner.
# TODO: Pass in time information of the blocks
# FIXME: This renders the blockchain class useless. Restructure
winner: list[tuple[AbstractAgent, int]] = self.get_longest_internal_chain(internal_state)
# Defector_blocks is zero when the winner is selfish
for _ in range(next(iter(winner))[1] - defector_blocks):
self.blockchain.add_block(Block(winning_agent=next(iter(winner[0]))))
# --------------------------
# Extract the honest agent object
# FIXME: this is just so so bad
other_agent = None
for agent in self.agents:
if agent != winner[0]:
other_agent = agent
assert (other_agent != None)
# --------------------------
# Only triggered when the winner is selfish
for _ in range(defector_blocks):
self.blockchain.add_block(Block(winning_agent=other_agent))
# Update the period time with the latest block time from the Blocktime Oracle
self.period_lengths[period_index] = self.blocktime_oracle.current_time
# get orphan blocks by reading internal state of all other agents
for agent in self.agents:
if agent != next(iter(winner))[0]:
# self.orphan_blocks[period_index][agent.type] += agent.store_length
self.orphan_blocks[period_index][agent.type] += internal_state[agent]
# assert(agent.store_length == internal_state[agent])
# Set all agent variables to their default values
for agent in self.agents:
agent.reset()
break
# If longest chain not unique, we must fork to establish a winner
elif len(longest_state_chains) > 1:
winning_chain_agent, winning_agent, min_time = self.blocktime_oracle.fork(self.difficulty,
self.agents)
lg.debug("fork winner: " + winning_agent.__str__())
# winning_agent could be honest(defector), winning_chain_agent is selfish
# defectors won. ex: honest win, but selfish-miner keeps his mined block
if winning_chain_agent != winning_agent:
# winning_chain_agent.defected_blocks += 1
defector_blocks += 1
# increment internal_state regardless
internal_state[winning_chain_agent] += 1
# fork will only have one winner, hence the 1
payload = self.explicit_val_to_payload(winning_chain_agent, 1)
# error-handling
else:
raise (Exception(f"longest_state_chains: {len(longest_state_chains)}. Value"
f"must be greater than 0."))
# If each entry in the longest chain isn't a pp_size of zero:
else:
payload = self.tuple_to_payload(next(iter(longest_published_chain)))
internal_state[payload["agent"]] += payload["pp_size"]
self.decum_periods(period_index)
# print(self.period_lengths[period_index])
self.update_difficulty(period_index)
self.transmit_difficulty()
lg.debug(str(self.difficulty))
self.difficulties.append(self.difficulty)
honest_win: int = 0
selfish_win: int = 0
for i in range(len(self.blockchain)):
if self.blockchain.chain.pop().winning_agent.type == "selfish":
selfish_win += 1
else:
honest_win += 1
lg.debug(f"honest win: {honest_win}")
lg.debug(f"selfish win: {selfish_win}")
lg.debug("_" * 40)
self.selfish_blocks[period_index] = selfish_win
if self.display:
for i in range(self.number_of_periods):
print(
"Period: " + str(i) + " | time: " + str(self.period_lengths[i]).rjust(5) + " | difficulty: " + str(
self.difficulties[i]).rjust(5))
print("Orphans: " + "Selfish: " + str(self.orphan_blocks[i]["selfish"]).rjust(5))
print("Orphans: " + "Honest: " + str(self.orphan_blocks[i]["honest"]).rjust(5))
print("_" * 40)
# reward leak & wasted power
print("🚰 Leaked Rewards: " + str(self.get_reward_leak()).rjust(2, " "))
print("🔋 Wasted Power: " + str(self.get_wasted_power()).rjust(12, " "))
SimResult(periods=self.period_lengths, difficulties=self.difficulties,
agents=self.agents, orphan_blocks=self.orphan_blocks)