Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/khaki-cheetahs-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@segment/analytics-signals': minor
---

* Clear signal buffer at start of new session
* Prune signalBuffer to maxBufferSize on new session (if different)
* Add sessionStorage storage type
117 changes: 107 additions & 10 deletions packages/signals/signals/src/core/buffer/__tests__/buffer.test.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,123 @@
import { sleep } from '@segment/analytics-core'
import { range } from '../../../test-helpers/range'
import { createInteractionSignal } from '../../../types/factories'
import { getSignalBuffer, SignalBuffer } from '../index'

describe(getSignalBuffer, () => {
let buffer: SignalBuffer
beforeEach(async () => {
sessionStorage.clear()
buffer = getSignalBuffer({
maxBufferSize: 10,
})
await buffer.clear()
})
describe('indexDB', () => {
it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
})
it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})

it('should delete older signals when maxBufferSize is exceeded', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})

it('should delete older signals on initialize if current number exceeds maxBufferSize', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

// Re-initialize buffer
buffer = getSignalBuffer({
maxBufferSize: 10,
})

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})

it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
it('should clear signal buffer if there is a new session according to session storage', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])

// Simulate a new session by clearing session storage and re-initializing the buffer
sessionStorage.clear()
await sleep(100)
buffer = getSignalBuffer({
maxBufferSize: 10,
})

await expect(buffer.getAll()).resolves.toHaveLength(0)
})
})
it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
describe('sessionStorage', () => {
it('should instantiate without throwing an error', () => {
expect(buffer).toBeTruthy()
})

it('should add and clear', async () => {
const mockSignal = createInteractionSignal({
eventType: 'submit',
target: {},
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})

it('should delete older signals when maxBufferSize is exceeded', async () => {
const signals = range(15).map((_, idx) =>
createInteractionSignal({
idx: idx,
eventType: 'change',
target: {},
})
)

for (const signal of signals) {
await buffer.add(signal)
}

const storedSignals = await buffer.getAll()
expect(storedSignals).toHaveLength(10)
expect(storedSignals).toEqual(signals.slice(-10).reverse())
})
await buffer.add(mockSignal)
await expect(buffer.getAll()).resolves.toEqual([mockSignal])
await buffer.clear()
await expect(buffer.getAll()).resolves.toHaveLength(0)
})
})
176 changes: 133 additions & 43 deletions packages/signals/signals/src/core/buffer/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Signal } from '@segment/analytics-signals-runtime'
import { openDB, DBSchema, IDBPDatabase } from 'idb'
import { openDB, DBSchema, IDBPDatabase, IDBPObjectStore } from 'idb'
import { logger } from '../../lib/logger'
import { WebStorage } from '../../lib/storage/web-storage'

