Skip to content
This repository was archived by the owner on Sep 26, 2022. It is now read-only.

Commit db531be

Browse files
authored
Merge pull request #68 from loads/feat/48
add more attack node tags (issue #48)
2 parents b426662 + 07f8ac9 commit db531be

File tree

14 files changed

+146
-36
lines changed

14 files changed

+146
-36
lines changed

loadsbroker/aws.py

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import time
2626
from collections import defaultdict, namedtuple
2727
from datetime import datetime, timedelta
28+
from typing import Dict, Optional
2829

2930
from boto.ec2 import connect_to_region
3031
from tornado import gen
@@ -52,6 +53,13 @@
5253
AWS_AMI_IDS = {k: {} for k in AWS_REGIONS}
5354

5455

56+
# How long after est. run times to trigger the reaper
57+
REAPER_DELTA = timedelta(hours=5)
58+
# Force the reaper for run times less than
59+
REAPER_FORCE = timedelta(hours=24)
60+
REAPER_STATE = 'ThirdState'
61+
62+
5563
def populate_ami_ids(aws_access_key_id=None, aws_secret_access_key=None,
5664
port=None, owner_id="595879546273", use_filters=True):
5765
"""Populate all the AMI ID's with the latest CoreOS stable info.
@@ -355,7 +363,7 @@ def __init__(self, broker_id, access_key=None, secret_key=None,
355363
self.security = security
356364
self.user_data = user_data
357365
self._instances = defaultdict(list)
358-
self._tag_filters = {"tag:Name": "loads-%s" % self.broker_id,
366+
self._tag_filters = {"tag:Name": "loads-%s*" % self.broker_id,
359367
"tag:Project": "loads"}
360368
self._conns = {}
361369
self._recovered = {}
@@ -493,7 +501,7 @@ def _locate_existing_instances(self, count, inst_type, region):
493501

494502
for inst in region_instances:
495503
if available_instance(inst) and inst_type == inst.instance_type:
496-
instances.append(inst)
504+
instances.append(inst)
497505
else:
498506
remaining.append(inst)
499507

@@ -518,12 +526,15 @@ async def _allocate_instances(self, conn, count, inst_type, region):
518526
return reservations.instances
519527

520528
async def request_instances(self,
521-
run_id,
522-
uuid,
529+
run_id: str,
530+
uuid: str,
523531
count=1,
524532
inst_type="t1.micro",
525533
region="us-west-2",
526-
allocate_missing=True):
534+
allocate_missing=True,
535+
plan: Optional[str] = None,
536+
owner: Optional[str] = None,
537+
run_max_time: Optional[int] = None):
527538
"""Allocate a collection of instances.
528539
529540
:param run_id: Run ID for these instances
@@ -535,6 +546,10 @@ async def request_instances(self,
535546
If there's insufficient existing instances for this uuid,
536547
whether existing or new instances should be allocated to the
537548
collection.
549+
:param plan: Name of the instances' plan
550+
:param owner: Owner name of the instances
551+
:param run_max_time: Maximum expected run-time of instances in
552+
seconds
538553
:returns: Collection of allocated instances
539554
:rtype: :class:`EC2Collection`
540555
@@ -563,26 +578,33 @@ async def request_instances(self,
563578
if num > 0:
564579
new_instances = await self._allocate_instances(
565580
conn, num, inst_type, region)
566-
logger.debug("Allocated instances: %s", new_instances)
581+
logger.debug("Allocated instances%s: %s",
582+
" (Owner: %s)" % owner if owner else "",
583+
new_instances)
567584
instances.extend(new_instances)
568585

569586
# Tag all the instances
570587
if self.use_filters:
571-
# Sometimes, we can get instance data back before the AWS API fully
572-
# recognizes it, so we wait as needed.
588+
tags = {
589+
"Name": "loads-{}{}".format(self.broker_id,
590+
"-" + plan if plan else ""),
591+
"Project": "loads",
592+
"RunId": run_id,
593+
"Uuid": uuid,
594+
}
595+
if owner:
596+
tags["Owner"] = owner
597+
if run_max_time is not None:
598+
self._tag_for_reaping(tags, run_max_time)
599+
600+
# Sometimes, we can get instance data back before the AWS
601+
# API fully recognizes it, so we wait as needed.
573602
async def tag_instance(instance):
574603
retries = 0
575604
while True:
576605
try:
577606
await self._run_in_executor(
578-
conn.create_tags, [instance.id],
579-
{
580-
"Name": "loads-%s" % self.broker_id,
581-
"Project": "loads",
582-
"RunId": run_id,
583-
"Uuid": uuid
584-
}
585-
)
607+
conn.create_tags, [instance.id], tags)
586608
break
587609
except:
588610
if retries > 5:
@@ -592,6 +614,23 @@ async def tag_instance(instance):
592614
await gen.multi([tag_instance(x) for x in instances])
593615
return EC2Collection(run_id, uuid, conn, instances, self._loop)
594616

617+
def _tag_for_reaping(self,
618+
tags: Dict[str, str],
619+
run_max_time: int) -> None:
620+
"""Tag an instance for the mozilla-services reaper
621+
622+
Set instances to stop after run_max_time + REAPER_DELTA
623+
624+
"""
625+
now = datetime.utcnow()
626+
reap = now + timedelta(seconds=run_max_time) + REAPER_DELTA
627+
tags['REAPER'] = "{}|{:{dfmt}}".format(
628+
REAPER_STATE, reap, dfmt="%Y-%m-%d %I:%M%p UTC")
629+
if reap < now + REAPER_FORCE:
630+
# the reaper ignores instances younger than REAPER_FORCE
631+
# unless forced w/ REAP_ME
632+
tags['REAP_ME'] = ""
633+
595634
async def release_instances(self, collection):
596635
"""Return a collection of instances to the pool.
597636

loadsbroker/broker.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,12 @@ class RunHelpers:
9898

9999

100100
class Broker:
101-
def __init__(self, io_loop, sqluri, ssh_key,
101+
def __init__(self, name, io_loop, sqluri, ssh_key,
102102
heka_options, influx_options, aws_port=None,
103103
aws_owner_id="595879546273", aws_use_filters=True,
104104
aws_access_key=None, aws_secret_key=None, initial_db=None):
105+
self.name = name
106+
logger.debug("loads-broker (%s)", self.name)
105107

106108
self.loop = io_loop
107109
self._base_env = BASE_ENV.copy()
@@ -133,7 +135,7 @@ def __init__(self, io_loop, sqluri, ssh_key,
133135
raise ImportError('You need to install the influx lib')
134136
self.influx = InfluxDBClient(**influx_args)
135137

136-
self.pool = aws.EC2Pool("1234", user_data=user_data,
138+
self.pool = aws.EC2Pool(self.name, user_data=user_data,
137139
io_loop=self.loop, port=aws_port,
138140
owner_id=aws_owner_id,
139141
use_filters=aws_use_filters,
@@ -228,7 +230,7 @@ def run_plan(self, strategy_id, create_db=True, **kwargs):
228230

229231
log_threadid("Running strategy: %s" % strategy_id)
230232
uuid = kwargs.pop('run_uuid', None)
231-
creator = kwargs.pop('creator', None)
233+
owner = kwargs.pop('owner', None)
232234

233235
# now we can start a new run
234236
try:
@@ -240,7 +242,7 @@ def run_plan(self, strategy_id, create_db=True, **kwargs):
240242
plan_uuid=strategy_id,
241243
run_uuid=uuid,
242244
additional_env=kwargs,
243-
creator=creator)
245+
owner=owner)
244246
except NoResultFound as e:
245247
raise LoadsException(str(e))
246248

@@ -335,7 +337,7 @@ def _get_state(self):
335337

336338
@classmethod
337339
def new_run(cls, run_helpers, db_session, pool, io_loop, plan_uuid,
338-
run_uuid=None, additional_env=None, creator=None):
340+
run_uuid=None, additional_env=None, owner=None):
339341
"""Create a new run manager for the given strategy name
340342
341343
This creates a new run for this strategy and initializes it.
@@ -354,7 +356,7 @@ def new_run(cls, run_helpers, db_session, pool, io_loop, plan_uuid,
354356
"""
355357
# Create the run for this manager
356358
logger.debug('Starting a new run manager')
357-
run = Run.new_run(db_session, plan_uuid, creator)
359+
run = Run.new_run(db_session, plan_uuid, owner)
358360
if run_uuid:
359361
run.uuid = run_uuid
360362
db_session.add(run)
@@ -391,10 +393,15 @@ async def _get_steps(self):
391393
logger.debug('Getting steps & collections')
392394
steps = self.run.plan.steps
393395
collections = await gen.multi(
394-
[self._pool.request_instances(self.run.uuid, s.uuid,
395-
count=s.instance_count,
396-
inst_type=s.instance_type,
397-
region=s.instance_region)
396+
[self._pool.request_instances(
397+
self.run.uuid,
398+
s.uuid,
399+
count=s.instance_count,
400+
inst_type=s.instance_type,
401+
region=s.instance_region,
402+
plan=self.run.plan.name,
403+
owner=self.run.owner,
404+
run_max_time=s.run_delay + s.run_max_time)
398405
for s in steps])
399406

400407
try:

loadsbroker/client/cmd_delete.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ def __call__(self, args):
99
url = '/run/' + args.run_id + '?purge=1'
1010
return self.session.delete(self.root + url).json()
1111

12+
1213
cmd = Delete

loadsbroker/client/cmd_run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,5 @@ def __call__(self, args):
2424
data=json.dumps(options), headers=headers)
2525
return r.json()
2626

27+
2728
cmd = Run

loadsbroker/client/cmd_status.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ def __call__(self, args):
1212
res = self.session.get(url)
1313
return res.json()
1414

15+
1516
cmd = Status

loadsbroker/db.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ class Run(Base):
342342
"""
343343
state = Column(Integer, default=INITIALIZING)
344344

345-
creator = Column(String, nullable=True)
345+
owner = Column(String, nullable=True)
346346

347347
created_at = Column(DateTime, default=datetime.datetime.utcnow,
348348
doc="When the Run was created.")
@@ -358,7 +358,7 @@ class Run(Base):
358358
plan_id = Column(Integer, ForeignKey("plan.id"))
359359

360360
@classmethod
361-
def new_run(cls, session, plan_uuid, creator=None):
361+
def new_run(cls, session, plan_uuid, owner=None):
362362
"""Create a new run with appropriate running container set
363363
linkage for a given strategy"""
364364
plan = Plan.load_with_steps(session, plan_uuid)
@@ -367,7 +367,7 @@ def new_run(cls, session, plan_uuid, creator=None):
367367

368368
run = cls()
369369
run.plan = plan
370-
run.creator = creator
370+
run.owner = owner
371371

372372
# Setup step records for each step in this plan
373373
run.step_records = [StepRecord.from_step(step) for step in plan.steps]
@@ -383,7 +383,7 @@ def json(self, fields=None):
383383
'completed_at': self._datetostr(self.completed_at),
384384
'started_at': self._datetostr(self.started_at),
385385
'plan_id': self.plan_id, 'plan_name': self.plan.name,
386-
'creator': self.creator}
386+
'owner': self.owner}
387387

388388

389389
run_table = Run.__table__

loadsbroker/extensions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,6 @@ async def start(self,
359359

360360
# Upload heka config to all the instances
361361
def upload_files(inst):
362-
hostname = ""
363362
hostname = "%s%s" % (
364363
series_name,
365364
inst.instance.ip_address.replace('.', '_')

loadsbroker/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ def _parse(sysargs=None):
1717
sysargs = sys.argv[1:]
1818

1919
parser = argparse.ArgumentParser(description='Runs a Loads broker.')
20+
parser.add_argument('--name', help="Name of this broker instance",
21+
type=str, default="1234")
2022
parser.add_argument('-p', '--port', help='HTTP Port', type=int,
2123
default=8080)
2224
parser.add_argument('--debug', help='Debug Info.', action='store_true',
@@ -86,7 +88,7 @@ def main(sysargs=None):
8688
args.influx_user, args.influx_password,
8789
args.influx_secure)
8890

89-
application.broker = Broker(loop, args.database, args.ssh_key,
91+
application.broker = Broker(args.name, loop, args.database, args.ssh_key,
9092
heka_options,
9193
influx_options,
9294
aws_port=args.aws_port,

loadsbroker/tests/fakedocker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,5 +106,6 @@ def main():
106106
except KeyboardInterrupt:
107107
pass
108108

109+
109110
if __name__ == "__main__":
110111
main()

loadsbroker/tests/fakeinflux.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,5 +81,6 @@ def main():
8181
except KeyboardInterrupt:
8282
pass
8383

84+
8485
if __name__ == "__main__":
8586
main()

0 commit comments

Comments
 (0)