const schema = `
type Notification {
id: ID!
message: String
}
type Query {
notifications: [Notification]
}
type Mutation {
addNotification(message: String): Notification
}
type Subscription {
notificationAdded: Notification
}
`
let idCount = 1
const notifications = [
{
id: idCount,
message: 'Notification message'
}
]
const resolvers = {
Query: {
notifications: () => notifications
},
Mutation: {
addNotification: async (_, { message }, { pubsub }) => {
const id = idCount++
const notification = {
id,
message
}
notifications.push(notification)
await pubsub.publish({
topic: 'NOTIFICATION_ADDED',
payload: {
notificationAdded: notification
}
})
return notification
}
},
Subscription: {
notificationAdded: {
// You can also subscribe to multiple topics at once using an array like this:
// pubsub.subscribe(['TOPIC1', 'TOPIC2'])
subscribe: async (root, args, { pubsub }) =>
await pubsub.subscribe('NOTIFICATION_ADDED')
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: true
})const schema = `
type Notification {
id: ID!
message: String
}
type Query {
notifications: [Notification]
}
type Mutation {
addNotification(message: String): Notification
}
type Subscription {
notificationAdded(contains: String): Notification
}
`
let idCount = 1
const notifications = [
{
id: idCount,
message: 'Notification message'
}
]
const { withFilter } = mercurius
const resolvers = {
Query: {
notifications: () => notifications
},
Mutation: {
addNotification: async (_, { message }, { pubsub }) => {
const id = idCount++
const notification = {
id,
message
}
notifications.push(notification)
await pubsub.publish({
topic: 'NOTIFICATION_ADDED',
payload: {
notificationAdded: notification
}
})
return notification
}
},
Subscription: {
notificationAdded: {
subscribe: withFilter(
(root, args, { pubsub }) => pubsub.subscribe('NOTIFICATION_ADDED'),
(payload, { contains }) => {
if (!contains) return true
return payload.notificationAdded.message.includes(contains)
}
)
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: true
})The context for the Subscription includes:
app, the Fastify applicationreply, an object that pretend to be aReplyobject from Fastify without its decorators.reply.request, the real request object from Fastify
During the connection initialization phase, all content of proerty payload of the connection_init packet
is automatically copied inside request.headers. In case an headers property is specified within payload,
that's used instead.
...
const resolvers = {
Mutation: {
sendMessage: async (_, { message, userId }, { pubsub }) => {
await pubsub.publish({
topic: userId,
payload: message
})
return "OK"
}
},
Subscription: {
receivedMessage: {
// If someone calls the sendMessage mutation with the Id of the user that was added
// to the subscription context, that user receives the message.
subscribe: (root, args, { pubsub, user }) => pubsub.subscribe(user.id)
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: {
// Add the decoded JWT from the Authorization header to the subscription context.
context: (_, req) => ({ user: jwt.verify(req.headers["Authorization"].slice(7))})
}
})
...const redis = require('mqemitter-redis')
const emitter = redis({
port: 6579,
host: '127.0.0.1'
})
const schema = `
type Vote {
id: ID!
title: String!
ayes: Int
noes: Int
}
type Query {
votes: [Vote]
}
type Mutation {
voteAye(voteId: ID!): Vote
voteNo(voteId: ID!): Vote
}
type Subscription {
voteAdded(voteId: ID!): Vote
}
`
const votes = []
const VOTE_ADDED = 'VOTE_ADDED'
const resolvers = {
Query: {
votes: async () => votes
},
Mutation: {
voteAye: async (_, { voteId }, { pubsub }) => {
if (voteId <= votes.length) {
votes[voteId - 1].ayes++
await pubsub.publish({
topic: `VOTE_ADDED_${voteId}`,
payload: {
voteAdded: votes[voteId - 1]
}
})
return votes[voteId - 1]
}
throw new Error('Invalid vote id')
},
voteNo: async (_, { voteId }, { pubsub }) => {
if (voteId <= votes.length) {
votes[voteId - 1].noes++
await pubsub.publish({
topic: `VOTE_ADDED_${voteId}`,
payload: {
voteAdded: votes[voteId - 1]
}
})
return votes[voteId - 1]
}
throw new Error('Invalid vote id')
}
},
Subscription: {
voteAdded: {
subscribe: async (root, { voteId }, { pubsub }) => {
// subscribe only for a vote with a given id
return await pubsub.subscribe(`VOTE_ADDED_${voteId}`)
}
}
}
}
app.register(mercurius, {
schema,
resolvers,
subscription: {
emitter,
verifyClient: (info, next) => {
if (info.req.headers['x-fastify-header'] !== 'fastify is awesome !') {
return next(false) // the connection is not allowed
}
next(true) // the connection is allowed
}
}
})Note that when passing both
pubsubandemitteroptions,emitterwill be ignored.
class CustomPubSub {
constructor () {
this.emitter = new EventEmitter()
}
async subscribe (topic, queue) {
const listener = (value) => {
queue.push(value)
}
const close = () => {
this.emitter.removeListener(topic, listener)
}
this.emitter.on(topic, listener)
queue.close = close
}
publish (event, callback) {
this.emitter.emit(event.topic, event.payload)
callback()
}
}
const pubsub = new CustomPubSub()
app.register(mercurius, {
schema,
resolvers,
subscription: {
pubsub
}
})Mercurius uses @fastify/websocket internally, but you can still use it by registering before mercurius plugin. If so, it is recommened to set the appropriate options.maxPayload like this:
const fastifyWebsocket = require('@fastify/websocket')
app.register(fastifyWebsocket, {
options: {
maxPayload: 1048576
}
})
app.register(mercurius, {
schema,
resolvers,
subscription: true
})
app.get('/', { websocket: true }, (connection, req) => {
connection.socket.on('message', message => {
connection.socket.send('hi from server')
})
})