Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
63ded57
Skip forward fetch for channels with no new messages using conversati…
joshalbrecht Mar 19, 2026
1f12844
Eliminate closure ratchet bump by restructuring retry as module-level…
joshalbrecht Mar 19, 2026
0657897
Filter cached channels by membership when members_only=True
joshalbrecht Mar 19, 2026
c39f5d2
Only fetch channel info for channels being exported
joshalbrecht Mar 19, 2026
efeb7de
Fix membership filter test to use channels=None on second run
joshalbrecht Mar 19, 2026
466d5c3
Add progress logging for channel info, channel export, and thread rep…
joshalbrecht Mar 19, 2026
050d15f
Parallelize channel info fetch with message export using ConcurrencyG…
joshalbrecht Mar 19, 2026
b2fa055
Add slack-channels command to list channels sorted by activity
joshalbrecht Mar 19, 2026
082144e
Fix list_channels: deduplicate timestamp logic, fix created/updated u…
joshalbrecht Mar 19, 2026
ad226e3
Deduplicate channel fetch logic and document slack-channels in README
joshalbrecht Mar 19, 2026
02a19a2
Defer reaction checking to end of export, scanning only recent releva…
joshalbrecht Mar 19, 2026
30f9e74
Restore message-level reaction extraction from forward-fetched messages
joshalbrecht Mar 19, 2026
71cda68
Fix timestamp handling in slack-channels to auto-detect seconds vs mi…
joshalbrecht Mar 19, 2026
1ef7a10
Fix slack-channels to use actual latest message timestamps from conve…
joshalbrecht Mar 19, 2026
93a9430
Remove slack-channels script
joshalbrecht Mar 19, 2026
3fe7fd0
Retry on transient network errors (SSL reset, connection failures)
joshalbrecht Mar 19, 2026
fbbdb3b
Split space-separated --channels args and skip conversations.list whe…
joshalbrecht Mar 19, 2026
b640735
Fix test helpers and add cross-run test for deferred reaction pass
joshalbrecht Mar 19, 2026
33b5c2a
Add --recently-active-channels flag to restrict export to most active…
joshalbrecht Mar 19, 2026
3a4e69c
Document --recently-active-channels and fix empty-history edge case
joshalbrecht Mar 19, 2026
cfe78a5
Add relevant_thread_replies event stream for replies in user-relevant…
joshalbrecht Mar 19, 2026
b2f23e0
Rename all event folder names from plural to singular
joshalbrecht Mar 19, 2026
87d40b7
Re-trigger CI (flaky test in mng_mind unrelated to this branch)
joshalbrecht Mar 19, 2026
59691cb
Fix flaky test_main_delivers_events_from_subprocess race condition
joshalbrecht Mar 19, 2026
af17f90
Reduce state file polling timeout from 5s to 2s in flaky test fix
joshalbrecht Mar 19, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions apps/slack_exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ slack-exporter --since 2023-01-01
# Custom output directory
slack-exporter --output-dir my_slack_data

# Export only the 10 most recently active channels (based on historical data)
slack-exporter --recently-active-channels 10

# Include channels you're not a member of (default: only member channels)
slack-exporter --all

# Control how many relevant threads to re-check for reaction changes (default: 10)
slack-exporter --reaction-lookback 20
# Control how many recent relevant threads to check for reactions (default: 50)
slack-exporter --max-recent-threads-for-reactions 20

# Force re-fetch of cached data (channels, users, identity)
slack-exporter --refresh
Expand All @@ -53,10 +56,9 @@ slack-exporter -v
5. Fetches the user list from Slack (via `users.list`) and saves only new users -- cached for `SLACK_EXPORTER_CACHE_TTL_SECONDS`
6. For each configured channel, fetches new messages (via `conversations.history`) starting from the most recent message already exported (or the configured oldest date on first run). If the configured oldest date is earlier than the oldest date already searched from, also backfills older messages down to that date
7. For messages with threads (reply_count > 0), uses the `latest_reply` field to skip threads with no new replies, then fetches replies (via `conversations.replies`) only for threads that have changed
8. Extracts reactions from message and reply payloads (inline `reactions` field) and saves when new or changed
8. Extracts reactions from fetched messages and saves when new or changed
9. Detects threads relevant to the authenticated user (threads where the user replied or was mentioned) and records them as `relevant_threads` events
10. Re-checks the most recent relevant threads for reaction changes (controlled by `--reaction-lookback`, default 10), even when the thread has no new replies
11. Re-checks the most recent 100 messages per channel for reaction changes (single API call per channel)
10. After all channels are exported, checks reactions on the most recent relevant threads (sorted by latest reply, controlled by `--max-recent-threads-for-reactions`, default 50)

