Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,48 @@ jobs:
with:
name: asynchronous-search-plugin-mac-${{ matrix.java }}
path: asynchronous-search-artifacts

publish:
if: github.ref == 'refs/heads/searchlake-3.2.0'
needs: [linux-test-docker, windows-build, mac-os-build]
strategy:
matrix:
java: [ 21 ]

runs-on: ubuntu-latest

permissions:
contents: write
packages: write

steps:
- uses: actions/checkout@v4

- name: Set release version
id: release_version
run: |
short_sha=$(git rev-parse --short=7 HEAD)
echo "version=3.2.0.0-$(date -u +'%Y%m%d%H%M%S')-${short_sha}" >> "$GITHUB_OUTPUT"

- name: Set Up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: ${{ matrix.java }}
cache: gradle

- name: Publish snapshots to maven
env:
SEARCHLAKE_GITHUB_ID: PA
SEARCHLAKE_GITHUB_TOKEN: ${{ secrets.GH_RUNNER_MANAGER }}
run: |
./gradlew publishPluginZipPublicationToGitHubPackagesRepository

- name: Create GitHub Release and upload zip
uses: softprops/action-gh-release@v2
with:
tag_name: ${{ steps.release_version.outputs.version }}
name: ${{ steps.release_version.outputs.version }}
target_commitish: ${{ github.sha }}
prerelease: false
files: build/distributions/*.zip
16 changes: 15 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ buildscript {

repositories {
mavenLocal()
maven { url = uri("https://maven-central.storage-download.googleapis.com/maven2/") }
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://central.sonatype.com/repository/maven-snapshots/" }
Expand All @@ -50,6 +51,7 @@ plugins {

repositories {
mavenLocal()
maven { url = uri("https://maven-central.storage-download.googleapis.com/maven2/") }
mavenCentral()
maven { url "https://plugins.gradle.org/m2/" }
maven { url "https://central.sonatype.com/repository/maven-snapshots/" }
Expand Down Expand Up @@ -114,13 +116,25 @@ publishing {
password "$System.env.SONATYPE_PASSWORD"
}
}

maven {
name = "GitHubPackages"
url = uri("https://maven.pkg.github.com/${System.getenv('GITHUB_REPOSITORY')}")
credentials {
username = project.findProperty('searchlake.user') ?: System.getenv('SEARCHLAKE_GITHUB_ID')
password = project.findProperty('searchlake.token') ?: System.getenv('SEARCHLAKE_GITHUB_TOKEN')
}
}
}
publications {
pluginZip(MavenPublication) { publication ->
artifact(tasks.named("jar")) {
classifier = "lib"
}
pom {
name = "opensearch-asynchronous-search"
description = "OpenSearch Asynchronous Search plugin"
groupId = "org.opensearch.plugin"
groupId = "io.searchlake.plugin"
licenses {
license {
name = "The Apache License, Version 2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.search.asynchronous.context.state.AsynchronousSearchState;
import org.opensearch.search.asynchronous.id.AsynchronousSearchId;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchProgressListener;
import org.opensearch.search.asynchronous.response.AsynchronousSearchProgress;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -65,18 +66,27 @@ public AsynchronousSearchContextId getContextId() {

public abstract @Nullable User getUser();

public @Nullable AsynchronousSearchProgress getProgress() {
return null;
}

public boolean isExpired() {
return getExpirationTimeMillis() < currentTimeSupplier.getAsLong();
}

public AsynchronousSearchResponse getAsynchronousSearchResponse() {
AsynchronousSearchProgress progress = getProgress();
if (progress == null && asynchronousSearchProgressListener != null) {
progress = asynchronousSearchProgressListener.progress();
}
return new AsynchronousSearchResponse(
getAsynchronousSearchId(),
getAsynchronousSearchState(),
getStartTimeMillis(),
getExpirationTimeMillis(),
getSearchResponse(),
getSearchError()
getSearchError(),
progress
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.search.asynchronous.id.AsynchronousSearchId;
import org.opensearch.search.asynchronous.id.AsynchronousSearchIdConverter;
import org.opensearch.search.asynchronous.listener.AsynchronousSearchProgressListener;
import org.opensearch.search.asynchronous.response.AsynchronousSearchProgress;
import org.opensearch.common.SetOnce;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.search.SearchProgressActionListener;
Expand All @@ -30,6 +31,7 @@
import java.io.Closeable;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -56,6 +58,7 @@ public class AsynchronousSearchActiveContext extends AsynchronousSearchContext i
private final Supplier<Boolean> persistSearchFailureSupplier;
private final AsynchronousSearchContextPermits asynchronousSearchContextPermits;
private final Supplier<SearchResponse> partialResponseSupplier;
private final AtomicReference<AsynchronousSearchProgress> progress;
@Nullable
private final User user;

Expand Down Expand Up @@ -87,6 +90,7 @@ public AsynchronousSearchActiveContext(
: new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId);
this.user = user;
this.persistSearchFailureSupplier = persistSearchFailureSupplier;
this.progress = new AtomicReference<>();
}

public void setTask(SearchTask searchTask) {
Expand All @@ -110,6 +114,7 @@ public void processSearchFailure(Exception e) {
e.getCause().setStackTrace(new StackTraceElement[0]);
}
this.error.set(e);
progress.compareAndSet(null, asynchronousSearchProgressListener.progress());
} finally {
boolean result = completed.compareAndSet(false, true);
assert result : "Process search failure already complete";
Expand All @@ -127,6 +132,7 @@ public void processSearchResponse(SearchResponse response) {
}
}
this.searchResponse.set(response);
progress.compareAndSet(null, asynchronousSearchProgressListener.progress());
} finally {
boolean result = completed.compareAndSet(false, true);
assert result : "Process search response already complete";
Expand All @@ -138,6 +144,11 @@ public SearchResponse getSearchResponse() {
return completed.get() ? searchResponse.get() : partialResponseSupplier.get();
}

@Override
public AsynchronousSearchProgress getProgress() {
return progress.get();
}

@Override
public String getAsynchronousSearchId() {
return asynchronousSearchId.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.search.asynchronous.response.AsynchronousSearchProgress;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -141,6 +142,35 @@ public User getUser() {
return asynchronousSearchPersistenceModel.getUser();
}

@Override
public AsynchronousSearchProgress getProgress() {
if (asynchronousSearchPersistenceModel.getProgress() == null) {
return null;
}
BytesReference bytesReference = BytesReference.fromByteBuffer(
ByteBuffer.wrap(Base64.getUrlDecoder().decode(asynchronousSearchPersistenceModel.getProgress()))
);
try (
NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(
bytesReference.streamInput(),
namedWriteableRegistry
)
) {
wrapperStreamInput.setVersion(wrapperStreamInput.readVersion());
return new AsynchronousSearchProgress(wrapperStreamInput);
} catch (IOException e) {
logger.error(
() -> new ParameterizedMessage(
"Failed to parse search progress for asynchronous search [{}] Progress : [{}] ",
asynchronousSearchId,
asynchronousSearchPersistenceModel.getProgress()
),
e
);
return null;
}
}

@Override
public AsynchronousSearchState getAsynchronousSearchState() {
return AsynchronousSearchState.STORE_RESIDENT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.search.asynchronous.response.AsynchronousSearchProgress;

import java.io.IOException;
import java.util.Base64;
Expand All @@ -27,13 +28,22 @@ public class AsynchronousSearchPersistenceModel {
private final long startTimeMillis;
private final String response;
private final String error;
private final String progress;
private final User user;

public AsynchronousSearchPersistenceModel(long startTimeMillis, long expirationTimeMillis, String response, String error, User user) {
public AsynchronousSearchPersistenceModel(
long startTimeMillis,
long expirationTimeMillis,
String response,
String error,
String progress,
User user
) {
this.startTimeMillis = startTimeMillis;
this.expirationTimeMillis = expirationTimeMillis;
this.response = response;
this.error = error;
this.progress = progress;
this.user = user;
}

Expand All @@ -42,12 +52,14 @@ public AsynchronousSearchPersistenceModel(
long expirationTimeMillis,
SearchResponse response,
Exception error,
AsynchronousSearchProgress progress,
User user
) throws IOException {
this.startTimeMillis = startTimeMillis;
this.expirationTimeMillis = expirationTimeMillis;
this.response = serializeResponse(response);
this.error = serializeError(error);
this.progress = serializeProgress(progress);
this.user = user;
}

Expand All @@ -63,6 +75,18 @@ private String serializeResponse(SearchResponse response) throws IOException {
}
}

private String serializeProgress(AsynchronousSearchProgress progress) throws IOException {
if (progress == null) {
return null;
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeVersion(Version.CURRENT);
progress.writeTo(out);
byte[] bytes = BytesReference.toBytes(out.bytes());
return Base64.getUrlEncoder().withoutPadding().encodeToString(bytes);
}
}

/**
* Serializes exception in string format
*
Expand Down Expand Up @@ -100,6 +124,10 @@ public String getError() {
return error;
}

public String getProgress() {
return progress;
}

public long getExpirationTimeMillis() {
return expirationTimeMillis;
}
Expand All @@ -122,6 +150,8 @@ public boolean equals(Object o) {
&& ((response == null && other.response == null)
|| (response != null && other.response != null && response.equals(other.response)))
&& ((error == null && other.error == null) || (error != null && other.error != null && error.equals(other.error)))
&& ((progress == null && other.progress == null)
|| (progress != null && other.progress != null && progress.equals(other.progress)))
&& ((user == null && other.user == null) || (user != null && other.user != null && user.equals(other.user)));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public AsynchronousSearchPersistenceModel getAsynchronousSearchPersistenceModel(
asynchronousSearchContext.getExpirationTimeMillis(),
asynchronousSearchContext.getSearchResponse(),
asynchronousSearchContext.getSearchError(),
asynchronousSearchContext.getProgress(),
asynchronousSearchContext.getUser()
);
} catch (IOException e) {
Expand Down
Loading