Skip to content

Commit 1bf813e

Browse files
committed
use background channel for decode api
1 parent 7c6c19a commit 1bf813e

File tree

8 files changed

+305
-315
lines changed

8 files changed

+305
-315
lines changed

src/dsstats.decode/DecodeService.cs

Lines changed: 28 additions & 312 deletions
Original file line numberDiff line numberDiff line change
@@ -1,343 +1,59 @@
1-
using dsstats.challenge.Services;
21
using dsstats.shared;
32
using Microsoft.Extensions.Options;
4-
using pax.dsstats.parser;
53
using s2protocol.NET;
6-
using System.Collections.Concurrent;
7-
using System.Security.Cryptography;
84
using System.Text.RegularExpressions;
95

106
namespace dsstats.decode;
117

12-
public partial class DecodeService(IOptions<DecodeSettings> decodeSettings,
13-
IHttpClientFactory httpClientFactory,
14-
ILogger<DecodeService> logger)
8+
public partial class DecodeService(
9+
IReplayQueue replayQueue,
10+
IOptions<DecodeSettings> decodeSettings,
11+
ILogger<DecodeService> logger)
1512
{
16-
17-
private readonly SemaphoreSlim ss = new(1, 1);
18-
private readonly SemaphoreSlim ssRaw = new(1, 1);
19-
private readonly SemaphoreSlim fileSemaphore = new SemaphoreSlim(1, 1);
20-
private ReplayDecoder? replayDecoder;
21-
private int queueCount = 0;
22-
private ConcurrentBag<string> excludeReplays = [];
23-
24-
public EventHandler<DecodeEventArgs>? DecodeFinished;
25-
public EventHandler<DecodeRawEventArgs>? DecodeRawFinished;
26-
27-
private async void OnDecodeFinished(DecodeEventArgs e)
28-
{
29-
var httpClient = httpClientFactory.CreateClient("callback");
30-
try
31-
{
32-
var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderesult/{e.Guid}", e.IhReplays);
33-
result.EnsureSuccessStatusCode();
34-
}
35-
catch (Exception ex)
36-
{
37-
logger.LogError("failed reporting decoderesult: {error}", ex.Message);
38-
}
39-
DecodeFinished?.Invoke(this, e);
40-
}
41-
42-
private async void OnDecodeRawFinished(DecodeRawEventArgs e)
43-
{
44-
var httpClient = httpClientFactory.CreateClient("callback");
45-
try
46-
{
47-
var result = await httpClient.PostAsJsonAsync($"/api8/v1/upload/decoderawresult/{e.Guid}", e.ChallengeResponses);
48-
result.EnsureSuccessStatusCode();
49-
}
50-
catch (Exception ex)
51-
{
52-
logger.LogError("failed reporting decoderesult: {error}", ex.Message);
53-
}
54-
DecodeRawFinished?.Invoke(this, e);
55-
}
13+
private readonly IReplayQueue replayQueue = replayQueue;
14+
private readonly DecodeSettings decodeSettings = decodeSettings.Value;
15+
private readonly ILogger<DecodeService> logger = logger;
5616

5717
public async Task<int> SaveReplays(Guid guid, List<IFormFile> files)
5818
{
59-
return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDo);
19+
return await SaveAndQueueFiles(guid, files, inHouse: true);
6020
}
6121

6222
public async Task<int> SaveReplaysRaw(Guid guid, List<IFormFile> files)
6323
{
64-
return await SaveReplays(guid, files, decodeSettings.Value.ReplayFolders.ToDoRaw);
24+
return await SaveAndQueueFiles(guid, files, inHouse: false);
6525
}
6626

