@@ -4929,14 +4929,17 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {
49294929 httpOverCapnpFactory (httpOverCapnpFactory) {}
49304930
49314931 kj::Promise<void > startEvent (StartEventContext context) override {
4932- // TODO(someday): Use cfBlobJson from the connection if there is one, or from RPC params
4933- // if we add that? (Note that if a connection-level cf blob exists, it should take
4934- // priority; we should only accept a cf blob from the client if we have a cfBlobHeader
4935- // configured, which hints that this service trusts the client to provide the cf blob.)
4936-
4932+ // Extract the optional cf blob from the RPC params and pass it along with the
4933+ // service channel to EventDispatcherImpl. The cf blob will be included in
4934+ // SubrequestMetadata when creating the WorkerInterface for HTTP events.
4935+ kj::Maybe<kj::String> cfBlobJson;
4936+ auto params = context.getParams ();
4937+ if (params.hasCfBlobJson ()) {
4938+ cfBlobJson = kj::str (params.getCfBlobJson ());
4939+ }
49374940 context.initResults (capnp::MessageSize{4 , 1 })
4938- .setDispatcher (
4939- kj::heap<EventDispatcherImpl>(httpOverCapnpFactory, service-> startRequest ({} )));
4941+ .setDispatcher (kj::heap<EventDispatcherImpl>(
4942+ httpOverCapnpFactory, kj::addRef (*service), kj::mv (cfBlobJson )));
49404943 return kj::READY_NOW;
49414944 }
49424945
@@ -4946,14 +4949,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {
49464949
49474950 class EventDispatcherImpl final : public rpc::EventDispatcher::Server {
49484951 public:
4949- EventDispatcherImpl (
4950- capnp::HttpOverCapnpFactory& httpOverCapnpFactory, kj::Own<WorkerInterface> worker)
4952+ EventDispatcherImpl (capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
4953+ kj::Own<IoChannelFactory::SubrequestChannel> service,
4954+ kj::Maybe<kj::String> cfBlobJson)
49514955 : httpOverCapnpFactory(httpOverCapnpFactory),
4952- worker (kj::mv(worker)) {}
4956+ service (kj::mv(service)),
4957+ cfBlobJson(kj::mv(cfBlobJson)) {}
49534958
49544959 kj::Promise<void > getHttpService (GetHttpServiceContext context) override {
4960+ // Create WorkerInterface with cf blob metadata (if provided via startEvent).
4961+ IoChannelFactory::SubrequestMetadata metadata;
4962+ KJ_IF_SOME (cf, cfBlobJson) {
4963+ metadata.cfBlobJson = kj::str (cf);
4964+ }
4965+ auto worker = getService ()->startRequest (kj::mv (metadata));
49554966 context.initResults (capnp::MessageSize{4 , 1 })
4956- .setHttp (httpOverCapnpFactory.kjToCapnp (getWorker ( )));
4967+ .setHttp (httpOverCapnpFactory.kjToCapnp (kj::mv (worker )));
49574968 return kj::READY_NOW;
49584969 }
49594970
@@ -5013,15 +5024,22 @@ class Server::WorkerdBootstrapImpl final: public rpc::WorkerdBootstrap::Server {
50135024
50145025 private:
50155026 capnp::HttpOverCapnpFactory& httpOverCapnpFactory;
5016- kj::Maybe<kj::Own<WorkerInterface>> worker;
5027+ kj::Maybe<kj::Own<IoChannelFactory::SubrequestChannel>> service;
5028+ kj::Maybe<kj::String> cfBlobJson;
50175029
5018- kj::Own<WorkerInterface> getWorker () {
5030+ kj::Own<IoChannelFactory::SubrequestChannel> getService () {
50195031 auto result =
5020- kj::mv (KJ_ASSERT_NONNULL (worker , " EventDispatcher can only be used for one request" ));
5021- worker = kj::none;
5032+ kj::mv (KJ_ASSERT_NONNULL (service , " EventDispatcher can only be used for one request" ));
5033+ service = kj::none;
50225034 return result;
50235035 }
50245036
5037+ kj::Own<WorkerInterface> getWorker () {
5038+ // For non-HTTP events (RPC, traces, etc.), create WorkerInterface with
5039+ // empty metadata since there's no HTTP request to extract cf from.
5040+ return getService ()->startRequest ({});
5041+ }
5042+
50255043 [[noreturn]] void throwUnsupported () {
50265044 JSG_FAIL_REQUIRE (Error, " RPC connections don't yet support this event type." );
50275045 }
0 commit comments