99
1010package org .opensearch .dataprepper .plugins .source .microsoft_office365 ;
1111
12- import io .micrometer .core .instrument .Counter ;
1312import lombok .extern .slf4j .Slf4j ;
14- import org .opensearch .dataprepper .metrics .PluginMetrics ;
1513import org .opensearch .dataprepper .plugins .source .microsoft_office365 .auth .Office365AuthenticationInterface ;
1614import org .opensearch .dataprepper .plugins .source .source_crawler .exception .SaaSCrawlerException ;
1715import org .opensearch .dataprepper .plugins .source .microsoft_office365 .models .AuditLogsResponse ;
1816import org .opensearch .dataprepper .plugins .source .source_crawler .metrics .VendorAPIMetricsRecorder ;
17+ import org .opensearch .dataprepper .plugins .source .microsoft_office365 .metrics .Office365MetricsRecorder ;
1918import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .RetryHandler ;
2019import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .DefaultRetryStrategy ;
2120import org .opensearch .dataprepper .plugins .source .source_crawler .utils .retry .DefaultStatusCodeHandler ;
4544@ Named
4645public class Office365RestClient {
4746 private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/" ;
48- private static final String API_CALLS = "apiCalls" ;
49-
5047 private final RestTemplate restTemplate = new RestTemplate ();
5148 private final RetryHandler retryHandler ;
5249 private final Office365AuthenticationInterface authConfig ;
5350 private final VendorAPIMetricsRecorder metricsRecorder ;
54- private final Counter apiCallsCounter ;
51+ private final Office365MetricsRecorder office365MetricsRecorder ;
5552
5653 public Office365RestClient (final Office365AuthenticationInterface authConfig ,
57- final PluginMetrics pluginMetrics ,
58- final VendorAPIMetricsRecorder metricsRecorder ) {
54+ final VendorAPIMetricsRecorder metricsRecorder ,
55+ final Office365MetricsRecorder office365MetricsRecorder ) {
5956 this .authConfig = authConfig ;
6057 this .metricsRecorder = metricsRecorder ;
61- this .apiCallsCounter = pluginMetrics . counter ( API_CALLS ) ;
58+ this .office365MetricsRecorder = office365MetricsRecorder ;
6259 this .retryHandler = new RetryHandler (
6360 new DefaultRetryStrategy (),
6461 new DefaultStatusCodeHandler ());
@@ -69,59 +66,66 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
6966 */
7067 public void startSubscriptions () {
7168 log .info ("Starting Office 365 subscriptions for audit logs" );
72- try {
73- HttpHeaders headers = new HttpHeaders ();
74- headers .setContentType (MediaType .APPLICATION_JSON );
69+
70+ office365MetricsRecorder .recordStartSubscriptionLatency (() -> {
71+ try {
72+ HttpHeaders headers = new HttpHeaders ();
73+ headers .setContentType (MediaType .APPLICATION_JSON );
7574
76- // TODO: Only start the subscriptions only if the call commented
77- // out below doesn't return all the audit log types
78- // Check current subscriptions
79- // final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
80- // String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
75+ // TODO: Only start the subscriptions only if the call commented
76+ // out below doesn't return all the audit log types
77+ // Check current subscriptions
78+ // final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
79+ // String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
8180//
82- // ResponseEntity<String> listResponse = restTemplate.exchange(
83- // listUrl,
84- // HttpMethod.GET,
85- // new HttpEntity<>(headers),
86- // String.class
87- // );
88- // log.debug("Current subscriptions: {}", listResponse.getBody());
81+ // ResponseEntity<String> listResponse = restTemplate.exchange(
82+ // listUrl,
83+ // HttpMethod.GET,
84+ // new HttpEntity<>(headers),
85+ // String.class
86+ // );
87+ // log.debug("Current subscriptions: {}", listResponse.getBody());
8988
90- // Start subscriptions for each content type
91- headers .setContentLength (0 );
89+ // Start subscriptions for each content type
90+ headers .setContentLength (0 );
9291
93- for (String contentType : CONTENT_TYPES ) {
94- final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s" ;
95- String url = String .format (SUBSCRIPTION_START_URL ,
96- authConfig .getTenantId (),
97- contentType );
92+ for (String contentType : CONTENT_TYPES ) {
93+ final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s" ;
94+ String url = String .format (SUBSCRIPTION_START_URL ,
95+ authConfig .getTenantId (),
96+ contentType );
9897
99- retryHandler .executeWithRetry (() -> {
100- try {
101- headers .setBearerAuth (authConfig .getAccessToken ());
102- apiCallsCounter .increment ();
103- ResponseEntity <String > response = restTemplate .exchange (
104- url ,
105- HttpMethod .POST ,
106- new HttpEntity <>(headers ),
107- String .class
108- );
109- log .debug ("Started subscription for {}: {}" , contentType , response .getBody ());
110- return response ;
111- } catch (HttpClientErrorException | HttpServerErrorException e ) {
112- if (e .getResponseBodyAsString ().contains ("AF20024" )) {
113- log .debug ("Subscription for {} is already enabled" , contentType );
114- return null ;
98+ retryHandler .executeWithRetry (() -> {
99+ try {
100+ headers .setBearerAuth (authConfig .getAccessToken ());
101+ office365MetricsRecorder .recordStartSubscriptionCall ();
102+
103+ ResponseEntity <String > response = restTemplate .exchange (
104+ url ,
105+ HttpMethod .POST ,
106+ new HttpEntity <>(headers ),
107+ String .class
108+ );
109+ log .debug ("Started subscription for {}: {}" , contentType , response .getBody ());
110+ return response ;
111+ } catch (HttpClientErrorException | HttpServerErrorException e ) {
112+ if (e .getResponseBodyAsString ().contains ("AF20024" )) {
113+ log .debug ("Subscription for {} is already enabled" , contentType );
114+ return null ;
115+ }
116+ throw e ;
115117 }
116- throw e ;
117- }
118- }, authConfig ::renewCredentials );
118+ }, authConfig ::renewCredentials , office365MetricsRecorder ::recordStartSubscriptionFailure );
119+ }
120+
121+ office365MetricsRecorder .recordStartSubscriptionSuccess ();
122+ return null ;
123+ } catch (Exception e ) {
124+ metricsRecorder .recordError (e );
125+ log .error (NOISY , "Failed to initialize subscriptions" , e );
126+ throw new SaaSCrawlerException ("Failed to initialize subscriptions: " + e .getMessage (), e , true );
119127 }
120- } catch (Exception e ) {
121- metricsRecorder .recordError (e );
122- log .error (NOISY , "Failed to initialize subscriptions" , e );
123- throw new SaaSCrawlerException ("Failed to initialize subscriptions: " + e .getMessage (), e , true );
124- }
128+ });
125129 }
126130
127131 /**
0 commit comments