Skip to content

Commit c0b5a8c

Browse files
Merge pull request #331 from InseeFr/devNewProcess
New raw data process endpoint by questionnaireId only
2 parents 48ebe05 + 65e5d1f commit c0b5a8c

18 files changed

+239
-14
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Changelog
2+
## 1.13.0 [TODO]
3+
### Changed
4+
- New raw data process endpoint
5+
26
## 1.12.2 [2025-11-06]
37
### Fixed
48
- Scientific notation for Doubles during raw data processing

src/main/java/fr/insee/genesis/controller/rest/responses/RawResponseController.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public ResponseEntity<LunaticJsonRawDataModel> getJsonRawData(
161161
@Operation(summary = "Process raw data of a campaign")
162162
@PostMapping(path = "/lunatic-json/process")
163163
@PreAuthorize("hasRole('SCHEDULER')")
164+
@Deprecated(since = "1.13.0")
164165
public ResponseEntity<String> processJsonRawData(
165166
@RequestParam("campaignName") String campaignName,
166167
@RequestParam("questionnaireId") String questionnaireId,
@@ -180,6 +181,24 @@ public ResponseEntity<String> processJsonRawData(
180181
}
181182
}
182183

184+
@Operation(summary = "Process raw data of a questionnaire")
185+
@PostMapping(path = "/{collectionInstrumentId}/process")
186+
@PreAuthorize("hasRole('SCHEDULER')")
187+
public ResponseEntity<String> processJsonRawData(
188+
@PathVariable String collectionInstrumentId
189+
) {
190+
log.info("Try to process raw JSON datas for questionnaire {}",collectionInstrumentId);
191+
try {
192+
DataProcessResult result = lunaticJsonRawDataApiPort.processRawData(collectionInstrumentId);
193+
return result.formattedDataCount() == 0 ?
194+
ResponseEntity.ok("%d document(s) processed".formatted(result.dataCount()))
195+
: ResponseEntity.ok("%d document(s) processed, including %d FORMATTED after data verification"
196+
.formatted(result.dataCount(), result.formattedDataCount()));
197+
} catch (GenesisException e) {
198+
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
199+
}
200+
}
201+
183202
@Operation(summary = "Get processed data ids from last n hours (default 24h)")
184203
@GetMapping(path = "/lunatic-json/processed/ids")
185204
@PreAuthorize("hasRole('ADMIN')")

src/main/java/fr/insee/genesis/controller/rest/responses/ResponseController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ private void processCampaignWithMode(String campaignName, Mode mode, String root
466466
}
467467

468468
//Create context if not exist
469-
if(contextService.getContextByPartitionId(campaignName) == null){
469+
if(contextService.getContextByCollectionInstrumentId(campaignName) == null){
470470
contextService.saveContext(campaignName, false);
471471
}
472472

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
package fr.insee.genesis.domain.model.surveyunit.rawdata;
22

3-
public record DataProcessResult(int dataCount, int formattedDataCount) {
3+
import fr.insee.genesis.exceptions.GenesisError;
4+
5+
import java.util.List;
6+
7+
public record DataProcessResult(int dataCount, int formattedDataCount, List<GenesisError> errors) {
48
}

src/main/java/fr/insee/genesis/domain/ports/api/DataProcessingContextApiPort.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ void saveKraftwerkExecutionSchedule(String partitionId,
3131
long countSchedules();
3232

3333
DataProcessingContextModel getContext(String interrogationId) throws GenesisException;
34-
DataProcessingContextModel getContextByPartitionId(String partitionId) throws GenesisException;
34+
DataProcessingContextModel getContextByCollectionInstrumentId(String partitionId) throws GenesisException;
3535
List<String> getPartitionIds(boolean withReview);
3636

3737
/**

src/main/java/fr/insee/genesis/domain/ports/api/LunaticJsonRawDataApiPort.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ public interface LunaticJsonRawDataApiPort {
2828
long countResponsesByQuestionnaireId(String campaignId);
2929
Page<LunaticJsonRawDataModel> findRawDataByCampaignIdAndDate(String campaignId, Instant startDt, Instant endDt, Pageable pageable);
3030

31+
@Deprecated(since = "1.13.0")
3132
DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException;
33+
34+
DataProcessResult processRawData(String collectionInstrumentId) throws GenesisException;
35+
3236
Map<String, List<String>> findProcessedIdsgroupedByQuestionnaireSince(LocalDateTime since);
3337
}

src/main/java/fr/insee/genesis/domain/ports/spi/LunaticJsonRawDataPersistencePort.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,5 @@ public interface LunaticJsonRawDataPersistencePort {
2222
long countResponsesByQuestionnaireId(String questionnaireId);
2323
List<GroupedInterrogation> findProcessedIdsGroupedByQuestionnaireSince(LocalDateTime since);
2424
List<GroupedInterrogation> findUnprocessedIds();
25+
Set<String> findUnprocessedInterrogationIdsByCollectionInstrumentId(String collectionInstrumentId);
2526
}

src/main/java/fr/insee/genesis/domain/service/context/DataProcessingContextService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public DataProcessingContextModel getContext(String interrogationId) throws Gene
168168
}
169169

170170
@Override
171-
public DataProcessingContextModel getContextByPartitionId(String partitionId){
171+
public DataProcessingContextModel getContextByCollectionInstrumentId(String partitionId){
172172
return DataProcessingContextMapper.INSTANCE.documentToModel(
173173
dataProcessingContextPersistancePort.findByPartitionId(partitionId)
174174
);

src/main/java/fr/insee/genesis/domain/service/rawdata/LunaticJsonRawDataService.java

Lines changed: 82 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,22 +100,16 @@ public List<LunaticJsonRawDataModel> getRawData(String campaignName, Mode mode,
100100
}
101101

102102
@Override
103+
@Deprecated(since = "1.13.0")
103104
public DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException {
104105
int dataCount=0;
105106
int formattedDataCount=0;
106107
DataProcessingContextModel dataProcessingContext =
107-
dataProcessingContextService.getContextByPartitionId(campaignName);
108+
dataProcessingContextService.getContextByCollectionInstrumentId(campaignName);
108109
List<Mode> modesList = controllerUtils.getModesList(campaignName, null);
109110
for (Mode mode : modesList) {
110111
//Load and save metadata into database, throw exception if none
111-
VariablesMap variablesMap = metadataService.loadAndSaveIfNotExists(campaignName, campaignName, mode, fileUtils,
112-
errors).getVariables();
113-
if (variablesMap == null) {
114-
throw new GenesisException(400,
115-
"Error during metadata parsing for mode %s :%n%s"
116-
.formatted(mode, errors.getLast().getMessage())
117-
);
118-
}
112+
VariablesMap variablesMap = getVariablesMap(campaignName, mode, errors);
119113
int totalBatchs = Math.ceilDiv(interrogationIdList.size() , config.getRawDataProcessingBatchSize());
120114
int batchNumber = 1;
121115
List<String> interrogationIdListForMode = new ArrayList<>(interrogationIdList);
@@ -156,7 +150,85 @@ public DataProcessResult processRawData(String campaignName, List<String> interr
156150
batchNumber++;
157151
}
158152
}
159-
return new DataProcessResult(dataCount, formattedDataCount);
153+
return new DataProcessResult(dataCount, formattedDataCount, errors);
154+
}
155+
156+
@Override
157+
public DataProcessResult processRawData(String questionnaireId) throws GenesisException {
158+
int dataCount=0;
159+
int formattedDataCount=0;
160+
DataProcessingContextModel dataProcessingContext =
161+
dataProcessingContextService.getContextByCollectionInstrumentId(questionnaireId);
162+
List<GenesisError> errors = new ArrayList<>();
163+
164+
List<Mode> modesList = controllerUtils.getModesList(questionnaireId, null);
165+
for (Mode mode : modesList) {
166+
//Load and save metadata into database, throw exception if none
167+
VariablesMap variablesMap = getVariablesMap(questionnaireId, mode, errors);
168+
Set<String> interrogationIds =
169+
lunaticJsonRawDataPersistencePort.findUnprocessedInterrogationIdsByCollectionInstrumentId(questionnaireId);
170+
171+
int totalBatchs = Math.ceilDiv(interrogationIds.size() , config.getRawDataProcessingBatchSize());
172+
int batchNumber = 1;
173+
List<String> interrogationIdListForMode = new ArrayList<>(interrogationIds);
174+
while(!interrogationIdListForMode.isEmpty()){
175+
log.info("Processing raw data batch {}/{}", batchNumber, totalBatchs);
176+
177+
int maxIndex = Math.min(interrogationIdListForMode.size(), config.getRawDataProcessingBatchSize());
178+
List<SurveyUnitModel> surveyUnitModels = getConvertedSurveyUnits(
179+
questionnaireId,
180+
mode,
181+
interrogationIdListForMode,
182+
maxIndex,
183+
variablesMap
184+
);
185+
186+
//Save converted data
187+
surveyUnitQualityService.verifySurveyUnits(surveyUnitModels, variablesMap);
188+
surveyUnitService.saveSurveyUnits(surveyUnitModels);
189+
190+
//Update process dates
191+
updateProcessDates(surveyUnitModels);
192+
193+
//Increment data count
194+
dataCount += surveyUnitModels.size();
195+
formattedDataCount += surveyUnitModels.stream()
196+
.filter(surveyUnitModel -> surveyUnitModel.getState().equals(DataState.FORMATTED))
197+
.toList()
198+
.size();
199+
200+
//Send processed ids grouped by questionnaire (if review activated)
201+
if(dataProcessingContext != null && dataProcessingContext.isWithReview()) {
202+
sendProcessedIdsToQualityTool(surveyUnitModels);
203+
}
204+
205+
//Remove processed ids from list
206+
interrogationIdListForMode = interrogationIdListForMode.subList(maxIndex, interrogationIdListForMode.size());
207+
batchNumber++;
208+
}
209+
}
210+
return new DataProcessResult(dataCount, formattedDataCount, errors);
211+
}
212+
213+
private VariablesMap getVariablesMap(String questionnaireId, Mode mode, List<GenesisError> errors) throws GenesisException {
214+
VariablesMap variablesMap = metadataService.loadAndSaveIfNotExists(questionnaireId, questionnaireId, mode, fileUtils,
215+
errors).getVariables();
216+
if (variablesMap == null) {
217+
throw new GenesisException(400,
218+
"Error during metadata parsing for mode %s :%n%s"
219+
.formatted(mode, errors.getLast().getMessage())
220+
);
221+
}
222+
return variablesMap;
223+
}
224+
225+
private List<SurveyUnitModel> getConvertedSurveyUnits(String questionnaireId, Mode mode, List<String> interrogationIdListForMode, int maxIndex, VariablesMap variablesMap) {
226+
List<String> interrogationIdToProcess = interrogationIdListForMode.subList(0, maxIndex);
227+
List<LunaticJsonRawDataModel> rawData = getRawData(questionnaireId, mode, interrogationIdToProcess);
228+
return convertRawData(
229+
rawData,
230+
variablesMap
231+
);
160232
}
161233

162234
private void sendProcessedIdsToQualityTool(List<SurveyUnitModel> surveyUnitModels) {

src/main/java/fr/insee/genesis/infrastructure/adapter/LunaticJsonRawDataMongoAdapter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import fr.insee.genesis.domain.model.surveyunit.rawdata.LunaticJsonRawDataModel;
77
import fr.insee.genesis.domain.ports.spi.LunaticJsonRawDataPersistencePort;
88
import fr.insee.genesis.infrastructure.document.rawdata.LunaticJsonRawDataDocument;
9+
import fr.insee.genesis.infrastructure.document.surveyunit.GroupedInterrogationDocument;
910
import fr.insee.genesis.infrastructure.mappers.GroupedInterrogationDocumentMapper;
1011
import fr.insee.genesis.infrastructure.mappers.LunaticJsonRawDataDocumentMapper;
1112
import fr.insee.genesis.infrastructure.repository.LunaticJsonMongoDBRepository;
@@ -96,4 +97,14 @@ public List<GroupedInterrogation> findUnprocessedIds() {
9697
return GroupedInterrogationDocumentMapper.INSTANCE.listDocumentToListModel(repository.aggregateRawGroupedWithNullProcessDate());
9798
}
9899

100+
@Override
101+
public Set<String> findUnprocessedInterrogationIdsByCollectionInstrumentId(String collectionInstrumentId) {
102+
Set<String> interrogationIds = new HashSet<>();
103+
104+
repository.aggregateRawGroupedWithNullProcessDate(collectionInstrumentId).forEach(
105+
groupedInterrogationDocument -> interrogationIds.addAll(groupedInterrogationDocument.getInterrogationIds())
106+
);
107+
108+
return interrogationIds;
109+
}
99110
}

0 commit comments

Comments
 (0)