Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions dtable_events/automations/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import os
from copy import deepcopy
from dataclasses import dataclass, field
from datetime import datetime, date, timedelta, timezone
from queue import Full
from urllib.parse import unquote, urlparse, parse_qs
Expand Down Expand Up @@ -4330,6 +4331,29 @@ class RuleInvalidException(Exception):
pass


@dataclass
class AutomationResult:
rule_id: int
rule_name: str
dtable_uuid: str
run_condition: str
org_id: int
owner: str
with_test: bool

is_exceed_system_resource_limit: bool = False

run_time: float = 0.0
trigger_time: datetime = None
trigger_date: date = None

success: bool = True
warnings: list = field(default_factory=list)

is_valid: bool = True
invalid_type: str = None


class AutomationRule:

def __init__(self, data, raw_trigger, raw_actions, options):
Expand Down Expand Up @@ -4713,7 +4737,7 @@ def do_actions(self, db_session, with_test=False):

self.db_session = db_session

auto_rule_result = {
auto_rule_result = AutomationResult(**{
'rule_id': self.rule_id,
'rule_name': self.rule_name,
'dtable_uuid': self.dtable_uuid,
Expand All @@ -4723,7 +4747,7 @@ def do_actions(self, db_session, with_test=False):
'trigger_time': datetime.utcnow(),
'trigger_date': date.today().replace(day=1),
'with_test': with_test,
}
})

do_actions_start = datetime.now()
for action_info in self.action_infos:
Expand Down Expand Up @@ -4962,7 +4986,8 @@ def do_actions(self, db_session, with_test=False):
'type': 'rule_invalid',
'invalid_type': invalid_type
})
auto_rule_result.update({'invalid_type': invalid_type})
auto_rule_result.invalid_type = invalid_type
auto_rule_result.is_valid = False
break
except Exception as e:
self.task_run_success = False
Expand All @@ -4974,12 +4999,7 @@ def do_actions(self, db_session, with_test=False):
if duration.seconds >= 5:
auto_rule_logger.warning('the running time of rule %s is too long, for %s. SQL queries are %s', self.rule_id, duration, f"\n{'\n'.join(self.query_stats)}")


if not with_test:
auto_rule_result.update({
'success': self.task_run_success,
'warnings': self.warnings,
'is_valid': self.current_valid
})
auto_rule_result.success = self.task_run_success
auto_rule_result.warnings = self.warnings

return auto_rule_result
103 changes: 82 additions & 21 deletions dtable_events/automations/automations_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
from queue import Queue
from concurrent.futures import ThreadPoolExecutor
from threading import Thread, Lock
from typing import Dict

from apscheduler.schedulers.blocking import BlockingScheduler
from sqlalchemy import text

from dtable_events.app.config import DTABLE_WEB_SERVICE_URL
from dtable_events.app.event_redis import RedisClient
from dtable_events.app.log import auto_rule_logger
from dtable_events.automations.actions import AutomationRule
from dtable_events.automations.automations_stats_helper import AutomationsStatsHelper
from dtable_events.automations.actions import AutomationRule, AutomationResult
from dtable_events.automations.automations_stats_manager import AutomationsStatsManager
from dtable_events.ccnet.organization import get_org_admins
from dtable_events.db import init_db_session_class
from dtable_events.utils import get_dtable_owner_org_id
from dtable_events.utils.dtable_web_api import DTableWebAPI
from dtable_events.utils.utils_metric import AUTOMATION_RULES_QUEUE_METRIC_HELP, REALTIME_AUTOMATION_RULES_HEARTBEAT_HELP, \
REALTIME_AUTOMATION_RULES_TRIGGERED_COUNT_HELP, SCHEDULED_AUTOMATION_RULES_TRIGGERED_COUNT_HELP, publish_metric

Expand Down Expand Up @@ -66,7 +68,7 @@ class AutomationsPipeline:
def __init__(self, config):
self.workers = 5
self.automations_queue: Queue[AutomationRule] = Queue()
self.results_queue: Queue[Dict] = Queue()
self.results_queue: Queue[AutomationResult] = Queue()

