Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,42 +105,46 @@ public async ValueTask StopAsync(Cancel ctx = default)
}

_logger.LogInformation("_reindex updates: '{SourceIndex}' => '{DestinationIndex}'", lexicalWriteAlias, semanticWriteAlias);
var request = PostData.Serializable(new
var request = PostData.String(@"
{
dest = new { index = semanticWriteAlias },
source = new
{
index = lexicalWriteAlias,
size = 100,
query = new
{
range = new
{
last_updated = new { gte = _batchIndexDate.ToString("o") }
""dest"": {
""index"": """ + semanticWriteAlias + @"""
},
""source"": {
""index"": """ + lexicalWriteAlias + @""",
""size"": 100,
""query"": {
""range"": {
""last_updated"": {
""gte"": """ + _batchIndexDate.ToString("o") + @"""
}
}
}
}
});
}");
await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "updates", ctx);

_logger.LogInformation("_reindex deletions: '{SourceIndex}' => '{DestinationIndex}'", lexicalWriteAlias, semanticWriteAlias);
request = PostData.Serializable(new
request = PostData.String(@"
{
dest = new { index = semanticWriteAlias },
script = new { source = "ctx.op = \"delete\"" },
source = new
{
index = lexicalWriteAlias,
size = 100,
query = new
{
range = new
{
batch_index_date = new { lt = _batchIndexDate.ToString("o") }
""dest"": {
""index"": """ + semanticWriteAlias + @"""
},
""script"": {
""source"": ""ctx.op = \""delete\""""
},
""source"": {
""index"": """ + lexicalWriteAlias + @""",
""size"": 100,
""query"": {
""range"": {
""batch_index_date"": {
""lt"": """ + _batchIndexDate.ToString("o") + @"""
}
}
}
}
});
}");
await DoReindex(request, lexicalWriteAlias, semanticWriteAlias, "deletions", ctx);

await DoDeleteByQuery(lexicalWriteAlias, ctx);
Expand All @@ -151,16 +155,16 @@ private async ValueTask DoDeleteByQuery(string lexicalWriteAlias, Cancel ctx)
// delete all documents with batch_index_date < _batchIndexDate
// they weren't part of the current export
_logger.LogInformation("Delete data in '{SourceIndex}' not part of batch date: {Date}", lexicalWriteAlias, _batchIndexDate.ToString("o"));
var request = PostData.Serializable(new
var request = PostData.String(@"
{
query = new
{
range = new
{
batch_index_date = new { lt = _batchIndexDate.ToString("o") }
""query"": {
""range"": {
""batch_index_date"": {
""lt"": """ + _batchIndexDate.ToString("o") + @"""
}
}
}
});
}");
var reindexUrl = $"/{lexicalWriteAlias}/_delete_by_query?wait_for_completion=false";
var deleteOldLexicalDocs = await _transport.PostAsync<DynamicResponse>(reindexUrl, request, ctx);
var taskId = deleteOldLexicalDocs.Body.Get<string>("task");
Expand Down
Loading