Use `--refresh` to bypass the cache and force re-fetching of all data.

Expand All @@ -66,22 +68,24 @@ Data is stored in a directory with created/updated streams per type:

```
slack_export/
channels/created/events.jsonl -- new channels (first seen)
channels/updated/events.jsonl -- all channel state changes (includes creates)
messages/created/events.jsonl -- new messages
messages/updated/events.jsonl -- all message state changes (includes creates)
reactions/created/events.jsonl -- new per-message reaction state (first seen)
reactions/updated/events.jsonl -- all reaction state changes (includes creates)
relevant_threads/created/events.jsonl -- threads user participated in (first seen)
relevant_threads/updated/events.jsonl -- all relevant thread changes (includes creates)
replies/created/events.jsonl -- new thread replies
replies/updated/events.jsonl -- all reply state changes (includes creates)
self_identity/created/events.jsonl -- authenticated user identity (first seen)
self_identity/updated/events.jsonl -- all identity state changes (includes creates)
unread_markers/created/events.jsonl -- new unread markers (first seen)
unread_markers/updated/events.jsonl -- all unread marker changes (includes creates)
users/created/events.jsonl -- new users (first seen)
users/updated/events.jsonl -- all user state changes (includes creates)
channel/created/events.jsonl -- new channels (first seen)
channel/updated/events.jsonl -- all channel state changes (includes creates)
message/created/events.jsonl -- new messages
message/updated/events.jsonl -- all message state changes (includes creates)
reaction/created/events.jsonl -- new per-message reaction state (first seen)
reaction/updated/events.jsonl -- all reaction state changes (includes creates)
relevant_thread/created/events.jsonl -- threads user participated in (first seen)
relevant_thread/updated/events.jsonl -- all relevant thread changes (includes creates)
relevant_thread_reply/created/events.jsonl -- replies in relevant threads (first seen)
relevant_thread_reply/updated/events.jsonl -- all relevant thread reply changes (includes creates)
reply/created/events.jsonl -- new thread replies
reply/updated/events.jsonl -- all reply state changes (includes creates)
self_identity/created/events.jsonl -- authenticated user identity (first seen)
self_identity/updated/events.jsonl -- all identity state changes (includes creates)
unread_marker/created/events.jsonl -- new unread markers (first seen)
unread_marker/updated/events.jsonl -- all unread marker changes (includes creates)
user/created/events.jsonl -- new users (first seen)
user/updated/events.jsonl -- all user state changes (includes creates)
```

The `created` stream contains only first-seen items. The `updated` stream contains all state changes (including creates, since a create is logically an update from nothing). Subscribe to `created` for lower cardinality, or `updated` for the full change history.
Expand Down
2 changes: 1 addition & 1 deletion apps/slack_exporter/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ def default_settings(temp_output_dir: Path) -> ExporterSettings:
channels=(ChannelConfig(name=SlackChannelName("general")),),
default_oldest=datetime(2024, 1, 1, tzinfo=timezone.utc),
output_dir=temp_output_dir,
reaction_lookback=0,
max_recent_threads_for_reactions=0,
cache_ttl_seconds=0,
)
86 changes: 63 additions & 23 deletions apps/slack_exporter/imbue/slack_exporter/channels.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging
from typing import Any

from pydantic import Field

from imbue.imbue_common.event_envelope import EventSource
from imbue.imbue_common.event_envelope import EventType
from imbue.imbue_common.frozen_model import FrozenModel
from imbue.slack_exporter.data_types import ChannelEvent
from imbue.slack_exporter.data_types import SelfIdentityEvent
from imbue.slack_exporter.data_types import SlackApiCaller
Expand All @@ -23,8 +26,11 @@
_SLACK_SOURCE = EventSource("slack")