self._db_session_class = init_db_session_class(config)

Expand All @@ -75,7 +77,7 @@ def __init__(self, config):

self.rate_limiter = RateLimiter()

self.automations_stats_helper = AutomationsStatsHelper()
self.automations_stats_manager = AutomationsStatsManager()

self.log_none_message_timeout = 10 * 60

Expand All @@ -88,6 +90,24 @@ def __init__(self, config):

self.parse_config()

self.exceed_system_resource_limit_entities = None
self.reset_exceed_system_resource_limit_entities()

def reset_exceed_system_resource_limit_entities(self):
self.exceed_system_resource_limit_entities = {'orgs_map': {}, 'owners_map': {}}

def add_exceed_system_resource_limit_entity(self, owner, org_id):
if org_id != -1:
if org_id in self.exceed_system_resource_limit_entities['orgs_map']:
self.exceed_system_resource_limit_entities['orgs_map'][org_id] += 1
else:
self.exceed_system_resource_limit_entities['orgs_map'][org_id] = 0
else:
if owner in self.exceed_system_resource_limit_entities['owners_map']:
self.exceed_system_resource_limit_entities['owners_map'][owner] += 1
else:
self.exceed_system_resource_limit_entities['owners_map'][owner] = 0

def parse_config(self):
try:
self.workers = int(os.environ.get('AUTOMATION_WORKERS', self.workers))
Expand Down Expand Up @@ -146,14 +166,31 @@ def receive(self):
try:
dtable_uuid = event.get('dtable_uuid')
owner_info = get_dtable_owner_org_id(dtable_uuid, db_session)
event.update(owner_info)
automation_rule = self.get_automation_rule(db_session, event)
if not automation_rule:
continue
if not self.rate_limiter.is_allowed(owner_info['owner'], owner_info['org_id']):
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} rate limit exceed, event {event} will not trigger")
automation_rule.append_warning({'type': 'exceed_system_resource_limit'})
self.results_queue.put(AutomationResult(
rule_id=automation_rule.rule_id,
rule_name=automation_rule.rule_name,
dtable_uuid=automation_rule.dtable_uuid,
run_condition=automation_rule.run_condition,
org_id=automation_rule.org_id,
owner=automation_rule.owner,
with_test=False,
success=False,
is_exceed_system_resource_limit=True,
trigger_time=datetime.utcnow(),
warnings=automation_rule.warnings
))
self.add_exceed_system_resource_limit_entity(automation_rule.owner, automation_rule.org_id)
continue
if self.automations_stats_helper.is_exceed(db_session, owner_info['owner'], owner_info['org_id']):
if self.automations_stats_manager.is_exceed(db_session, owner_info['owner'], owner_info['org_id']):
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger count limit exceed, {event} will not trigger")
continue
event.update(owner_info)
automation_rule = self.get_automation_rule(db_session, event)
if not automation_rule.can_do_actions():
auto_rule_logger.info(f"owner {owner_info['owner']} org {owner_info['org_id']} trigger run condition missed, {event} will not trigger")
continue
Expand Down Expand Up @@ -184,7 +221,7 @@ def worker(self):
run_time = time.time() - start_time
auto_rule_logger.info(f"Automation {automation.rule_id} with data {automation.data} result is {result} run for {run_time}")
if result:
result['run_time'] = run_time
result.run_time = run_time
self.results_queue.put(result)
except Exception as e:
auto_rule_logger.exception('Handle automation rule with data %s failed: %s', automation.data, e)
Expand All @@ -198,7 +235,6 @@ def start_workers(self):
auto_rule_logger.info(f"Started {self.workers} automation workers")

