|
1 | 1 | const std = @import("std"); |
| 2 | +const builtin = @import("builtin"); |
| 3 | +const build_info = @import("build_info"); |
| 4 | + |
| 5 | +const Thread = std.Thread; |
2 | 6 | const Allocator = std.mem.Allocator; |
3 | | -const ArenAallocator = std.heap.ArenaAllocator; |
4 | 7 |
|
5 | | -const Loop = @import("jsruntime").Loop; |
6 | | -const Client = @import("asyncio").Client; |
7 | | -const Event = @import("telemetry.zig").Event; |
| 8 | +const telemetry = @import("telemetry.zig"); |
8 | 9 | const RunMode = @import("../app.zig").RunMode; |
9 | | -const builtin = @import("builtin"); |
10 | | -const build_info = @import("build_info"); |
11 | 10 |
|
12 | 11 | const log = std.log.scoped(.telemetry); |
13 | 12 | const URL = "https://telemetry.lightpanda.io"; |
14 | 13 |
|
15 | 14 | pub const LightPanda = struct { |
16 | | - loop: *Loop, |
17 | 15 | uri: std.Uri, |
| 16 | + pending: List, |
| 17 | + running: bool, |
| 18 | + thread: ?std.Thread, |
18 | 19 | allocator: Allocator, |
19 | | - sending_pool: std.heap.MemoryPool(Sending), |
| 20 | + mutex: std.Thread.Mutex, |
| 21 | + cond: Thread.Condition, |
| 22 | + node_pool: std.heap.MemoryPool(List.Node), |
| 23 | + |
| 24 | + const List = std.DoublyLinkedList(LightPandaEvent); |
20 | 25 |
|
21 | | - pub fn init(allocator: Allocator, loop: *Loop) !LightPanda { |
22 | | - std.debug.print("{s}\n", .{@typeName(Client.IO)}); |
| 26 | + pub fn init(allocator: Allocator) !LightPanda { |
23 | 27 | return .{ |
24 | | - .loop = loop, |
| 28 | + .cond = .{}, |
| 29 | + .mutex = .{}, |
| 30 | + .pending = .{}, |
| 31 | + .thread = null, |
| 32 | + .running = true, |
25 | 33 | .allocator = allocator, |
26 | 34 | .uri = std.Uri.parse(URL) catch unreachable, |
27 | | - .sending_pool = std.heap.MemoryPool(Sending).init(allocator), |
| 35 | + .node_pool = std.heap.MemoryPool(List.Node).init(allocator), |
28 | 36 | }; |
29 | 37 | } |
30 | 38 |
|
31 | 39 | pub fn deinit(self: *LightPanda) void { |
32 | | - self.sending_pool.deinit(); |
| 40 | + if (self.thread) |*thread| { |
| 41 | + self.mutex.lock(); |
| 42 | + self.running = false; |
| 43 | + self.mutex.unlock(); |
| 44 | + self.cond.signal(); |
| 45 | + thread.join(); |
| 46 | + } |
| 47 | + self.node_pool.deinit(); |
33 | 48 | } |
34 | 49 |
|
35 | | - pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: RunMode, event: Event) !void { |
36 | | - var arena = std.heap.ArenaAllocator.init(self.allocator); |
37 | | - errdefer arena.deinit(); |
38 | | - |
39 | | - const resp_header_buffer = try arena.allocator().alloc(u8, 4096); |
40 | | - const body = try std.json.stringifyAlloc(arena.allocator(), .{ |
| 50 | + pub fn send(self: *LightPanda, iid: ?[]const u8, run_mode: RunMode, raw_event: telemetry.Event) !void { |
| 51 | + const event = LightPandaEvent{ |
41 | 52 | .iid = iid, |
42 | | - .driver = if (std.meta.activeTag(event) == .navigate) "cdp" else null, |
| 53 | + .driver = if (std.meta.activeTag(raw_event) == .navigate) "cdp" else null, |
43 | 54 | .mode = run_mode, |
44 | 55 | .os = builtin.os.tag, |
45 | 56 | .arch = builtin.cpu.arch, |
46 | 57 | .version = build_info.git_commit, |
47 | | - .event = std.meta.activeTag(event), |
48 | | - }, .{ .emit_null_optional_fields = false }); |
49 | | - |
50 | | - const sending = try self.sending_pool.create(); |
51 | | - errdefer self.sending_pool.destroy(sending); |
52 | | - |
53 | | - sending.* = .{ |
54 | | - .body = body, |
55 | | - .arena = arena, |
56 | | - .ctx = undefined, |
57 | | - .lightpanda = self, |
58 | | - .request = undefined, |
59 | | - .io = Client.IO.init(self.loop), |
60 | | - .client = .{ .allocator = self.allocator }, |
| 58 | + .event = @tagName(std.meta.activeTag(raw_event)), |
61 | 59 | }; |
62 | | - sending.request = try sending.client.create(.POST, self.uri, .{ |
63 | | - .server_header_buffer = resp_header_buffer, |
64 | | - }); |
65 | | - errdefer sending.request.deinit(); |
66 | | - |
67 | | - sending.ctx = try Client.Ctx.init(&sending.io, &sending.request); |
68 | | - errdefer sending.ctx.deinit(); |
69 | | - |
70 | | - try sending.client.async_open( |
71 | | - .POST, |
72 | | - self.uri, |
73 | | - .{ .server_header_buffer = resp_header_buffer }, |
74 | | - &sending.ctx, |
75 | | - onRequestConnect, |
76 | | - ); |
77 | | - } |
78 | 60 |
|
79 | | - fn handleError(ctx: *Client.Ctx, err: anyerror) anyerror!void { |
80 | | - const sending: *Sending = @fieldParentPtr("ctx", ctx); |
81 | | - const lightpanda = sending.lightpanda; |
82 | | - |
83 | | - sending.deinit(); |
84 | | - lightpanda.sending_pool.destroy(sending); |
85 | | - log.info("request failure: {}", .{err}); |
86 | | - } |
87 | | - |
88 | | - fn onRequestConnect(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { |
89 | | - const sending: *Sending = @fieldParentPtr("ctx", ctx); |
90 | | - res catch |err| return handleError(ctx, err); |
| 61 | + self.mutex.lock(); |
| 62 | + defer self.mutex.unlock(); |
| 63 | + if (self.thread == null) { |
| 64 | + self.thread = try std.Thread.spawn(.{}, run, .{self}); |
| 65 | + } |
91 | 66 |
|
92 | | - ctx.req.transfer_encoding = .{ .content_length = sending.body.len }; |
93 | | - return ctx.req.async_send(ctx, onRequestSend) catch |err| { |
94 | | - return handleError(ctx, err); |
95 | | - }; |
| 67 | + const node = try self.node_pool.create(); |
| 68 | + errdefer self.node_pool.destroy(node); |
| 69 | + node.data = event; |
| 70 | + self.pending.append(node); |
| 71 | + self.cond.signal(); |
96 | 72 | } |
97 | 73 |
|
98 | | - fn onRequestSend(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { |
99 | | - const sending: *Sending = @fieldParentPtr("ctx", ctx); |
100 | | - res catch |err| return handleError(ctx, err); |
| 74 | + fn run(self: *LightPanda) void { |
| 75 | + var arr: std.ArrayListUnmanaged(u8) = .{}; |
| 76 | + var client = std.http.Client{ .allocator = self.allocator }; |
101 | 77 |
|
102 | | - return ctx.req.async_writeAll(sending.body, ctx, onRequestWrite) catch |err| { |
103 | | - return handleError(ctx, err); |
104 | | - }; |
105 | | - } |
106 | | - |
107 | | - fn onRequestWrite(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { |
108 | | - res catch |err| return handleError(ctx, err); |
109 | | - return ctx.req.async_finish(ctx, onRequestFinish) catch |err| { |
110 | | - return handleError(ctx, err); |
111 | | - }; |
112 | | - } |
| 78 | + defer { |
| 79 | + arr.deinit(self.allocator); |
| 80 | + client.deinit(); |
| 81 | + } |
113 | 82 |
|
114 | | - fn onRequestFinish(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { |
115 | | - res catch |err| return handleError(ctx, err); |
116 | | - return ctx.req.async_wait(ctx, onRequestWait) catch |err| { |
117 | | - return handleError(ctx, err); |
118 | | - }; |
| 83 | + self.mutex.lock(); |
| 84 | + while (true) { |
| 85 | + while (self.pending.popFirst()) |node| { |
| 86 | + self.mutex.unlock(); |
| 87 | + self.postEvent(&node.data, &client, &arr) catch |err| { |
| 88 | + log.warn("Telementry reporting error: {}", .{err}); |
| 89 | + }; |
| 90 | + self.mutex.lock(); |
| 91 | + self.node_pool.destroy(node); |
| 92 | + } |
| 93 | + if (self.running == false) { |
| 94 | + return; |
| 95 | + } |
| 96 | + self.cond.wait(&self.mutex); |
| 97 | + } |
119 | 98 | } |
120 | 99 |
|
121 | | - fn onRequestWait(ctx: *Client.Ctx, res: anyerror!void) anyerror!void { |
122 | | - const sending: *Sending = @fieldParentPtr("ctx", ctx); |
123 | | - res catch |err| return handleError(ctx, err); |
| 100 | + fn postEvent(self: *const LightPanda, event: *const LightPandaEvent, client: *std.http.Client, arr: *std.ArrayListUnmanaged(u8)) !void { |
| 101 | + defer arr.clearRetainingCapacity(); |
| 102 | + try std.json.stringify(event, .{ .emit_null_optional_fields = false }, arr.writer(self.allocator)); |
124 | 103 |
|
125 | | - const lightpanda = sending.lightpanda; |
| 104 | + var response_header_buffer: [2048]u8 = undefined; |
126 | 105 |
|
127 | | - defer { |
128 | | - sending.deinit(); |
129 | | - lightpanda.sending_pool.destroy(sending); |
130 | | - } |
131 | | - |
132 | | - if (ctx.req.response.status != .ok) { |
133 | | - log.info("invalid response: {d}", .{@intFromEnum(ctx.req.response.status)}); |
| 106 | + const result = try client.fetch(.{ |
| 107 | + .method = .POST, |
| 108 | + .payload = arr.items, |
| 109 | + .response_storage = .ignore, |
| 110 | + .location = .{ .uri = self.uri }, |
| 111 | + .server_header_buffer = &response_header_buffer, |
| 112 | + }); |
| 113 | + if (result.status != .ok) { |
| 114 | + log.warn("server error status: {}", .{result.status}); |
134 | 115 | } |
135 | 116 | } |
136 | 117 | }; |
137 | 118 |
|
138 | | -const Sending = struct { |
139 | | - io: Client.IO, |
140 | | - ctx: Client.Ctx, |
141 | | - client: Client, |
142 | | - body: []const u8, |
143 | | - request: Client.Request, |
144 | | - lightpanda: *LightPanda, |
145 | | - arena: std.heap.ArenaAllocator, |
146 | | - |
147 | | - pub fn deinit(self: *Sending) void { |
148 | | - self.ctx.deinit(); |
149 | | - self.arena.deinit(); |
150 | | - self.request.deinit(); |
151 | | - self.client.deinit(); |
152 | | - } |
| 119 | +const LightPandaEvent = struct { |
| 120 | + iid: ?[]const u8, |
| 121 | + mode: RunMode, |
| 122 | + driver: ?[]const u8, |
| 123 | + os: std.Target.Os.Tag, |
| 124 | + arch: std.Target.Cpu.Arch, |
| 125 | + version: []const u8, |
| 126 | + event: []const u8, |
153 | 127 | }; |
0 commit comments