def fetch_channel_list(api_caller: SlackApiCaller, members_only: bool = True) -> list[ChannelEvent]:
"""Fetch non-archived channels from Slack.
def fetch_raw_channel_list(
api_caller: SlackApiCaller,
members_only: bool = True,
) -> list[dict[str, Any]]:
"""Fetch raw non-archived channel dicts from Slack via conversations.list.

When members_only is True (default), only channels where the authenticated
user is a member are returned.
Expand All @@ -37,6 +43,16 @@ def fetch_channel_list(api_caller: SlackApiCaller, members_only: bool = True) ->
)
if members_only:
raw_channels = [ch for ch in raw_channels if ch.get("is_member", False)]
return raw_channels


def fetch_channel_list(api_caller: SlackApiCaller, members_only: bool = True) -> list[ChannelEvent]:
"""Fetch non-archived channels from Slack as ChannelEvent objects.

When members_only is True (default), only channels where the authenticated
user is a member are returned.
"""
raw_channels = fetch_raw_channel_list(api_caller=api_caller, members_only=members_only)
channels = [_make_channel_event(raw) for raw in raw_channels]
logger.info("Fetched %d channels from Slack", len(channels))
return channels
Expand Down Expand Up @@ -102,36 +118,60 @@ def fetch_self_identity(api_caller: SlackApiCaller) -> SelfIdentityEvent:
)