def scan_rules(self):
db_session = self._db_session_class()
sql = '''
SELECT `dar`.`id`, `run_condition`, `trigger`, `actions`, `dtable_uuid`, w.`owner`, w.`org_id` FROM dtable_automation_rules dar
JOIN dtables d ON dar.dtable_uuid=d.uuid
Expand Down Expand Up @@ -245,7 +281,7 @@ def scan_rules(self):
self.automations_queue.put(automation)
self.scheduled_trigger_count += 1
continue
if self.automations_stats_helper.is_exceed(db_session, rule.owner, rule.org_id):
if self.automations_stats_manager.is_exceed(db_session, rule.owner, rule.org_id):
cached_exceed_keys_set.add(exceed_key)
continue
self.automations_queue.put(automation)
Expand Down Expand Up @@ -273,24 +309,49 @@ def stats(self):
auto_rule_logger.info("Start to stats thread")
while True:
result = self.results_queue.get()
if result.get('run_condition') == 'per_update':
owner = result.get('owner')
org_id = result.get('org_id')
run_time = result.get('run_time')
if result.run_condition == 'per_update' and not result.is_exceed_system_resource_limit:
owner = result.owner
org_id = result.org_id
run_time = result.run_time
self.rate_limiter.record_time(owner, org_id, run_time)
auto_rule_logger.debug(f"owner {owner} org_id {org_id} usage percent {self.rate_limiter.get_percent(owner, org_id)}")
db_session = self._db_session_class()
try:
self.automations_stats_helper.update_stats(db_session, result)
self.automations_stats_manager.update_stats(db_session, result)
except Exception as e:
auto_rule_logger.exception(e)
finally:
db_session.close()

def send_exceed_system_resource_limit_notifications(self):
sched = BlockingScheduler()

@sched.scheduled_job('cron', day_of_week='*', hour='*', minute='*/10', misfire_grace_time=60)
def timed_job():
orgs_map = self.exceed_system_resource_limit_entities['orgs_map']
db_session = self._db_session_class()
dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL)
try:
for org_id, missing_count in orgs_map.items():
admins = get_org_admins(db_session, org_id)
if admins:
dtable_web_api.internal_add_notification(admins, 'auto_exceed_sys_res_limit', {'missing_count': missing_count})
owners_map = self.exceed_system_resource_limit_entities['owners_map']
for owner, missing_count in owners_map.items():
dtable_web_api.internal_add_notification([owner], 'auto_exceed_sys_res_limit', {'missing_count': missing_count})
except Exception as e:
auto_rule_logger.exception(e)
finally:
db_session.close()
self.reset_exceed_system_resource_limit_entities()

sched.start()

def start(self):
auto_rule_logger.info("Start automations pipeline")
self.start_workers()
Thread(target=self.receive, daemon=True).start()
Thread(target=self.scheduled_scan, daemon=True).start()
Thread(target=self.stats, daemon=True).start()
Thread(target=self.publish_metrics, daemon=True).start()
self.start_workers() # auto rules do action from automations_queue
Thread(target=self.receive, daemon=True).start() # add normal action to automations_queue
Thread(target=self.scheduled_scan, daemon=True).start() # add cron action to automations_queue
Thread(target=self.stats, daemon=True).start() # update status
Thread(target=self.publish_metrics, daemon=True).start() # update metrics
Thread(target=self.send_exceed_system_resource_limit_notifications, daemon=True).start() # send notifications
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
from dtable_events.utils import get_dtable_admins

from dtable_events.app.config import CCNET_DB_NAME, DTABLE_WEB_SERVICE_URL
from dtable_events.automations.actions import AutomationResult
from dtable_events.utils.dtable_web_api import DTableWebAPI


class AutomationsStatsHelper:
class AutomationsStatsManager:

def __init__(self):
self.dtable_web_api = DTableWebAPI(DTABLE_WEB_SERVICE_URL)
Expand Down Expand Up @@ -111,7 +112,7 @@ def check_org_reach_warning(self, db_session, org_id):
db_session.execute(text(sql), {'org_id': org_id, 'warning_limit': limit, 'trigger_date': date.today().replace(day=1)})
db_session.commit()

def update_stats(self, db_session, auto_rule_result):
def update_stats(self, db_session, auto_rule_result: AutomationResult):
# update rule, rule_log, stats
statistic_sql_user = '''
INSERT INTO user_auto_rules_statistics (username, trigger_date, trigger_count, update_at) VALUES
Expand All @@ -134,27 +135,32 @@ def update_stats(self, db_session, auto_rule_result):
INSERT INTO auto_rules_task_log (trigger_time, success, rule_id, run_condition, dtable_uuid, org_id, owner, warnings) VALUES
(:trigger_time, :success, :rule_id, :run_condition, :dtable_uuid, :org_id, :owner, :warnings)
'''
org_id = auto_rule_result.get('org_id')
owner = auto_rule_result.get('owner')
sqls = [update_rule_sql, insert_rule_log]
if org_id:
if org_id == -1:
if '@seafile_group' not in owner:
sqls.append(statistic_sql_user)
else:
sqls.append(statistic_sql_org)
org_id = auto_rule_result.org_id
owner = auto_rule_result.owner
sqls = []
if not auto_rule_result.is_exceed_system_resource_limit:
sqls.append(update_rule_sql)
sqls.append(insert_rule_log)
if org_id:
if org_id == -1:
if '@seafile_group' not in owner:
sqls.append(statistic_sql_user)
else:
sqls.append(statistic_sql_org)
else:
sqls.append(insert_rule_log)
params = {
'rule_id': auto_rule_result.get('rule_id'),
'rule_id': auto_rule_result.rule_id,
'username': owner,
'dtable_uuid': auto_rule_result.get('dtable_uuid'),
'dtable_uuid': auto_rule_result.dtable_uuid,
'org_id': org_id,
'owner': auto_rule_result.get('owner'),
'trigger_time': auto_rule_result.get('trigger_time'),
'trigger_date': auto_rule_result.get('trigger_date'),
'is_valid': auto_rule_result.get('is_valid'),
'success': 1 if auto_rule_result.get('success') else 0,
'run_condition': auto_rule_result.get('run_condition'),
'warnings': json.dumps(auto_rule_result.get('warnings')) if json.dumps(auto_rule_result.get('warnings')) else None
'owner': auto_rule_result.owner,
'trigger_time': auto_rule_result.trigger_time,
'trigger_date': auto_rule_result.trigger_date,
'is_valid': auto_rule_result.is_valid,
'success': 1 if auto_rule_result.success else 0,
'run_condition': auto_rule_result.run_condition,
'warnings': json.dumps(auto_rule_result.warnings) if auto_rule_result.warnings else None
}
for sql in sqls:
db_session.execute(text(sql), params)
Expand All @@ -167,16 +173,16 @@ def update_stats(self, db_session, auto_rule_result):
self.check_org_reach_warning(db_session, org_id)

# send invalid warning
if auto_rule_result.get('is_valid') == False:
admins = get_dtable_admins(auto_rule_result.get('dtable_uuid'), db_session)
invalid_type = auto_rule_result.get('invalid_type') or ''
send_notification(auto_rule_result.get('dtable_uuid'), [{
if auto_rule_result.is_valid == False:
admins = get_dtable_admins(auto_rule_result.dtable_uuid, db_session)
invalid_type = auto_rule_result.invalid_type or ''
send_notification(auto_rule_result.dtable_uuid, [{
'to_user': user,
'msg_type': 'auto_rule_invalid',
'detail': {
'author': 'Automation Rule',
'rule_id': auto_rule_result.get('rule_id'),
'rule_name': auto_rule_result.get('rule_name'),
'rule_id': auto_rule_result.rule_id,
'rule_name': auto_rule_result.rule_name,
'invalid_type': invalid_type
}
} for user in admins])
10 changes: 10 additions & 0 deletions dtable_events/ccnet/organization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from sqlalchemy import text

from dtable_events.app.config import CCNET_DB_NAME


def get_org_admins(db_session, org_id):
sql = f"SELECT `email` FROM `{CCNET_DB_NAME}`.`OrgUser` WHERE `org_id`=:org_id AND `is_staff`=1"
rows = db_session.execute(text(sql), {'org_id': org_id})
admins = [row.email for row in rows]
return admins