Skip to content

Commit c577689

Browse files
committed
Add support for Pub/Sub topic references in Encore.ts
1 parent 2c9a7fa commit c577689

File tree

7 files changed

+200
-30
lines changed

7 files changed

+200
-30
lines changed

docs/ts/primitives/pubsub.md

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,37 @@ async function example() {
202202
await cartEvents.publish({shoppingCartID: 2, event: "item_added"});
203203
}
204204
```
205+
206+
## Topic references
207+
208+
Encore uses static analysis to determine which services are accessing each Pub/Sub topic,
209+
and what operations each service is performing.
210+
211+
That information is used for features such as rendering architecture diagrams, and is used by Encore Cloud to provision infrastructure correctly and configure IAM permissions.
212+
213+
This means `Topic` objects can't be passed around however you like,
214+
as it makes static analysis impossible in many cases. To simplify your workflow, given these restrictions,
215+
Encore supports defining a "reference" to a topic that can be passed around any way you want.
216+
217+
### Using topic references
218+
219+
Define a topic reference by calling `topic.ref<DesiredPermissions>()` from within a service, where `DesiredPermissions` is one of the pre-defined permission types defined in the `encore.dev/pubsub` module.
220+
221+
This means you're effectively pre-declaring the permissions you need, and only the methods that
222+
are allowed by those permissions are available on the returned reference object.
223+
224+
For example, to get a reference to a topic that can publish messages:
225+
226+
```typescript
227+
import { Publisher } from "encore.dev/pubsub";
228+
const ref = cartEvents.ref<Publisher>();
229+
230+
// You can now freely pass around `ref`, and you can use
231+
// `ref.publish()` just like you would `cartEvents.publish()`.
232+
```
233+
234+
To ensure Encore still is aware of which permissions each service needs, the call to `topic.ref`
235+
must be made from within a service, so that Encore knows which service to associate the permissions with.
236+
237+
Currently, the only permission type is `Publisher`, which allows publishing events to the topic.
238+
We plan to add more permission types in the future.

runtimes/js/encore.dev/pubsub/mod.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ export type { TopicConfig, DeliveryGuarantee } from "./topic";
44
export { Subscription } from "./subscription";
55
export type { SubscriptionConfig } from "./subscription";
66

7+
export type { TopicPerms, Publisher } from "./refs";
8+
79
/**
810
* Attribute represents a field on a message that should be sent
911
* as an attribute in a PubSub message, rather than in the message
@@ -47,8 +49,8 @@ export type Attribute<T extends string | number | boolean> =
4749
export type AttributesOf<T extends object> = keyof {
4850
[Key in keyof // for (const Key in T)
4951
T as Extract<T[Key], allBrandedTypes> extends never // if (typeof T[Key] !== oneof(allBrandedTypes))
50-
? never // drop the key
51-
: Key]: never; // else keep the key
52+
? never // drop the key
53+
: Key]: never; // else keep the key
5254
};
5355

5456
/**
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
export abstract class TopicPerms {
2+
private topicPerms(): void {}
3+
}
4+
5+
export abstract class Publisher<Msg extends object> extends TopicPerms {
6+
abstract publish(msg: Msg): Promise<string>;
7+
}

runtimes/js/encore.dev/pubsub/topic.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
import { getCurrentRequest } from "../internal/reqtrack/mod";
22
import type { AttributesOf } from "./mod";
33
import * as runtime from "../internal/runtime/mod";
4+
import { Publisher, TopicPerms } from "./refs";
45

56
/**
67
* A topic is a resource to which you can publish messages
78
* to be delivered to subscribers of that topic.
89
*/
9-
export class Topic<Msg extends object> {
10+
export class Topic<Msg extends object>
11+
extends TopicPerms
12+
implements Publisher<Msg>
13+
{
1014
public readonly name: string;
1115
public readonly cfg: TopicConfig<Msg>;
1216
private impl: runtime.PubSubTopic;
1317

1418
constructor(name: string, cfg: TopicConfig<Msg>) {
19+
super();
1520
this.name = name;
1621
this.cfg = cfg;
1722
this.impl = runtime.RT.pubsubTopic(name);
@@ -21,6 +26,10 @@ export class Topic<Msg extends object> {
2126
const source = getCurrentRequest();
2227
return this.impl.publish(msg, source);
2328
}
29+
30+
public ref<P extends TopicPerms>(): P {
31+
return this as unknown as P;
32+
}
2433
}
2534

2635
/**

tsparser/src/legacymeta/mod.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::parser::parser::{ParseContext, ParseResult, Service};
1010
use crate::parser::resourceparser::bind::{Bind, BindKind};
1111
use crate::parser::resources::apis::{authhandler, gateway};
1212
use crate::parser::resources::infra::cron::CronJobSchedule;
13+
use crate::parser::resources::infra::pubsub_topic::TopicOperation;
1314
use crate::parser::resources::infra::{cron, objects, pubsub_subscription, pubsub_topic, sqldb};
1415
use crate::parser::resources::Resource;
1516
use crate::parser::types::validation;
@@ -453,26 +454,28 @@ impl MetaBuilder<'_> {
453454
let mut bucket_perms = HashMap::new();
454455
for u in &self.parse.usages {
455456
match u {
456-
Usage::PublishTopic(publish) => {
457-
let svc =
458-
self.service_for_range(&publish.range)
459-
.ok_or(publish.range.parse_err(
460-
"unable to determine which service this 'publish' call is within",
461-
))?;
462-
463-
// Add the publisher if it hasn't already been seen.
464-
let key = (svc.name.clone(), publish.topic.name.clone());
465-
if seen_publishers.insert(key) {
466-
let service_name = svc.name.clone();
467-
468-
let idx = topic_by_name
469-
.get(&publish.topic.name)
470-
.ok_or(publish.range.parse_err("could not resolve topic"))?
471-
.to_owned();
472-
let topic = &mut self.data.pubsub_topics[idx];
473-
topic
474-
.publishers
475-
.push(v1::pub_sub_topic::Publisher { service_name });
457+
Usage::Topic(access) => {
458+
if access.ops.contains(&TopicOperation::Publish) {
459+
let svc =
460+
self.service_for_range(&access.range)
461+
.ok_or(access.range.parse_err(
462+
"cannot determine which service is accessing this topic",
463+
))?;
464+
465+
// Add the publisher if it hasn't already been seen.
466+
let key = (svc.name.clone(), access.topic.name.clone());
467+
if seen_publishers.insert(key) {
468+
let service_name = svc.name.clone();
469+
470+
let idx = topic_by_name
471+
.get(&access.topic.name)
472+
.ok_or(access.range.parse_err("could not resolve topic"))?
473+
.to_owned();
474+
let topic = &mut self.data.pubsub_topics[idx];
475+
topic
476+
.publishers
477+
.push(v1::pub_sub_topic::Publisher { service_name });
478+
}
476479
}
477480
}
478481
Usage::AccessDatabase(access) => {

tsparser/src/parser/resources/infra/pubsub_topic.rs

Lines changed: 121 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::ops::Deref;
2+
13
use litparser_derive::LitParser;
24
use swc_common::sync::Lrc;
35
use swc_ecma_ast as ast;
@@ -13,8 +15,8 @@ use crate::parser::resources::parseutil::{
1315
ReferenceParser, TrackedNames,
1416
};
1517
use crate::parser::resources::Resource;
16-
use crate::parser::types::Type;
17-
use crate::parser::usageparser::{ResolveUsageData, Usage, UsageExprKind};
18+
use crate::parser::types::{Generic, Type};
19+
use crate::parser::usageparser::{MethodCall, ResolveUsageData, Usage, UsageExprKind};
1820
use crate::parser::Range;
1921
use crate::span_err::ErrReporter;
2022

@@ -128,18 +130,38 @@ impl ReferenceParser for PubSubTopicDefinition {
128130
}
129131

130132
#[derive(Debug)]
131-
pub struct PublishUsage {
133+
pub struct TopicUsage {
132134
pub range: Range,
133135
pub topic: Lrc<Topic>,
136+
pub ops: Vec<TopicOperation>,
134137
}
135138

136139
pub fn resolve_topic_usage(data: &ResolveUsageData, topic: Lrc<Topic>) -> Option<Usage> {
137140
match &data.expr.kind {
138-
UsageExprKind::MethodCall(method) => {
139-
if method.method.as_ref() == "publish" {
140-
Some(Usage::PublishTopic(PublishUsage {
141+
UsageExprKind::MethodCall(call) => {
142+
if call.method.as_ref() == "ref" {
143+
let Some(type_args) = call.call.type_args.as_deref() else {
144+
call.call
145+
.span
146+
.err("expected a type argument in call to Topic.ref");
147+
return None;
148+
};
149+
150+
let Some(type_arg) = type_args.params.first() else {
151+
call.call
152+
.span
153+
.err("expected a type argument in call to Topic.ref");
154+
return None;
155+
};
156+
157+
return parse_topic_ref(data, topic, call, type_arg);
158+
}
159+
160+
if call.method.as_ref() == "publish" {
161+
Some(Usage::Topic(TopicUsage {
141162
range: data.expr.range,
142163
topic,
164+
ops: vec![TopicOperation::Publish],
143165
}))
144166
} else {
145167
None
@@ -155,3 +177,96 @@ pub fn resolve_topic_usage(data: &ResolveUsageData, topic: Lrc<Topic>) -> Option
155177
}
156178
}
157179
}
180+
181+
fn parse_topic_ref(
182+
data: &ResolveUsageData,
183+
topic: Lrc<Topic>,
184+
_call: &MethodCall,
185+
type_arg: &ast::TsType,
186+
) -> Option<Usage> {
187+
fn process_type(
188+
data: &ResolveUsageData,
189+
sp: &swc_common::Span,
190+
t: &Type,
191+
depth: usize,
192+
) -> Option<Vec<TopicOperation>> {
193+
if depth > 10 {
194+
// Prevent infinite recursion.
195+
return None;
196+
}
197+
198+
match t {
199+
Type::Named(named) => {
200+
let ops = match named.obj.name.as_deref() {
201+
Some("Publisher") => vec![TopicOperation::Publish],
202+
_ => {
203+
let underlying = data.type_checker.resolve_obj_type(&named.obj);
204+
return process_type(data, sp, &underlying, depth + 1);
205+
}
206+
};
207+
208+
Some(ops)
209+
}
210+
211+
Type::Class(cls) => {
212+
let ops = cls
213+
.methods
214+
.iter()
215+
.filter_map(|method| {
216+
let op = match method.as_str() {
217+
"publish" => TopicOperation::Publish,
218+
_ => {
219+
// Ignore other methods.
220+
return None;
221+
}
222+
};
223+
224+
Some(op)
225+
})
226+
.collect();
227+
Some(ops)
228+
}
229+
230+
Type::Generic(Generic::Intersection(int)) => {
231+
let mut result = Vec::new();
232+
for t in &[&int.x, &int.y] {
233+
if let Some(ops) = process_type(data, sp, t, depth + 1) {
234+
result.extend(ops);
235+
}
236+
}
237+
238+
if result.is_empty() {
239+
None
240+
} else {
241+
Some(result)
242+
}
243+
}
244+
245+
_ => {
246+
sp.err(&format!("unsupported topic permission type {t:#?}"));
247+
None
248+
}
249+
}
250+
}
251+
252+
let typ = data
253+
.type_checker
254+
.resolve_type(data.module.clone(), type_arg);
255+
256+
if let Some(ops) = process_type(data, &typ.span(), typ.deref(), 0) {
257+
Some(Usage::Topic(TopicUsage {
258+
range: data.expr.range,
259+
topic,
260+
ops,
261+
}))
262+
} else {
263+
typ.err("no topic permissions found in type argument");
264+
None
265+
}
266+
}
267+
268+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
269+
pub enum TopicOperation {
270+
/// Publishing messages to the topic.
271+
Publish,
272+
}

tsparser/src/parser/usageparser/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ impl<'a> UsageResolver<'a> {
237237
#[derive(Debug)]
238238
pub enum Usage {
239239
CallEndpoint(apis::api::CallEndpointUsage),
240-
PublishTopic(infra::pubsub_topic::PublishUsage),
240+
Topic(infra::pubsub_topic::TopicUsage),
241241
AccessDatabase(infra::sqldb::AccessDatabaseUsage),
242242
Bucket(infra::objects::BucketUsage),
243243
}

0 commit comments

Comments
 (0)