def fetch_unread_markers(
class ChannelInfoResult(FrozenModel):
"""Result of fetching per-channel info via conversations.info."""

unread_markers: tuple[UnreadMarkerEvent, ...] = Field(description="Unread marker events")
updated_channels: tuple[ChannelEvent, ...] = Field(
description="Channel events updated from conversations.info responses",
)


def fetch_channel_info(
api_caller: SlackApiCaller,
channel_events: list[ChannelEvent],
) -> list[UnreadMarkerEvent]:
"""Fetch unread markers for each channel via conversations.info.
) -> ChannelInfoResult:
"""Fetch per-channel info via conversations.info.

The conversations.list API does not reliably include last_read, so we
fetch it per channel via conversations.info.
Returns unread markers and updated channel events from the conversations.info
responses (which include the full channel object).
"""
markers: list[UnreadMarkerEvent] = []
for event in channel_events:
updated_channels: list[ChannelEvent] = []
total_channels = len(channel_events)
for channel_idx, event in enumerate(channel_events):
if total_channels > 1:
logger.info(" Fetching channel info %d/%d: %s", channel_idx + 1, total_channels, event.channel_name)
data = api_caller("conversations.info", {"channel": str(event.channel_id)})
channel_info = data.get("channel", {})

# Build an updated channel event from the conversations.info response.
# Strip user-specific fields (last_read, latest) so the raw dict is comparable
# to what conversations.list returns for stable diff comparisons.
if channel_info.get("id") and channel_info.get("name"):
channel_raw_for_event = {k: v for k, v in channel_info.items() if k not in ("last_read", "latest")}
updated_channels.append(_make_channel_event(channel_raw_for_event))

last_read = channel_info.get("last_read")
if not last_read:
continue
markers.append(
UnreadMarkerEvent(
timestamp=make_iso_timestamp(),
type=EventType("unread_marker"),
event_id=make_event_id(),
source=_SLACK_SOURCE,
channel_id=event.channel_id,
channel_name=event.channel_name,
last_read_ts=SlackMessageTimestamp(last_read),
raw={"channel_id": str(event.channel_id), "last_read": last_read},
if last_read:
markers.append(
UnreadMarkerEvent(
timestamp=make_iso_timestamp(),
type=EventType("unread_marker"),
event_id=make_event_id(),
source=_SLACK_SOURCE,
channel_id=event.channel_id,
channel_name=event.channel_name,
last_read_ts=SlackMessageTimestamp(last_read),
raw={"channel_id": str(event.channel_id), "last_read": last_read},
)
)
)
logger.info("Fetched %d unread markers from Slack", len(markers))
return markers

logger.info("Fetched info for %d channels (%d unread markers)", len(channel_events), len(markers))
return ChannelInfoResult(
unread_markers=tuple(markers),
updated_channels=tuple(updated_channels),
)


def _make_user_event(user_raw: dict[str, Any]) -> UserEvent:
Expand Down
31 changes: 16 additions & 15 deletions apps/slack_exporter/imbue/slack_exporter/channels_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import pytest

from imbue.slack_exporter.channels import fetch_channel_info
from imbue.slack_exporter.channels import fetch_channel_list
from imbue.slack_exporter.channels import fetch_self_identity
from imbue.slack_exporter.channels import fetch_unread_markers
from imbue.slack_exporter.channels import fetch_user_list
from imbue.slack_exporter.channels import resolve_channel_id
from imbue.slack_exporter.errors import ChannelNotFoundError
Expand Down Expand Up @@ -194,36 +194,37 @@ def test_fetch_unread_markers_from_conversations_info() -> None:
api_caller = make_fake_api_caller(
{
"conversations.info": [
{"ok": True, "channel": {"id": "C123", "last_read": "1700000000.000001"}},
{"ok": True, "channel": {"id": "C456", "last_read": "1700000000.000099"}},
{"ok": True, "channel": {"id": "C123", "name": "general", "last_read": "1700000000.000001"}},
{"ok": True, "channel": {"id": "C456", "name": "random", "last_read": "1700000000.000099"}},
],
}
)

markers = fetch_unread_markers(api_caller, channels)
result = fetch_channel_info(api_caller, channels)

assert len(markers) == 2
assert markers[0].channel_id == SlackChannelId("C123")
assert markers[0].last_read_ts == SlackMessageTimestamp("1700000000.000001")
assert markers[0].source == "slack"
assert markers[1].last_read_ts == SlackMessageTimestamp("1700000000.000099")
assert len(result.unread_markers) == 2
assert result.unread_markers[0].channel_id == SlackChannelId("C123")
assert result.unread_markers[0].last_read_ts == SlackMessageTimestamp("1700000000.000001")
assert result.unread_markers[0].source == "slack"
assert result.unread_markers[1].last_read_ts == SlackMessageTimestamp("1700000000.000099")
assert len(result.updated_channels) == 2


def test_fetch_unread_markers_skips_channels_without_last_read() -> None:
def test_fetch_channel_info_skips_channels_without_last_read() -> None:
channels = [
make_channel_event("C123", "general"),
make_channel_event("C456", "random"),
]
api_caller = make_fake_api_caller(
{
"conversations.info": [
{"ok": True, "channel": {"id": "C123", "last_read": "1700000000.000001"}},
{"ok": True, "channel": {"id": "C456"}},
{"ok": True, "channel": {"id": "C123", "name": "general", "last_read": "1700000000.000001"}},
{"ok": True, "channel": {"id": "C456", "name": "random"}},
],
}
)

markers = fetch_unread_markers(api_caller, channels)
result = fetch_channel_info(api_caller, channels)

assert len(markers) == 1
assert markers[0].channel_id == SlackChannelId("C123")
assert len(result.unread_markers) == 1
assert result.unread_markers[0].channel_id == SlackChannelId("C123")
10 changes: 7 additions & 3 deletions apps/slack_exporter/imbue/slack_exporter/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class ExporterSettings(FrozenModel):
default=None,
description="Channels to export. When None, all channels from the fetched channel list are exported.",
)
recently_active_channels: int | None = Field(
default=None,
description="If set, restrict to the N channels with the most recent messages (from historical data).",
)
default_oldest: datetime = Field(
description="Default earliest date to fetch messages from",
)
Expand All @@ -61,9 +65,9 @@ class ExporterSettings(FrozenModel):
default=True,
description="Only export channels where the authenticated user is a member",
)
reaction_lookback: int = Field(
default=10,
description="Number of recent relevant threads to re-check for reaction changes",
max_recent_threads_for_reactions: int = Field(
default=50,
description="Number of most recent relevant threads to check for reaction changes after export",
)
cache_ttl_seconds: int = Field(
default=600,
Expand Down
Loading
Loading