-
Notifications
You must be signed in to change notification settings - Fork 7
add prefect to organize into flows and tasks #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Adds prefect and new `flows` and `tasks` modules * Adds boto3 and `src/aws.py` module to enable uploading data artifacts to S3 * Moves `duration_to_minutes` from `notebooks/meetings.ipynb` to `src/meetings.py` * Adds more files to `.gitignore`
| - `data/`: local data artifacts | ||
| - `flows/`: prefect flows | ||
| - `notebooks/`: Jupyter notebooks for analysis and exploration | ||
| - `scripts/`: one off scripts for downloading, conversions, etc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jdungan : be sure to remove this line from the README.md too.
| - `src/`: Source code for the scraper | ||
| - `models/`: Pydantic models for data representation | ||
| - 'scripts`: one off scripts for downloading, conversions, etc | ||
| - `tasks/`: prefect tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the next task to convert all scripts to prefect tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily.
The only thing I'm sure is that we should move as much of the core logic (fetching, parsing, transforming, invoking models, writing outputs, etc.) as possible into either "src" or "functions" modules that do not import or depend on any orchestration library.
That way, we can seep our core logic as de-coupled as possible from prefect, or airflow, or langchain, or whatever other orchestration tool we want to try.
So, I think it would be better to convert or refactor code from scripts into code in "src" or "functions" modules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Really liking the structured way that tasks and flows are organized in Prefect, and its neat dashboard!
NIT - a few small comments.
Will try tackling one of the #TODO's to get a sense of converting a script to a task, and add to the translate_meetings() flow.
| @task | ||
| async def create_meetings_csv(): | ||
| meetings = await get_meetings() | ||
| print(f"Got meetings: {meetings}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of print statements, do we want to consider importing logging for debug messages?
We might then connect logs to cloudwatch for db monitoring and alerts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's a good idea. I was using lazy print statements while developing, and prefect has the handy log_prints=True argument for flows which converts prints into logs that show up on the flow and task runs.
But yeah - we should do some "real" logging in our code without relying on prefect. It looks like we can also configure prefect to capture logging from our own code too, so we aren't depending on prefect for logging, but we ARE able to see our logging in prefect.
And then I assume we can have cloudwatch also watch our logging?
|
|
||
| @task | ||
| async def create_meetings_csv(): | ||
| meetings = await get_meetings() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to add error handling here in a try/exceptblock.
| from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError | ||
|
|
||
| def is_aws_configured(): | ||
| required_vars = ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to return the AWS_REGION?
|
|
||
|
|
||
| def create_bucket_if_not_exists(bucket_name): | ||
| s3 = boto3.client('s3') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe adding reusable get_s3_client() and get_aws_config() methods could be helpful for reusing in other files
| print(f"Client error: {e}") | ||
|
|
||
|
|
||
| def upload_to_s3(file_path, bucket_name, s3_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also want to add a download_from_s3() method?
|
Merging this now so we can build on top of it. We'll make some of the improvements @kaizengrowth suggested as we go. |
8378ff9 to
9687479
Compare
9687479 to
2355e6a
Compare
flowsandtasksmodulessrc/aws.pymodule to enable uploading data artifacts to S3duration_to_minutesfromnotebooks/meetings.ipynbtosrc/meetings.py.gitignore