11use serde_json:: json;
2+ use std:: path:: { Path , PathBuf } ;
23use std:: sync:: Arc ;
34
45use super :: { AsOf , Query , QueryResult , WhereCondition } ;
@@ -132,12 +133,66 @@ impl Engine {
132133 Query :: BackupTable {
133134 table,
134135 destination,
135- compression : _ ,
136+ compression,
136137 } => {
137- // Table-specific backup would be implemented by copying just that table's files
138- // For now, return a placeholder
138+ // Verify table exists
139+ if !self . tables . contains_key ( & table) {
140+ return Ok ( QueryResult :: Error {
141+ message : format ! ( "Table '{}' not found" , table) ,
142+ } ) ;
143+ }
144+
145+ // Create a BackupManager and backup just this table
146+ let metrics = Arc :: new ( Metrics :: new ( ) ) ;
147+ let _backup_manager = BackupManager :: new ( self . base_path ( ) , metrics) ;
148+
149+ // Create backup directory
150+ std:: fs:: create_dir_all ( & destination) . map_err ( |e| {
151+ crate :: errors:: DriftError :: Other ( format ! ( "Failed to create backup directory: {}" , e) )
152+ } ) ?;
153+
154+ // Call the private backup_table_full method via a new public wrapper
155+ // For now, we'll use the same approach as full backup but only for one table
156+ let src_table_dir = self . base_path ( ) . join ( "tables" ) . join ( & table) ;
157+ let dst_table_dir = PathBuf :: from ( & destination) . join ( "tables" ) . join ( & table) ;
158+
159+ std:: fs:: create_dir_all ( & dst_table_dir) . map_err ( |e| {
160+ crate :: errors:: DriftError :: Other ( format ! ( "Failed to create table backup directory: {}" , e) )
161+ } ) ?;
162+
163+ // Copy all table files
164+ let mut files_copied = 0 ;
165+ if src_table_dir. exists ( ) {
166+ for entry in std:: fs:: read_dir ( & src_table_dir) ? {
167+ let entry = entry?;
168+ let src_path = entry. path ( ) ;
169+ let file_name = entry. file_name ( ) ;
170+ let dst_path = dst_table_dir. join ( file_name) ;
171+
172+ if src_path. is_file ( ) {
173+ std:: fs:: copy ( & src_path, & dst_path) ?;
174+ files_copied += 1 ;
175+ } else if src_path. is_dir ( ) {
176+ // Recursively copy directories (like segments/)
177+ Self :: copy_dir_recursive ( & src_path, & dst_path) ?;
178+ files_copied += 1 ;
179+ }
180+ }
181+ }
182+
183+ // Create simple metadata
184+ let metadata = serde_json:: json!( {
185+ "table" : table,
186+ "timestamp" : chrono:: Utc :: now( ) . to_rfc3339( ) ,
187+ "compression" : format!( "{:?}" , compression) ,
188+ "files_copied" : files_copied,
189+ } ) ;
190+
191+ let metadata_path = PathBuf :: from ( & destination) . join ( "metadata.json" ) ;
192+ std:: fs:: write ( & metadata_path, serde_json:: to_string_pretty ( & metadata) ?) ?;
193+
139194 Ok ( QueryResult :: Success {
140- message : format ! ( "Table '{}' backup created at '{}'" , table, destination) ,
195+ message : format ! ( "Table '{}' backed up to '{}' ({} files) " , table, destination, files_copied ) ,
141196 } )
142197 }
143198 Query :: RestoreDatabase {
@@ -158,14 +213,79 @@ impl Engine {
158213 } )
159214 }
160215 Query :: RestoreTable {
161- table : _ ,
162- source : _ ,
163- target : _ ,
164- verify : _ ,
216+ table,
217+ source,
218+ target,
219+ verify,
165220 } => {
166- // For now, return error indicating feature not fully implemented
167- Ok ( QueryResult :: Error {
168- message : "Table-level restore not yet implemented. Use full database restore instead." . to_string ( ) ,
221+ // Verify source backup exists
222+ let source_path = PathBuf :: from ( & source) ;
223+ if !source_path. exists ( ) {
224+ return Ok ( QueryResult :: Error {
225+ message : format ! ( "Backup source '{}' not found" , source) ,
226+ } ) ;
227+ }
228+
229+ // Read metadata if available
230+ let metadata_path = source_path. join ( "metadata.json" ) ;
231+ if metadata_path. exists ( ) {
232+ let metadata_content = std:: fs:: read_to_string ( & metadata_path) ?;
233+ let metadata: serde_json:: Value = serde_json:: from_str ( & metadata_content) ?;
234+
235+ // Verify it's the right table
236+ if let Some ( backup_table) = metadata. get ( "table" ) . and_then ( |t| t. as_str ( ) ) {
237+ if backup_table != table {
238+ return Ok ( QueryResult :: Error {
239+ message : format ! ( "Backup is for table '{}', but trying to restore '{}'" , backup_table, table) ,
240+ } ) ;
241+ }
242+ }
243+ }
244+
245+ // Determine target table name
246+ let target_table = target. as_deref ( ) . unwrap_or ( & table) ;
247+
248+ // Check if target table already exists
249+ if self . tables . contains_key ( target_table) {
250+ return Ok ( QueryResult :: Error {
251+ message : format ! ( "Target table '{}' already exists. Drop it first or use a different target name." , target_table) ,
252+ } ) ;
253+ }
254+
255+ // Restore the table files
256+ let src_table_dir = source_path. join ( "tables" ) . join ( & table) ;
257+ if !src_table_dir. exists ( ) {
258+ return Ok ( QueryResult :: Error {
259+ message : format ! ( "Table '{}' not found in backup" , table) ,
260+ } ) ;
261+ }
262+
263+ let dst_table_dir = self . base_path ( ) . join ( "tables" ) . join ( target_table) ;
264+
265+ // Verify backup integrity if requested
266+ if verify {
267+ // Basic verification: check if required files exist
268+ let schema_file = src_table_dir. join ( "schema.json" ) ;
269+ if !schema_file. exists ( ) {
270+ return Ok ( QueryResult :: Error {
271+ message : format ! ( "Backup verification failed: schema.json not found" ) ,
272+ } ) ;
273+ }
274+ }
275+
276+ // Copy all table files
277+ Self :: copy_dir_recursive ( & src_table_dir, & dst_table_dir) ?;
278+
279+ // Reload the table into the engine
280+ // Note: This requires the Engine to be mutable, which it is in execute_query
281+ // We'll need to use interior mutability or restructure this
282+ // For now, return success and note that engine restart may be needed
283+ Ok ( QueryResult :: Success {
284+ message : format ! (
285+ "Table '{}' restored to '{}'. Restart the engine or reload the table to use it." ,
286+ table,
287+ target_table
288+ ) ,
169289 } )
170290 }
171291 Query :: ShowBackups { directory } => {
@@ -409,4 +529,23 @@ impl Engine {
409529 // Return all column names from the schema
410530 Ok ( schema. columns . iter ( ) . map ( |c| c. name . clone ( ) ) . collect ( ) )
411531 }
532+
533+ /// Recursively copy a directory
534+ fn copy_dir_recursive ( src : & Path , dst : & Path ) -> Result < ( ) > {
535+ std:: fs:: create_dir_all ( dst) ?;
536+
537+ for entry in std:: fs:: read_dir ( src) ? {
538+ let entry = entry?;
539+ let src_path = entry. path ( ) ;
540+ let dst_path = dst. join ( entry. file_name ( ) ) ;
541+
542+ if src_path. is_file ( ) {
543+ std:: fs:: copy ( & src_path, & dst_path) ?;
544+ } else if src_path. is_dir ( ) {
545+ Self :: copy_dir_recursive ( & src_path, & dst_path) ?;
546+ }
547+ }
548+
549+ Ok ( ( ) )
550+ }
412551}
0 commit comments