67-
private async Task<int> SaveReplays(Guid guid, List<IFormFile> files, string folder)
27+
private async Task<int> SaveAndQueueFiles(Guid guid, List<IFormFile> files, bool inHouse)
6828
{
69-
int filesSaved = 0;
70-
71-
try
72-
{
73-
foreach (var formFile in files)
74-
{
75-
if (formFile.Length > 0)
76-
{
77-
string fileHash;
78-
using (var md5 = MD5.Create())
79-
{
80-
using var stream = formFile.OpenReadStream();
81-
fileHash = BitConverter.ToString(md5.ComputeHash(stream)).Replace("-", "").ToLowerInvariant();
82-
}
83-
84-
var destinationFile = Path.Combine(decodeSettings.Value.ReplayFolders.Done, $"{fileHash}.SC2Replay");
85-
var todoFolder = folder;
86-
if (!Directory.Exists(todoFolder))
87-
{
88-
Directory.CreateDirectory(todoFolder);
89-
}
90-
var todoFile = Path.Combine(todoFolder, $"{guid}_{fileHash}.SC2Replay");
91-
92-
if (File.Exists(destinationFile))
93-
{
94-
logger.LogInformation("File {FileName} already exists. Skipping upload.", formFile.FileName);
95-
continue;
96-
}
29+
if (files.Count == 0)
30+
return 0;
9731

98-
try
99-
{
100-
var tmpFile = todoFile + ".tmp";
101-
using (var fileStream = File.Create(tmpFile))
102-
{
103-
await formFile.CopyToAsync(fileStream);
104-
fileStream.Close();
105-
}
106-
File.Move(tmpFile, todoFile);
107-
filesSaved++;
108-
}
109-
catch (Exception ex)
110-
{
111-
logger.LogError(ex, "Error saving file {FileName}.", formFile.FileName);
112-
}
113-
}
114-
else
115-
{
116-
logger.LogWarning("File {FileName} is empty and will be skipped.", formFile.FileName);
117-
}
118-
}
32+
Directory.CreateDirectory(decodeSettings.ReplayFolders.Temp);
11933

120-
if (folder.EndsWith("raw"))
121-
{
122-
_ = DecodeRaw();
123-
}
124-
else
125-
{
126-
_ = Decode();
127-
}
128-
}
129-
catch (Exception ex)
34+
foreach (var formFile in files)
13035
{
131-
logger.LogError(ex, "Unexpected error in SaveReplays.");
132-
return -1;
133-
}
36+
var tempFileName = $"{guid}_{Guid.NewGuid()}.SC2Replay";
37+
var tempPath = Path.Combine(decodeSettings.ReplayFolders.Temp, tempFileName);
13438

135-
logger.LogInformation("{FilesSaved} files saved for GUID {Guid}.", filesSaved, guid);
136-
return filesSaved;
137-
}
39+
// Save the file
40+
using (var stream = new FileStream(tempPath, FileMode.Create))
41+
await formFile.CopyToAsync(stream);
13842

43+
logger.LogInformation("Saved uploaded replay to temp: {path}", tempPath);
13944

