Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Changelog
## 1.13.0 [TODO]
### Changed
- New raw data process endpoint

## 1.12.2 [2025-11-06]
### Fixed
- Scientific notation for Doubles during raw data processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public ResponseEntity<LunaticJsonRawDataModel> getJsonRawData(
@Operation(summary = "Process raw data of a campaign")
@PostMapping(path = "/lunatic-json/process")
@PreAuthorize("hasRole('SCHEDULER')")
@Deprecated(since = "1.13.0")
public ResponseEntity<String> processJsonRawData(
@RequestParam("campaignName") String campaignName,
@RequestParam("questionnaireId") String questionnaireId,
Expand All @@ -180,6 +181,24 @@ public ResponseEntity<String> processJsonRawData(
}
}

@Operation(summary = "Process raw data of a questionnaire")
@PostMapping(path = "/lunatic-json/{questionnaireId}/process")
@PreAuthorize("hasRole('SCHEDULER')")
public ResponseEntity<String> processJsonRawData(
@PathVariable String questionnaireId
) {
log.info("Try to process raw JSON datas for questionnaire {}",questionnaireId);
try {
DataProcessResult result = lunaticJsonRawDataApiPort.processRawData(questionnaireId);
return result.formattedDataCount() == 0 ?
ResponseEntity.ok("%d document(s) processed".formatted(result.dataCount()))
: ResponseEntity.ok("%d document(s) processed, including %d FORMATTED after data verification"
.formatted(result.dataCount(), result.formattedDataCount()));
} catch (GenesisException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
}

@Operation(summary = "Get processed data ids from last n hours (default 24h)")
@GetMapping(path = "/lunatic-json/processed/ids")
@PreAuthorize("hasRole('ADMIN')")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package fr.insee.genesis.domain.model.surveyunit.rawdata;

public record DataProcessResult(int dataCount, int formattedDataCount) {
import fr.insee.genesis.exceptions.GenesisError;

import java.util.List;

public record DataProcessResult(int dataCount, int formattedDataCount, List<GenesisError> errors) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public interface LunaticJsonRawDataApiPort {
long countResponsesByQuestionnaireId(String campaignId);
Page<LunaticJsonRawDataModel> findRawDataByCampaignIdAndDate(String campaignId, Instant startDt, Instant endDt, Pageable pageable);

@Deprecated(since = "1.13.0")
DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException;

DataProcessResult processRawData(String questionnaireId) throws GenesisException;

Map<String, List<String>> findProcessedIdsgroupedByQuestionnaireSince(LocalDateTime since);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public interface LunaticJsonRawDataPersistencePort {
long countResponsesByQuestionnaireId(String questionnaireId);
List<GroupedInterrogation> findProcessedIdsGroupedByQuestionnaireSince(LocalDateTime since);
List<GroupedInterrogation> findUnprocessedIds();
Set<String> findUnprocessedInterrogationIdsByQuestionnaire(String questionnaireId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public List<LunaticJsonRawDataModel> getRawData(String campaignName, Mode mode,
}

@Override
@Deprecated(since = "1.13.0")
public DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException {
int dataCount=0;
int formattedDataCount=0;
Expand Down Expand Up @@ -156,7 +157,73 @@ public DataProcessResult processRawData(String campaignName, List<String> interr
batchNumber++;
}
}
return new DataProcessResult(dataCount, formattedDataCount);
return new DataProcessResult(dataCount, formattedDataCount, errors);
}

@Override
public DataProcessResult processRawData(String questionnaireId) throws GenesisException {
int dataCount=0;
int formattedDataCount=0;
DataProcessingContextModel dataProcessingContext =
dataProcessingContextService.getContextByPartitionId(questionnaireId);
List<GenesisError> errors = new ArrayList<>();

List<Mode> modesList = controllerUtils.getModesList(questionnaireId, null);
for (Mode mode : modesList) {
//Load and save metadata into database, throw exception if none
VariablesMap variablesMap = metadataService.loadAndSaveIfNotExists(questionnaireId, questionnaireId, mode, fileUtils,
errors).getVariables();
if (variablesMap == null) {
throw new GenesisException(400,
"Error during metadata parsing for mode %s :%n%s"
.formatted(mode, errors.getLast().getMessage())
);
}
Set<String> interrogationIds =
lunaticJsonRawDataPersistencePort.findUnprocessedInterrogationIdsByQuestionnaire(questionnaireId);


int totalBatchs = Math.ceilDiv(interrogationIds.size() , config.getRawDataProcessingBatchSize());
int batchNumber = 1;
List<String> interrogationIdListForMode = new ArrayList<>(interrogationIds);
while(!interrogationIdListForMode.isEmpty()){
log.info("Processing raw data batch {}/{}", batchNumber, totalBatchs);
int maxIndex = Math.min(interrogationIdListForMode.size(), config.getRawDataProcessingBatchSize());
List<String> interrogationIdToProcess = interrogationIdListForMode.subList(0, maxIndex);

List<LunaticJsonRawDataModel> rawData = getRawData(questionnaireId, mode, interrogationIdToProcess);

List<SurveyUnitModel> surveyUnitModels = convertRawData(
rawData,
variablesMap
);

//Save converted data
surveyUnitQualityService.verifySurveyUnits(surveyUnitModels, variablesMap);
surveyUnitService.saveSurveyUnits(surveyUnitModels);

//Update process dates
updateProcessDates(surveyUnitModels);

//Increment data count
dataCount += surveyUnitModels.size();
formattedDataCount += surveyUnitModels.stream()
.filter(surveyUnitModel -> surveyUnitModel.getState().equals(DataState.FORMATTED))
.toList()
.size();

//Send processed ids grouped by questionnaire (if review activated)
if(dataProcessingContext != null && dataProcessingContext.isWithReview()) {
sendProcessedIdsToQualityTool(surveyUnitModels);
}

//Remove processed ids from list
interrogationIdListForMode = interrogationIdListForMode.subList(maxIndex, interrogationIdListForMode.size());

batchNumber++;
}
}
return new DataProcessResult(dataCount, formattedDataCount, errors);
}

private void sendProcessedIdsToQualityTool(List<SurveyUnitModel> surveyUnitModels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import fr.insee.genesis.domain.model.surveyunit.rawdata.LunaticJsonRawDataModel;
import fr.insee.genesis.domain.ports.spi.LunaticJsonRawDataPersistencePort;
import fr.insee.genesis.infrastructure.document.rawdata.LunaticJsonRawDataDocument;
import fr.insee.genesis.infrastructure.document.surveyunit.GroupedInterrogationDocument;
import fr.insee.genesis.infrastructure.mappers.GroupedInterrogationDocumentMapper;
import fr.insee.genesis.infrastructure.mappers.LunaticJsonRawDataDocumentMapper;
import fr.insee.genesis.infrastructure.repository.LunaticJsonMongoDBRepository;
Expand Down Expand Up @@ -96,4 +97,14 @@ public List<GroupedInterrogation> findUnprocessedIds() {
return GroupedInterrogationDocumentMapper.INSTANCE.listDocumentToListModel(repository.aggregateRawGroupedWithNullProcessDate());
}

@Override
public Set<String> findUnprocessedInterrogationIdsByQuestionnaire(String questionnaireId) {
Set<String> interrogationIds = new HashSet<>();

repository.aggregateRawGroupedWithNullProcessDate(questionnaireId).forEach(
groupedInterrogationDocument -> interrogationIds.addAll(groupedInterrogationDocument.getInterrogationIds())
);

return interrogationIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.Data;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
Expand All @@ -23,6 +24,7 @@ public DataProcessingContextDocument(String partitionId,

@Id
private ObjectId id;
@Indexed
private String partitionId; //ex Survey Name, campaignId
private LocalDateTime lastExecution;
private List<KraftwerkExecutionSchedule> kraftwerkExecutionScheduleList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fr.insee.genesis.infrastructure.document.lunaticmodel;

import lombok.Builder;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
Expand All @@ -9,6 +10,7 @@
@Builder
@Document(collection = "lunaticmodels")
public record LunaticModelDocument (
@Indexed
String questionnaireId,
Map<String,Object> lunaticModel,
LocalDateTime recordDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.genesis.domain.model.surveyunit.Mode;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

@CompoundIndex(name = "questionnaireId_1_mode_1", def = "{'questionnaireId': 1, 'mode': 1}")
@Document(collection = "questionnaireMetadatas")
public record QuestionnaireMetadataDocument(
@Indexed
String questionnaireId,
Mode mode,
MetadataModel metadataModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public record LunaticJsonRawDataDocument(
@Id
ObjectId id,
String campaignId,
@Indexed
String questionnaireId,
String interrogationId,
String idUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,22 @@ public interface LunaticJsonMongoDBRepository extends MongoRepository<LunaticJso
"} }"
})
List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate();

@Aggregation(pipeline = {
"{ '$match': {'questionnaireId': ?0 ,'processDate': null } }",
"{ '$group': { " +
"'_id': { " +
"'questionnaireId': '$questionnaireId', " +
"'partitionOrCampaignId': { '$ifNull': ['$partitionId', '$campaignId'] } " +
"}, " +
"'interrogationIds': { '$addToSet': '$interrogationId' } " +
"} }",
"{ '$project': { " +
"'questionnaireId': '$_id.questionnaireId', " +
"'partitionOrCampaignId': '$_id.partitionOrCampaignId', " +
"'interrogationIds': 1, " +
"'_id': 0 " +
"} }"
})
List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate(String questionnaireId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import fr.insee.genesis.stubs.QuestionnaireMetadataPersistancePortStub;
import fr.insee.genesis.stubs.SurveyUnitPersistencePortStub;
import fr.insee.genesis.stubs.SurveyUnitQualityToolPerretAdapterStub;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.data.web.PagedModel;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;

@Slf4j
class RawResponseControllerTest {
private final FileUtils fileUtils = new FileUtils(new ConfigStub());
private final LunaticJsonRawDataPersistanceStub lunaticJsonRawDataPersistanceStub = new LunaticJsonRawDataPersistanceStub();
Expand Down Expand Up @@ -198,6 +200,66 @@ void processJsonRawDataTest(){
.contains(interrogationId);
}

@Test
void processJsonRawDataV2Test(){
//GIVEN
lunaticJsonRawDataPersistanceStub.getMongoStub().clear();
surveyUnitPersistencePortStub.getMongoStub().clear();
surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().clear();
String questionnaireId = "SAMPLETEST-PARADATA-v2";
String interrogationId = "testinterrogationId1";
String idUE = "testIdUE1";
String varName = "AVIS_MAIL";
String varValue = "TEST";
addJsonRawDataDocumentToStub(questionnaireId, questionnaireId, interrogationId, idUE, null, LocalDateTime.now(),varName
, varValue);

dataProcessingContextPersistancePortStub.getMongoStub().add(
DataProcessingContextMapper.INSTANCE.modelToDocument(
DataProcessingContextModel.builder()
.partitionId(questionnaireId)
.kraftwerkExecutionScheduleList(new ArrayList<>())
.withReview(true)
.build()
)
);

//WHEN
ResponseEntity response = rawResponseController.processJsonRawData(questionnaireId);


//THEN
if(!response.getStatusCode().is2xxSuccessful()){
log.error(response.getBody().toString());
}
Assertions.assertThat(response.getStatusCode().is2xxSuccessful()).isTrue();

//Genesis model survey unit created successfully
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub()).isNotNull().isNotEmpty().hasSize(1);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCampaignId()).isEqualTo(questionnaireId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getQuestionnaireId()).isNotNull().isEqualTo(questionnaireId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getMode()).isNotNull().isEqualTo(Mode.WEB);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getInterrogationId()).isEqualTo(interrogationId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getIdUE()).isEqualTo(idUE);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getFileDate()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getRecordDate()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables()).isNotNull().isNotEmpty().hasSize(1);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst().varId()).isNotNull().isEqualTo(varName);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst().value()).isNotNull().isEqualTo(varValue);

//Process date check
Assertions.assertThat(lunaticJsonRawDataPersistanceStub.getMongoStub().getFirst().processDate()).isNotNull();

//Perret call check
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps())
.hasSize(1);
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().getFirst()).containsKey(questionnaireId);
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().getFirst().get(questionnaireId))
.contains(interrogationId);
}


@Test
void getRawResponsesFromJsonBody() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.time.LocalDateTime;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -136,6 +137,17 @@ public List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate
return result;
}

@Override
public List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate(String questionnaireId) {
GroupedInterrogationDocument groupedInterrogationDocument = new GroupedInterrogationDocument();
groupedInterrogationDocument.setQuestionnaireId(questionnaireId);
groupedInterrogationDocument.setPartitionOrCampaignId(questionnaireId);
groupedInterrogationDocument.setInterrogationIds(new ArrayList<>());
documents.stream().filter(doc -> doc.questionnaireId().equals(questionnaireId) && doc.processDate() == null).toList()
.forEach(doc -> groupedInterrogationDocument.getInterrogationIds().add(doc.interrogationId()));
return Collections.singletonList(groupedInterrogationDocument);
}

// Implémentations vides requises par MongoRepository
@Override public <S extends LunaticJsonRawDataDocument> S save(S entity) { return null; }
@Override public Optional<LunaticJsonRawDataDocument> findById(String s) { return Optional.empty(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,15 @@ public List<GroupedInterrogation> findUnprocessedIds() {
return GroupedInterrogationDocumentMapper.INSTANCE.listDocumentToListModel(result);
}

@Override
public Set<String> findUnprocessedInterrogationIdsByQuestionnaire(String questionnaireId) {
List<LunaticJsonRawDataDocument> unprocessedDocuments =
mongoStub.stream().filter(
lunaticJsonDataDocument -> lunaticJsonDataDocument.processDate() == null
&& lunaticJsonDataDocument.questionnaireId().equals(questionnaireId)
).toList();
Set<String> interrogationIds = new HashSet<>();
unprocessedDocuments.forEach(doc -> interrogationIds.add(doc.interrogationId()));
return interrogationIds;
}
}
Loading