-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
72 lines (61 loc) · 2.44 KB
/
db.py
File metadata and controls
72 lines (61 loc) · 2.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from flask_sqlalchemy import SQLAlchemy
import random
import string
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
db = SQLAlchemy()
# Define the CheckoutSession model
class CheckoutSession(db.Model):
__tablename__ = 'checkout_sessions'
id = db.Column(db.Integer, primary_key=True)
session_id = db.Column(db.String, unique=True, nullable=False)
status = db.Column(db.String, nullable=False)
type = db.Column(db.String, nullable=False)
payment_status = db.Column(db.String, nullable=False)
email = db.Column(db.String, nullable=True)
amount_total = db.Column(db.Integer, nullable=True) # In cents
created_at = db.Column(db.DateTime, default=datetime.utcnow)
access_codes = db.relationship('AccessCode', backref='checkout_session', lazy=True)
def generate_unique_code():
return ''.join(random.choices(string.ascii_uppercase, k=6))
# Define the AccessCode model
class AccessCode(db.Model):
__tablename__ = 'access_codes'
id = db.Column(db.Integer, primary_key=True)
code = db.Column(db.String(6), unique=True, nullable=False, default=generate_unique_code)
is_valid = db.Column(db.Boolean, default=True)
type = db.Column(db.String, nullable=False)
session_id = db.Column(db.Integer, db.ForeignKey('checkout_sessions.id'), nullable=False)
uitpas_number = db.Column(db.String, nullable=True)
scanned_at = db.Column(db.DateTime, nullable=True)
def init_db(app):
"""
Initialize the database with the given Flask app.
"""
logger.info("Initializing Database")
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///tsirk_tickets.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
with app.app_context():
db.create_all()
def get_sold_count(show_id):
"""
Returns the number of sold tickets for a given show_id (s1, s2, s3).
"""
# Mapping show_id to description string used in AccessCode.type
show_map = {
's1': 'SHOW 1',
's2': 'SHOW 2',
's3': 'SHOW 3'
}
search_str = show_map.get(show_id)
if not search_str: return 0
# Count valid (paid) tickets
# We join with CheckoutSession to ensure payment_status is paid
# Note: access_codes has a foreign key to checkout_session
query = db.session.query(AccessCode).join(CheckoutSession).filter(
CheckoutSession.payment_status == 'paid',
AccessCode.type.contains(search_str)
)
return query.count()