1212import static org .sagebionetworks .bridge .BridgeConstants .API_APP_ID ;
1313import static org .sagebionetworks .bridge .BridgeConstants .API_DEFAULT_PAGE_SIZE ;
1414import static org .sagebionetworks .bridge .BridgeConstants .CANNOT_BE_BLANK ;
15+ import static org .sagebionetworks .bridge .BridgeUtils .COMMA_SPACE_JOINER ;
1516
17+ import java .io .IOException ;
1618import java .net .URL ;
1719import java .util .ArrayList ;
1820import java .util .Date ;
3032import com .amazonaws .services .s3 .model .AmazonS3Exception ;
3133import com .amazonaws .services .s3 .model .GeneratePresignedUrlRequest ;
3234import com .amazonaws .services .s3 .model .ObjectMetadata ;
35+ import com .amazonaws .services .sqs .AmazonSQS ;
36+ import com .amazonaws .services .sqs .model .SendMessageResult ;
3337import com .fasterxml .jackson .core .JsonProcessingException ;
3438import com .fasterxml .jackson .databind .JsonNode ;
39+ import com .fasterxml .jackson .databind .ObjectMapper ;
3540import com .fasterxml .jackson .databind .node .ObjectNode ;
3641import com .google .common .base .Stopwatch ;
3742import com .google .common .base .Strings ;
3843import com .google .common .collect .ImmutableSet ;
3944import org .joda .time .DateTime ;
4045import org .joda .time .DateTimeZone ;
46+
47+ import org .sagebionetworks .bridge .BridgeConstants ;
48+ import org .sagebionetworks .bridge .dao .HealthCodeDao ;
4149import org .sagebionetworks .bridge .exceptions .EntityNotFoundException ;
4250import org .sagebionetworks .bridge .models .PagedResourceList ;
4351import org .sagebionetworks .bridge .models .accounts .Account ;
6371import org .sagebionetworks .bridge .models .apps .App ;
6472import org .sagebionetworks .bridge .models .healthdata .HealthDataRecordEx3 ;
6573import org .sagebionetworks .bridge .models .schedules2 .timelines .TimelineMetadataView ;
74+ import org .sagebionetworks .bridge .models .upload .UploadRedriveList ;
6675import org .sagebionetworks .bridge .models .upload .UploadViewEx3 ;
76+ import org .sagebionetworks .bridge .models .worker .UploadRedriveWorkerRequest ;
77+ import org .sagebionetworks .bridge .models .worker .WorkerRequest ;
78+ import org .sagebionetworks .bridge .s3 .S3Helper ;
6779import org .sagebionetworks .bridge .time .DateUtils ;
6880import org .sagebionetworks .bridge .models .ForwardCursorPagedResourceList ;
6981import org .sagebionetworks .bridge .models .accounts .StudyParticipant ;
@@ -88,9 +100,12 @@ public class UploadService {
88100
89101 // package-scoped to be available in unit tests
90102 static final String CONFIG_KEY_UPLOAD_BUCKET = "upload.bucket" ;
103+ static final String CONFIG_KEY_BACKFILL_BUCKET = "backfill.bucket" ;
104+ static final String REDRIVE_UPLOAD_S3_KEY_PREFIX = "redrive-upload-id-" ;
91105 static final String METADATA_KEY_EVENT_TIMESTAMP = "eventTimestamp" ;
92106 static final String METADATA_KEY_INSTANCE_GUID = "instanceGuid" ;
93107 static final String METADATA_KEY_STARTED_ON = "startedOn" ;
108+ static final String WORKER_NAME_UPLOAD_REDRIVE = "UploadRedriveWorker" ;
94109
95110 private AccountService accountService ;
96111 private AdherenceService adherenceService ;
@@ -103,9 +118,15 @@ public class UploadService {
103118 private Schedule2Service schedule2Service ;
104119 private StudyService studyService ;
105120 private String uploadBucket ;
121+ private String redriveUploadBucket ;
106122 private UploadDao uploadDao ;
107123 private UploadDedupeDao uploadDedupeDao ;
108124 private UploadValidationService uploadValidationService ;
125+ private HealthCodeDao healthCodeDao ;
126+ private String workerQueueUrl ;
127+ private AmazonSQS sqsClient ;
128+ private S3Helper s3Helper ;
129+ private BridgeConfig config ;
109130
110131 // These parameters can be overriden to facilitate testing.
111132 // By default, we sleep 5 seconds, including right at the start and end. This means on our 7th iteration,
@@ -136,7 +157,9 @@ public final void setExporter3Service(Exporter3Service exporter3Service) {
136157 /** Sets parameters from the specified Bridge config. */
137158 @ Autowired
138159 final void setConfig (BridgeConfig config ) {
160+ this .config = config ;
139161 uploadBucket = config .getProperty (CONFIG_KEY_UPLOAD_BUCKET );
162+ redriveUploadBucket = config .getProperty (CONFIG_KEY_BACKFILL_BUCKET );
140163 }
141164
142165 /**
@@ -189,6 +212,21 @@ final void setUploadValidationService(UploadValidationService uploadValidationSe
189212 this .uploadValidationService = uploadValidationService ;
190213 }
191214
215+ @ Autowired
216+ final void setHealthCodeDao (HealthCodeDao healthCodeDao ) {
217+ this .healthCodeDao = healthCodeDao ;
218+ }
219+
220+ @ Autowired
221+ final void setSqsClient (AmazonSQS sqsClient ) {
222+ this .sqsClient = sqsClient ;
223+ }
224+
225+ @ Resource (name = "s3Helper" )
226+ public final void setS3Helper (S3Helper s3Helper ) {
227+ this .s3Helper = s3Helper ;
228+ }
229+
192230 /**
193231 * Number of iterations while polling for validation status before we time out. This is used primarily by tests to
194232 * reduce the amount of wait time during tests.
@@ -527,6 +565,88 @@ public void uploadComplete(String appId, UploadCompletionClient completedBy, Upl
527565 // Save uploadedOn date and uploadId to related adherence records.
528566 updateAdherenceWithUploadInfo (appId , upload );
529567 }
568+
569+ public void redriveUpload (UploadRedriveList redriveList ) throws IOException {
570+ checkNotNull (redriveList );
571+
572+ if (redriveList .getUploadIds ().isEmpty ()) {
573+ throw new BadRequestException ("No upload ids submitted for redrive" );
574+ }
575+
576+ int redriveSize = redriveList .getUploadIds ().size ();
577+
578+ logger .info ("Redrive uploads" + " with " + redriveSize + " upload ids" );
579+
580+ // Redrive upload.
581+ if (redriveSize <= 10 ) {
582+ redriveSmallAmountOfUploads (redriveList .getUploadIds ());
583+ } else {
584+ redriveLargeAmountOfUploads (redriveList .getUploadIds ());
585+ }
586+ }
587+
588+ private void redriveSmallAmountOfUploads (List <String > uploadIds ) throws JsonProcessingException {
589+ for (String uploadId : uploadIds ) {
590+ Upload upload = getUpload (uploadId );
591+ String appId = upload .getAppId ();
592+ if (appId == null ) {
593+ appId = healthCodeDao .getAppId (upload .getHealthCode ());
594+ }
595+ uploadComplete (appId , UploadCompletionClient .REDRIVE , upload , true );
596+ }
597+
598+ for (String uploadId : uploadIds ) {
599+ UploadValidationStatus validationStatus = pollUploadValidationStatusUntilComplete (uploadId );
600+ if (validationStatus .getStatus () != UploadStatus .SUCCEEDED ) {
601+ logErrorMessage (uploadId , validationStatus );
602+ }
603+ }
604+ }
605+
606+ // For unit test.
607+ protected void logErrorMessage (String uploadId , UploadValidationStatus validationStatus ) {
608+ logger .error ("Redrive failed for uploadId=" + uploadId + ": " + COMMA_SPACE_JOINER .join (
609+ validationStatus .getMessageList ()));
610+ }
611+
612+ private void redriveLargeAmountOfUploads (List <String > uploadIds ) throws IOException {
613+ // set up s3 Key.
614+ String currentTime = String .valueOf (DateUtils .getCurrentDateTime ().getMillis ());
615+ String s3Key = REDRIVE_UPLOAD_S3_KEY_PREFIX + currentTime ;
616+
617+ // Upload file to S3 bucket
618+ s3Helper .writeLinesToS3 (redriveUploadBucket , s3Key , uploadIds );
619+
620+ // write Json message to sqs
621+ // 1. Create request.
622+ UploadRedriveWorkerRequest uploadRedriveWorkerRequest = new UploadRedriveWorkerRequest ();
623+ uploadRedriveWorkerRequest .setS3Key (s3Key );
624+ uploadRedriveWorkerRequest .setS3Bucket (redriveUploadBucket );
625+ uploadRedriveWorkerRequest .setRedriveTypeStr ("upload_id" );
626+
627+ WorkerRequest workerRequest = new WorkerRequest ();
628+ workerRequest .setService (WORKER_NAME_UPLOAD_REDRIVE );
629+ workerRequest .setBody (uploadRedriveWorkerRequest );
630+
631+ // Convert request to JSON.
632+ ObjectMapper objectMapper = BridgeObjectMapper .get ();
633+ String requestJson ;
634+ try {
635+ requestJson = objectMapper .writeValueAsString (workerRequest );
636+ } catch (JsonProcessingException ex ) {
637+ // This should never happen, but catch and re-throw for code hygiene.
638+ throw new BridgeServiceException ("Error creating upload redrive request for S3 file " + s3Key ,
639+ ex );
640+ }
641+
642+ // 2. Send to SQS.
643+ // Note: SqsInitializer runs after Spring, so we need to grab the queue URL dynamically.
644+ workerQueueUrl = config .getProperty (BridgeConstants .CONFIG_KEY_WORKER_SQS_URL );
645+
646+ SendMessageResult sqsResult = sqsClient .sendMessage (workerQueueUrl , requestJson );
647+ logger .info ("Sent redrive upload request for file " + s3Key +
648+ sqsResult .getMessageId ());
649+ }
530650
531651 public void deleteUploadsForHealthCode (String healthCode ) {
532652 checkArgument (isNotBlank (healthCode ));
0 commit comments