Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Changelog
## 1.13.0 [TODO]
### Changed
- New raw data process endpoint

## 1.12.2 [2025-11-06]
### Fixed
- Scientific notation for Doubles during raw data processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public ResponseEntity<LunaticJsonRawDataModel> getJsonRawData(
@Operation(summary = "Process raw data of a campaign")
@PostMapping(path = "/lunatic-json/process")
@PreAuthorize("hasRole('SCHEDULER')")
@Deprecated(since = "1.13.0")
public ResponseEntity<String> processJsonRawData(
@RequestParam("campaignName") String campaignName,
@RequestParam("questionnaireId") String questionnaireId,
Expand All @@ -180,6 +181,24 @@ public ResponseEntity<String> processJsonRawData(
}
}

@Operation(summary = "Process raw data of a questionnaire")
@PostMapping(path = "/{collectionInstrumentId}/process")
@PreAuthorize("hasRole('SCHEDULER')")
public ResponseEntity<String> processJsonRawData(
@PathVariable String collectionInstrumentId
) {
log.info("Try to process raw JSON datas for questionnaire {}",collectionInstrumentId);
try {
DataProcessResult result = lunaticJsonRawDataApiPort.processRawData(collectionInstrumentId);
return result.formattedDataCount() == 0 ?
ResponseEntity.ok("%d document(s) processed".formatted(result.dataCount()))
: ResponseEntity.ok("%d document(s) processed, including %d FORMATTED after data verification"
.formatted(result.dataCount(), result.formattedDataCount()));
} catch (GenesisException e) {
return ResponseEntity.status(e.getStatus()).body(e.getMessage());
}
}

@Operation(summary = "Get processed data ids from last n hours (default 24h)")
@GetMapping(path = "/lunatic-json/processed/ids")
@PreAuthorize("hasRole('ADMIN')")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ private void processCampaignWithMode(String campaignName, Mode mode, String root
}

//Create context if not exist
if(contextService.getContextByPartitionId(campaignName) == null){
if(contextService.getContextByCollectionInstrumentId(campaignName) == null){
contextService.saveContext(campaignName, false);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package fr.insee.genesis.domain.model.surveyunit.rawdata;

public record DataProcessResult(int dataCount, int formattedDataCount) {
import fr.insee.genesis.exceptions.GenesisError;

import java.util.List;

public record DataProcessResult(int dataCount, int formattedDataCount, List<GenesisError> errors) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void saveKraftwerkExecutionSchedule(String partitionId,
long countSchedules();

DataProcessingContextModel getContext(String interrogationId) throws GenesisException;
DataProcessingContextModel getContextByPartitionId(String partitionId) throws GenesisException;
DataProcessingContextModel getContextByCollectionInstrumentId(String partitionId) throws GenesisException;
List<String> getPartitionIds(boolean withReview);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public interface LunaticJsonRawDataApiPort {
long countResponsesByQuestionnaireId(String campaignId);
Page<LunaticJsonRawDataModel> findRawDataByCampaignIdAndDate(String campaignId, Instant startDt, Instant endDt, Pageable pageable);

@Deprecated(since = "1.13.0")
DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException;

DataProcessResult processRawData(String collectionInstrumentId) throws GenesisException;

Map<String, List<String>> findProcessedIdsgroupedByQuestionnaireSince(LocalDateTime since);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public interface LunaticJsonRawDataPersistencePort {
long countResponsesByQuestionnaireId(String questionnaireId);
List<GroupedInterrogation> findProcessedIdsGroupedByQuestionnaireSince(LocalDateTime since);
List<GroupedInterrogation> findUnprocessedIds();
Set<String> findUnprocessedInterrogationIdsByCollectionInstrumentId(String collectionInstrumentId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public DataProcessingContextModel getContext(String interrogationId) throws Gene
}

@Override
public DataProcessingContextModel getContextByPartitionId(String partitionId){
public DataProcessingContextModel getContextByCollectionInstrumentId(String partitionId){
return DataProcessingContextMapper.INSTANCE.documentToModel(
dataProcessingContextPersistancePort.findByPartitionId(partitionId)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,22 +100,16 @@ public List<LunaticJsonRawDataModel> getRawData(String campaignName, Mode mode,
}

@Override
@Deprecated(since = "1.13.0")
public DataProcessResult processRawData(String campaignName, List<String> interrogationIdList, List<GenesisError> errors) throws GenesisException {
int dataCount=0;
int formattedDataCount=0;
DataProcessingContextModel dataProcessingContext =
dataProcessingContextService.getContextByPartitionId(campaignName);
dataProcessingContextService.getContextByCollectionInstrumentId(campaignName);
List<Mode> modesList = controllerUtils.getModesList(campaignName, null);
for (Mode mode : modesList) {
//Load and save metadata into database, throw exception if none
VariablesMap variablesMap = metadataService.loadAndSaveIfNotExists(campaignName, campaignName, mode, fileUtils,
errors).getVariables();
if (variablesMap == null) {
throw new GenesisException(400,
"Error during metadata parsing for mode %s :%n%s"
.formatted(mode, errors.getLast().getMessage())
);
}
VariablesMap variablesMap = getVariablesMap(campaignName, mode, errors);
int totalBatchs = Math.ceilDiv(interrogationIdList.size() , config.getRawDataProcessingBatchSize());
int batchNumber = 1;
List<String> interrogationIdListForMode = new ArrayList<>(interrogationIdList);
Expand Down Expand Up @@ -156,7 +150,85 @@ public DataProcessResult processRawData(String campaignName, List<String> interr
batchNumber++;
}
}
return new DataProcessResult(dataCount, formattedDataCount);
return new DataProcessResult(dataCount, formattedDataCount, errors);
}

@Override
public DataProcessResult processRawData(String questionnaireId) throws GenesisException {
int dataCount=0;
int formattedDataCount=0;
DataProcessingContextModel dataProcessingContext =
dataProcessingContextService.getContextByCollectionInstrumentId(questionnaireId);
List<GenesisError> errors = new ArrayList<>();

List<Mode> modesList = controllerUtils.getModesList(questionnaireId, null);
for (Mode mode : modesList) {
//Load and save metadata into database, throw exception if none
VariablesMap variablesMap = getVariablesMap(questionnaireId, mode, errors);
Set<String> interrogationIds =
lunaticJsonRawDataPersistencePort.findUnprocessedInterrogationIdsByCollectionInstrumentId(questionnaireId);

int totalBatchs = Math.ceilDiv(interrogationIds.size() , config.getRawDataProcessingBatchSize());
int batchNumber = 1;
List<String> interrogationIdListForMode = new ArrayList<>(interrogationIds);
while(!interrogationIdListForMode.isEmpty()){
log.info("Processing raw data batch {}/{}", batchNumber, totalBatchs);

int maxIndex = Math.min(interrogationIdListForMode.size(), config.getRawDataProcessingBatchSize());
List<SurveyUnitModel> surveyUnitModels = getConvertedSurveyUnits(
questionnaireId,
mode,
interrogationIdListForMode,
maxIndex,
variablesMap
);

//Save converted data
surveyUnitQualityService.verifySurveyUnits(surveyUnitModels, variablesMap);
surveyUnitService.saveSurveyUnits(surveyUnitModels);

//Update process dates
updateProcessDates(surveyUnitModels);

//Increment data count
dataCount += surveyUnitModels.size();
formattedDataCount += surveyUnitModels.stream()
.filter(surveyUnitModel -> surveyUnitModel.getState().equals(DataState.FORMATTED))
.toList()
.size();

//Send processed ids grouped by questionnaire (if review activated)
if(dataProcessingContext != null && dataProcessingContext.isWithReview()) {
sendProcessedIdsToQualityTool(surveyUnitModels);
}

//Remove processed ids from list
interrogationIdListForMode = interrogationIdListForMode.subList(maxIndex, interrogationIdListForMode.size());
batchNumber++;
}
}
return new DataProcessResult(dataCount, formattedDataCount, errors);
}

private VariablesMap getVariablesMap(String questionnaireId, Mode mode, List<GenesisError> errors) throws GenesisException {
VariablesMap variablesMap = metadataService.loadAndSaveIfNotExists(questionnaireId, questionnaireId, mode, fileUtils,
errors).getVariables();
if (variablesMap == null) {
throw new GenesisException(400,
"Error during metadata parsing for mode %s :%n%s"
.formatted(mode, errors.getLast().getMessage())
);
}
return variablesMap;
}

private List<SurveyUnitModel> getConvertedSurveyUnits(String questionnaireId, Mode mode, List<String> interrogationIdListForMode, int maxIndex, VariablesMap variablesMap) {
List<String> interrogationIdToProcess = interrogationIdListForMode.subList(0, maxIndex);
List<LunaticJsonRawDataModel> rawData = getRawData(questionnaireId, mode, interrogationIdToProcess);
return convertRawData(
rawData,
variablesMap
);
}

private void sendProcessedIdsToQualityTool(List<SurveyUnitModel> surveyUnitModels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import fr.insee.genesis.domain.model.surveyunit.rawdata.LunaticJsonRawDataModel;
import fr.insee.genesis.domain.ports.spi.LunaticJsonRawDataPersistencePort;
import fr.insee.genesis.infrastructure.document.rawdata.LunaticJsonRawDataDocument;
import fr.insee.genesis.infrastructure.document.surveyunit.GroupedInterrogationDocument;
import fr.insee.genesis.infrastructure.mappers.GroupedInterrogationDocumentMapper;
import fr.insee.genesis.infrastructure.mappers.LunaticJsonRawDataDocumentMapper;
import fr.insee.genesis.infrastructure.repository.LunaticJsonMongoDBRepository;
Expand Down Expand Up @@ -96,4 +97,14 @@ public List<GroupedInterrogation> findUnprocessedIds() {
return GroupedInterrogationDocumentMapper.INSTANCE.listDocumentToListModel(repository.aggregateRawGroupedWithNullProcessDate());
}

@Override
public Set<String> findUnprocessedInterrogationIdsByCollectionInstrumentId(String collectionInstrumentId) {
Set<String> interrogationIds = new HashSet<>();

repository.aggregateRawGroupedWithNullProcessDate(collectionInstrumentId).forEach(
groupedInterrogationDocument -> interrogationIds.addAll(groupedInterrogationDocument.getInterrogationIds())
);

return interrogationIds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import lombok.Data;
import org.bson.types.ObjectId;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
Expand All @@ -23,6 +24,7 @@ public DataProcessingContextDocument(String partitionId,

@Id
private ObjectId id;
@Indexed
private String partitionId; //ex Survey Name, campaignId
private LocalDateTime lastExecution;
private List<KraftwerkExecutionSchedule> kraftwerkExecutionScheduleList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fr.insee.genesis.infrastructure.document.lunaticmodel;

import lombok.Builder;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.time.LocalDateTime;
Expand All @@ -9,6 +10,7 @@
@Builder
@Document(collection = "lunaticmodels")
public record LunaticModelDocument (
@Indexed
String questionnaireId,
Map<String,Object> lunaticModel,
LocalDateTime recordDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import fr.insee.bpm.metadata.model.MetadataModel;
import fr.insee.genesis.domain.model.surveyunit.Mode;
import org.springframework.data.mongodb.core.index.CompoundIndex;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

@CompoundIndex(name = "questionnaireId_1_mode_1", def = "{'questionnaireId': 1, 'mode': 1}")
@Document(collection = "questionnaireMetadatas")
public record QuestionnaireMetadataDocument(
@Indexed
String questionnaireId,
Mode mode,
MetadataModel metadataModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public record LunaticJsonRawDataDocument(
@Id
ObjectId id,
String campaignId,
@Indexed
String questionnaireId,
String interrogationId,
String idUE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,22 @@ public interface LunaticJsonMongoDBRepository extends MongoRepository<LunaticJso
"} }"
})
List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate();

@Aggregation(pipeline = {
"{ '$match': {'questionnaireId': ?0 ,'processDate': null } }",
"{ '$group': { " +
"'_id': { " +
"'questionnaireId': '$questionnaireId', " +
"'partitionOrCampaignId': { '$ifNull': ['$partitionId', '$campaignId'] } " +
"}, " +
"'interrogationIds': { '$addToSet': '$interrogationId' } " +
"} }",
"{ '$project': { " +
"'questionnaireId': '$_id.questionnaireId', " +
"'partitionOrCampaignId': '$_id.partitionOrCampaignId', " +
"'interrogationIds': 1, " +
"'_id': 0 " +
"} }"
})
List<GroupedInterrogationDocument> aggregateRawGroupedWithNullProcessDate(String questionnaireId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import fr.insee.genesis.stubs.QuestionnaireMetadataPersistancePortStub;
import fr.insee.genesis.stubs.SurveyUnitPersistencePortStub;
import fr.insee.genesis.stubs.SurveyUnitQualityToolPerretAdapterStub;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.data.web.PagedModel;
Expand All @@ -35,6 +36,7 @@
import java.util.List;
import java.util.Map;

@Slf4j
class RawResponseControllerTest {
private final FileUtils fileUtils = new FileUtils(new ConfigStub());
private final LunaticJsonRawDataPersistanceStub lunaticJsonRawDataPersistanceStub = new LunaticJsonRawDataPersistanceStub();
Expand Down Expand Up @@ -198,6 +200,66 @@ void processJsonRawDataTest(){
.contains(interrogationId);
}

@Test
void processJsonRawDataV2Test(){
//GIVEN
lunaticJsonRawDataPersistanceStub.getMongoStub().clear();
surveyUnitPersistencePortStub.getMongoStub().clear();
surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().clear();
String questionnaireId = "SAMPLETEST-PARADATA-v2";
String interrogationId = "testinterrogationId1";
String idUE = "testIdUE1";
String varName = "AVIS_MAIL";
String varValue = "TEST";
addJsonRawDataDocumentToStub(questionnaireId, questionnaireId, interrogationId, idUE, null, LocalDateTime.now(),varName
, varValue);

dataProcessingContextPersistancePortStub.getMongoStub().add(
DataProcessingContextMapper.INSTANCE.modelToDocument(
DataProcessingContextModel.builder()
.partitionId(questionnaireId)
.kraftwerkExecutionScheduleList(new ArrayList<>())
.withReview(true)
.build()
)
);

//WHEN
ResponseEntity response = rawResponseController.processJsonRawData(questionnaireId);


//THEN
if(!response.getStatusCode().is2xxSuccessful()){
log.error(response.getBody().toString());
}
Assertions.assertThat(response.getStatusCode().is2xxSuccessful()).isTrue();

//Genesis model survey unit created successfully
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub()).isNotNull().isNotEmpty().hasSize(1);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCampaignId()).isEqualTo(questionnaireId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getQuestionnaireId()).isNotNull().isEqualTo(questionnaireId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getMode()).isNotNull().isEqualTo(Mode.WEB);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getInterrogationId()).isEqualTo(interrogationId);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getIdUE()).isEqualTo(idUE);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getFileDate()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getRecordDate()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables()).isNotNull().isNotEmpty().hasSize(1);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst()).isNotNull();
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst().varId()).isNotNull().isEqualTo(varName);
Assertions.assertThat(surveyUnitPersistencePortStub.getMongoStub().getFirst().getCollectedVariables().getFirst().value()).isNotNull().isEqualTo(varValue);

//Process date check
Assertions.assertThat(lunaticJsonRawDataPersistanceStub.getMongoStub().getFirst().processDate()).isNotNull();

//Perret call check
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps())
.hasSize(1);
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().getFirst()).containsKey(questionnaireId);
Assertions.assertThat(surveyUnitQualityToolPerretAdapterStub.getReceivedMaps().getFirst().get(questionnaireId))
.contains(interrogationId);
}


@Test
void getRawResponsesFromJsonBody() {
Expand Down
Loading
Loading