interface SignalDatabase extends DBSchema {
signals: {
Expand All @@ -15,77 +16,147 @@ export interface SignalPersistentStorage {
clear(): void
}

export class SignalStore implements SignalPersistentStorage {
interface IDBPDatabaseSignals extends IDBPDatabase<SignalDatabase> {}
interface IDBPObjectStoreSignals
extends IDBPObjectStore<
SignalDatabase,
['signals'],
'signals',
'readonly' | 'readwrite' | 'versionchange'
> {}

interface StoreSettings {
maxBufferSize: number
}
export class SignalStoreIndexDB implements SignalPersistentStorage {
static readonly DB_NAME = 'Segment Signals Buffer'
static readonly STORE_NAME = 'signals'
private signalStore: Promise<IDBPDatabase<SignalDatabase>>
private signalCount = 0
private db: Promise<IDBPDatabaseSignals>
private maxBufferSize: number

public length() {
return this.signalCount
}

private sessionKeyStorage = new WebStorage(window.sessionStorage)
static deleteDatabase() {
return indexedDB.deleteDatabase(SignalStore.DB_NAME)
return indexedDB.deleteDatabase(SignalStoreIndexDB.DB_NAME)
}

constructor(settings: { maxBufferSize?: number } = {}) {
this.maxBufferSize = settings.maxBufferSize ?? 50
this.signalStore = this.createSignalStore()
void this.initializeSignalCount()
async getStore(
permission: IDBTransactionMode,
database?: IDBPDatabaseSignals
): Promise<IDBPObjectStoreSignals> {
const db = database ?? (await this.db)
const store = db
.transaction(SignalStoreIndexDB.STORE_NAME, permission)
.objectStore(SignalStoreIndexDB.STORE_NAME)
return store
}

private getStore() {
return this.signalStore
constructor(settings: StoreSettings) {
this.maxBufferSize = settings.maxBufferSize
this.db = this.initSignalDB()
}

private async createSignalStore() {
const db = await openDB<SignalDatabase>(SignalStore.DB_NAME, 1, {
private async initSignalDB(): Promise<IDBPDatabaseSignals> {
const db = await openDB<SignalDatabase>(SignalStoreIndexDB.DB_NAME, 1, {
upgrade(db) {
db.createObjectStore(SignalStore.STORE_NAME, { autoIncrement: true })
db.createObjectStore(SignalStoreIndexDB.STORE_NAME, {
autoIncrement: true,
})
},
})
logger.debug('Signals Buffer (indexDB) initialized')
// if the signal buffer is too large, delete the oldest signals (e.g, the settings have changed)
const store = await this.getStore('readwrite', db)
await this.clearStoreIfNeeded(store)
await this.countAndDeleteOldestIfNeeded(store, true)
await store.transaction.done
return db
}

private async initializeSignalCount() {
const store = await this.signalStore
this.signalCount = await store.count(SignalStore.STORE_NAME)
logger.debug(
`Signal count initialized with ${this.signalCount} signals (max: ${this.maxBufferSize})`
)
private async clearStoreIfNeeded(store: IDBPObjectStoreSignals) {
// prevent the signals buffer from persisting across sessions (e.g, user closes tab and reopens)
const sessionKey = 'segment_signals_db_session_key'
if (!sessionStorage.getItem(sessionKey)) {
this.sessionKeyStorage.setItem(sessionKey, true)
await store.clear!()
logger.debug('New Session, so signals buffer cleared')
}
}

async add(signal: Signal): Promise<void> {
const store = await this.signalStore
if (this.signalCount >= this.maxBufferSize) {
// Get the key of the oldest signal and delete it
const oldestKey = await store
.transaction(SignalStore.STORE_NAME)
.store.getKey(IDBKeyRange.lowerBound(0))
if (oldestKey !== undefined) {
await store.delete(SignalStore.STORE_NAME, oldestKey)
} else {
this.signalCount--
const store = await this.getStore('readwrite')
await store.add!(signal)
await this.countAndDeleteOldestIfNeeded(store)
return store.transaction.done
}

private async countAndDeleteOldestIfNeeded(
store: IDBPObjectStoreSignals,
deleteMultiple = false
): Promise<void> {
let count = await store.count()
if (count > this.maxBufferSize) {
const cursor = await store.openCursor()
if (cursor) {
// delete up to maxItems
if (deleteMultiple) {
while (count > this.maxBufferSize) {
await cursor.delete!()
await cursor.continue()
count--
}
logger.debug(
`Signals Buffer: Purged signals to max buffer size of ${this.maxBufferSize}`
)
} else {
// just delete the oldest item
await cursor.delete!()
count--
}
}
}
await store.add(SignalStore.STORE_NAME, signal)
this.signalCount++
}

/**
* Get list of signals from the store, with the newest signals first.
*/
async getAll(): Promise<Signal[]> {
const store = await this.getStore()
return (await store.getAll(SignalStore.STORE_NAME)).reverse()
const store = await this.getStore('readonly')
const signals = await store.getAll()
await store.transaction.done
return signals.reverse()
}

async clear() {
const store = await this.getStore()
return store.clear(SignalStore.STORE_NAME)
async clear(): Promise<void> {
const store = await this.getStore('readwrite')
await store.clear!()
await store.transaction.done
}
}

export class SignalStoreSessionStorage implements SignalPersistentStorage {
private readonly storageKey = 'segment_signals_buffer'
private maxBufferSize: number

constructor(settings: StoreSettings) {
this.maxBufferSize = settings.maxBufferSize
}

add(signal: Signal): void {
const signals = this.getAll()
signals.unshift(signal)
if (signals.length > this.maxBufferSize) {
// delete the last one
signals.splice(-1)
}
sessionStorage.setItem(this.storageKey, JSON.stringify(signals))
}

clear(): void {
sessionStorage.removeItem(this.storageKey)
}

getAll(): Signal[] {
const signals = sessionStorage.getItem(this.storageKey)
return signals ? JSON.parse(signals) : []
}
}

Expand Down Expand Up @@ -125,14 +196,33 @@ export class SignalBuffer<
export interface SignalBufferSettingsConfig<
T extends SignalPersistentStorage = SignalPersistentStorage
> {
/**
* Maximum number of signals to store. Only applies if no custom storage implementation is provided.
*/
maxBufferSize?: number
/**
* Choose between sessionStorage and indexDB. Only applies if no custom storage implementation is provided.
* @default 'indexDB'
*/
storageType?: 'session' | 'indexDB'
/**
* Custom storage implementation
* @default SignalStoreIndexDB
*/
signalStorage?: T
}
export const getSignalBuffer = <
T extends SignalPersistentStorage = SignalPersistentStorage
>(
settings: SignalBufferSettingsConfig<T>
) => {
const store = settings.signalStorage ?? new SignalStore(settings)
const settingsWithDefaults: StoreSettings = {
maxBufferSize: 50,
...settings,
}
const store =
settings.signalStorage ?? settings.storageType === 'session'
? new SignalStoreSessionStorage(settingsWithDefaults)
: new SignalStoreIndexDB(settingsWithDefaults)
return new SignalBuffer(store)
}
2 changes: 2 additions & 0 deletions packages/signals/signals/src/core/signals/settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type SignalsSettingsConfig = Pick<
| 'networkSignalsAllowList'
| 'networkSignalsDisallowList'
| 'networkSignalsAllowSameDomain'
| 'signalStorageType'
> & {
signalStorage?: SignalPersistentStorage
processSignal?: string
Expand Down Expand Up @@ -52,6 +53,7 @@ export class SignalGlobalSettings {

this.signalBuffer = {
signalStorage: settings.signalStorage,
storageType: settings.signalStorageType,
maxBufferSize: settings.maxBufferSize,
}
this.ingestClient = {
Expand Down
Loading
Loading