-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix: Fix cohort membership Kafka message format #42254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical bug in the realtime cohort calculation workflow that was sending Kafka messages with incorrect field names to the cohort_membership_changed topic. The consumer expected camelCase field names matching its Zod schema validation, but the workflow was sending snake_case names, causing validation errors.
Key Changes:
- Updated Kafka message payload format in the realtime cohort workflow to use camelCase field names (
teamId,cohortId,personId,cohort_membership_changed) instead of snake_case - Removed the unused
last_updatedfield from the payload - Increased kafka_ui HTTP request header size limit to fix UI loading issues
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
posthog/temporal/messaging/realtime_cohort_calculation_workflow.py |
Fixed Kafka message payload format to match consumer schema expectations by converting field names to camelCase and renaming status to cohort_membership_changed |
docker-compose.dev.yml |
Added environment variable to increase kafka_ui HTTP request header size limit |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.
posthog/temporal/messaging/realtime_cohort_calculation_workflow.py
Outdated
Show resolved
Hide resolved
posthog/temporal/messaging/realtime_cohort_calculation_workflow.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 files reviewed, no comments
Problem
The realtime cohort calculation workflow was sending Kafka messages to the
cohort_membership_changedtopic with snake_case field names, causing validation errors in the CDP cohort membership consumer.The consumer expected camelCase field names (
personId,cohortId,teamId,cohort_membership_changed) matching its Zod schema, but the workflow was sending snake_case names (person_id,cohort_id,team_id,status).This resulted in validation errors:
Error: Invalid cohort membership change message:
Changes
Updated consumer schema to use snake_case (
cdp-cohort-membership.consumer.ts):Updated workflow to produce snake_case messages (
realtime_cohort_calculation_workflow.py)How did you test this code?