-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetl.py
More file actions
142 lines (108 loc) · 4.58 KB
/
etl.py
File metadata and controls
142 lines (108 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import glob
import psycopg2
import pandas as pd
from sql_queries import *
def get_files(filepath):
"""
Please do not instantiate this method. This method is deprecated. Please use the process_data method.
"""
all_files = []
for root, dirs, files in os.walk(filepath):
files = glob.glob(os.path.join(root,'*.json'))
for f in files :
all_files.append(os.path.abspath(f))
return all_files
def process_song_file(cur, filepath):
"""
Process song file will take the following inputs:
cur - cursor point for the database connection
filepath - path of file
and inserts song records into the songs table along with artists records into the artists table
"""
song_files = get_files(filepath)
# open song file
df = pd.DataFrame(pd.read_json(filepath,lines=True))
# insert song record
song_data = df[['song_id','title','artist_id','year','duration']].values[0].tolist()
cur.execute(song_table_insert, song_data)
# insert artist record
artist_data = df[['artist_id','artist_name','artist_location','artist_latitude','artist_longitude']].values[0].tolist()
cur.execute(artist_table_insert, artist_data)
def process_log_file(cur, filepath):
"""
Process log file will take the following inputs:
cur - cursor point for the database connection
filepath - path of file
drops missing values, loads the user table, songplay records,by executing a query from tables that was loaded previously.
"""
# open log file
df = pd.DataFrame(pd.read_json(filepath,lines=True))
# drop the missing data
df = df.dropna()
# filter by NextSong action
df = df[df.page == 'NextSong']
t = df.copy()
# convert timestamp column to datetime
df['ts'] = pd.to_datetime(df['ts'],unit='ms')
t['ts'] = pd.to_datetime(t['ts'],unit='ms')
# insert time data records
time_data = [t.ts, t.ts.dt.hour, t.ts.dt.day, t.ts.dt.week, t.ts.dt.month, t.ts.dt.year,t.ts.dt.weekday]
column_labels = ['start_time','hour','day','week','month','year','weekday']
time_df = pd.DataFrame.from_dict(dict(zip(column_labels,time_data)))
for i, row in time_df.iterrows():
cur.execute(time_table_insert, list(row))
# load user table
user_df = df[['userId','firstName','lastName','gender','level']]
# drop missing values
user_df = user_df.dropna()
# insert user records
for i, row in user_df.iterrows():
cur.execute(user_table_insert, row)
# insert songplay records
for index, row in df.iterrows():
# get songid and artistid from song and artist tables
cur.execute(song_select, (row.song, row.artist, row.length))
results = cur.fetchone()
if results:
songid, artistid = results
else:
songid, artistid = None, None
# insert songplay record
songplay_data = (row.ts, row.userId, row.level, songid, artistid, row.sessionId, row.location, row.userAgent)
cur.execute(songplay_table_insert, songplay_data)
def process_data(cur, conn, filepath, func):
"""
process data method will take the following inputs:
cur - cursor point for the database connection
filepath - path of file
conn - database connection itself
func - passing a function as a paramter of another function
scans through the folder, identifies all JSON documents, gets all the files matching the extension,
executes the function that is passed in the main method
"""
# get all files matching extension from directory
all_files = []
for root, dirs, files in os.walk(filepath):
files = glob.glob(os.path.join(root,'*.json'))
for f in files :
all_files.append(os.path.abspath(f))
# get total number of files found
num_files = len(all_files)
print('{} files found in {}'.format(num_files, filepath))
# iterate over files and process
for i, datafile in enumerate(all_files, 1):
func(cur, datafile)
conn.commit()
print('{}/{} files processed.'.format(i, num_files))
def main():
"""
defining all parameters for database connection and cursor; instantiating the process data function with all necessary parameters being passed
"""
conn = psycopg2.connect("host=127.0.0.1 dbname=sparkifydb user=student password=student")
cur = conn.cursor()
process_data(cur, conn, filepath='data/song_data', func=process_song_file)
process_data(cur, conn, filepath='data/log_data', func=process_log_file)
conn.close()
if __name__ == "__main__":
main()