Skip to content

Commit f4033b9

Browse files
Add support for firebase firestore database.
- adds specific support for data structure for covid collab in firestore
1 parent 05cd1ca commit f4033b9

File tree

8 files changed

+828
-0
lines changed

8 files changed

+828
-0
lines changed

kafka-connect-fitbit-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ dependencies {
66

77
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
88
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion
9+
implementation 'com.google.firebase:firebase-admin:6.12.2'
910

1011
// Included in connector runtime
1112
compileOnly group: 'org.apache.kafka', name: 'connect-api', version: kafkaVersion
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
package org.radarbase.connect.rest.fitbit.user.firebase;
2+
3+
import com.google.cloud.firestore.CollectionReference;
4+
import com.google.cloud.firestore.DocumentChange;
5+
import com.google.cloud.firestore.DocumentSnapshot;
6+
import com.google.cloud.firestore.EventListener;
7+
import com.google.cloud.firestore.FirestoreException;
8+
import com.google.cloud.firestore.ListenerRegistration;
9+
import com.google.cloud.firestore.QuerySnapshot;
10+
import java.io.IOException;
11+
import java.util.HashMap;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.stream.Stream;
15+
import javax.ws.rs.NotAuthorizedException;
16+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
17+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
18+
import org.radarbase.connect.rest.fitbit.user.User;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* The User repository that supports the covid-collab application (https://covid-collab.org/). The
24+
* data is stored in Firebase Firestore. This user repository reads data from this source and
25+
* creates user objects. To ease the creation of objects a new {@link User} {@link FirebaseUser} is
26+
* created.
27+
*
28+
* <p>Structure in Firestore is :- 1) Fitbit Collection -> User Document(uuid) -> Fitbit Details. 2)
29+
* Users Collection -> User Document(uuid) -> User Details.
30+
*
31+
* <p>See {@link FirebaseFitbitAuthDetails} for the keys present in Ftibit Details for each User.
32+
* See {@link FirebaseUserDetails} for the keys present in User Details for each User.
33+
*/
34+
public class CovidCollabFirebaseUserRepository extends FirebaseUserRepository {
35+
36+
protected static final String FITBIT_TOKEN_ENDPOINT = "https://api.fitbit.com/oauth2/token";
37+
private static final Logger logger =
38+
LoggerFactory.getLogger(CovidCollabFirebaseUserRepository.class);
39+
40+
private Map<String, FirebaseUser> cachedUsers = new HashMap<>();
41+
private CollectionReference userCollection;
42+
private CollectionReference fitbitCollection;
43+
private FitbitTokenService fitbitTokenService;
44+
private List<String> allowedUsers;
45+
private ListenerRegistration fitbitCollectionListenerRegistration;
46+
private boolean hasPendingUpdates = true;
47+
48+
@Override
49+
public User get(String key) throws IOException {
50+
return this.cachedUsers.getOrDefault(key, createUser(key));
51+
}
52+
53+
@Override
54+
public Stream<? extends User> stream() {
55+
return cachedUsers.values().stream();
56+
}
57+
58+
@Override
59+
public String getAccessToken(User user) throws IOException, NotAuthorizedException {
60+
FitbitOAuth2UserCredentials credentials =
61+
cachedUsers.get(user.getId()).getFitbitAuthDetails().getOauth2Credentials();
62+
if (credentials == null || credentials.isAccessTokenExpired()) {
63+
return refreshAccessToken(user);
64+
}
65+
return credentials.getAccessToken();
66+
}
67+
68+
@Override
69+
public String refreshAccessToken(User user) throws IOException, NotAuthorizedException {
70+
FirebaseFitbitAuthDetails authDetails = cachedUsers.get(user.getId()).getFitbitAuthDetails();
71+
72+
logger.debug("Refreshing token for User: {}", cachedUsers.get(user.getId()));
73+
if (!authDetails.getOauth2Credentials().hasRefreshToken()) {
74+
logger.error("No refresh Token present");
75+
throw new NotAuthorizedException("The user does not contain a refresh token");
76+
}
77+
78+
// Make call to fitbit to get new refresh and access token.
79+
logger.info("Requesting to refreshToken.");
80+
FitbitOAuth2UserCredentials userCredentials =
81+
fitbitTokenService.refreshToken(authDetails.getOauth2Credentials().getRefreshToken());
82+
logger.debug("Token Refreshed.");
83+
84+
if (userCredentials.hasRefreshToken() && userCredentials.getAccessToken() != null) {
85+
authDetails.setOauth2Credentials(userCredentials);
86+
updateDocument(fitbitCollection.document(user.getId()), authDetails);
87+
this.cachedUsers.get(user.getId()).setFitbitAuthDetails(authDetails);
88+
return userCredentials.getAccessToken();
89+
} else {
90+
throw new IOException("There was a problem refreshing the token.");
91+
}
92+
}
93+
94+
@Override
95+
public boolean hasPendingUpdates() {
96+
return this.hasPendingUpdates;
97+
}
98+
99+
@Override
100+
public void applyPendingUpdates() throws IOException {
101+
if (this.hasPendingUpdates()) {
102+
this.hasPendingUpdates = false;
103+
} else {
104+
throw new IOException(
105+
"No pending updates available. Try calling this method only when updates are available");
106+
}
107+
}
108+
109+
@Override
110+
public void initialize(RestSourceConnectorConfig config) {
111+
super.initialize(config);
112+
113+
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
114+
this.fitbitCollection =
115+
getFirestore().collection(fitbitConfig.getFitbitUserRepositoryFirestoreFitbitCollection());
116+
this.userCollection =
117+
getFirestore().collection(fitbitConfig.getFitbitUserRepositoryFirestoreUserCollection());
118+
119+
this.fitbitTokenService =
120+
new FitbitTokenService(
121+
fitbitConfig.getFitbitClient(),
122+
fitbitConfig.getFitbitClientSecret(),
123+
FITBIT_TOKEN_ENDPOINT);
124+
125+
/**
126+
* Currently, we only listen for the fitbit collection, as it contains most information while
127+
* the user collection only contains project Id which is not supposed to change. The user
128+
* document is pulled every time the corresponding fitbit document is pulled, so it will be
129+
* sufficiently upto date. Moreover, not every document in the user collection will have linked
130+
* the fitbit. In the future, we might listen to user collection too if required.
131+
*/
132+
if (this.fitbitCollectionListenerRegistration == null) {
133+
this.fitbitCollectionListenerRegistration = initListener(fitbitCollection, this::onEvent);
134+
logger.info("Added listener to Fitbit collection for real-time updates.");
135+
}
136+
137+
this.allowedUsers = fitbitConfig.getFitbitUsers();
138+
}
139+
140+
private ListenerRegistration initListener(
141+
CollectionReference collectionReference, EventListener<QuerySnapshot> eventListener) {
142+
return collectionReference.addSnapshotListener(eventListener);
143+
}
144+
145+
protected FirebaseUser createUser(String uuid) throws IOException {
146+
DocumentSnapshot fitbitDocumentSnapshot = getDocument(uuid, fitbitCollection);
147+
DocumentSnapshot userDocumentSnapshot = getDocument(uuid, userCollection);
148+
149+
return createUser(userDocumentSnapshot, fitbitDocumentSnapshot);
150+
}
151+
152+
protected FirebaseUser createUser(
153+
DocumentSnapshot userSnapshot, DocumentSnapshot fitbitSnapshot) {
154+
// Get the fitbit document for the user which contains Auth Info
155+
FirebaseFitbitAuthDetails authDetails =
156+
fitbitSnapshot.toObject(FirebaseFitbitAuthDetails.class);
157+
// Get the user document for the user which contains User Details
158+
FirebaseUserDetails userDetails = userSnapshot.toObject(FirebaseUserDetails.class);
159+
160+
logger.debug("Auth details: {}", authDetails);
161+
logger.debug("User Details: {}", userDetails);
162+
163+
// if auth details are not available, skip this user.
164+
if (authDetails == null || authDetails.getOauth2Credentials() == null) {
165+
logger.warn(
166+
"The auth details for user {} in the database are not valid. Skipping...",
167+
fitbitSnapshot.getId());
168+
return null;
169+
}
170+
171+
// If no user details found, create one with default project.
172+
if (userDetails == null) {
173+
userDetails = new FirebaseUserDetails();
174+
}
175+
176+
FirebaseUser user = new FirebaseUser();
177+
user.setUuid(fitbitSnapshot.getId());
178+
user.setUserId(fitbitSnapshot.getId());
179+
user.setFitbitAuthDetails(authDetails);
180+
user.setFirebaseUserDetails(userDetails);
181+
return user;
182+
}
183+
184+
private void updateUser(DocumentSnapshot fitbitDocumentSnapshot) {
185+
try {
186+
FirebaseUser user =
187+
createUser(
188+
getDocument(fitbitDocumentSnapshot.getId(), userCollection), fitbitDocumentSnapshot);
189+
logger.debug("User to be updated: {}", user);
190+
if (user != null
191+
&& user.isComplete()
192+
&& (allowedUsers.isEmpty() || allowedUsers.contains(user.getId()))) {
193+
FirebaseUser user1 = this.cachedUsers.put(fitbitDocumentSnapshot.getId(), user);
194+
if (user1 == null) {
195+
logger.info("Created new User: {}", fitbitDocumentSnapshot.getId());
196+
} else {
197+
logger.info("Updated existing user: {}", user1);
198+
logger.debug("Updated user is: {}", user);
199+
}
200+
this.hasPendingUpdates = true;
201+
} else {
202+
removeUser(fitbitDocumentSnapshot);
203+
}
204+
} catch (IOException e) {
205+
logger.error(
206+
"The update of the user {} was not possible.", fitbitDocumentSnapshot.getId(), e);
207+
}
208+
}
209+
210+
private void removeUser(DocumentSnapshot documentSnapshot) {
211+
FirebaseUser user = this.cachedUsers.remove(documentSnapshot.getId());
212+
if (user != null) {
213+
logger.info("Removed User: {}:", user);
214+
this.hasPendingUpdates = true;
215+
}
216+
}
217+
218+
private void onEvent(QuerySnapshot snapshots, FirestoreException e) {
219+
if (e != null) {
220+
logger.warn("Listen for updates failed: " + e);
221+
return;
222+
}
223+
224+
logger.debug(
225+
"OnEvent Called: {}, {}",
226+
snapshots.getDocumentChanges().size(),
227+
snapshots.getDocuments().size());
228+
for (DocumentChange dc : snapshots.getDocumentChanges()) {
229+
logger.debug("Type: {}", dc.getType());
230+
switch (dc.getType()) {
231+
case ADDED:
232+
case MODIFIED:
233+
this.updateUser(dc.getDocument());
234+
break;
235+
case REMOVED:
236+
this.removeUser(dc.getDocument());
237+
default:
238+
break;
239+
}
240+
}
241+
}
242+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package org.radarbase.connect.rest.fitbit.user.firebase;
2+
3+
import com.google.cloud.firestore.annotation.Exclude;
4+
import com.google.cloud.firestore.annotation.IgnoreExtraProperties;
5+
import com.google.cloud.firestore.annotation.PropertyName;
6+
import java.time.Instant;
7+
import java.util.Date;
8+
import java.util.UUID;
9+
10+
/**
11+
* POJO corresponding to the Fitbit Auth details document for a user in Firestore. Currently,
12+
* consists of OAuth 2 details, sourceId and date range for data collection.
13+
*/
14+
@IgnoreExtraProperties
15+
public class FirebaseFitbitAuthDetails {
16+
17+
protected static final String DEFAULT_SOURCE_ID = "fitbit";
18+
private String sourceId = getDefaultSourceId();
19+
private Date startDate;
20+
private Date endDate;
21+
private String version;
22+
23+
private FitbitOAuth2UserCredentials oauth2Credentials;
24+
25+
public FirebaseFitbitAuthDetails() {
26+
this.oauth2Credentials = new FitbitOAuth2UserCredentials();
27+
this.startDate = Date.from(Instant.parse("2017-01-01T00:00:00Z"));
28+
this.endDate = Date.from(Instant.parse("9999-12-31T23:59:59.999Z"));
29+
this.version = null;
30+
}
31+
32+
@Exclude
33+
protected static String getDefaultSourceId() {
34+
return DEFAULT_SOURCE_ID + "-" + UUID.randomUUID();
35+
}
36+
37+
@PropertyName("version")
38+
public String getVersion() {
39+
return version;
40+
}
41+
42+
@PropertyName("version")
43+
public void setVersion(String version) {
44+
if (version != null && !version.trim().isEmpty()) {
45+
this.version = version;
46+
}
47+
}
48+
49+
@PropertyName("start_date")
50+
public Date getStartDate() {
51+
return startDate;
52+
}
53+
54+
@PropertyName("start_date")
55+
public void setStartDate(Date startDate) {
56+
if (startDate != null) {
57+
this.startDate = startDate;
58+
}
59+
}
60+
61+
@PropertyName("end_date")
62+
public Date getEndDate() {
63+
return endDate;
64+
}
65+
66+
@PropertyName("end_date")
67+
public void setEndDate(Date endDate) {
68+
if (endDate != null) {
69+
this.endDate = endDate;
70+
}
71+
}
72+
73+
@PropertyName("source_id")
74+
public String getSourceId() {
75+
return sourceId;
76+
}
77+
78+
@PropertyName("source_id")
79+
public void setSourceId(String sourceId) {
80+
if (sourceId != null && !sourceId.trim().isEmpty()) {
81+
this.sourceId = sourceId;
82+
}
83+
}
84+
85+
@PropertyName("oauth2")
86+
public FitbitOAuth2UserCredentials getOauth2Credentials() {
87+
return oauth2Credentials;
88+
}
89+
90+
@PropertyName("oauth2")
91+
public void setOauth2Credentials(FitbitOAuth2UserCredentials oauth2Credentials) {
92+
this.oauth2Credentials = oauth2Credentials;
93+
}
94+
}

0 commit comments

Comments
 (0)