Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@

package org.opensearch.dataprepper.plugins.source.microsoft_office365;

import io.micrometer.core.instrument.Counter;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
import org.opensearch.dataprepper.plugins.source.source_crawler.exception.SaaSCrawlerException;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.models.AuditLogsResponse;
Expand Down Expand Up @@ -45,20 +43,15 @@
@Named
public class Office365RestClient {
private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/";
private static final String API_CALLS = "apiCalls";

private final RestTemplate restTemplate = new RestTemplate();
private final RetryHandler retryHandler;
private final Office365AuthenticationInterface authConfig;
private final VendorAPIMetricsRecorder metricsRecorder;
private final Counter apiCallsCounter;

public Office365RestClient(final Office365AuthenticationInterface authConfig,
final PluginMetrics pluginMetrics,
final VendorAPIMetricsRecorder metricsRecorder) {
this.authConfig = authConfig;
this.metricsRecorder = metricsRecorder;
this.apiCallsCounter = pluginMetrics.counter(API_CALLS);
this.retryHandler = new RetryHandler(
new DefaultRetryStrategy(),
new DefaultStatusCodeHandler());
Expand All @@ -69,59 +62,66 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
*/
public void startSubscriptions() {
log.info("Starting Office 365 subscriptions for audit logs");
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

metricsRecorder.recordSubscriptionLatency(() -> {
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);

// TODO: Only start the subscriptions only if the call commented
// out below doesn't return all the audit log types
// Check current subscriptions
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
//
// ResponseEntity<String> listResponse = restTemplate.exchange(
// listUrl,
// HttpMethod.GET,
// new HttpEntity<>(headers),
// String.class
// );
// log.debug("Current subscriptions: {}", listResponse.getBody());
// TODO: Only start the subscriptions only if the call commented
// out below doesn't return all the audit log types
// Check current subscriptions
// final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
// String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
//
// ResponseEntity<String> listResponse = restTemplate.exchange(
// listUrl,
// HttpMethod.GET,
// new HttpEntity<>(headers),
// String.class
// );
// log.debug("Current subscriptions: {}", listResponse.getBody());

// Start subscriptions for each content type
headers.setContentLength(0);
// Start subscriptions for each content type
headers.setContentLength(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation was previously off, this fixes the incorrect indentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the future you can run this command to fix indentation

./gradlew :data-prepper-plugins:saas-source-plugins:microsoft-office365-source:spotlessApply

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thank you for this command. Just ran it and looks like no changes were made so the new indentation is correct!


for (String contentType : CONTENT_TYPES) {
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
String url = String.format(SUBSCRIPTION_START_URL,
authConfig.getTenantId(),
contentType);
for (String contentType : CONTENT_TYPES) {
final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s";
String url = String.format(SUBSCRIPTION_START_URL,
authConfig.getTenantId(),
contentType);

retryHandler.executeWithRetry(() -> {
try {
headers.setBearerAuth(authConfig.getAccessToken());
apiCallsCounter.increment();
ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.POST,
new HttpEntity<>(headers),
String.class
);
log.debug("Started subscription for {}: {}", contentType, response.getBody());
return response;
} catch (HttpClientErrorException | HttpServerErrorException e) {
if (e.getResponseBodyAsString().contains("AF20024")) {
log.debug("Subscription for {} is already enabled", contentType);
return null;
retryHandler.executeWithRetry(() -> {
try {
headers.setBearerAuth(authConfig.getAccessToken());
metricsRecorder.recordSubscriptionCall();

ResponseEntity<String> response = restTemplate.exchange(
url,
HttpMethod.POST,
new HttpEntity<>(headers),
String.class
);
log.debug("Started subscription for {}: {}", contentType, response.getBody());
return response;
} catch (HttpClientErrorException | HttpServerErrorException e) {
if (e.getResponseBodyAsString().contains("AF20024")) {
log.debug("Subscription for {} is already enabled", contentType);
return null;
}
throw e;
}
throw e;
}
}, authConfig::renewCredentials);
}, authConfig::renewCredentials, metricsRecorder::recordSubscriptionFailure);
}

metricsRecorder.recordSubscriptionSuccess();
return null;
} catch (Exception e) {
metricsRecorder.recordError(e);
log.error(NOISY, "Failed to initialize subscriptions", e);
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
}
} catch (Exception e) {
metricsRecorder.recordError(e);
log.error(NOISY, "Failed to initialize subscriptions", e);
throw new SaaSCrawlerException("Failed to initialize subscriptions: " + e.getMessage(), e, true);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import lombok.Getter;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.RetryHandler;
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultRetryStrategy;
import org.opensearch.dataprepper.plugins.source.source_crawler.utils.retry.DefaultStatusCodeHandler;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class Office365AuthenticationProvider implements Office365AuthenticationI
private final RetryHandler retryHandler;
private final String tenantId;
private final Office365SourceConfig office365SourceConfig;
private final VendorAPIMetricsRecorder metricsRecorder;
private String accessToken;
private final Object lock = new Object();
private final Object accessTokenFetchLock = new Object();
Expand All @@ -53,12 +55,13 @@ public class Office365AuthenticationProvider implements Office365AuthenticationI
@Getter
private Instant expireTime = Instant.ofEpochMilli(0);

public Office365AuthenticationProvider(Office365SourceConfig config) {
public Office365AuthenticationProvider(Office365SourceConfig config, VendorAPIMetricsRecorder metricsRecorder) {
this.tenantId = config.getTenantId();
this.office365SourceConfig = config;
this.retryHandler = new RetryHandler(
new DefaultRetryStrategy(),
new DefaultStatusCodeHandler());
this.metricsRecorder = metricsRecorder;
}

@Override
Expand All @@ -82,22 +85,37 @@ public void renewCredentials() {
HttpEntity<String> entity = new HttpEntity<>(payload, headers);
String tokenEndpoint = String.format(TOKEN_URL, office365SourceConfig.getTenantId());

ResponseEntity<Map> response = retryHandler.executeWithRetry(
() -> restTemplate.postForEntity(tokenEndpoint, entity, Map.class),
() -> {
} // No credential renewal for authentication endpoint
);
try {
// Record authentication latency and execute the request
ResponseEntity<Map> response = metricsRecorder.recordAuthLatency(() ->
retryHandler.executeWithRetry(
() -> restTemplate.postForEntity(tokenEndpoint, entity, Map.class),
() -> {
} // No credential renewal for authentication endpoint
)
);

Map<String, Object> tokenResponse = response.getBody();
Map<String, Object> tokenResponse = response.getBody();

if (tokenResponse == null || tokenResponse.get("access_token") == null) {
throw new IllegalStateException("Invalid token response: missing access_token");
}
if (tokenResponse == null || tokenResponse.get("access_token") == null) {
throw new IllegalStateException("Invalid token response: missing access_token");
}

this.accessToken = (String) tokenResponse.get("access_token");
int expiresIn = (int) tokenResponse.get("expires_in");
this.expireTime = Instant.now().plusSeconds(expiresIn);
log.info("Received new access token. Expires in {} seconds", expiresIn);
this.accessToken = (String) tokenResponse.get("access_token");
int expiresIn = (int) tokenResponse.get("expires_in");
this.expireTime = Instant.now().plusSeconds(expiresIn);

// Record successful authentication
metricsRecorder.recordAuthSuccess();

log.info("Received new access token. Expires in {} seconds", expiresIn);
} catch (Exception e) {
// Record authentication failure and specific error details
metricsRecorder.recordAuthFailure();
metricsRecorder.recordError(e);
log.error("Failed to renew Office 365 credentials", e);
throw e;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,26 @@

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365RestClient;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.Office365SourceConfig;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationInterface;
import org.opensearch.dataprepper.plugins.source.microsoft_office365.auth.Office365AuthenticationProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.metrics.VendorAPIMetricsRecorder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Spring configuration for Microsoft Office 365 RestClient.
* Spring configuration for Microsoft Office 365 components.
*
* This configuration class creates the Office365RestClient with the required dependencies:
* 1. Office365AuthenticationInterface for authentication
* 2. PluginMetrics for unified metrics recording
* This configuration class creates all Office 365 related beans with their dependencies:
* 1. VendorAPIMetricsRecorder for unified metrics across all operations
* 2. Office365AuthenticationProvider with authentication metrics support
* 3. Office365RestClient with VendorAPIMetricsRecorder
*
* The Office365RestClient internally creates VendorAPIMetricsRecorder instances
* for different operation types (GET, SEARCH, AUTH) from the provided PluginMetrics.
* This provides comprehensive metrics coverage for authentication, API calls,
* and subscription management operations.
*/
@Configuration
public class Office365RestClientConfiguration {
public class Office365Configuration {

/**
* Creates VendorAPIMetricsRecorder with unified metrics for all operations.
Expand All @@ -40,19 +43,32 @@ public VendorAPIMetricsRecorder vendorAPIMetricsRecorder(PluginMetrics pluginMet
return new VendorAPIMetricsRecorder(pluginMetrics);
}

/**
* Creates Office365AuthenticationProvider with metrics recording capabilities.
*
* @param config The Office 365 source configuration
* @param metricsRecorder The metrics recorder for authentication operations
* @return Configured Office365AuthenticationProvider with metrics support
*/
@Bean
public Office365AuthenticationProvider office365AuthenticationProvider(
Office365SourceConfig config,
VendorAPIMetricsRecorder metricsRecorder) {
return new Office365AuthenticationProvider(config, metricsRecorder);
}


/**
* Creates Office365RestClient with unified metrics recorder.
*
* @param authConfig The Office 365 authentication provider
* @param pluginMetrics The system plugin metrics instance
* @param vendorAPIMetricsRecorder The unified metrics recorder
* @return Configured Office365RestClient
*/
@Bean
public Office365RestClient office365RestClient(
Office365AuthenticationInterface authConfig,
PluginMetrics pluginMetrics,
VendorAPIMetricsRecorder vendorAPIMetricsRecorder) {
return new Office365RestClient(authConfig, pluginMetrics, vendorAPIMetricsRecorder);
return new Office365RestClient(authConfig, vendorAPIMetricsRecorder);
}
}
Loading
Loading