|
| 1 | +using Dapper; |
| 2 | +using System; |
| 3 | +using System.Collections.Generic; |
| 4 | +using System.Data; |
| 5 | +using System.Data.Common; |
| 6 | +using System.Linq; |
| 7 | +using System.Threading; |
| 8 | +using System.Threading.Tasks; |
| 9 | + |
| 10 | +namespace Quartz.Plugins.RecentHistory.Impl |
| 11 | +{ |
| 12 | + [Serializable] |
| 13 | + public class SqlServerExecutionHistoryStore : IExecutionHistoryStore |
| 14 | + { |
| 15 | + public string SchedulerName { get; set; } |
| 16 | + |
| 17 | + |
| 18 | + DateTime _nextPurgeTime = DateTime.UtcNow; |
| 19 | + int _updatesFromLastPurge; |
| 20 | + |
| 21 | + int _totalJobsExecuted = 0, _totalJobsFailed = 0; |
| 22 | + private const string _prefix = "tb_quartz_"; |
| 23 | + private readonly IDbConnection _dbConnection; |
| 24 | + public Task<ExecutionHistoryEntry> Get(string fireInstanceId) |
| 25 | + { |
| 26 | + var sql = $"SELECT * FROM {_prefix}ExecutionHistoryStore WHERE fire_instance_id = @FireInstanceId"; |
| 27 | + return _dbConnection.QuerySingleOrDefaultAsync<ExecutionHistoryEntry>(sql, new { FireInstanceId = fireInstanceId }); |
| 28 | + } |
| 29 | + |
| 30 | + public async Task Purge() |
| 31 | + { |
| 32 | + var ids = new HashSet<string>((await FilterLastOfEveryTrigger(10)).Select(x => x.FireInstanceId)); |
| 33 | + var sql = $"DELETE FROM {_prefix}ExecutionHistoryStore WHERE id NOT IN @Ids"; |
| 34 | + await _dbConnection.ExecuteAsync(sql, new { Ids = ids }); |
| 35 | + } |
| 36 | + |
| 37 | + public async Task Save(ExecutionHistoryEntry entry) |
| 38 | + { |
| 39 | + _updatesFromLastPurge++; |
| 40 | + |
| 41 | + if (_updatesFromLastPurge >= 10 || _nextPurgeTime < DateTime.UtcNow) |
| 42 | + { |
| 43 | + _nextPurgeTime = DateTime.UtcNow.AddMinutes(1); |
| 44 | + _updatesFromLastPurge = 0; |
| 45 | + await Purge(); |
| 46 | + } |
| 47 | + var sql = $@" |
| 48 | + INSERT INTO {_prefix}ExecutionHistoryStore ( |
| 49 | + fire_instance_id, |
| 50 | + scheduler_instance_id, |
| 51 | + scheduler_name, |
| 52 | + job, |
| 53 | + trigger, |
| 54 | + scheduled_fire_time_utc, |
| 55 | + actual_fire_time_utc, |
| 56 | + recovering, |
| 57 | + vetoed, |
| 58 | + finished_time_utc, |
| 59 | + exception_message |
| 60 | + ) VALUES ( |
| 61 | + @FireInstanceId, |
| 62 | + @SchedulerInstanceId, |
| 63 | + @SchedulerName, |
| 64 | + @Job, |
| 65 | + @Trigger, |
| 66 | + @ScheduledFireTimeUtc, |
| 67 | + @ActualFireTimeUtc, |
| 68 | + @Recovering, |
| 69 | + @Vetoed, |
| 70 | + @FinishedTimeUtc, |
| 71 | + @ExceptionMessage |
| 72 | + )"; |
| 73 | + await _dbConnection.ExecuteAsync(sql, entry); |
| 74 | + } |
| 75 | + |
| 76 | + public Task<IEnumerable<ExecutionHistoryEntry>> FilterLastOfEveryJob(int limitPerJob) |
| 77 | + { |
| 78 | + var sql = $@" |
| 79 | +SELECT * |
| 80 | +FROM ( |
| 81 | + SELECT |
| 82 | + *, |
| 83 | + ROW_NUMBER() OVER ( |
| 84 | + PARTITION BY job |
| 85 | + ORDER BY actual_fire_time_utc DESC |
| 86 | + ) AS rn |
| 87 | + FROM {_prefix}ExecutionHistoryStore |
| 88 | + WHERE scheduler_name = @SchedulerName |
| 89 | +) t |
| 90 | +WHERE t.rn <= @LimitPerJob |
| 91 | +ORDER BY job, actual_fire_time_utc ASC; |
| 92 | +"; |
| 93 | + |
| 94 | + var result = _dbConnection.QueryAsync<ExecutionHistoryEntry>( |
| 95 | + sql, new { SchedulerName = SchedulerName, LimitPerJob = limitPerJob }); |
| 96 | + return result; |
| 97 | + } |
| 98 | + |
| 99 | + public Task<IEnumerable<ExecutionHistoryEntry>> FilterLastOfEveryTrigger(int limitPerTrigger) |
| 100 | + { |
| 101 | + |
| 102 | + var sql = $@" |
| 103 | +SELECT * |
| 104 | +FROM ( |
| 105 | + SELECT |
| 106 | + *, |
| 107 | + ROW_NUMBER() OVER ( |
| 108 | + PARTITION BY trigger |
| 109 | + ORDER BY actual_fire_time_utc DESC |
| 110 | + ) AS rn |
| 111 | + FROM {_prefix}ExecutionHistoryStore |
| 112 | + WHERE scheduler_name = @SchedulerName |
| 113 | +) t |
| 114 | +WHERE t.rn <= @LimitPerTrigger |
| 115 | +ORDER BY trigger, actual_fire_time_utc ASC; |
| 116 | +"; |
| 117 | + |
| 118 | + var result = _dbConnection.QueryAsync<ExecutionHistoryEntry>( |
| 119 | + sql, new { SchedulerName, LimitPerTrigger = limitPerTrigger }); |
| 120 | + return result; |
| 121 | + } |
| 122 | + |
| 123 | + public Task<IEnumerable<ExecutionHistoryEntry>> FilterLast(int limit) |
| 124 | + { |
| 125 | + |
| 126 | + var sql = $@" |
| 127 | +SELECT * |
| 128 | +FROM ( |
| 129 | + SELECT * |
| 130 | + FROM {_prefix}ExecutionHistoryStore |
| 131 | + WHERE scheduler_name = @SchedulerName |
| 132 | + ORDER BY actual_fire_time_utc DESC |
| 133 | + LIMIT @Limit |
| 134 | +) t |
| 135 | +ORDER BY actual_fire_time_utc ASC; |
| 136 | +"; |
| 137 | + |
| 138 | + var result = _dbConnection.QueryAsync<ExecutionHistoryEntry>( |
| 139 | + sql, new { SchedulerName, Limit = limit }); |
| 140 | + return result; |
| 141 | + } |
| 142 | + |
| 143 | + private const int StatsId = 1; // 只有一行,ID恒为1 |
| 144 | + |
| 145 | + public async Task<int> GetTotalJobsExecuted() |
| 146 | + { |
| 147 | + var sql = $@"SELECT total_jobs_executed AS TotalJobsExecuted, total_jobs_failed AS TotalJobsFailed |
| 148 | + FROM {_prefix}JobStats WHERE id = @Id"; |
| 149 | + var js= await _dbConnection.QuerySingleAsync<JobStats>(sql, new { Id = StatsId }); |
| 150 | + return js.TotalJobsExecuted; |
| 151 | + } |
| 152 | + public async Task<int> GetTotalJobsFailed() |
| 153 | + { |
| 154 | + var sql = $@"SELECT total_jobs_executed AS TotalJobsExecuted, total_jobs_failed AS TotalJobsFailed |
| 155 | + FROM {_prefix}JobStats WHERE id = @Id"; |
| 156 | + var js = await _dbConnection.QuerySingleAsync<JobStats>(sql, new { Id = StatsId }); |
| 157 | + return js.TotalJobsFailed; |
| 158 | + } |
| 159 | + |
| 160 | + public async Task IncrementTotalJobsExecuted() |
| 161 | + { |
| 162 | + var sql = $@"UPDATE {_prefix}JobStats SET total_jobs_executed = total_jobs_executed + 1 WHERE id = @Id"; |
| 163 | + await _dbConnection.ExecuteAsync(sql, new { Id = StatsId }); |
| 164 | + } |
| 165 | + |
| 166 | + public async Task IncrementTotalJobsFailed() |
| 167 | + { |
| 168 | + var sql = $@"UPDATE {_prefix}JobStats SET total_jobs_failed = total_jobs_failed + 1 WHERE id = @Id"; |
| 169 | + await _dbConnection.ExecuteAsync(sql, new { Id = StatsId }); |
| 170 | + } |
| 171 | + } |
| 172 | +} |
0 commit comments