1010import com .thughari .jobtrackerpro .entity .User ;
1111import com .thughari .jobtrackerpro .interfaces .GeminiService ;
1212import com .thughari .jobtrackerpro .repo .UserRepository ;
13+ import com .thughari .jobtrackerpro .util .CacheEvictService ;
1314import com .thughari .jobtrackerpro .util .UrlParser ;
1415
1516import lombok .extern .slf4j .Slf4j ;
2122import org .springframework .transaction .annotation .Transactional ;
2223
2324import java .math .BigInteger ;
25+ import java .time .Instant ;
26+ import java .time .LocalDateTime ;
27+ import java .time .ZoneOffset ;
2428import java .util .ArrayList ;
2529import java .util .Base64 ;
2630import java .util .List ;
@@ -32,30 +36,29 @@ public class GmailWebhookService {
3236 private final GeminiService geminiService ;
3337 private final JobService jobService ;
3438 private final UserRepository userRepository ;
35- private final CacheManager cacheManager ;
39+ private final CacheEvictService cacheEvictService ;
3640
3741 @ Value ("${spring.security.oauth2.client.registration.google.client-id}" )
3842 private String clientId ;
3943
4044 @ Value ("${spring.security.oauth2.client.registration.google.client-secret}" )
4145 private String clientSecret ;
4246
43- public GmailWebhookService (GeminiService geminiService , JobService jobService , UserRepository userRepository , CacheManager cacheManager ) {
47+ public GmailWebhookService (GeminiService geminiService , JobService jobService , UserRepository userRepository , CacheEvictService cacheEvictService ) {
4448 this .geminiService = geminiService ;
4549 this .jobService = jobService ;
4650 this .userRepository = userRepository ;
47- this .cacheManager = cacheManager ;
51+ this .cacheEvictService = cacheEvictService ;
4852 }
4953
5054 @ Async ("taskExecutor" )
51- @ Transactional
5255 public void processHistorySync (String userEmail ) {
5356 final String email = userEmail .toLowerCase ();
5457
5558 int updatedRows = userRepository .claimSyncLock (email );
56- if (updatedRows == 0 ) return ;
57-
58- evictUserCaches (email );
59+ if (updatedRows == 0 ) return ;
60+
61+ cacheEvictService . evictAllForUser (email );
5962
6063 try {
6164 User user = userRepository .findByEmail (email )
@@ -84,59 +87,19 @@ public void processHistorySync(String userEmail) {
8487
8588 List <EmailBatchItem > batchItems = collectMessages (service , historyResponse .getHistory ());
8689
87- if (!batchItems .isEmpty ()) {log .info ("Ingesting batch of {} emails via Gemini for {}" , batchItems .size (), email );
88-
89- List <List <String >> batchUrlLists = batchItems .stream ()
90- .map (item -> UrlParser .extractAndCleanUrls (item .body ()))
91- .toList ();
92-
93- List <JobDTO > extractedJobs = geminiService .extractJobsFromBatch (batchItems );
94-
95- for (JobDTO job : extractedJobs ) {
96- Integer idx = job .getInputIndex ();
97-
98- if (idx != null && idx >= 0 && idx < batchUrlLists .size ()) {
99- List <String > originalUrls = batchUrlLists .get (idx );
100-
101- if (job .getUrlIndex () != null && job .getUrlIndex () >= 0 && job .getUrlIndex () < originalUrls .size ()) {
102- job .setUrl (originalUrls .get (job .getUrlIndex ()));
103- }
104- else if (job .getUrl () == null || job .getUrl ().isEmpty ()) {
105- job .setUrl (originalUrls .stream ()
106- .filter (u -> {
107- String lower = u .toLowerCase ();
108- return lower .contains ("career" ) ||
109- lower .contains ("job" ) ||
110- lower .contains ("apply" ) ||
111- lower .contains ("/jobs/" ) ||
112- lower .contains ("/careers/" );
113- })
114- .findFirst ().orElse ("" ));
115- }
116- }
117-
118- if (job .getUrl () != null ) {
119- String lower = job .getUrl ().toLowerCase ();
120- if (lower .contains ("unsubscribe" ) ||
121- lower .contains ("privacy" ) ||
122- lower .contains ("help" ) ||
123- lower .contains ("settings" )) {
124- job .setUrl ("" );
125- }
126- }
127-
128- job .setUrlIndex (null );
129- job .setInputIndex (null );
130-
131- jobService .createOrUpdateJob (job , email );
132- }
133- }
134-
90+ if (!batchItems .isEmpty ()) {
91+
92+ List <JobDTO > extractedJobs = geminiService .extractJobsFromBatch (batchItems );
93+
94+ log .info ("Ingesting batch of {} emails via Gemini for {}" , batchItems .size (), email );
95+
96+ jobService .saveBatchResults (email , batchItems , extractedJobs );
97+ }
13598 } catch (Exception e ) {
13699 log .error ("High-Performance Sync failed for {}: " , email , e );
137100 } finally {
138101 userRepository .releaseSyncLock (email );
139- evictUserCaches (email );
102+ cacheEvictService . evictAllForUser (email );
140103 }
141104 }
142105
@@ -151,15 +114,20 @@ private List<EmailBatchItem> collectMessages(Gmail service, List<History> histor
151114 Message m = service .users ().messages ().get ("me" , added .getMessage ().getId ())
152115 .setFormat ("full" ).execute ();
153116
154- String from = "" , subj = "" ;
117+ long millisecondTimestamp = m .getInternalDate ();
118+ LocalDateTime emailDate = LocalDateTime .ofInstant (
119+ Instant .ofEpochMilli (millisecondTimestamp ), ZoneOffset .UTC );
120+
121+ String from = "" , subj = "" , replyTo ="" ;
155122 for (var h : m .getPayload ().getHeaders ()) {
156123 if ("From" .equalsIgnoreCase (h .getName ())) from = h .getValue ();
157124 if ("Subject" .equalsIgnoreCase (h .getName ())) subj = h .getValue ();
125+ if ("Reply-To" .equalsIgnoreCase (h .getName ())) replyTo = h .getValue ();
158126 }
159127
160128 if (!isSystemNoise (subj )) {
161129 String body = extractTextFromBody (m .getPayload ());
162- items .add (new EmailBatchItem (from , subj , body ));
130+ items .add (new EmailBatchItem (from , subj , replyTo , body , emailDate ));
163131 }
164132 } catch (Exception e ) {
165133 log .warn ("Failed to fetch message {}: {}" , added .getMessage ().getId (), e .getMessage ());
@@ -197,12 +165,12 @@ private boolean isSystemNoise(String subject) {
197165 return s .contains ("security alert" ) || s .contains ("sign-in" ) || s .contains ("verification code" );
198166 }
199167
200- private void evictUserCaches (String email ) {
201- Cache userCache = cacheManager .getCache ("users" );
202- Cache entityCache = cacheManager .getCache ("userEntities" );
203- if (userCache != null ) userCache .evict (email );
204- if (entityCache != null ) entityCache .evict (email );
205- }
168+ // private void evictUserCaches(String email) {
169+ // Cache userCache = cacheManager.getCache("users");
170+ // Cache entityCache = cacheManager.getCache("userEntities");
171+ // if (userCache != null) userCache.evict(email);
172+ // if (entityCache != null) entityCache.evict(email);
173+ // }
206174
207175 public String getFreshAccessToken (String refreshToken ) throws Exception {
208176 return new GoogleRefreshTokenRequest (GoogleNetHttpTransport .newTrustedTransport (), GsonFactory .getDefaultInstance (),
0 commit comments