140-
public async Task Decode()
141-
{
142-
Interlocked.Increment(ref queueCount);
143-
await ss.WaitAsync();
144-
ConcurrentDictionary<Guid, ConcurrentBag<IhReplay>> replays = [];
145-
string? error = null;
146-
147-
try
148-
{
149-
var replayPaths = Directory.GetFiles(decodeSettings.Value.ReplayFolders.ToDo, "*SC2Replay")
150-
.Where(f => !File.Exists(f + ".tmp"))
151-
.Where(f => !excludeReplays.Contains(f))
152-
.ToArray();
153-
154-
if (replayPaths.Length == 0)
45+
// Prepare job
46+
var job = new ReplayJob(Guid.NewGuid(), tempPath, "", inHouse);
47+
48+
// Try enqueue
49+
if (!replayQueue.TryEnqueue(job))
15550
{
156-
error = "No replays found.";
157-
return;
51+
logger.LogWarning("Replay queue full — rejecting uploaded replay: {path}", tempPath);
52+
return -1; // API returns 500 or 429 based on your controller
15853
}
159-
160-
if (replayDecoder is null)
161-
{
162-
replayDecoder = new();
163-
}
164-
165-
var options = new ReplayDecoderOptions()
166-
{
167-
Initdata = true,
168-
Details = true,
169-
Metadata = true,
170-
TrackerEvents = true,
171-
};
172-
173-
using var md5 = MD5.Create();
174-
175-
await foreach (var result in
176-
replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options))
177-
{
178-
if (result.Sc2Replay is null)
179-
{
180-
Error(result);
181-
error = "failed decoding replays.";
182-
continue;
183-
}
184-
185-
var metaData = GetMetaData(result.Sc2Replay);
186-
187-
var sc2Replay = Parse.GetDsReplay(result.Sc2Replay);
188-
189-
if (sc2Replay is null)
190-
{
191-
Error(result);
192-
error = "failed decoding replays.";
193-
continue;
194-
}
195-
196-
var replayDto = Parse.GetReplayDto(sc2Replay, md5);
197-
198-
if (replayDto is null)
199-
{
200-
Error(result);
201-
error = "failed decoding replays.";
202-
continue;
203-
}
204-
var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done,
205-
Path.GetFileNameWithoutExtension(result.ReplayPath)[..36] +
206-
"_" +
207-
replayDto.ReplayHash +
208-
Path.GetExtension(result.ReplayPath));
209-
await fileSemaphore.WaitAsync();
210-
try
211-
{
212-
if (!File.Exists(destination))
213-
{
214-
File.Move(result.ReplayPath, destination);
215-
var groupId = GetGroupIdFromFilename(result.ReplayPath);
216-
var ihReplay = new IhReplay() { Replay = replayDto, Metadata = metaData };
217-
replays.AddOrUpdate(groupId, [ihReplay], (k, v) => { v.Add(ihReplay); return v; });
218-
}
219-
}
220-
finally
221-
{
222-
fileSemaphore.Release();
223-
}
224-
}
225-
}
226-
catch (Exception ex)
227-
{
228-
logger.LogError("failed decoding replays: {error}", ex.Message);
229-
error = "failed decoding replays.";
230-
}
231-
finally
232-
{
233-
ss.Release();
234-
foreach (var ent in replays)
235-
{
236-
OnDecodeFinished(new()
237-
{
238-
Guid = ent.Key,
239-
IhReplays = [.. ent.Value],
240-
Error = error,
241-
});
242-
}
243-
Interlocked.Decrement(ref queueCount);
24454
}
245-
}
246-
247-
public async Task DecodeRaw()
248-
{
249-
Interlocked.Increment(ref queueCount);
250-
await ssRaw.WaitAsync();
251-
ConcurrentDictionary<Guid, ConcurrentBag<ChallengeResponse>> challengeResponses = [];
252-
string? error = null;
253-
254-
try
255-
{
256-
var replayPaths = Directory.GetFiles(Path.Combine(decodeSettings.Value.ReplayFolders.ToDoRaw), "*SC2Replay")
257-
.Where(f => !File.Exists(f + ".tmp"))
258-
.Where(f => !excludeReplays.Contains(f))
259-
.ToArray();
26055

261-
if (replayPaths.Length == 0)
262-
{
263-
error = "No replays found.";
264-
return;
265-
}
266-
267-
if (replayDecoder is null)
268-
{
269-
replayDecoder = new();
270-
}
271-
272-
var options = new ReplayDecoderOptions()
273-
{
274-
Initdata = true,
275-
Details = true,
276-
Metadata = true,
277-
TrackerEvents = true,
278-
};
279-
280-
await foreach (var result in
281-
replayDecoder.DecodeParallelWithErrorReport(replayPaths, decodeSettings.Value.Threads, options))
282-
{
283-
if (result.Sc2Replay is null)
284-
{
285-
Error(result);
286-
error = "failed decoding replays.";
287-
continue;
288-
}
289-
var challengeResponse = ChallengeService.GetChallengeResponse(result.Sc2Replay);
290-
291-
var destination = Path.Combine(decodeSettings.Value.ReplayFolders.Done, Path.GetFileName(result.ReplayPath));
292-
await fileSemaphore.WaitAsync();
293-
try
294-
{
295-
if (!File.Exists(destination))
296-
{
297-
File.Move(result.ReplayPath, destination);
298-
var groupId = GetGroupIdFromFilename(result.ReplayPath);
299-
challengeResponses.AddOrUpdate(groupId, [challengeResponse], (k, v) => { v.Add(challengeResponse); return v; });
300-
}
301-
}
302-
finally
303-
{
304-
fileSemaphore.Release();
305-
}
306-
}
307-
}
308-
catch (Exception ex)
309-
{
310-
logger.LogError("failed decoding replays: {error}", ex.Message);
311-
error = "failed decoding replays.";
312-
}
313-
finally
314-
{
315-
ssRaw.Release();
316-
foreach (var ent in challengeResponses)
317-
{
318-
OnDecodeRawFinished(new()
319-
{
320-
Guid = ent.Key,
321-
ChallengeResponses = [.. ent.Value],
322-
Error = error,
323-
});
324-
}
325-
Interlocked.Decrement(ref queueCount);
326-
}
327-
}
328-
329-
private void Error(DecodeParallelResult result)
330-
{
331-
logger.LogError("failed decoding replay: {path}, {error}", result.ReplayPath, result.Exception);
332-
try
333-
{
334-
File.Move(result.ReplayPath, Path.Combine(decodeSettings.Value.ReplayFolders.Error, Path.GetFileName(result.ReplayPath)));
335-
}
336-
catch (Exception ex)
337-
{
338-
logger.LogWarning("failed moving error replay: {error}", ex.Message);
339-
excludeReplays.Add(result.ReplayPath);
340-
}
56+
return replayQueue.QueueLength;
34157
}
34258

34359
public static ReplayMetadata GetMetaData(Sc2Replay replay)

src/dsstats.decode/DecodeSettings.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ public record ReplayFolders
1515
public string ToDoRaw { get; set; } = string.Empty;
1616
public string Done { get; set; } = string.Empty;
1717
public string Error { get; set; } = string.Empty;
18+
public string Temp { get; set; } = string.Empty;
1819
}

src/dsstats.decode/Program.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
builder.Services.AddControllers();
99

1010
builder.Services.Configure<DecodeSettings>(builder.Configuration.GetSection("DecodeSettings"));
11+
builder.Services.AddSingleton<IReplayQueue, ReplayQueue>();
12+
builder.Services.AddHostedService<ReplayDecoderWorker>();
1113
builder.Services.AddSingleton<DecodeService>();
1214

1315
builder.Services.AddHttpClient("callback")

0 commit comments

Comments
 (0)