Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
2e854e4
Write model parts async
davidkyle Aug 7, 2024
125c822
Update docs/changelog/111684.yaml
davidkyle Aug 7, 2024
be39082
Pass a listener to import
davidkyle Aug 9, 2024
5310a08
Ref counting WIP
davidkyle Aug 9, 2024
3a57e14
use ref counting listener
davidkyle Aug 14, 2024
56f5e1c
tidying
davidkyle Aug 14, 2024
9f98dbe
add tests
davidkyle Aug 14, 2024
e434499
Merge branch 'main' into background-download-write
elasticmachine Aug 14, 2024
9d9a0e6
Add download threadpool
davidkyle Aug 19, 2024
669909b
less blocking
davidkyle Aug 19, 2024
fcc66b4
tidy up
davidkyle Aug 19, 2024
1921812
more tests
davidkyle Aug 21, 2024
63bdff6
Merge branch 'main' into background-download-write
elasticmachine Aug 21, 2024
84aec83
remove unused
davidkyle Aug 22, 2024
c4ea146
fix the tests
davidkyle Aug 22, 2024
413a3ab
5 in flight requests
davidkyle Aug 22, 2024
70ffe07
use another threadpool for writes
davidkyle Aug 27, 2024
573d5af
Revert "use another threadpool for writes"
davidkyle Aug 29, 2024
658b903
use range request
davidkyle Sep 3, 2024
98d8020
Merge branch 'main' into background-download-write
davidkyle Sep 9, 2024
1d9f5b5
Use multiple connections
davidkyle Sep 9, 2024
d718a92
Merge branch 'main' into background-download-write
elasticmachine Sep 10, 2024
0395c76
Tidy comments
davidkyle Sep 10, 2024
d37cd3a
short threads
davidkyle Sep 10, 2024
6e88b09
Merge branch 'main' into background-download-write
elasticmachine Sep 11, 2024
73f0f6b
Enable markSupported in SlicedInputStream (#112563)
kingherc Sep 11, 2024
830c26f
Make two SubReaderWrapper implementations singletons (#112596)
original-brownbear Sep 11, 2024
02a9750
Two speedups to IndexNameExpressionResolver (#112486)
original-brownbear Sep 11, 2024
7f66489
Fix failing LangMustacheClientYamlTestSuiteIT yamlRestTestV7CompatTes…
cbuescher Sep 11, 2024
ae94a40
Fix trappy timeouts in downsample action (#112734)
DaveCTurner Sep 11, 2024
06a361a
Mute org.elasticsearch.smoketest.DocsClientYamlTestSuiteIT test {yaml…
elasticsearchmachine Sep 11, 2024
802ee00
Introduce repository integrity verification API (#112348)
DaveCTurner Sep 11, 2024
75c0fb9
JSON parse failures should be 4xx codes (#112703)
benwtrent Sep 11, 2024
07329d7
Handle null exception message in `TestCluster#wipe` (#112741)
DaveCTurner Sep 11, 2024
eabea6f
Add TaskManager to pluginServices (#112687)
parkertimmins Sep 11, 2024
051f504
Introduce data stream options and failure store configuration classes…
gmarouli Sep 11, 2024
a148619
Support widening of numeric types in union-types (#112610)
craigtaverner Sep 11, 2024
b9b62db
add CDR related data streams to kibana_system priviliges (#112655)
maxcold Sep 11, 2024
1a05488
Bump Elasticsearch version to 9.0.0 (#112570)
mark-vieira Sep 11, 2024
ca2b144
ESQL: Compute support for filtering ungrouped aggs (#112717)
nik9000 Sep 11, 2024
bceeced
Bump Elasticsearch to a minimum of JDK 21 (#112252)
ChrisHegarty Sep 11, 2024
a211d80
Mute org.elasticsearch.repositories.blobstore.testkit.integrity.Repos…
elasticsearchmachine Sep 11, 2024
cef6f0b
[DOCS] Augment installation warnings (#112756)
lcawl Sep 11, 2024
196728e
(Doc+) CAT Nodes default columns (#112715)
stefnestor Sep 11, 2024
c4932f2
(Doc+) Terminating Exit Codes (#112530)
stefnestor Sep 11, 2024
2b426f7
Fix verifyVersions task (#112765)
mark-vieira Sep 11, 2024
7b17077
(Doc+) Inference Pipeline ignores Mapping Analyzers (#112522)
stefnestor Sep 11, 2024
18a48c7
Estimate segment field usages (#112760)
dnhatn Sep 11, 2024
e0044a5
Use a dedicated test executor in MockTransportService (#112748)
ywangd Sep 12, 2024
2046b29
Mute org.elasticsearch.repositories.blobstore.testkit.integrity.Repos…
elasticsearchmachine Sep 12, 2024
9f5b528
Mute org.elasticsearch.script.StatsSummaryTests testEqualsAndHashCode…
elasticsearchmachine Sep 12, 2024
6fdb78c
Do not throw in task enqueued by CancellableRunner (#112780)
ywangd Sep 12, 2024
ca30b69
[Test] Account for auto-repairing for shard gen file (#112778)
ywangd Sep 12, 2024
1bbb739
Introduce test utils for ingest pipelines (#112733)
DaveCTurner Sep 12, 2024
6ef94ac
Deduplicate BucketOrder when deserializing (#112707)
iverase Sep 12, 2024
3fadf53
Block use of current version feature in yaml tests (#112737)
thecoop Sep 12, 2024
b801949
ci(bump automation): bump ubi9 for ironbank (#112298)
v1v Sep 12, 2024
c3c4aa5
Two empty mappings now are created equally (#107936)
piergm Sep 12, 2024
af1ba75
Cleanup shutdown module bwc in v9 (#112793)
DaveCTurner Sep 12, 2024
a14f529
Update last few references in yaml tests from ROOT locale to ENGLISH …
thecoop Sep 12, 2024
7aa98ef
Remove adaptive allocations feature flag (#112798)
jan-elastic Sep 12, 2024
352dd89
address comments
davidkyle Sep 12, 2024
a2c3c5e
Merge branch 'main' into background-download-write
elasticmachine Sep 12, 2024
d0fbe17
fix recovery from failure
davidkyle Sep 12, 2024
81ce3f1
Merge branch 'main' into background-download-write
elasticmachine Sep 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/111684.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111684
summary: Write downloaded model parts async
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
Expand All @@ -27,6 +29,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
import java.util.concurrent.Semaphore;

import static org.elasticsearch.core.Strings.format;

Expand All @@ -35,26 +38,31 @@
*/
class ModelImporter {
private static final int DEFAULT_CHUNK_SIZE = 1024 * 1024; // 1MB
private static final int MAX_IN_FLIGHT_REQUESTS = 5;
private static final Logger logger = LogManager.getLogger(ModelImporter.class);
private final Client client;
private final String modelId;
private final ModelPackageConfig config;
private final ModelDownloadTask task;
private final Semaphore requestLimiter;

ModelImporter(Client client, String modelId, ModelPackageConfig packageConfig, ModelDownloadTask task) {
this.client = client;
this.modelId = Objects.requireNonNull(modelId);
this.config = Objects.requireNonNull(packageConfig);
this.task = Objects.requireNonNull(task);
this.requestLimiter = new Semaphore(MAX_IN_FLIGHT_REQUESTS);
}

public void doImport() throws URISyntaxException, IOException, ElasticsearchStatusException {
public void doImport() throws URISyntaxException, IOException, ElasticsearchStatusException, InterruptedException {
long size = config.getSize();

var releasingListener = ActionListener.<AcknowledgedResponse>wrap(r -> requestLimiter.release(), e -> requestLimiter.release());

// Uploading other artefacts of the model first, that way the model is last and a simple search can be used to check if the
// download is complete
if (Strings.isNullOrEmpty(config.getVocabularyFile()) == false) {
uploadVocabulary();
uploadVocabulary(releasingListener);

logger.debug(() -> format("[%s] imported model vocabulary [%s]", modelId, config.getVocabularyFile()));
}
Expand Down Expand Up @@ -84,7 +92,7 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat
true
);

executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, modelPartRequest);
executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, modelPartRequest, releasingListener);
}

// get the last part, this time verify the checksum and size
Expand Down Expand Up @@ -119,11 +127,13 @@ public void doImport() throws URISyntaxException, IOException, ElasticsearchStat
true
);

executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, finalModelPartRequest);
executeRequestIfNotCancelled(PutTrainedModelDefinitionPartAction.INSTANCE, finalModelPartRequest, releasingListener);
logger.info("waiting for finish");
requestLimiter.acquire(MAX_IN_FLIGHT_REQUESTS); // cannot acquire until all inflight requests have completed
logger.debug(format("finished importing model [%s] using [%d] parts", modelId, totalParts));
}

private void uploadVocabulary() throws URISyntaxException {
private void uploadVocabulary(ActionListener<AcknowledgedResponse> listener) throws URISyntaxException, InterruptedException {
ModelLoaderUtils.VocabularyParts vocabularyParts = ModelLoaderUtils.loadVocabulary(
ModelLoaderUtils.resolvePackageLocation(config.getModelRepository(), config.getVocabularyFile())
);
Expand All @@ -136,17 +146,19 @@ private void uploadVocabulary() throws URISyntaxException {
true
);

executeRequestIfNotCancelled(PutTrainedModelVocabularyAction.INSTANCE, request);
executeRequestIfNotCancelled(PutTrainedModelVocabularyAction.INSTANCE, request, listener);
}

private <Request extends ActionRequest, Response extends ActionResponse> void executeRequestIfNotCancelled(
ActionType<Response> action,
Request request
) {
Request request,
ActionListener<Response> listener
) throws InterruptedException {
if (task.isCancelled()) {
throw new TaskCancelledException(format("task cancelled with reason [%s]", task.getReasonCancelled()));
}

client.execute(action, request).actionGet();
requestLimiter.acquire();
client.execute(action, request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ static void importModel(
} else {
listener.onResponse(AcknowledgedResponse.TRUE);
}

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ private void assertNotificationAndOnFailure(
private ModelImporter createUploader(Exception exception) throws URISyntaxException, IOException {
ModelImporter uploader = mock(ModelImporter.class);
if (exception != null) {
doThrow(exception).when(uploader).doImport();
try {
doThrow(exception).when(uploader).doImport();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

return uploader;
Expand Down