-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path1. data_utils.py
More file actions
181 lines (138 loc) · 5.52 KB
/
1. data_utils.py
File metadata and controls
181 lines (138 loc) · 5.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from kaggle.api.kaggle_api_extended import KaggleApi as ka
import pandas as pd
from pathlib import Path
import os
import curses
import numpy as np
def kaggle_connect(stdscr):
try:
# "Download" base path
base_folder = Path("./")
# Initialize the API and authenticate
api = ka()
api.authenticate()
# Clear the screen for the curses menu
stdscr.clear()
stdscr.refresh()
# Prompt for the search term
stdscr.addstr("Search for data (press Enter to skip): ")
stdscr.refresh()
curses.echo()
search_term = stdscr.getstr().decode('utf-8').strip()
curses.noecho()
if not search_term:
stdscr.addstr("No search term entered. Exiting...")
stdscr.refresh()
stdscr.getch()
return None
# List datasets related to the search term
datasets = api.dataset_list(search=search_term)
datasets = list(datasets) # Convert to list for indexing
if not datasets:
stdscr.addstr("No datasets found for the search term.")
stdscr.refresh()
stdscr.getch()
return None
# Display datasets and prompt for selection
stdscr.addstr("Datasets found:\n\n")
stdscr.refresh() # Refresca para mostrar el texto en pantalla
for i, dataset in enumerate(datasets):
stdscr.addstr(f"{i + 1}){dataset.ref}\n")
stdscr.refresh() # Refresh after every line
stdscr.addstr("\nEnter the number of the dataset to download: ")
stdscr.refresh()
curses.echo()
try:
option = int(stdscr.getstr().decode('utf-8'))
curses.noecho()
if option < 1 or option > len(datasets):
stdscr.addstr("Invalid selection. Exiting.")
stdscr.refresh()
stdscr.getch()
return None
# Dataset selection
data_ref = datasets[option - 1].ref
except ValueError:
curses.noecho()
stdscr.addstr("Invalid input. Please enter a number.")
stdscr.refresh()
stdscr.getch()
return None
# Destination folder for the download
stdscr.addstr("\nEnter the name of the new folder to store the dataset: ")
stdscr.refresh()
curses.echo()
new_folder = stdscr.getstr().decode('utf-8').strip()
curses.noecho()
if not new_folder:
stdscr.addstr("Folder name cannot be empty. Exiting.")
stdscr.refresh()
stdscr.getch()
return None
download_path = base_folder / new_folder
# Create the folder if it doesn't exist
download_path.mkdir(parents=True, exist_ok=True)
# Download the dataset and unzip it in the specified folder
stdscr.addstr("\nDownloading dataset...")
stdscr.refresh()
api.dataset_download_files(data_ref, path=str(download_path), unzip=True)
# List all CSV files in the download directory
csv_files = list(download_path.glob('*.csv'))
if not csv_files:
stdscr.addstr("No CSV files found in the dataset.")
else:
# Select the first CSV file in the directory
csv_file = csv_files[0]
stdscr.addstr(f"\nLoading dataset from: {csv_file}")
stdscr.refresh()
# Load the dataset into a DataFrame
df = pd.read_csv(csv_file)
stdscr.addstr("\nDataset loaded successfully.")
stdscr.refresh()
stdscr.getch()
return df
except Exception as e:
stdscr.addstr(f"An error occurred: {e}")
stdscr.refresh()
stdscr.getch()
return None
def col_name(folder_path):
# List all files in the folder
files_and_dirs = os.listdir(folder_path)
files = [f for f in files_and_dirs if os.path.isfile(os.path.join(folder_path, f))]
print("Available files:", files)
# Ask the user to select a CSV file to modify
file = input("Enter the name of the CSV file to modify (include .csv extension): ").strip()
if not file in files:
print("File not found. Exiting.")
return None
# Load the CSV file into a DataFrame
df = pd.read_csv(os.path.join(folder_path, file))
# Rename columns interactively
new_columns = []
print("\nRename columns:")
for col in df.columns:
new_name = col.lower().replace(" ", "_").replace(".", "").replace("(", "").replace(")", "")
print(f"{col} -> {new_name}")
if new_name:
new_columns.append(new_name)
else:
new_columns.append(col) # Keep the original name if left blank
df.columns = new_columns
# Save the modified DataFrame to a new file
modified_file_path = os.path.join(folder_path, "modified_data.csv")
df.to_csv(modified_file_path, index=False)
print(f"\nModified dataset saved as: {modified_file_path}")
def clean_data(folder_path):
df = pd.read_csv(os.path.join(folder_path, "modified_data.csv"))
for col in df:
df[col] = df[col].replace("N.A.", np.nan)
df[col] = df[col].fillna(0)
clean_file_path = os.path.join(folder_path,"clean_data.csv")
df.to_csv(clean_file_path, index=False)
print(f"\nClean dataset saved as: {clean_file_path}")
if __name__ == "__main__":
folder_path = "./data/"
curses.wrapper(kaggle_connect)
col_name(folder_path)
clean_data(folder_path)