2929
3030import javax .inject .Named ;
3131import java .time .Instant ;
32+ import java .util .ArrayList ;
33+ import java .util .HashSet ;
3234import java .util .List ;
3335import java .util .Map ;
36+ import java .util .Set ;
3437
3538import static org .opensearch .dataprepper .logging .DataPrepperMarkers .NOISY ;
3639import static org .opensearch .dataprepper .plugins .source .microsoft_office365 .utils .Constants .CONTENT_TYPES ;
4346@ Named
4447public class Office365RestClient {
4548 private static final String MANAGEMENT_API_BASE_URL = "https://manage.office.com/api/v1.0/" ;
49+ private static final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list" ;
50+ private static final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s" ;
51+ private static final String GET_AUDIT_LOGS_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/content?contentType=%s&startTime=%s&endTime=%s" ;
52+
4653 private final RestTemplate restTemplate = new RestTemplate ();
4754 private final RetryHandler retryHandler ;
4855 private final Office365AuthenticationInterface authConfig ;
@@ -57,63 +64,144 @@ public Office365RestClient(final Office365AuthenticationInterface authConfig,
5764 new DefaultStatusCodeHandler ());
5865 }
5966
67+ /**
68+ * Lists current subscriptions for Office 365 audit logs.
69+ *
70+ * @return List of subscription maps containing contentType, status, and webhook information
71+ * @throws SaaSCrawlerException if the operation fails
72+ */
73+ private List <Map <String , Object >> listSubscriptions () {
74+ log .info ("Listing Office 365 subscriptions" );
75+ String listUrl = String .format (SUBSCRIPTION_LIST_URL , authConfig .getTenantId ());
76+
77+ return metricsRecorder .recordListSubscriptionLatency (() -> {
78+ HttpHeaders headers = new HttpHeaders ();
79+ headers .setContentType (MediaType .APPLICATION_JSON );
80+
81+ try {
82+ List <Map <String , Object >> result = retryHandler .executeWithRetry (() -> {
83+ headers .setBearerAuth (authConfig .getAccessToken ());
84+ metricsRecorder .recordListSubscriptionCall ();
85+
86+ ResponseEntity <List <Map <String , Object >>> response = restTemplate .exchange (
87+ listUrl ,
88+ HttpMethod .GET ,
89+ new HttpEntity <>(headers ),
90+ new ParameterizedTypeReference <>() {}
91+ );
92+ log .debug ("Current subscriptions: {}" , response .getBody ());
93+ return response .getBody ();
94+ }, authConfig ::renewCredentials , metricsRecorder ::recordListSubscriptionFailure );
95+
96+ metricsRecorder .recordListSubscriptionSuccess ();
97+ return result ;
98+ } catch (Exception e ) {
99+ metricsRecorder .recordError (e );
100+ log .error (NOISY , "Failed to list subscriptions: {}" , e .getMessage ());
101+ throw new SaaSCrawlerException ("Failed to list subscriptions: " + e .getMessage (), e , true );
102+ }
103+ });
104+ }
105+
106+ /**
107+ * Starts subscriptions for the specified content types.
108+ *
109+ * @param contentTypesToStart List of content types to start subscriptions for
110+ */
111+ private void startSubscriptionsForContentTypes (List <String > contentTypesToStart ) {
112+ log .info ("Starting {} subscription(s)" , contentTypesToStart .size ());
113+ HttpHeaders headers = new HttpHeaders ();
114+ headers .setContentType (MediaType .APPLICATION_JSON );
115+ headers .setContentLength (0 );
116+
117+ for (String contentType : contentTypesToStart ) {
118+ String url = String .format (SUBSCRIPTION_START_URL ,
119+ authConfig .getTenantId (),
120+ contentType );
121+
122+ try {
123+ retryHandler .executeWithRetry (() -> {
124+ headers .setBearerAuth (authConfig .getAccessToken ());
125+ metricsRecorder .recordSubscriptionCall ();
126+
127+ ResponseEntity <String > response = restTemplate .exchange (
128+ url ,
129+ HttpMethod .POST ,
130+ new HttpEntity <>(headers ),
131+ String .class
132+ );
133+ log .info ("Successfully started subscription for {}: {}" , contentType , response .getBody ());
134+ return response ;
135+ }, authConfig ::renewCredentials , metricsRecorder ::recordSubscriptionFailure );
136+ } catch (HttpClientErrorException | HttpServerErrorException e ) {
137+ if (e .getResponseBodyAsString ().contains ("AF20024" )) {
138+ log .debug ("Subscription for {} is already enabled" , contentType );
139+ } else {
140+ metricsRecorder .recordError (e );
141+ throw new SaaSCrawlerException ("Failed to start subscription for " + contentType + ": " + e .getMessage (), e , true );
142+ }
143+ } catch (Exception e ) {
144+ metricsRecorder .recordError (e );
145+ throw new SaaSCrawlerException ("Failed to start subscription for " + contentType + ": " + e .getMessage (), e , true );
146+ }
147+ }
148+
149+ log .info ("Successfully started {} subscription(s)" , contentTypesToStart .size ());
150+ }
151+
60152 /**
61153 * Starts and verifies subscriptions for Office 365 audit logs.
154+ * Only starts subscriptions for content types that are not already enabled.
155+ * If listing subscriptions fails, falls back to starting all content types.
62156 */
63157 public void startSubscriptions () {
64158 log .info ("Starting Office 365 subscriptions for audit logs" );
65159
66160 metricsRecorder .recordSubscriptionLatency (() -> {
67161 try {
68- HttpHeaders headers = new HttpHeaders ();
69- headers .setContentType (MediaType .APPLICATION_JSON );
70-
71- // TODO: Only start the subscriptions only if the call commented
72- // out below doesn't return all the audit log types
73- // Check current subscriptions
74- // final String SUBSCRIPTION_LIST_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/list";
75- // String listUrl = String.format(SUBSCRIPTION_LIST_URL, authConfig.getTenantId());
76- //
77- // ResponseEntity<String> listResponse = restTemplate.exchange(
78- // listUrl,
79- // HttpMethod.GET,
80- // new HttpEntity<>(headers),
81- // String.class
82- // );
83- // log.debug("Current subscriptions: {}", listResponse.getBody());
84-
85- // Start subscriptions for each content type
86- headers .setContentLength (0 );
87-
88- for (String contentType : CONTENT_TYPES ) {
89- final String SUBSCRIPTION_START_URL = MANAGEMENT_API_BASE_URL + "%s/activity/feed/subscriptions/start?contentType=%s" ;
90- String url = String .format (SUBSCRIPTION_START_URL ,
91- authConfig .getTenantId (),
92- contentType );
93-
94- retryHandler .executeWithRetry (() -> {
95- try {
96- headers .setBearerAuth (authConfig .getAccessToken ());
97- metricsRecorder .recordSubscriptionCall ();
98-
99- ResponseEntity <String > response = restTemplate .exchange (
100- url ,
101- HttpMethod .POST ,
102- new HttpEntity <>(headers ),
103- String .class
104- );
105- log .debug ("Started subscription for {}: {}" , contentType , response .getBody ());
106- return response ;
107- } catch (HttpClientErrorException | HttpServerErrorException e ) {
108- if (e .getResponseBodyAsString ().contains ("AF20024" )) {
109- log .debug ("Subscription for {} is already enabled" , contentType );
110- return null ;
111- }
112- throw e ;
162+ List <String > contentTypesToStart = new ArrayList <>();
163+
164+ // Try to get current subscriptions to determine which need to be started
165+ try {
166+ List <Map <String , Object >> currentSubscriptions = listSubscriptions ();
167+
168+ // Determine which content types are already enabled
169+ Set <String > enabledContentTypes = new HashSet <>();
170+ for (Map <String , Object > subscription : currentSubscriptions ) {
171+ String contentType = (String ) subscription .get ("contentType" );
172+ String status = (String ) subscription .get ("status" );
173+
174+ if ("enabled" .equalsIgnoreCase (status )) {
175+ enabledContentTypes .add (contentType );
176+ log .info ("Content type {} is already enabled" , contentType );
177+ }
178+ }
179+
180+ // Identify content types that need to be started
181+ for (String contentType : CONTENT_TYPES ) {
182+ if (!enabledContentTypes .contains (contentType )) {
183+ contentTypesToStart .add (contentType );
184+ log .info ("Content type {} needs to be started" , contentType );
113185 }
114- }, authConfig ::renewCredentials , metricsRecorder ::recordSubscriptionFailure );
186+ }
187+
188+ // If all content types are already enabled, we're done
189+ if (contentTypesToStart .isEmpty ()) {
190+ log .info ("All content types are already enabled. No subscriptions need to be started." );
191+ metricsRecorder .recordSubscriptionSuccess ();
192+ return null ;
193+ }
194+ } catch (Exception e ) {
195+ // If listing subscriptions fails, fall back to starting all content types
196+ log .warn ("Failed to list subscriptions, will attempt to start all content types as fallback: {}" , e .getMessage ());
197+ contentTypesToStart .clear ();
198+ for (String contentType : CONTENT_TYPES ) {
199+ contentTypesToStart .add (contentType );
200+ }
115201 }
116-
202+
203+ // Start subscriptions for the identified content types
204+ startSubscriptionsForContentTypes (contentTypesToStart );
117205 metricsRecorder .recordSubscriptionSuccess ();
118206 return null ;
119207 } catch (Exception e ) {
@@ -138,9 +226,6 @@ public AuditLogsResponse searchAuditLogs(final String contentType,
138226 final Instant startTime ,
139227 final Instant endTime ,
140228 String pageUri ) {
141- final String GET_AUDIT_LOGS_URL = MANAGEMENT_API_BASE_URL +
142- "%s/activity/feed/subscriptions/content?contentType=%s&startTime=%s&endTime=%s" ;
143-
144229 final String url = pageUri != null ? pageUri :
145230 String .format (GET_AUDIT_LOGS_URL ,
146231 authConfig .getTenantId (),
0 commit comments