@@ -110,6 +110,197 @@ def expire_snapshots_older_than(self, timestamp_ms: int) -> None:
110
110
111
111
txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
112
112
113
+ def expire_snapshots_older_than_with_retention (
114
+ self ,
115
+ timestamp_ms : int ,
116
+ retain_last_n : Optional [int ] = None ,
117
+ min_snapshots_to_keep : Optional [int ] = None
118
+ ) -> None :
119
+ """Expire all unprotected snapshots with a timestamp older than a given value, with retention strategies.
120
+
121
+ Args:
122
+ timestamp_ms: Only snapshots with timestamp_ms < this value will be expired.
123
+ retain_last_n: Always keep the last N snapshots regardless of age.
124
+ min_snapshots_to_keep: Minimum number of snapshots to keep in total.
125
+ """
126
+ snapshots_to_expire = self ._get_snapshots_to_expire_with_retention (
127
+ timestamp_ms = timestamp_ms ,
128
+ retain_last_n = retain_last_n ,
129
+ min_snapshots_to_keep = min_snapshots_to_keep
130
+ )
131
+
132
+ if snapshots_to_expire :
133
+ with self .tbl .transaction () as txn :
134
+ from pyiceberg .table .update import RemoveSnapshotsUpdate
135
+
136
+ txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
137
+
138
+ def retain_last_n_snapshots (self , n : int ) -> None :
139
+ """Keep only the last N snapshots, expiring all others.
140
+
141
+ Args:
142
+ n: Number of most recent snapshots to keep.
143
+
144
+ Raises:
145
+ ValueError: If n is less than 1.
146
+ """
147
+ if n < 1 :
148
+ raise ValueError ("Number of snapshots to retain must be at least 1" )
149
+
150
+ protected_ids = self ._get_protected_snapshot_ids (self .tbl .metadata )
151
+
152
+ # Sort snapshots by timestamp (most recent first)
153
+ sorted_snapshots = sorted (
154
+ self .tbl .metadata .snapshots ,
155
+ key = lambda s : s .timestamp_ms ,
156
+ reverse = True
157
+ )
158
+
159
+ # Keep the last N snapshots and all protected ones
160
+ snapshots_to_keep = set ()
161
+ snapshots_to_keep .update (protected_ids )
162
+
163
+ # Add the N most recent snapshots
164
+ for i , snapshot in enumerate (sorted_snapshots ):
165
+ if i < n :
166
+ snapshots_to_keep .add (snapshot .snapshot_id )
167
+
168
+ # Find snapshots to expire
169
+ snapshots_to_expire = []
170
+ for snapshot in self .tbl .metadata .snapshots :
171
+ if snapshot .snapshot_id not in snapshots_to_keep :
172
+ snapshots_to_expire .append (snapshot .snapshot_id )
173
+
174
+ if snapshots_to_expire :
175
+ with self .tbl .transaction () as txn :
176
+ from pyiceberg .table .update import RemoveSnapshotsUpdate
177
+
178
+ txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
179
+
180
+ def _get_snapshots_to_expire_with_retention (
181
+ self ,
182
+ timestamp_ms : Optional [int ] = None ,
183
+ retain_last_n : Optional [int ] = None ,
184
+ min_snapshots_to_keep : Optional [int ] = None
185
+ ) -> List [int ]:
186
+ """Get snapshots to expire considering retention strategies.
187
+
188
+ Args:
189
+ timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
190
+ retain_last_n: Always keep the last N snapshots regardless of age.
191
+ min_snapshots_to_keep: Minimum number of snapshots to keep in total.
192
+
193
+ Returns:
194
+ List of snapshot IDs to expire.
195
+ """
196
+ protected_ids = self ._get_protected_snapshot_ids (self .tbl .metadata )
197
+
198
+ # Sort snapshots by timestamp (most recent first)
199
+ sorted_snapshots = sorted (
200
+ self .tbl .metadata .snapshots ,
201
+ key = lambda s : s .timestamp_ms ,
202
+ reverse = True
203
+ )
204
+
205
+ # Start with all snapshots that could be expired
206
+ candidates_for_expiration = []
207
+ snapshots_to_keep = set (protected_ids )
208
+
209
+ # Apply retain_last_n constraint
210
+ if retain_last_n is not None :
211
+ for i , snapshot in enumerate (sorted_snapshots ):
212
+ if i < retain_last_n :
213
+ snapshots_to_keep .add (snapshot .snapshot_id )
214
+
215
+ # Apply timestamp constraint
216
+ for snapshot in self .tbl .metadata .snapshots :
217
+ if (snapshot .snapshot_id not in snapshots_to_keep and
218
+ (timestamp_ms is None or snapshot .timestamp_ms < timestamp_ms )):
219
+ candidates_for_expiration .append (snapshot )
220
+
221
+ # Sort candidates by timestamp (oldest first) for potential expiration
222
+ candidates_for_expiration .sort (key = lambda s : s .timestamp_ms )
223
+
224
+ # Apply min_snapshots_to_keep constraint
225
+ total_snapshots = len (self .tbl .metadata .snapshots )
226
+ snapshots_to_expire = []
227
+
228
+ for candidate in candidates_for_expiration :
229
+ # Check if expiring this snapshot would violate min_snapshots_to_keep
230
+ remaining_after_expiration = total_snapshots - len (snapshots_to_expire ) - 1
231
+
232
+ if min_snapshots_to_keep is None or remaining_after_expiration >= min_snapshots_to_keep :
233
+ snapshots_to_expire .append (candidate .snapshot_id )
234
+ else :
235
+ # Stop expiring to maintain minimum count
236
+ break
237
+
238
+ return snapshots_to_expire
239
+
240
+ def expire_snapshots_with_retention_policy (
241
+ self ,
242
+ timestamp_ms : Optional [int ] = None ,
243
+ retain_last_n : Optional [int ] = None ,
244
+ min_snapshots_to_keep : Optional [int ] = None
245
+ ) -> List [int ]:
246
+ """Comprehensive snapshot expiration with multiple retention strategies.
247
+
248
+ This method provides a unified interface for snapshot expiration with various
249
+ retention policies to ensure operational resilience while allowing space reclamation.
250
+
251
+ Args:
252
+ timestamp_ms: Only snapshots with timestamp_ms < this value will be considered for expiration.
253
+ If None, all snapshots are candidates (subject to other constraints).
254
+ retain_last_n: Always keep the last N snapshots regardless of age.
255
+ Useful when regular snapshot creation occurs and users want to keep
256
+ the last few for rollback purposes.
257
+ min_snapshots_to_keep: Minimum number of snapshots to keep in total.
258
+ Acts as a guardrail to prevent aggressive expiration logic
259
+ from removing too many snapshots.
260
+
261
+ Returns:
262
+ List of snapshot IDs that were expired.
263
+
264
+ Raises:
265
+ ValueError: If retain_last_n or min_snapshots_to_keep is less than 1.
266
+
267
+ Examples:
268
+ # Keep last 5 snapshots regardless of age
269
+ maintenance.expire_snapshots_with_retention_policy(retain_last_n=5)
270
+
271
+ # Expire snapshots older than timestamp but keep at least 3 total
272
+ maintenance.expire_snapshots_with_retention_policy(
273
+ timestamp_ms=1234567890000,
274
+ min_snapshots_to_keep=3
275
+ )
276
+
277
+ # Combined policy: expire old snapshots but keep last 10 and at least 5 total
278
+ maintenance.expire_snapshots_with_retention_policy(
279
+ timestamp_ms=1234567890000,
280
+ retain_last_n=10,
281
+ min_snapshots_to_keep=5
282
+ )
283
+ """
284
+ if retain_last_n is not None and retain_last_n < 1 :
285
+ raise ValueError ("retain_last_n must be at least 1" )
286
+
287
+ if min_snapshots_to_keep is not None and min_snapshots_to_keep < 1 :
288
+ raise ValueError ("min_snapshots_to_keep must be at least 1" )
289
+
290
+ snapshots_to_expire = self ._get_snapshots_to_expire_with_retention (
291
+ timestamp_ms = timestamp_ms ,
292
+ retain_last_n = retain_last_n ,
293
+ min_snapshots_to_keep = min_snapshots_to_keep
294
+ )
295
+
296
+ if snapshots_to_expire :
297
+ with self .tbl .transaction () as txn :
298
+ from pyiceberg .table .update import RemoveSnapshotsUpdate
299
+
300
+ txn ._apply ((RemoveSnapshotsUpdate (snapshot_ids = snapshots_to_expire ),))
301
+
302
+ return snapshots_to_expire
303
+
113
304
def _get_protected_snapshot_ids (self , table_metadata : TableMetadata ) -> Set [int ]:
114
305
"""Get the IDs of protected snapshots.
115
306
0 commit comments