Skip to content

Create rss collection with feed syncing #368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions packages/rss-db-collection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ RSS/Atom feed collection for TanStack DB - sync data from RSS and Atom feeds wit
## Features

- **📡 RSS & Atom Support**: Dedicated option creators for RSS 2.0 and Atom 1.0 feeds
- **🔄 Automatic Polling**: Configurable polling intervals with intelligent error recovery and manual refresh capability
- **✨ Deduplication**: Built-in deduplication based on feed item IDs/GUIDs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this readme accessible from the website? Or if not or that's awkward (links breaking and whatever) let's move the docs with the rest of the docs and the readme can just link to Tanstack.com

- **🔄 Smart Polling**: Configurable polling intervals with automatic detection based on feed metadata (`sy:updatePeriod`/`sy:updateFrequency`)
- **✨ Content-Aware Deduplication**: Built-in deduplication that detects content changes for existing GUIDs and treats them as updates
- **📅 RFC-Compliant Date Parsing**: Strict RFC 2822/3339 date parsing for reliable timezone handling
- **🔧 Transform Functions**: Custom transform functions to normalize feed data to your schema
- **📝 Full TypeScript Support**: Complete type safety with schema inference
- **🎛️ Mutation Handlers**: Support for `onInsert`, `onUpdate`, and `onDelete` callbacks
Expand Down Expand Up @@ -83,9 +84,51 @@ const atomFeed = createCollection({
})
```

## Smart Features

### Smart Polling Intervals

The RSS collection automatically detects optimal polling intervals based on feed metadata:

- **RSS Syndication**: Uses `<sy:updatePeriod>` and `<sy:updateFrequency>` tags when available
- **Default**: 5 minutes for all feeds when syndication tags are not present

```typescript
// The collection will automatically detect and use appropriate intervals
const feed = createCollection({
...rssCollectionOptions({
feedUrl: "https://blog.example.com/feed.xml",
// No pollingInterval specified - will use 5 minutes default or sy:updatePeriod if available
}),
})
```

### Content-Aware Deduplication

Unlike simple GUID-based deduplication, this collection detects when feed items with the same GUID have changed content and treats them as updates:

- **New Items**: Items with unseen GUIDs are inserted
- **Content Changes**: Items with existing GUIDs but changed content are updated
- **No Changes**: Items with existing GUIDs and unchanged content are ignored

This ensures that corrections, updates, or content changes in feed items are properly reflected in your database.

### RFC-Compliant Date Parsing

The collection uses strict RFC 2822 (RSS) and RFC 3339 (Atom) date parsing to avoid locale-dependent issues:

```typescript
import { parseFeedDate } from "@tanstack/rss-db-collection"

