-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathseatgeekRedshift.py
More file actions
86 lines (70 loc) · 3.22 KB
/
seatgeekRedshift.py
File metadata and controls
86 lines (70 loc) · 3.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# script to update seetgeek tables daily
# Establish cluster connection
import pandas as pd
from sqlalchemy import *
import datetime as dt
# create engine
engine = create_engine(
'postgresql+psycopg2://awsuser:Capstone1@redshift-cluster-1.cah6qt0iybst.us-east-2.redshift.amazonaws.com:5439/dev')
# get day
day = dt.datetime.today().day
# =============================================================================================================
# SEATGEEK.VENUES_DF
# make sure working table exists
engine.execute("CREATE TABLE IF NOT EXISTS working.sg_venues_df (LIKE seatgeek.venues_df);")
# fill working table
engine.execute(text("""COPY working.sg_venues_df FROM 's3://nycdsa.ta-am/SeatGeek_venueDF_9_{}_2018.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::148285915521:role/myRedshiftRole'
DELIMITER ',' REGION 'us-east-2'
IGNOREHEADER 1
CSV;""".format(day)).execution_options(autocommit=True))
# fill only if listing_id and listing_deliv_method is unique
engine.execute(text("""INSERT INTO seatgeek.venues_df
SELECT w.*
FROM working.sg_venues_df AS w
WHERE
NOT EXISTS (SELECT 1
FROM seatgeek.venues_df AS s
WHERE s.venue_id = w.venue_id);""").execution_options(autocommit=True))
# empty working table
engine.execute("TRUNCATE working.sg_venues_df;")
# =============================================================================================================
# SEATGEEK.PRICES_DF
# make sure working table exists
engine.execute("CREATE TABLE IF NOT EXISTS working.sg_prices_df (LIKE seatgeek.prices_df);")
# fill working table
engine.execute(text("""COPY working.sg_prices_df FROM 's3://nycdsa.ta-am/SeatGeek_pricesDF_9_{}_2018.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::148285915521:role/myRedshiftRole'
DELIMITER ',' REGION 'us-east-2'
IGNOREHEADER 1
CSV;""".format(day)).execution_options(autocommit=True))
# fill only if listing_id and listing_deliv_method is unique
engine.execute(text("""INSERT INTO seatgeek.prices_df
SELECT w.*
FROM working.sg_prices_df AS w
WHERE
NOT EXISTS (SELECT 1
FROM seatgeek.prices_df AS s
WHERE s.event_id = w.event_id);""").execution_options(autocommit=True))
# empty working table
engine.execute("TRUNCATE working.sg_prices_df;")
# =============================================================================================================
# SEATGEEK.EVENTS_DF
# make sure working table exists
engine.execute("CREATE TABLE IF NOT EXISTS working.sg_events_df (LIKE seatgeek.events_df);")
# fill working table
engine.execute(text("""COPY working.sg_events_df FROM 's3://nycdsa.ta-am/SeatGeek_eventsDF_9_{}_2018.csv'
CREDENTIALS 'aws_iam_role=arn:aws:iam::148285915521:role/myRedshiftRole'
DELIMITER ',' REGION 'us-east-2'
IGNOREHEADER 1
CSV;""".format(day)).execution_options(autocommit=True))
# fill only if listing_id and listing_deliv_method is unique
engine.execute(text("""INSERT INTO seatgeek.events_df
SELECT w.*
FROM working.sg_events_df AS w
WHERE
NOT EXISTS (SELECT 1
FROM seatgeek.events_df AS s
WHERE s.event_id = w.event_id);""").execution_options(autocommit=True))
# empty working table
engine.execute("TRUNCATE working.sg_events_df;")