Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1390,7 +1390,13 @@ export const queues: NavMenuConstant = {
{
name: 'Getting Started',
url: undefined,
items: [{ name: 'Quickstart', url: '/guides/queues/quickstart' }],
items: [
{ name: 'Quickstart', url: '/guides/queues/quickstart' },
{
name: 'Consuming Messages with Edge Functions',
url: '/guides/queues/consuming-messages-with-edge-functions',
},
],
},
{
name: 'References',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
---
title: Consuming Supabase Queue Messages with Edge Functions
subtitle: 'Learn how to consume Supabase Queue messages server-side with a Supabase Edge Function'
---

This guide helps you read & process queue messages server-side with a Supabase Edge Function. Read [Queues API Reference](/docs/guides/queues/api) for more details on our API.

## Concepts

Supabase Queues is a pull-based Message Queue consisting of three main components: Queues, Messages, and Queue Types. You should already be familiar with the [Queues Quickstart](/docs/guides/queues/quickstart).

### Consuming messages in an Edge Function

This is a Supabase Edge Function that reads 5 messages off the queue, processes each of them, and deletes each message when it is done.

```tsx
import 'jsr:@supabase/functions-js/edge-runtime.d.ts'
import { createClient } from 'npm:@supabase/supabase-js@2'

const supabaseUrl = 'supabaseURL'
const supabaseKey = 'supabaseKey'

const supabase = createClient(supabaseUrl, supabaseKey)
const queueName = 'your_queue_name'

// Type definition for queue messages
interface QueueMessage {
msg_id: bigint
read_ct: number
vt: string
enqueued_at: string
message: any
}

async function processMessage(message: QueueMessage) {
//
// Do whatever logic you need to with the message content
//
// Delete the message from the queue
const { error: deleteError } = await supabase.schema('pgmq_public').rpc('delete', {
queue_name: queueName,
msg_id: message.msg_id,
})

if (deleteError) {
console.error(`Failed to delete message ${message.msg_id}:`, deleteError)
} else {
console.log(`Message ${message.msg_id} deleted from queue`)
}
}

Deno.serve(async (req) => {
const { data: messages, error } = await supabase.schema('pgmq_public').rpc('read', {
queue_name: queueName,
sleep_seconds: 0, // Don't wait if queue is empty
n: 5, // Read 5 messages off the queue
})

if (error) {
console.error(`Error reading from ${queueName} queue:`, error)
return new Response(JSON.stringify({ error: error.message }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
})
}

if (!messages || messages.length === 0) {
console.log('No messages in workflow_messages queue')
return new Response(JSON.stringify({ message: 'No messages in queue' }), {
status: 200,
headers: { 'Content-Type': 'application/json' },
})
}

console.log(`Found ${messages.length} messages to process`)

// Process each message that was read off the queue
for (const message of messages) {
try {
await processMessage(message as QueueMessage)
} catch (error) {
console.error(`Error processing message ${message.msg_id}:`, error)
}
}

// Return immediately while background processing continues
return new Response(
JSON.stringify({
message: `Processing ${messages.length} messages in background`,
count: messages.length,
}),
{
status: 200,
headers: { 'Content-Type': 'application/json' },
}
)
})
```

Every time this Edge Function is run it:

1. Read 5 messages off the queue
2. Call the `processMessage` function
3. At the end of `processMessage`, the message is deleted from the queue
4. If `processMessage` throws an error, the error is logged. In this case, the message is still in the queue, so the next time this Edge Function runs it reads the message again.

You might find this kind of setup handy to run with [Supabase Cron](/docs/guides/cron). You can set up Cron so that every N number of minutes or seconds, the Edge Function will run and process a number of messages off the queue.

Similarly, you can invoke the Edge Function on command at any given time with [`supabase.functions.invoke`](/docs/guides/functions/quickstart-dashboard#usage).
4 changes: 2 additions & 2 deletions apps/docs/spec/supabase_js_v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1197,14 +1197,14 @@ functions:
isSpotlight: false
code: |
```js
const { error } = await supabase.auth.signOut('local')
const { error } = await supabase.auth.signOut({ scope: 'local' })
```
- id: sign-out-other-sessions
name: Sign out (other sessions)
isSpotlight: false
code: |
```js
const { error } = await supabase.auth.signOut('others')
const { error } = await supabase.auth.signOut({ scope: 'others' })
```
- id: verify-otp
title: 'verifyOtp()'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function DiskSizeField({
const mainDiskUsed = Math.round(((diskUtil?.metrics.fs_used_bytes ?? 0) / GB) * 100) / 100

return (
<div className="grid @xl:grid-cols-12 gap-5">
<div id="disk-size" className="grid @xl:grid-cols-12 gap-5">
<div className="col-span-4">
<FormField_Shadcn_
name="totalSize"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export const BillingSettings = () => {

<ScaffoldDivider />

{org?.plan.id !== 'free' && (
{org && org.plan.id !== 'free' && (
<ScaffoldContainer id="breakdown">
<BillingBreakdown />
</ScaffoldContainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ export const ChartBlock = ({
{
projectRef: ref as string,
attribute: attribute as ProjectDailyStatsAttribute,
startDate,
endDate,
interval: interval as AnalyticsInterval,
databaseIdentifier,
startDate: dayjs(startDate).format('YYYY-MM-DD'),
endDate: dayjs(endDate).format('YYYY-MM-DD'),
},
{ enabled: provider === 'daily-stats' }
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { LogChartHandler } from 'components/ui/Charts/LogChartHandler'
import Link from 'next/link'
import { useRef, useState } from 'react'

import { LogChartHandler } from 'components/ui/Charts/LogChartHandler'
import { ReportConfig } from 'data/reports/v2/reports.types'
import { Button, Card, cn } from 'ui'

export function ReportChartUpsell({
Expand Down Expand Up @@ -78,7 +77,6 @@ export function ReportChartUpsell({
label={''}
startDate={startDate}
endDate={endDate}
interval={'1d'}
data={demoData as any}
isLoading={false}
highlightedValue={0}
Expand Down
20 changes: 18 additions & 2 deletions apps/studio/components/interfaces/Storage/StorageMenu.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ import Link from 'next/link'
import { useRouter } from 'next/router'
import { useState } from 'react'

import { useParams } from 'common'
import { useFlag, useParams } from 'common'
import { CreateBucketModal } from 'components/interfaces/Storage/CreateBucketModal'
import ShimmeringLoader from 'components/ui/ShimmeringLoader'
import { useProjectStorageConfigQuery } from 'data/config/project-storage-config-query'
import { useBucketsQuery } from 'data/storage/buckets-query'
import { useSelectedProjectQuery } from 'hooks/misc/useSelectedProject'
import { IS_PLATFORM } from 'lib/constants'
import { useStorageExplorerStateSnapshot } from 'state/storage-explorer'
import { Alert_Shadcn_, AlertDescription_Shadcn_, AlertTitle_Shadcn_, Menu } from 'ui'
import { InfoTooltip } from 'ui-patterns/info-tooltip'
import {
InnerSideBarEmptyPanel,
InnerSideBarFilters,
Expand All @@ -24,6 +26,15 @@ export const StorageMenu = () => {
const { ref, bucketId } = useParams()
const { data: projectDetails } = useSelectedProjectQuery()
const snap = useStorageExplorerStateSnapshot()

const showMigrationCallout = useFlag('storageMigrationCallout')
const { data: config } = useProjectStorageConfigQuery(
{ projectRef: ref },
{ enabled: showMigrationCallout }
)
const isListV2UpgradeAvailable =
!!config && !config.capabilities.list_v2 && config.external.upstreamTarget === 'main'

const isBranch = projectDetails?.parent_project_ref !== undefined

const [searchText, setSearchText] = useState<string>('')
Expand Down Expand Up @@ -159,7 +170,12 @@ export const StorageMenu = () => {
{IS_PLATFORM && (
<Link href={`/project/${ref}/storage/settings`}>
<Menu.Item rounded active={page === 'settings'}>
<p className="truncate">Settings</p>
<div className="flex items-center gap-x-2">
<p className="truncate">Settings</p>
{isListV2UpgradeAvailable && (
<InfoTooltip side="right">Upgrade available</InfoTooltip>
)}
</div>
</Menu.Item>
</Link>
)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import dayjs from 'dayjs'
import { toast } from 'sonner'

import { useParams } from 'common'
import { InlineLink } from 'components/ui/InlineLink'
import { useProjectStorageConfigUpdateUpdateMutation } from 'data/config/project-storage-config-update-mutation'
import { useState } from 'react'
import {
Button,
Dialog,
DialogContent,
DialogDescription,
DialogFooter,
DialogHeader,
DialogSection,
DialogSectionSeparator,
DialogTitle,
DialogTrigger,
} from 'ui'
import { Admonition, TimestampInfo } from 'ui-patterns'

// [Joshen] Will be decided by Storage team, temp setting to 15th December 2025 UTC (3 months buffer)
const MIGRATION_DEADLINE = '2025-12-15T00:00:00'

export const StorageListV2MigrationCallout = () => {
const deadline = dayjs(MIGRATION_DEADLINE).utc(true)
const currentDate = dayjs.utc()
const remainingMonths = Math.ceil(deadline.diff(currentDate, 'months', true))

return (
<Admonition
type={remainingMonths <= 1 ? 'warning' : 'note'}
title="A new version of Storage is available for your project"
>
<p className="!leading-normal prose max-w-full text-sm !mb-0">
Get access to the List-V2 endpoint for improved performance and the ability to enable
Analytics buckets to your storage system
</p>
{remainingMonths <= 1 && (
<p className="!leading-normal prose max-w-full text-sm">
Your project's Storage will be automatically upgraded by{' '}
<TimestampInfo
displayAs="utc"
utcTimestamp={MIGRATION_DEADLINE}
className="text-sm text-foreground"
labelFormat="DD MMM YYYY HH:mm (UTC)"
/>{' '}
if the upgrade is not completed by then.
</p>
)}
<div className="flex items-center gap-x-2 mt-3">
<StorageListV2MigrationDialog />
</div>
</Admonition>
)
}

export const StorageListV2MigratingCallout = () => {
return (
<Admonition type="note" title="Project's storage is currently upgrading">
<p className="!leading-normal prose max-w-full text-sm !mb-0">
This notice will be closed once the upgrade has been completed - hang tight!
</p>
</Admonition>
)
}

const StorageListV2MigrationDialog = () => {
const { ref } = useParams()

const [open, setOpen] = useState(false)

const { mutate: updateStorageConfig, isLoading: isUpdating } =
useProjectStorageConfigUpdateUpdateMutation({
onSuccess: () => {
toast.success(`Project's storage will be upgraded shortly!`)
setOpen(false)
},
})

const onConfirmUpgrade = () => {
if (!ref) return console.error('Project ref is required')
updateStorageConfig({ projectRef: ref, external: { upstreamTarget: 'canary' } })
}

return (
<Dialog open={open} onOpenChange={setOpen}>
<DialogTrigger asChild>
<Button type="primary">Upgrade Storage</Button>
</DialogTrigger>
<DialogContent>
<DialogHeader>
<DialogTitle>Upgrade your project's Storage</DialogTitle>
<DialogDescription>
Get access to Analytics buckets and an improved list method
</DialogDescription>
</DialogHeader>

<DialogSectionSeparator />

<Admonition
type="warning"
className="mb-0 rounded-none border-x-0 border-t-0"
title="Migration required to optimise the database schema for upgrade"
description="We recommend running the update during periods of lower activity, although minimal to no disruption is expected."
/>

<DialogSection className="flex flex-col gap-y-2">
<p className="text-sm">
Depending on the number of objects in your Storage, the migration can take up to 24
hours to finish.
</p>

<p className="text-sm">
The upgrade will increase your disk size to about 15 - 25% and IOPS will be used to
create new efficient indexes as well as denormalising tables.
</p>

<p className="text-sm">
Ensure that your database instance has not{' '}
<InlineLink href={`/project/${ref}/settings/compute-and-disk#disk-size`}>
scaled disk
</InlineLink>{' '}
within the last 6h and you have at least 60%{' '}
<InlineLink href={`/project/${ref}/settings/infrastructure#infrastructure-activity`}>
CPU capacity
</InlineLink>{' '}
before proceeding.
</p>
</DialogSection>

<DialogFooter>
<Button type="default" disabled={isUpdating} onClick={() => setOpen(false)}>
Cancel
</Button>
<Button type="primary" loading={isUpdating} onClick={() => onConfirmUpgrade()}>
Upgrade now
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
)
}
Loading
Loading