1+ /*
2+ * Copyright 2018 The Hyve
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ *
16+ */
17+
18+ package org .radarbase .connect .rest .fitbit .user ;
19+
20+ import static org .radarbase .connect .rest .converter .PayloadToSourceRecordConverter .MIN_INSTANT ;
21+ import static org .radarbase .connect .rest .fitbit .request .FitbitRequestGenerator .JSON_READER ;
22+
23+ import com .fasterxml .jackson .core .JsonProcessingException ;
24+ import com .fasterxml .jackson .databind .ObjectReader ;
25+ import java .io .IOException ;
26+ import java .net .ProtocolException ;
27+ import java .net .URL ;
28+ import java .time .Duration ;
29+ import java .time .Instant ;
30+ import java .util .HashMap ;
31+ import java .util .HashSet ;
32+ import java .util .Map ;
33+ import java .util .NoSuchElementException ;
34+ import java .util .Set ;
35+ import java .util .concurrent .atomic .AtomicReference ;
36+ import java .util .stream .Collectors ;
37+ import java .util .stream .Stream ;
38+ import okhttp3 .Credentials ;
39+ import okhttp3 .HttpUrl ;
40+ import okhttp3 .MediaType ;
41+ import okhttp3 .OkHttpClient ;
42+ import okhttp3 .Request ;
43+ import okhttp3 .RequestBody ;
44+ import okhttp3 .Response ;
45+ import okhttp3 .ResponseBody ;
46+ import org .apache .kafka .common .config .ConfigException ;
47+ import org .radarbase .connect .rest .RestSourceConnectorConfig ;
48+ import org .radarbase .connect .rest .fitbit .FitbitRestSourceConnectorConfig ;
49+ import org .radarbase .exception .TokenException ;
50+ import org .radarbase .oauth .OAuth2Client ;
51+ import org .slf4j .Logger ;
52+ import org .slf4j .LoggerFactory ;
53+
54+ @ SuppressWarnings ("unused" )
55+ public class ServiceUserRepositoryLegacy implements UserRepository {
56+ private static final Logger logger = LoggerFactory .getLogger (ServiceUserRepositoryLegacy .class );
57+
58+ private static final ObjectReader USER_LIST_READER = JSON_READER .forType (Users .class );
59+ private static final ObjectReader USER_READER = JSON_READER .forType (User .class );
60+ private static final ObjectReader OAUTH_READER = JSON_READER .forType (OAuth2UserCredentials .class );
61+ private static final RequestBody EMPTY_BODY =
62+ RequestBody .create ("" , MediaType .parse ("application/json; charset=utf-8" ));
63+ private static final Duration FETCH_THRESHOLD = Duration .ofMinutes (1L );
64+ private static final Duration CONNECTION_TIMEOUT = Duration .ofSeconds (60 );
65+ private static final Duration CONNECTION_READ_TIMEOUT = Duration .ofSeconds (90 );
66+
67+ private final OkHttpClient client ;
68+ private final Map <String , OAuth2UserCredentials > cachedCredentials ;
69+ private final AtomicReference <Instant > nextFetch = new AtomicReference <>(MIN_INSTANT );
70+
71+ private HttpUrl baseUrl ;
72+ private final HashSet <String > containedUsers ;
73+ private Set <? extends User > timedCachedUsers = new HashSet <>();
74+ private OAuth2Client repositoryClient ;
75+ private String basicCredentials ;
76+
77+ public ServiceUserRepositoryLegacy () {
78+ this .client = new OkHttpClient .Builder ()
79+ .connectTimeout (CONNECTION_TIMEOUT )
80+ .readTimeout (CONNECTION_READ_TIMEOUT )
81+ .build ();
82+ this .cachedCredentials = new HashMap <>();
83+ this .containedUsers = new HashSet <>();
84+ }
85+
86+ @ Override
87+ public User get (String key ) throws IOException {
88+ Request request = requestFor ("users/" + key ).build ();
89+ return makeRequest (request , USER_READER );
90+ }
91+
92+ @ Override
93+ public void initialize (RestSourceConnectorConfig config ) {
94+ FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig ) config ;
95+ this .baseUrl = fitbitConfig .getFitbitUserRepositoryUrl ();
96+ this .containedUsers .addAll (fitbitConfig .getFitbitUsers ());
97+
98+ URL tokenUrl = fitbitConfig .getFitbitUserRepositoryTokenUrl ();
99+ String clientId = fitbitConfig .getFitbitUserRepositoryClientId ();
100+ String clientSecret = fitbitConfig .getFitbitUserRepositoryClientSecret ();
101+
102+ if (tokenUrl != null ) {
103+ if (clientId .isEmpty ()) {
104+ throw new ConfigException ("Client ID for user repository is not set." );
105+ }
106+ this .repositoryClient = new OAuth2Client .Builder ()
107+ .credentials (clientId , clientSecret )
108+ .endpoint (tokenUrl )
109+ .scopes ("SUBJECT.READ MEASUREMENT.CREATE" )
110+ .httpClient (client )
111+ .build ();
112+ } else if (clientId != null ) {
113+ basicCredentials = Credentials .basic (clientId , clientSecret );
114+ }
115+ }
116+
117+ @ Override
118+ public Stream <? extends User > stream () {
119+ if (nextFetch .get ().equals (MIN_INSTANT )) {
120+ try {
121+ applyPendingUpdates ();
122+ } catch (IOException ex ) {
123+ logger .error ("Failed to initially get users from repository" , ex );
124+ }
125+ }
126+ return this .timedCachedUsers .stream ()
127+ .filter (User ::isComplete );
128+ }
129+
130+ @ Override
131+ public String getAccessToken (User user ) throws IOException , UserNotAuthorizedException {
132+ if (!user .isAuthorized ()) {
133+ throw new UserNotAuthorizedException ("User is not authorized" );
134+ }
135+ OAuth2UserCredentials credentials = cachedCredentials .get (user .getId ());
136+ if (credentials != null && !credentials .isAccessTokenExpired ()) {
137+ return credentials .getAccessToken ();
138+ } else {
139+ Request request = requestFor ("users/" + user .getId () + "/token" ).build ();
140+ return requestAccessToken (user , request );
141+ }
142+ }
143+
144+ @ Override
145+ public String refreshAccessToken (User user ) throws IOException , UserNotAuthorizedException {
146+ if (!user .isAuthorized ()) {
147+ throw new UserNotAuthorizedException ("User is not authorized" );
148+ }
149+ Request request = requestFor ("users/" + user .getId () + "/token" )
150+ .post (EMPTY_BODY )
151+ .build ();
152+ return requestAccessToken (user , request );
153+ }
154+
155+ private String requestAccessToken (User user , Request request )
156+ throws UserNotAuthorizedException , IOException {
157+ try {
158+ OAuth2UserCredentials credentials = makeRequest (request , OAUTH_READER );
159+ cachedCredentials .put (user .getId (), credentials );
160+ return credentials .getAccessToken ();
161+ } catch (HttpResponseException ex ) {
162+ if (ex .getStatusCode () == 407 ) {
163+ cachedCredentials .remove (user .getId ());
164+ if (user instanceof LocalUser ) {
165+ ((LocalUser ) user ).setIsAuthorized (false );
166+ }
167+ throw new UserNotAuthorizedException (ex .getMessage ());
168+ }
169+ throw ex ;
170+ }
171+ }
172+
173+ @ Override
174+ public boolean hasPendingUpdates () {
175+ Instant nextFetchTime = nextFetch .get ();
176+ Instant now = Instant .now ();
177+ return now .isAfter (nextFetchTime );
178+ }
179+
180+ @ Override
181+ public void applyPendingUpdates () throws IOException {
182+ logger .info ("Requesting user information from webservice" );
183+ Request request = requestFor ("users?source-type=FitBit" ).build ();
184+ this .timedCachedUsers =
185+ this .<Users >makeRequest (request , USER_LIST_READER ).getUsers ().stream ()
186+ .filter (u -> u .isComplete ()
187+ && (containedUsers .isEmpty ()
188+ || containedUsers .contains (u .getVersionedId ())))
189+ .collect (Collectors .toSet ());
190+ nextFetch .set (Instant .now ().plus (FETCH_THRESHOLD ));
191+ }
192+
193+ private Request .Builder requestFor (String relativeUrl ) throws IOException {
194+ HttpUrl url = baseUrl .resolve (relativeUrl );
195+ if (url == null ) {
196+ throw new IllegalArgumentException ("Relative URL is invalid" );
197+ }
198+ Request .Builder builder = new Request .Builder ().url (url );
199+ String authorization = requestAuthorization ();
200+ if (authorization != null ) {
201+ builder .addHeader ("Authorization" , authorization );
202+ }
203+
204+ return builder ;
205+ }
206+
207+ private String requestAuthorization () throws IOException {
208+ if (repositoryClient != null ) {
209+ try {
210+ return "Bearer " + repositoryClient .getValidToken ().getAccessToken ();
211+ } catch (TokenException ex ) {
212+ throw new IOException (ex );
213+ }
214+ } else if (basicCredentials != null ) {
215+ return basicCredentials ;
216+ } else {
217+ return null ;
218+ }
219+ }
220+
221+ private <T > T makeRequest (Request request , ObjectReader reader ) throws IOException {
222+ logger .info ("Requesting info from {}" , request .url ());
223+ try (Response response = client .newCall (request ).execute ()) {
224+ ResponseBody body = response .body ();
225+
226+ if (response .code () == 404 ) {
227+ throw new NoSuchElementException ("URL " + request .url () + " does not exist" );
228+ } else if (!response .isSuccessful () || body == null ) {
229+ String message = "Failed to make request" ;
230+ if (response .code () > 0 ) {
231+ message += " (HTTP status code " + response .code () + ')' ;
232+ }
233+ if (body != null ) {
234+ message += body .string ();
235+ }
236+ throw new HttpResponseException (message , response .code ());
237+ }
238+ String bodyString = body .string ();
239+ try {
240+ return reader .readValue (bodyString );
241+ } catch (JsonProcessingException ex ) {
242+ logger .error ("Failed to parse JSON: {}\n {}" , ex , bodyString );
243+ throw ex ;
244+ }
245+ } catch (ProtocolException ex ) {
246+ throw new IOException ("Failed to make request to user repository" , ex );
247+ }
248+ }
249+ }
0 commit comments