Skip to content

Commit 9456a25

Browse files
committed
Set threshold for keeping un-processed records
Only keep un-processed records if usage event table size is below threshold. This is a safeguard to ensure row count does not grow unbounded in the event of a zombie consumer.
1 parent c1a8556 commit 9456a25

File tree

16 files changed

+194
-53
lines changed

16 files changed

+194
-53
lines changed

app/jobs/runtime/app_usage_events_cleanup.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@ module VCAP::CloudController
44
module Jobs
55
module Runtime
66
class AppUsageEventsCleanup < VCAP::CloudController::Jobs::CCJob
7-
attr_accessor :cutoff_age_in_days
7+
attr_accessor :cutoff_age_in_days, :threshold_for_keeping_unprocessed_records
88

9-
def initialize(cutoff_age_in_days)
9+
def initialize(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
1010
@cutoff_age_in_days = cutoff_age_in_days
11+
@threshold_for_keeping_unprocessed_records = threshold_for_keeping_unprocessed_records
1112
end
1213

1314
def perform
1415
logger = Steno.logger('cc.background')
1516
logger.info('Cleaning up old AppUsageEvent rows')
1617

1718
repository = Repositories::AppUsageEventRepository.new
18-
repository.delete_events_older_than(cutoff_age_in_days)
19+
repository.delete_events_older_than(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
1920
end
2021

2122
def job_name_in_configuration

app/jobs/v2/services/service_usage_events_cleanup.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@ module VCAP::CloudController
44
module Jobs
55
module Services
66
class ServiceUsageEventsCleanup < VCAP::CloudController::Jobs::CCJob
7-
attr_accessor :cutoff_age_in_days
7+
attr_accessor :cutoff_age_in_days, :threshold_for_keeping_unprocessed_records
88

9-
def initialize(cutoff_age_in_days)
9+
def initialize(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
1010
@cutoff_age_in_days = cutoff_age_in_days
11+
@threshold_for_keeping_unprocessed_records = threshold_for_keeping_unprocessed_records
1112
end
1213

1314
def perform
1415
logger = Steno.logger('cc.background')
1516
logger.info('Cleaning up old ServiceUsageEvent rows')
1617

1718
repository = Repositories::ServiceUsageEventRepository.new
18-
repository.delete_events_older_than(cutoff_age_in_days)
19+
repository.delete_events_older_than(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
1920
end
2021

2122
def job_name_in_configuration

app/repositories/app_usage_event_repository.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ def purge_and_reseed_started_apps!
151151
AppUsageEvent.insert(column_map.keys, usage_query)
152152
end
153153

154-
def delete_events_older_than(cutoff_age_in_days)
154+
def delete_events_older_than(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
155155
Database::OldRecordCleanup.new(AppUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true,
156-
keep_unprocessed_records: true).delete
156+
keep_unprocessed_records: true, threshold_for_keeping_unprocessed_records: threshold_for_keeping_unprocessed_records).delete
157157
end
158158

159159
private

app/repositories/service_usage_event_repository.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,10 @@ def purge_and_reseed_service_instances!
9191
ServiceUsageEvent.insert(column_map.keys, usage_query)
9292
end
9393

94-
def delete_events_older_than(cutoff_age_in_days)
94+
def delete_events_older_than(cutoff_age_in_days, threshold_for_keeping_unprocessed_records)
9595
Database::OldRecordCleanup.new(ServiceUsageEvent, cutoff_age_in_days: cutoff_age_in_days, keep_at_least_one_record: true, keep_running_records: true,
96-
keep_unprocessed_records: true).delete
96+
keep_unprocessed_records: true,
97+
threshold_for_keeping_unprocessed_records: threshold_for_keeping_unprocessed_records).delete
9798
end
9899
end
99100
end

config/bosh-lite.yml

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ newrelic_enabled: false
1010
external_protocol: https
1111
external_domain: api.bosh-lite.com
1212

13-
system_domain_organization:
13+
system_domain_organization:
1414
system_domain: bosh-lite.com
1515
app_domains:
1616
- name: bosh-lite.com
@@ -21,9 +21,11 @@ jobs:
2121

2222
app_usage_events:
2323
cutoff_age_in_days: 31
24+
threshold_for_keeping_unprocessed_records: 5000000
2425

2526
service_usage_events:
2627
cutoff_age_in_days: 31
28+
threshold_for_keeping_unprocessed_records: 5000000
2729

2830
audit_events:
2931
cutoff_age_in_days: 31
@@ -133,8 +135,8 @@ resource_pool:
133135
maximum_size: 536870912
134136
resource_directory_key: bosh-lite.com-cc-resources
135137
cdn:
136-
uri:
137-
key_pair_id:
138+
uri:
139+
key_pair_id:
138140
private_key: ""
139141
fog_connection: {}
140142

@@ -150,8 +152,8 @@ packages:
150152
max_valid_packages_stored: 5
151153
max_package_size: 1073741824
152154
cdn:
153-
uri:
154-
key_pair_id:
155+
uri:
156+
key_pair_id:
155157
private_key: ""
156158
fog_connection: {}
157159

@@ -165,8 +167,8 @@ droplets:
165167
password: blobstore-password
166168
droplet_directory_key: bosh-lite.com-cc-droplets
167169
cdn:
168-
uri:
169-
key_pair_id:
170+
uri:
171+
key_pair_id:
170172
private_key: ""
171173
fog_connection: {}
172174
max_staged_droplets_stored: 5
@@ -181,8 +183,8 @@ buildpacks:
181183
password: blobstore-password
182184
buildpack_directory_key: bosh-lite.com-cc-buildpacks
183185
cdn:
184-
uri:
185-
key_pair_id:
186+
uri:
187+
key_pair_id:
186188
private_key: ""
187189
fog_connection: {}
188190

@@ -211,7 +213,7 @@ default_app_ssh_access: true
211213

212214
skip_cert_verify: true
213215

214-
install_buildpacks:
216+
install_buildpacks:
215217
- name: staticfile_buildpack
216218
package: staticfile-buildpack
217219
- name: java_buildpack

config/cloud_controller.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@ jobs:
3838

3939
app_usage_events:
4040
cutoff_age_in_days: 31
41+
threshold_for_keeping_unprocessed_records: 5000000
4142

4243
service_usage_events:
4344
cutoff_age_in_days: 31
45+
threshold_for_keeping_unprocessed_records: 5000000
4446

4547
audit_events:
4648
cutoff_age_in_days: 31

lib/cloud_controller/clock/scheduler.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
module VCAP::CloudController
66
class Scheduler
77
CLEANUPS = [
8-
{ name: 'app_usage_events', class: Jobs::Runtime::AppUsageEventsCleanup, time: '18:00', arg_from_config: %i[app_usage_events cutoff_age_in_days] },
8+
{ name: 'app_usage_events', class: Jobs::Runtime::AppUsageEventsCleanup, time: '18:00',
9+
arg_from_config: [%i[app_usage_events cutoff_age_in_days], %i[app_usage_events threshold_for_keeping_unprocessed_records]] },
910
{ name: 'audit_events', class: Jobs::Runtime::EventsCleanup, time: '20:00', arg_from_config: %i[audit_events cutoff_age_in_days] },
10-
{ name: 'service_usage_events', class: Jobs::Services::ServiceUsageEventsCleanup, time: '22:00', arg_from_config: %i[service_usage_events cutoff_age_in_days] },
11+
{ name: 'service_usage_events', class: Jobs::Services::ServiceUsageEventsCleanup, time: '22:00',
12+
arg_from_config: [%i[service_usage_events cutoff_age_in_days], %i[service_usage_events threshold_for_keeping_unprocessed_records]] },
1113
{ name: 'completed_tasks', class: Jobs::Runtime::PruneCompletedTasks, time: '23:00', arg_from_config: %i[completed_tasks cutoff_age_in_days] },
1214
{ name: 'expired_blob_cleanup', class: Jobs::Runtime::ExpiredBlobCleanup, time: '00:00' },
1315
{ name: 'expired_resource_cleanup', class: Jobs::Runtime::ExpiredResourceCleanup, time: '00:30' },
@@ -87,7 +89,13 @@ def start_daily_jobs
8789
klass = cleanup_config[:class]
8890

8991
if cleanup_config[:arg_from_config]
90-
klass.new(@config.get(*cleanup_config[:arg_from_config]))
92+
args = cleanup_config[:arg_from_config]
93+
if args.first.is_a?(Array)
94+
args = args.map { |arg| @config.get(*arg) }
95+
klass.new(*args)
96+
else
97+
klass.new(@config.get(*args))
98+
end
9199
else
92100
klass.new
93101
end

lib/cloud_controller/config_schemas/clock_schema.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ class ClockSchema < VCAP::Config
1515
clock: Integer
1616
},
1717
app_usage_events: {
18-
cutoff_age_in_days: Integer
18+
cutoff_age_in_days: Integer,
19+
threshold_for_keeping_unprocessed_records: Integer
1920
},
2021
audit_events: {
2122
cutoff_age_in_days: Integer
@@ -206,7 +207,10 @@ class ClockSchema < VCAP::Config
206207
frequency_in_seconds: Integer
207208
},
208209

209-
service_usage_events: { cutoff_age_in_days: Integer },
210+
service_usage_events: {
211+
cutoff_age_in_days: Integer,
212+
threshold_for_keeping_unprocessed_records: Integer
213+
},
210214
default_app_ssh_access: bool,
211215
allow_app_ssh_access: bool,
212216
jobs: {

lib/database/old_record_cleanup.rb

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
module Database
44
class OldRecordCleanup
55
class NoCurrentTimestampError < StandardError; end
6-
attr_reader :model, :days_ago, :keep_at_least_one_record, :keep_running_records, :keep_unprocessed_records
6+
attr_reader :model, :days_ago, :keep_at_least_one_record, :keep_running_records, :keep_unprocessed_records, :threshold_for_keeping_unprocessed_records
77

8-
def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false, keep_unprocessed_records: false)
8+
def initialize(model, cutoff_age_in_days:, keep_at_least_one_record: false, keep_running_records: false, keep_unprocessed_records: false,
9+
threshold_for_keeping_unprocessed_records: nil)
910
@model = model
1011
@days_ago = cutoff_age_in_days
1112
@keep_at_least_one_record = keep_at_least_one_record
1213
@keep_running_records = keep_running_records
1314
@keep_unprocessed_records = keep_unprocessed_records
15+
@threshold_for_keeping_unprocessed_records = threshold_for_keeping_unprocessed_records
1416
end
1517

1618
def delete
@@ -97,16 +99,22 @@ def exclude_unprocessed_records(old_records)
9799

98100
return old_records unless consumer_model
99101

100-
# Find the usage event record with the lowest ID
101-
# of any that are referenced by a consumer
102-
referenced_event = model.
103-
join(consumer_model.table_name.to_sym, last_processed_guid: :guid).
104-
select(Sequel.function(:min, Sequel.qualify(model.table_name, :id)).as(:min_id)).
105-
first
106-
107-
return old_records if referenced_event[:min_id].nil?
108-
109-
old_records.where { id < referenced_event[:min_id] }
102+
if approximate_row_count(model) < threshold_for_keeping_unprocessed_records.to_i
103+
# Find the usage event record with the lowest ID
104+
# of any that are referenced by a consumer
105+
referenced_event = model.
106+
join(consumer_model.table_name.to_sym, last_processed_guid: :guid).
107+
select(Sequel.function(:min, Sequel.qualify(model.table_name, :id)).as(:min_id)).
108+
first
109+
110+
return old_records if referenced_event[:min_id].nil?
111+
112+
old_records.where { id < referenced_event[:min_id] }
113+
else
114+
# When above threshold, we don't exclude any records
115+
# Associated consumers will be automatically deleted by Sequel
116+
old_records
117+
end
110118
end
111119

112120
def registered_consumer_model(model)
@@ -116,5 +124,27 @@ def registered_consumer_model(model)
116124

117125
false
118126
end
127+
128+
def approximate_row_count(model)
129+
case model.db.database_type
130+
when :postgres
131+
result = model.db[<<-SQL.squish
132+
SELECT reltuples::bigint AS estimate
133+
FROM pg_class
134+
WHERE relname = '#{model.table_name}'
135+
SQL
136+
].first
137+
result[:estimate].to_i
138+
when :mysql, :mysql2
139+
result = model.db[<<-SQL.squish
140+
SELECT table_rows
141+
FROM information_schema.tables
142+
WHERE table_schema = DATABASE()
143+
AND table_name = '#{model.table_name}'
144+
SQL
145+
].first
146+
result[:table_rows].to_i
147+
end
148+
end
119149
end
120150
end

spec/fixtures/config/port_8181_config.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ jobs:
3030

3131
app_usage_events:
3232
cutoff_age_in_days: 31
33+
threshold_for_keeping_unprocessed_records: 5000000
3334

3435
audit_events:
3536
cutoff_age_in_days: 31

0 commit comments

Comments
 (0)