Skip to content

Commit 4daae5a

Browse files
authored
Extract app from main.py to src/app.py and fix import of src module
1 parent d5c2ae7 commit 4daae5a

File tree

1 file changed

+321
-0
lines changed

1 file changed

+321
-0
lines changed

src/app.py

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
import typer
2+
import os
3+
import threading
4+
import json
5+
from rich.console import Console
6+
from rich.table import Table
7+
from datetime import datetime, timedelta, timezone
8+
from dotenv import load_dotenv
9+
10+
from .api import get_user_id, get_user_profile, get_user_posts, get_post_insights, fetch_all_posts, create_post, get_post_replies, get_post_replies_count
11+
from .utils import convert_to_locale
12+
13+
app = typer.Typer()
14+
console = Console()
15+
load_dotenv()
16+
17+
ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")
18+
HEADERS = {
19+
'Authorization': f'Bearer {ACCESS_TOKEN}'
20+
}
21+
DRAFTS_FILE = 'drafts.json'
22+
SERVER_PROCESS_TIME = 10
23+
24+
@app.command()
25+
def get_profile():
26+
"""
27+
Retrieve and display user profile information, including the last post made by the user.
28+
"""
29+
user_id = get_user_id(HEADERS)
30+
profile = get_user_profile(user_id, HEADERS)
31+
last_post = get_user_posts(user_id, HEADERS, limit=1)[0]
32+
33+
profile_table = Table(title=f'{profile["username"]}\'s Profile')
34+
profile_table.add_column("Field", style="cyan", no_wrap=True)
35+
profile_table.add_column("Value", style="magenta")
36+
37+
profile_table.add_row("ID", profile.get("id", "N/A"))
38+
profile_table.add_row("Username", profile.get("username", "N/A"))
39+
profile_table.add_row("Profile Picture URL", profile.get("threads_profile_picture_url", "N/A"))
40+
profile_table.add_row("Biography", profile.get("threads_biography", "N/A"))
41+
if last_post:
42+
profile_table.add_row("Last Post ID", last_post.get("id", "N/A"))
43+
profile_table.add_row("Post Type", last_post.get("media_type", "N/A"))
44+
profile_table.add_row("Post Text", last_post.get("text", "N/A"))
45+
profile_table.add_row("Post Permalink", last_post.get("permalink", "N/A"))
46+
profile_table.add_row("Post Timestamp", convert_to_locale(last_post.get("timestamp", "N/A")))
47+
else:
48+
profile_table.add_row("Message", "No posts found")
49+
50+
console.print(profile_table)
51+
52+
@app.command()
53+
def get_recent_posts(limit: int = 5):
54+
"""
55+
Retrieve the most recent posts.
56+
"""
57+
user_id = get_user_id(HEADERS)
58+
posts = get_user_posts(user_id, HEADERS, limit=limit)
59+
60+
table = Table(title="Recent Posts")
61+
table.add_column("ID", style="cyan", no_wrap=True)
62+
table.add_column("Username", style="cyan", no_wrap=True)
63+
table.add_column("Timestamp", style="magenta")
64+
table.add_column("Type", style="green")
65+
table.add_column("Text", style="yellow")
66+
table.add_column("Permalink", style="blue")
67+
table.add_column("Replies", style="red")
68+
69+
for post in posts:
70+
if post.get('media_type') == 'REPOST_FACADE':
71+
continue
72+
timestamp = convert_to_locale(post.get('timestamp', 'N/A'))
73+
replies_count = get_post_replies_count(post['id'], HEADERS)
74+
table.add_row(
75+
post.get('id', 'N/A'),
76+
post.get('username', 'N/A'),
77+
timestamp,
78+
post.get('media_type', 'N/A'),
79+
post.get('text', 'N/A'),
80+
post.get('permalink', 'N/A'),
81+
str(replies_count)
82+
)
83+
84+
console.print(table)
85+
86+
@app.command()
87+
def get_top_liked_posts(limit: int = 5, time_range: str = None):
88+
"""
89+
Retrieve the top liked posts of all time or within a specific time range.
90+
"""
91+
user_id = get_user_id(HEADERS)
92+
all_posts = fetch_all_posts(user_id, HEADERS)
93+
94+
if time_range:
95+
now = datetime.now(timezone.utc)
96+
if time_range.endswith('w'):
97+
weeks = int(time_range[:-1])
98+
start_time = now - timedelta(weeks=weeks)
99+
elif time_range.endswith('d'):
100+
days = int(time_range[:-1])
101+
start_time = now - timedelta(days=days)
102+
elif time_range.endswith('h'):
103+
hours = int(time_range[:-1])
104+
start_time = now - timedelta(hours=hours)
105+
elif time_range.endswith('m'):
106+
months = int(time_range[:-1])
107+
start_time = now - timedelta(days=30 * months)
108+
else:
109+
typer.echo("Invalid time range format. Use '2w' for 2 weeks, '7d' for 7 days, '24h' for 24 hours, or '7m' for 7 months.")
110+
return
111+
112+
all_posts = [post for post in all_posts if datetime.strptime(post['timestamp'], '%Y-%m-%dT%H:%M:%S%z') >= start_time]
113+
114+
posts_with_likes = []
115+
for post in all_posts:
116+
if post.get('media_type') == 'REPOST_FACADE':
117+
continue
118+
insights = get_post_insights(post['id'], HEADERS)
119+
if 'likes' in insights:
120+
posts_with_likes.append((post, insights['likes']))
121+
122+
posts_with_likes.sort(key=lambda x: x[1], reverse=True)
123+
top_liked_posts = posts_with_likes[:limit]
124+
125+
table = Table(title="Top Liked Posts")
126+
table.add_column("Username", style="cyan", no_wrap=True)
127+
table.add_column("Timestamp", style="magenta")
128+
table.add_column("Type", style="green")
129+
table.add_column("Text", style="yellow")
130+
table.add_column("Permalink", style="blue")
131+
table.add_column("Likes", style="red")
132+
table.add_column("Replies", style="green")
133+
table.add_column("Reposts", style="blue")
134+
table.add_column("Quotes", style="yellow")
135+
table.add_column("Views", style="cyan")
136+
137+
for post, likes in top_liked_posts:
138+
timestamp = convert_to_locale(post.get('timestamp', 'N/A'))
139+
insights = get_post_insights(post['id'], HEADERS)
140+
table.add_row(
141+
post.get('username', 'N/A'),
142+
timestamp,
143+
post.get('media_type', 'N/A'),
144+
post.get('text', 'N/A'),
145+
post.get('permalink', 'N/A'),
146+
str(insights.get('likes', 'N/A')),
147+
str(insights.get('replies', 'N/A')),
148+
str(insights.get('reposts', 'N/A')),
149+
str(insights.get('quotes', 'N/A')),
150+
str(insights.get('views', 'N/A'))
151+
)
152+
153+
console.print(table)
154+
155+
@app.command()
156+
def create_text_post(text: str):
157+
"""
158+
Create a post with text.
159+
"""
160+
user_id = get_user_id(HEADERS)
161+
payload = {
162+
"media_type": "TEXT",
163+
"text": text
164+
}
165+
post = create_post(user_id, HEADERS, payload)
166+
typer.echo(f"Post created with ID: {post['id']}")
167+
168+
@app.command()
169+
def create_image_post(text: str, image_url: str):
170+
"""
171+
Create a post with an image.
172+
"""
173+
user_id = get_user_id(HEADERS)
174+
payload = {
175+
"media_type": "IMAGE",
176+
"image_url": image_url,
177+
"text": text
178+
}
179+
post = create_post(user_id, HEADERS, payload)
180+
typer.echo(f"Post created with ID: {post['id']}")
181+
182+
@app.command()
183+
def get_latest_replies(media_id: str, limit: int = 5):
184+
"""
185+
Retrieve the latest replies for a specific media post.
186+
"""
187+
replies = get_post_replies(media_id, HEADERS, limit=limit)
188+
189+
table = Table(title="Latest Replies")
190+
table.add_column("Username", style="cyan", no_wrap=True)
191+
table.add_column("Media ID", style="cyan", no_wrap=True)
192+
table.add_column("Timestamp", style="magenta")
193+
table.add_column("Text", style="yellow")
194+
table.add_column("Permalink", style="blue")
195+
196+
for reply in replies:
197+
timestamp = convert_to_locale(reply.get('timestamp', 'N/A'))
198+
table.add_row(
199+
reply.get('username', 'N/A'),
200+
reply.get('id', 'N/A'),
201+
timestamp,
202+
reply.get('text', 'N/A'),
203+
reply.get('permalink', 'N/A')
204+
)
205+
206+
console.print(table)
207+
208+
@app.command()
209+
def send_reply(media_id: str, text: str):
210+
"""
211+
Send a reply to a specific media post.
212+
"""
213+
user_id = get_user_id(HEADERS)
214+
payload = {
215+
"media_type": "TEXT",
216+
"text": text,
217+
"reply_to_id": media_id
218+
}
219+
reply = create_post(user_id, HEADERS, payload)
220+
typer.echo(f"Reply created with ID: {reply['id']}")
221+
222+
def job_create_text_post(text: str):
223+
"""
224+
Job function to create a post with text.
225+
"""
226+
create_text_post(text)
227+
228+
@app.command()
229+
def schedule_post(text: str, post_time: str):
230+
"""
231+
Schedule a post with text at a specific time.
232+
"""
233+
post_time_dt = datetime.strptime(post_time, '%Y-%m-%d %H:%M:%S')
234+
current_time = datetime.now()
235+
delay = (post_time_dt - current_time).total_seconds()
236+
237+
if delay <= 0:
238+
typer.echo("Scheduled time must be in the future.")
239+
return
240+
241+
timer = threading.Timer(delay, job_create_text_post, [text])
242+
timer.start()
243+
typer.echo(f"Post scheduled for {post_time} with text: '{text}'")
244+
245+
@app.command()
246+
def create_draft(text: str, drafts_file: str = DRAFTS_FILE):
247+
'''
248+
Create a draft with the given text and save it to the drafts file.
249+
'''
250+
if os.path.exists(drafts_file):
251+
with open(drafts_file, 'r') as file:
252+
drafts = json.load(file)
253+
else:
254+
drafts = []
255+
256+
next_id = max([draft['id'] for draft in drafts], default=0) + 1
257+
258+
draft = {
259+
"id": next_id,
260+
"text": text,
261+
"timestamp": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
262+
}
263+
drafts.append(draft)
264+
265+
with open(drafts_file, 'w') as file:
266+
json.dump(drafts, file, indent=4)
267+
268+
typer.echo(f"Draft created with ID: {next_id}")
269+
270+
@app.command()
271+
def get_drafts(drafts_file: str = DRAFTS_FILE):
272+
'''
273+
Get all drafts from the drafts file.
274+
'''
275+
if not os.path.exists(drafts_file):
276+
typer.echo("No drafts found.")
277+
return
278+
279+
with open(drafts_file, 'r') as file:
280+
drafts = json.load(file)
281+
282+
table = Table(title="Drafts")
283+
table.add_column("ID", style="cyan", no_wrap=True)
284+
table.add_column("Text", style="yellow")
285+
table.add_column("Timestamp", style="magenta")
286+
287+
for draft in drafts:
288+
table.add_row(
289+
str(draft['id']),
290+
draft['text'],
291+
draft['timestamp']
292+
)
293+
294+
console.print(table)
295+
296+
@app.command()
297+
def send_draft(draft_id: int, drafts_file: str = DRAFTS_FILE):
298+
'''
299+
Send a draft with the given ID and remove it from the drafts file.
300+
'''
301+
if not os.path.exists(drafts_file):
302+
typer.echo("No drafts found.")
303+
raise typer.Exit(1)
304+
305+
with open(drafts_file, 'r') as file:
306+
drafts = json.load(file)
307+
308+
draft = next((draft for draft in drafts if draft['id'] == draft_id), None)
309+
310+
if draft is None:
311+
typer.echo(f"Draft with ID {draft_id} not found.")
312+
raise typer.Exit(1)
313+
314+
create_text_post(draft['text'])
315+
316+
drafts = [draft for draft in drafts if draft['id'] != draft_id]
317+
318+
with open(drafts_file, 'w') as file:
319+
json.dump(drafts, file, indent=4)
320+
321+
typer.echo(f"Draft with ID {draft_id} sent and removed from drafts.")

0 commit comments

Comments
 (0)