Skip to content

Commit 97db8e9

Browse files
committed
use async-client for telemetry
1 parent 03421d0 commit 97db8e9

File tree

6 files changed

+204
-112
lines changed

6 files changed

+204
-112
lines changed

src/app.zig

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const std = @import("std");
22

3+
const Loop = @import("jsruntime").Loop;
34
const Allocator = std.mem.Allocator;
45
const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
56

@@ -8,8 +9,8 @@ const Telemetry = @import("telemetry/telemetry.zig").Telemetry;
89
pub const App = struct {
910
telemetry: Telemetry,
1011

11-
pub fn init(allocator: Allocator) !App {
12-
const telemetry = Telemetry.init(allocator);
12+
pub fn init(allocator: Allocator, loop: *Loop) !App {
13+
const telemetry = Telemetry.init(allocator, loop);
1314
errdefer telemetry.deinit();
1415

1516
return .{

src/main.zig

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ pub fn main() !void {
5454
_ = gpa.detectLeaks();
5555
};
5656

57-
var app = try @import("app.zig").App.init(alloc);
58-
defer app.deinit();
59-
6057
var args_arena = std.heap.ArenaAllocator.init(alloc);
6158
defer args_arena.deinit();
6259
const args = try parseArgs(args_arena.allocator());
@@ -68,7 +65,6 @@ pub fn main() !void {
6865
return std.process.cleanExit();
6966
},
7067
.serve => |opts| {
71-
app.telemetry.record(.{ .run = .{ .mode = .serve, .version = version } });
7268
const address = std.net.Address.parseIp4(opts.host, opts.port) catch |err| {
7369
log.err("address (host:port) {any}\n", .{err});
7470
return args.printUsageAndExit(false);
@@ -77,24 +73,31 @@ pub fn main() !void {
7773
var loop = try jsruntime.Loop.init(alloc);
7874
defer loop.deinit();
7975

76+
var app = try @import("app.zig").App.init(alloc, &loop);
77+
defer app.deinit();
78+
app.telemetry.record(.{ .run = .{ .mode = .serve, .version = version } });
79+
8080
const timeout = std.time.ns_per_s * @as(u64, opts.timeout);
81-
server.run(alloc, address, timeout, &loop) catch |err| {
81+
server.run(alloc, address, timeout, &loop, &app) catch |err| {
8282
log.err("Server error", .{});
8383
return err;
8484
};
8585
},
8686
.fetch => |opts| {
87-
app.telemetry.record(.{ .run = .{ .mode = .fetch, .version = version } });
8887
log.debug("Fetch mode: url {s}, dump {any}", .{ opts.url, opts.dump });
8988

90-
// vm
91-
const vm = jsruntime.VM.init();
92-
defer vm.deinit();
93-
9489
// loop
9590
var loop = try jsruntime.Loop.init(alloc);
9691
defer loop.deinit();
9792

93+
var app = try @import("app.zig").App.init(alloc, &loop);
94+
defer app.deinit();
95+
app.telemetry.record(.{ .run = .{ .mode = .fetch, .version = version } });
96+
97+
// vm
98+
const vm = jsruntime.VM.init();
99+
defer vm.deinit();
100+
98101
// browser
99102
var browser = Browser.init(alloc, &loop);
100103
defer browser.deinit();

src/server.zig

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ const CloseError = jsruntime.IO.CloseError;
3434
const CancelError = jsruntime.IO.CancelOneError;
3535
const TimeoutError = jsruntime.IO.TimeoutError;
3636

37+
const App = @import("app.zig").App;
3738
const CDP = @import("cdp/cdp.zig").CDP;
3839

3940
const TimeoutCheck = std.time.ns_per_ms * 100;
@@ -50,6 +51,7 @@ const MAX_MESSAGE_SIZE = 256 * 1024 + 14;
5051
pub const Client = ClientT(*Server, CDP);
5152

5253
const Server = struct {
54+
app: *App,
5355
allocator: Allocator,
5456
loop: *jsruntime.Loop,
5557
current_client_id: usize = 0,
@@ -1028,6 +1030,7 @@ pub fn run(
10281030
address: net.Address,
10291031
timeout: u64,
10301032
loop: *jsruntime.Loop,
1033+
app: *App,
10311034
) !void {
10321035
// create socket
10331036
const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK;
@@ -1053,6 +1056,7 @@ pub fn run(
10531056
const json_version_response = try buildJSONVersionResponse(allocator, address);
10541057

10551058
var server = Server{
1059+
.app = app,
10561060
.loop = loop,
10571061
.timeout = timeout,
10581062
.listener = listener,

src/telemetry/lightpanda.zig

Lines changed: 158 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,176 @@ const std = @import("std");
22
const Allocator = std.mem.Allocator;
33
const ArenAallocator = std.heap.ArenaAllocator;
44

5-
const Event = @import("telemetry.zig").Event;
5+
const Loop = @import("jsruntime").Loop;
6+
const Client = @import("asyncio").Client;
7+
68
const log = std.log.scoped(.telemetry);
79

8-
const URL = "https://lightpanda.io/browser-stats";
10+
const URL = "https://stats.lightpanda.io";
911

10-
pub const Lightpanda = struct {
12+
pub const LightPanda = struct {
1113
uri: std.Uri,
12-
arena: ArenAallocator,
13-
client: std.http.Client,
14-
headers: [1]std.http.Header,
14+
io: Client.IO,
15+
client: Client,
16+
allocator: Allocator,
17+
sending_pool: std.heap.MemoryPool(Sending),
18+
client_context_pool: std.heap.MemoryPool(Client.Ctx),
1519

16-
pub fn init(allocator: Allocator) !Lightpanda {
20+
pub fn init(allocator: Allocator, loop: *Loop) !LightPanda {
1721
return .{
22+
.allocator = allocator,
23+
.io = Client.IO.init(loop),
1824
.client = .{ .allocator = allocator },
19-
.arena = std.heap.ArenaAllocator.init(allocator),
2025
.uri = std.Uri.parse(URL) catch unreachable,
21-
.headers = [1]std.http.Header{
22-
.{ .name = "Content-Type", .value = "application/json" },
23-
},
26+
.sending_pool = std.heap.MemoryPool(Sending).init(allocator),
27+
.client_context_pool = std.heap.MemoryPool(Client.Ctx).init(allocator),
2428
};
2529
}
2630

27-
pub fn deinit(self: *Lightpanda) void {
28-
self.arena.deinit();
31+
pub fn deinit(self: *LightPanda) void {
2932
self.client.deinit();
33+
self.sending_pool.deinit();
34+
self.client_context_pool.deinit();
3035
}
3136

32-
pub fn send(self: *Lightpanda, iid: ?[]const u8, eid: []const u8, events: []Event) !void {
33-
_ = self;
34-
_ = iid;
35-
_ = eid;
36-
_ = events;
37-
// defer _ = self.arena.reset(.{ .retain_capacity = {} });
38-
// const body = try std.json.stringifyAlloc(self.arena.allocator(), PlausibleEvent{ .event = event }, .{});
39-
40-
// var server_headers: [2048]u8 = undefined;
41-
// var req = try self.client.open(.POST, self.uri, .{
42-
// .redirect_behavior = .not_allowed,
43-
// .extra_headers = &self.headers,
44-
// .server_header_buffer = &server_headers,
45-
// });
46-
// req.transfer_encoding = .{ .content_length = body.len };
47-
// try req.send();
48-
49-
// try req.writeAll(body);
50-
// try req.finish();
51-
// try req.wait();
52-
53-
// const status = req.response.status;
54-
// if (status != .accepted) {
55-
// log.warn("telemetry '{s}' event error: {d}", .{ @tagName(event), @intFromEnum(status) });
56-
// } else {
57-
// log.warn("telemetry '{s}' sent", .{@tagName(event)});
58-
// }
37+
pub fn send(self: *LightPanda, iid: ?[]const u8, eid: []const u8, event: anytype) !void {
38+
var arena = std.heap.ArenaAllocator.init(self.allocator);
39+
errdefer arena.deinit();
40+
41+
const resp_header_buffer = try arena.allocator().alloc(u8, 4096);
42+
const body = try std.json.stringifyAlloc(arena.allocator(), .{
43+
.iid = iid,
44+
.eid = eid,
45+
.event = event,
46+
}, .{});
47+
48+
const sending = try self.sending_pool.create();
49+
errdefer self.sending_pool.destroy(sending);
50+
51+
sending.* = .{
52+
.body = body,
53+
.arena = arena,
54+
.lightpanda = self,
55+
.request = try self.client.create(.POST, self.uri, .{
56+
.server_header_buffer = resp_header_buffer,
57+
}),
58+
};
59+
errdefer sending.request.deinit();
60+
61+
const ctx = try self.client_context_pool.create();
62+
errdefer self.client_context_pool.destroy(ctx);
63+
64+
ctx.* = try Client.Ctx.init(&self.io, &sending.request);
65+
ctx.userData = sending;
66+
67+
try self.client.async_open(
68+
.POST,
69+
self.uri,
70+
.{ .server_header_buffer = resp_header_buffer },
71+
ctx,
72+
onRequestConnect,
73+
);
74+
}
75+
76+
fn handleError(self: *LightPanda, ctx: *Client.Ctx, err: anyerror) anyerror!void {
77+
ctx.deinit();
78+
self.client_context_pool.destroy(ctx);
79+
80+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
81+
sending.deinit();
82+
self.sending_pool.destroy(sending);
83+
log.info("request failure: {}", .{err});
84+
}
85+
86+
fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void {
87+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
88+
res catch |err| return sending.lightpanda.handleError(ctx, err);
89+
90+
ctx.req.transfer_encoding = .{ .content_length = sending.body.len };
91+
return ctx.req.async_send(ctx, onRequestSend) catch |err| {
92+
return sending.lightpanda.handleError(ctx, err);
93+
};
94+
}
95+
96+
fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) anyerror!void {
97+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
98+
res catch |err| return sending.lightpanda.handleError(ctx, err);
99+
100+
return ctx.req.async_writeAll(sending.body, ctx, onRequestWrite) catch |err| {
101+
return sending.lightpanda.handleError(ctx, err);
102+
};
103+
}
104+
105+
fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) anyerror!void {
106+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
107+
res catch |err| return sending.lightpanda.handleError(ctx, err);
108+
return ctx.req.async_finish(ctx, onRequestFinish) catch |err| {
109+
return sending.lightpanda.handleError(ctx, err);
110+
};
111+
}
112+
113+
fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) anyerror!void {
114+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
115+
res catch |err| return sending.lightpanda.handleError(ctx, err);
116+
return ctx.req.async_wait(ctx, onRequestWait) catch |err| {
117+
return sending.lightpanda.handleError(ctx, err);
118+
};
119+
}
120+
121+
fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) anyerror!void {
122+
var sending: *Sending = @ptrCast(@alignCast(ctx.userData));
123+
res catch |err| return sending.lightpanda.handleError(ctx, err);
124+
125+
const lightpanda = sending.lightpanda;
126+
127+
defer {
128+
ctx.deinit();
129+
lightpanda.client_context_pool.destroy(ctx);
130+
131+
sending.deinit();
132+
lightpanda.sending_pool.destroy(sending);
133+
}
134+
135+
var buffer: [2048]u8 = undefined;
136+
const reader = ctx.req.reader();
137+
while (true) {
138+
const n = reader.read(&buffer) catch 0;
139+
if (n == 0) {
140+
break;
141+
}
142+
}
143+
if (ctx.req.response.status != .ok) {
144+
log.info("invalid response: {d}", .{@intFromEnum(ctx.req.response.status)});
145+
}
146+
}
147+
};
148+
149+
const Sending = struct {
150+
body: []const u8,
151+
request: Client.Request,
152+
lightpanda: *LightPanda,
153+
arena: std.heap.ArenaAllocator,
154+
155+
pub fn deinit(self: *Sending) void {
156+
self.arena.deinit();
157+
self.request.deinit();
59158
}
60159
};
61160

62-
// wraps a telemetry event so that we can serialize it to plausible's event endpoint
63-
// const PlausibleEvent = struct {
64-
// event: Event,
161+
// // wraps a telemetry event so that we can serialize it to plausible's event endpoint
162+
// const EventWrap = struct {
163+
// iid: ?[]const u8,
164+
// eid: []const u8,
165+
// event: *const Event,
65166

66-
// pub fn jsonStringify(self: PlausibleEvent, jws: anytype) !void {
167+
// pub fn jsonStringify(self: *const EventWrap, jws: anytype) !void {
67168
// try jws.beginObject();
68-
// try jws.objectField("name");
69-
// try jws.write(@tagName(self.event));
70-
// try jws.objectField("url");
71-
// try jws.write(EVENT_URL);
72-
// try jws.objectField("domain");
73-
// try jws.write(DOMAIN_KEY);
169+
// try jws.objectField("iid");
170+
// try jws.write(self.iid);
171+
// try jws.objectField("eid");
172+
// try jws.write(self.eid);
173+
// try jws.objectField("event");
174+
// try jws.write(@tagName(self.event.*));
74175
// try jws.objectField("props");
75176
// switch (self.event) {
76177
// inline else => |props| try jws.write(props),
@@ -80,11 +181,15 @@ pub const Lightpanda = struct {
80181
// };
81182

82183
// const testing = std.testing;
83-
// test "plausible: json event" {
84-
// const json = try std.json.stringifyAlloc(testing.allocator, PlausibleEvent{ .event = .{ .run = .{ .mode = .serve, .version = "over 9000!" } } }, .{});
184+
// test "telemetry: lightpanda json event" {
185+
// const json = try std.json.stringifyAlloc(testing.allocator, EventWrap{
186+
// .iid = "1234",
187+
// .eid = "abc!",
188+
// .event = .{ .run = .{ .mode = .serve, .version = "over 9000!" } }
189+
// }, .{});
85190
// defer testing.allocator.free(json);
86191

87192
// try testing.expectEqualStrings(
88-
// \\{"name":"run","url":"https://lightpanda.io/browser-stats","domain":"localhost","props":{"version":"over 9000!","mode":"serve"}}
193+
// \\{"event":"run","iid""1234","eid":"abc!","props":{"version":"over 9000!","mode":"serve"}}
89194
// , json);
90195
// }

0 commit comments

Comments
 (0)