11package io.seqera.tower.plugin
22
3- import java.net.http.HttpClient
4- import java.net.http.HttpRequest
53import java.time.Duration
64import java.time.Instant
75import java.time.temporal.ChronoUnit
8- import java.util.concurrent.Executors
96
107import com.google.common.cache.Cache
118import com.google.common.cache.CacheBuilder
129import com.google.common.util.concurrent.UncheckedExecutionException
1310import com.google.gson.JsonSyntaxException
1411import groovy.transform.CompileStatic
1512import groovy.util.logging.Slf4j
16- import io.seqera.http.HxClient
17- import io.seqera.http.HxConfig
1813import io.seqera.tower.plugin.exception.BadResponseException
1914import io.seqera.tower.plugin.exception.UnauthorizedException
2015import io.seqera.tower.plugin.exchange.GetLicenseTokenRequest
2116import io.seqera.tower.plugin.exchange.GetLicenseTokenResponse
22- import io.seqera.util.trace.TraceUtils
2317import nextflow.SysEnv
2418import nextflow.exception.AbortOperationException
2519import nextflow.exception.ReportWarningException
@@ -28,8 +22,6 @@ import nextflow.fusion.FusionToken
2822import nextflow.platform.PlatformHelper
2923import nextflow.plugin.Priority
3024import nextflow.serde.gson.GsonEncoder
31- import nextflow.util.RetryConfig
32- import nextflow.util.Threads
3325import org.pf4j.Extension
3426/**
3527 * Environment provider for Platform-specific environment variables.
@@ -45,14 +37,8 @@ class TowerFusionToken implements FusionToken {
4537 // The path relative to the Platform endpoint where license-scoped JWT tokens are obtained
4638 private static final String LICENSE_TOKEN_PATH = ' license/token/'
4739
48- // Server errors that should trigger a retry
49- private static final Set<Integer > SERVER_ERRORS = Set . of(408 , 429 , 500 , 502 , 503 , 504 )
50-
51- // Default connection timeout for HTTP requests
52- private static final Duration DEFAULT_CONNECTION_TIMEOUT = Duration . of(30 , ChronoUnit . SECONDS )
53-
54- // The HttpClient instance used to send requests
55- private HxClient httpClient
40+ // The TowerClient instance used to send requests
41+ private TowerClient client
5642
5743 // Time-to-live for cached tokens
5844 private Duration tokenTTL = Duration . of(1 , ChronoUnit . HOURS )
@@ -84,7 +70,7 @@ class TowerFusionToken implements FusionToken {
8470 this . refreshToken = PlatformHelper . getRefreshToken(config, env)
8571 this . workflowId = env. get(' TOWER_WORKFLOW_ID' )
8672 this . workspaceId = PlatformHelper . getWorkspaceId(config, env)
87- this . httpClient = newDefaultHttpClient(accessToken, refreshToken )
73+ this . client = TowerFactory . client( )
8874 }
8975
9076 protected void validateConfig () {
@@ -168,51 +154,6 @@ class TowerFusionToken implements FusionToken {
168154 * Helper methods
169155 *************************************************************************/
170156
171- /**
172- * Create a new HttpClient instance with default settings
173- * @return The new HttpClient instance
174- */
175- private HxClient newDefaultHttpClient (String accessToken , String refreshToken ) {
176- final refreshUrl = refreshToken ? " $endpoint /oauth/access_token" : null
177- // the client config
178- final config = HxConfig . newBuilder()
179- .bearerToken(accessToken)
180- .refreshToken(refreshToken)
181- .refreshTokenUrl(refreshUrl)
182- .refreshCookiePolicy(CookiePolicy . ACCEPT_ALL )
183- .retryStatusCodes(SERVER_ERRORS )
184- .retryConfig(RetryConfig . config())
185- .build()
186- // the client builder
187- final builder = HxClient . newBuilder()
188- .version(HttpClient.Version . HTTP_1_1 )
189- .connectTimeout(DEFAULT_CONNECTION_TIMEOUT )
190- .config(config)
191- // use virtual threads executor if enabled
192- if ( Threads . useVirtual() ) {
193- builder. executor(Executors . newVirtualThreadPerTaskExecutor())
194- }
195- // build and return the new client
196- return builder. build()
197- }
198-
199- /**
200- * Create a {@link HttpRequest} representing a {@link GetLicenseTokenRequest} object
201- *
202- * @param req The LicenseTokenRequest object
203- * @return The resulting HttpRequest object
204- */
205- private HttpRequest makeHttpRequest (GetLicenseTokenRequest req ) {
206- final gson = new GsonEncoder<GetLicenseTokenRequest > () {}
207- final body = HttpRequest.BodyPublishers . ofString( gson. encode(req) )
208- return HttpRequest . newBuilder()
209- .uri(URI . create(" ${ endpoint} /${ LICENSE_TOKEN_PATH} " ). normalize())
210- .header(' Content-Type' , ' application/json' )
211- .header(' Traceparent' , TraceUtils . rndTrace())
212- .POST (body)
213- .build()
214- }
215-
216157 /**
217158 * Parse a JSON string into a {@link GetLicenseTokenResponse} object
218159 *
@@ -233,28 +174,20 @@ class TowerFusionToken implements FusionToken {
233174 * @return The LicenseTokenResponse object
234175 */
235176 private GetLicenseTokenResponse sendRequest (GetLicenseTokenRequest request ) {
236- final httpReq = makeHttpRequest(request)
237- try {
238- final resp = httpClient. sendAsString(httpReq)
239- if ( log. isTraceEnabled() || resp. statusCode()!= 200 )
240- log. debug " Fusion license request ${ httpReq} ${ request} ; status=${ resp.statusCode()} ; body: ${ resp.body()} "
241- else
242- log. debug " Fusion license request ${ httpReq} ; status=${ resp.statusCode()} "
243- // check ok response
244- if ( resp. statusCode() == 200 ) {
245- final ret = parseLicenseTokenResponse(resp. body())
246- return ret
247- }
248- // check for unauthorized error
249- if ( resp. statusCode() == 401 ) {
250- throw new UnauthorizedException (" Unauthorized [401] - Verify you have provided a Seqera Platform valid access token" )
251- }
252- // unpexted error
253- throw new BadResponseException (" Invalid response: ${ httpReq.method()} ${ httpReq.uri()} [${ resp.statusCode()} ] ${ resp.body()} " )
177+ final url = " ${ client.getEndpoint()} /${ LICENSE_TOKEN_PATH} "
178+ final resp = client. sendHttpMessage(url, request. toMap())
179+
180+ if ( resp. code == 200 ) {
181+ final ret = parseLicenseTokenResponse(resp. message)
182+ return ret
254183 }
255- catch (IOException e) {
256- throw new IllegalStateException (" Unable to send request to '${ httpReq.uri()} ' : ${ e.message} " )
184+
185+ if ( resp. code == 401 ) {
186+ throw new UnauthorizedException (" Unauthorized [401] - Verify you have provided a Seqera Platform valid access token" )
257187 }
188+
189+ throw new BadResponseException (" Invalid response: ${ url} [${ resp.code} ] ${ resp.message} -- ${ resp.cause} " )
190+
258191 }
259192
260193}
0 commit comments