// Handles various date formats reliably
const date1 = parseFeedDate("Mon, 25 Dec 2023 10:30:00 GMT") // RFC 2822
const date2 = parseFeedDate("2023-12-25T10:30:00Z") // RFC 3339
const date3 = parseFeedDate("2023-12-25T10:30:00+01:00") // RFC 3339 with offset
```

## Configuration Options

### RSS Collection Configuration
### RSS Collection Options

```typescript
interface RSSCollectionConfig {
Expand All @@ -94,7 +137,7 @@ interface RSSCollectionConfig {
getKey: (item: T) => string // Extract unique key from item

// Optional
pollingInterval?: number // Polling interval in ms (default: 300000 = 5 minutes)
pollingInterval?: number // Polling interval in ms (default: 5 minutes, or based on sy:updatePeriod/sy:updateFrequency)
startPolling?: boolean // Start polling immediately (default: true)
maxSeenItems?: number // Max items to track for deduplication (default: 1000)

Expand Down
21 changes: 9 additions & 12 deletions packages/rss-db-collection/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,23 @@
*/

// RSS collection functionality
export {
rssCollectionOptions,
type RSSCollectionConfig,
type RSSItem,
} from "./rss"
export { rssCollectionOptions, type RSSCollectionConfig } from "./rss"

// Atom collection functionality
export {
atomCollectionOptions,
type AtomCollectionConfig,
type AtomItem,
} from "./rss"
export { atomCollectionOptions, type AtomCollectionConfig } from "./rss"

// Shared types and utilities
export { type FeedCollectionUtils } from "./rss"

// Feed item types
export {
type RSSItem,
type AtomItem,
type FeedItem,
type FeedType,
type HTTPOptions,
type FeedCollectionUtils,
} from "./rss"
type ParsedFeedData,
} from "./types"

// Error types
export {
Expand Down
119 changes: 55 additions & 64 deletions packages/rss-db-collection/src/rss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import {
InvalidPollingIntervalError,
UnsupportedFeedFormatError,
} from "./errors"
import {
detectSmartPollingInterval,
getContentHash,
parseFeedDate,
} from "./utils"
import type {
CollectionConfig,
DeleteMutationFnParams,
Expand All @@ -17,66 +22,10 @@ import type {
UtilsRecord,
} from "@tanstack/db"
import type { StandardSchemaV1 } from "@standard-schema/spec"
import type { AtomItem, FeedItem, HTTPOptions, RSSItem } from "./types"

const debug = DebugModule.debug(`ts/db:rss`)

/**
* Types for RSS feed items
*/
export interface RSSItem {
title?: string
description?: string
link?: string
guid?: string
pubDate?: string | Date
author?: string
category?: string | Array<string>
enclosure?: {
url: string
type?: string
length?: string
}
[key: string]: any
}

/**
* Types for Atom feed items
*/
export interface AtomItem {
title?: string | { $text?: string; type?: string }
summary?: string | { $text?: string; type?: string }
content?: string | { $text?: string; type?: string }
link?:
| string
| { href?: string; rel?: string; type?: string }
| Array<{ href?: string; rel?: string; type?: string }>
id?: string
updated?: string | Date
published?: string | Date
author?: string | { name?: string; email?: string; uri?: string }
category?:
| string
| { term?: string; label?: string }
| Array<{ term?: string; label?: string }>
[key: string]: any
}

export type FeedItem = RSSItem | AtomItem

/**
* Feed type detection
*/
export type FeedType = `rss` | `atom` | `auto`

/**
* HTTP options for fetching feeds
*/
export interface HTTPOptions {
timeout?: number
headers?: Record<string, string>
userAgent?: string
}

/**
* Base configuration interface for feed collection options
*/
Expand Down Expand Up @@ -305,7 +254,7 @@ function parseFeed(xmlContent: string, parserOptions: any = {}): ParsedFeed {
function defaultRSSTransform(item: RSSItem): RSSItem {
return {
...item,
pubDate: item.pubDate ? new Date(item.pubDate) : undefined,
pubDate: item.pubDate ? parseFeedDate(item.pubDate) : undefined,
}
}

Expand Down Expand Up @@ -340,10 +289,10 @@ function defaultAtomTransform(item: AtomItem): AtomItem {

// Handle dates
if (item.updated) {
normalized.updated = new Date(item.updated)
normalized.updated = parseFeedDate(item.updated)
}
if (item.published) {
normalized.published = new Date(item.published)
normalized.published = parseFeedDate(item.published)
}

// Handle author
Expand Down Expand Up @@ -447,7 +396,7 @@ function createFeedCollectionOptions<
) {
const {
feedUrl,
pollingInterval = 300000, // 5 minutes default
pollingInterval: userPollingInterval,
httpOptions = {},
startPolling = true,
maxSeenItems = 1000,
Expand All @@ -461,6 +410,10 @@ function createFeedCollectionOptions<
...restConfig
} = config

// Smart polling interval detection
let pollingInterval =
userPollingInterval !== undefined ? userPollingInterval : 300000 // Default 5 minutes

// Validation
if (!feedUrl) {
throw new FeedURLRequiredError()
Expand All @@ -470,7 +423,10 @@ function createFeedCollectionOptions<
}

// State management
let seenItems = new Map<string, { id: string; lastSeen: number }>()
let seenItems = new Map<
string,
{ id: string; lastSeen: number; contentHash: string }
>()
let syncParams:
| Parameters<
SyncConfig<ResolveType<TExplicit, TSchema, TFallback>, TKey>[`sync`]
Expand Down Expand Up @@ -544,10 +500,22 @@ function createFeedCollectionOptions<
throw new UnsupportedFeedFormatError(feedUrl)
}

// Detect smart polling interval on first fetch
if (!userPollingInterval) {
const parser = new XMLParser(parserOptions)
const feedData = parser.parse(xmlContent)
const smartInterval = detectSmartPollingInterval(feedData)
if (smartInterval !== pollingInterval) {
pollingInterval = smartInterval
debug(`Updated polling interval to ${pollingInterval}ms`)
}
}

const { begin, write, commit } = params
begin()

let newItemsCount = 0
let updatedItemsCount = 0
const currentTime = Date.now()

for (const rawItem of parsedFeed.items) {
Expand All @@ -572,22 +540,41 @@ function createFeedCollectionOptions<

// Generate unique ID for deduplication
const itemId = getItemId(rawItem, parsedFeed.type)
const contentHash = getContentHash(rawItem)

// Check if we've seen this item before
const seen = seenItems.get(itemId)

if (!seen) {
// New item
seenItems.set(itemId, { id: itemId, lastSeen: currentTime })
seenItems.set(itemId, {
id: itemId,
lastSeen: currentTime,
contentHash,
})

write({
type: `insert`,
value: transformedItem,
})

newItemsCount++
} else if (seen.contentHash !== contentHash) {
// Item exists but content has changed - treat as update
seenItems.set(itemId, {
...seen,
lastSeen: currentTime,
contentHash,
})

write({
type: `update`,
value: transformedItem,
})

updatedItemsCount++
} else {
// Update last seen time
// Item exists and content hasn't changed - just update last seen time
seenItems.set(itemId, { ...seen, lastSeen: currentTime })
}
}
Expand All @@ -597,6 +584,9 @@ function createFeedCollectionOptions<
if (newItemsCount > 0) {
debug(`Added ${newItemsCount} new items from feed`)
}
if (updatedItemsCount > 0) {
debug(`Updated ${updatedItemsCount} existing items from feed`)
}

// Clean up old items periodically
cleanupSeenItems()
Expand Down Expand Up @@ -694,6 +684,7 @@ function createFeedCollectionOptions<
getKey,
sync,
startSync: true,
rowUpdateMode: `full`,
onInsert,
onUpdate,
onDelete,
Expand Down
Loading
Loading