Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/dm-store/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ description: Helm chart for the HMCTS CDM Document Management APO
apiVersion: v2
name: dm-store
home: https://github.com/hmcts/document-management-store-app
version: 2.3.5
version: 2.3.6
maintainers:
- name: HMCTS Evidence Management Team
email: [email protected]
Expand Down
5 changes: 4 additions & 1 deletion charts/dm-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ java:
CASE_DOCUMENTS_DELETION_NO_OF_ITERATIONS: 1
CASE_DOCUMENTS_DELETION_BATCH_SIZE: 1
CASE_DOCUMENTS_DELETION_SERVICE_NAME: ccd_case_disposer

# mimetype update job
MIME_TYPE_UPDATE_THREAD_LIMIT: 1
MIME_TYPE_UPDATE_NO_OF_ITERATIONS: 1
MIME_TYPE_UPDATE_BATCH_SIZE: 300
blobstorage:
resourceGroup: dm-store-aso-preview-rg
teamName: "CCD"
Expand Down
112 changes: 112 additions & 0 deletions src/main/java/uk/gov/hmcts/dm/config/batch/MimeTypeUpdateTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package uk.gov.hmcts.dm.config.batch;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import uk.gov.hmcts.dm.repository.DocumentContentVersionRepository;
import uk.gov.hmcts.dm.service.DocumentContentVersionService;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* This task periodically checks for Document Content Versions where the mimeTypeUpdated flag is false.
* It will then read the blob from storage, detect the correct MIME type, and update the database record.
*/
@Service
public class MimeTypeUpdateTask implements Runnable {

private static final Logger log = LoggerFactory.getLogger(MimeTypeUpdateTask.class);

private final DocumentContentVersionService documentContentVersionService;
private final DocumentContentVersionRepository documentContentVersionRepository;

@Value("${spring.batch.mimeTypeUpdate.batchSize}")
private int batchSize;

@Value("${spring.batch.mimeTypeUpdate.noOfIterations}")
private int noOfIterations;

@Value("${spring.batch.mimeTypeUpdate.threadLimit}")
private int threadLimit;

public MimeTypeUpdateTask(DocumentContentVersionService documentContentVersionService,
DocumentContentVersionRepository documentContentVersionRepository) {
this.documentContentVersionService = documentContentVersionService;
this.documentContentVersionRepository = documentContentVersionRepository;
}

@Override
public void run() {
log.info("Started MIME Type Update job.");
StopWatch stopWatch = new StopWatch();
stopWatch.start();

try {
log.info("threadLimit: {}, noOfIterations: {}, batchSize: {}", threadLimit, noOfIterations, batchSize);

for (int i = 0; i < noOfIterations; i++) {
if (!getAndUpdateMimeTypes(i)) {
// Stop iterating if a run finds no records to process
log.info("No records found in iteration {}. Stopping job.", i);
break;
}
}

} catch (Exception e) {
log.error("MIME Type Update job failed with Error message: {}", e.getMessage(), e);
} finally {
stopWatch.stop();
log.info("MIME Type Update job finished and took {} ms", stopWatch.getDuration().toMillis());
}
}

private boolean getAndUpdateMimeTypes(int iteration) {
StopWatch iterationStopWatch = new StopWatch();
iterationStopWatch.start();

Pageable pageable = PageRequest.of(0, batchSize);

List<UUID> documentIds = documentContentVersionRepository
.findDocumentContentVersionIdsForMimeTypeUpdate(pageable);

if (CollectionUtils.isEmpty(documentIds)) {
iterationStopWatch.stop();
log.info("Iteration {}: No records found for MIME type update. Total time: {} ms",
iteration, iterationStopWatch.getDuration().toMillis());
return false; // Indicates no records were found
}

log.info("Iteration {}: Found {} records to process for MIME type update.", iteration, documentIds.size());

ExecutorService executorService = Executors.newFixedThreadPool(threadLimit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can update in batch as well similar to

int batchCommitSize = 500; // Define the batch size for committing to the DB

try {
documentIds.forEach(
id -> executorService.submit(() -> documentContentVersionService.updateMimeType(id))
);
} finally {
executorService.shutdown();
}

try {
// Wait for all tasks to complete
executorService.awaitTermination(1, TimeUnit.HOURS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("MIME type update job was interrupted while waiting for tasks to complete.", e);
}

iterationStopWatch.stop();
log.info("Time taken to complete iteration number: {} was : {} ms", iteration,
iterationStopWatch.getDuration().toMillis());
return true; // Indicates records were processed
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.hmcts.dm.repository;

import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
Expand Down Expand Up @@ -29,4 +30,18 @@ void updateContentUriAndContentCheckSum(@Param("id") UUID id,
@Query("select dcv from DocumentContentVersion dcv where dcv.storedDocument.id = :storedDocumentId")
List<DocumentContentVersion> findAllByStoredDocumentId(@Param("storedDocumentId") UUID storedDocumentId);

@Query("""
SELECT dcv.id FROM DocumentContentVersion dcv
WHERE dcv.mimeTypeUpdated = false
ORDER BY dcv.createdOn DESC""")
List<UUID> findDocumentContentVersionIdsForMimeTypeUpdate(Pageable pageable);


@Modifying
@Query("UPDATE DocumentContentVersion d SET d.mimeType = :mimeType, d.mimeTypeUpdated = true where d.id = :id")
void updateMimeType(@Param("id") UUID id, @Param("mimeType") String mimeType);

@Modifying
@Query("UPDATE DocumentContentVersion d SET d.mimeTypeUpdated = true WHERE d.id = :id")
void markMimeTypeUpdated(@Param("id") UUID id);
}
13 changes: 13 additions & 0 deletions src/main/java/uk/gov/hmcts/dm/service/BlobStorageReadService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import uk.gov.hmcts.dm.exception.InvalidRangeRequestException;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Enumeration;
import java.util.Objects;
Expand All @@ -42,6 +43,18 @@ public BlobStorageReadService(BlobContainerClient cloudBlobContainer, ToggleConf
this.toggleConfiguration = toggleConfiguration;
}


/**
* Gets an InputStream for a given document/blob UUID.
* This is useful for internal processing like MIME type detection.
*
* @param documentId The UUID of the document content version.
* @return An InputStream of the blob's content.
*/
public InputStream getInputStream(UUID documentId) {
return blockBlobClient(documentId.toString()).openInputStream();
}

/**
* Streams the content of a document from blob storage. Processes a Range request when possible
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package uk.gov.hmcts.dm.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import uk.gov.hmcts.dm.domain.DocumentContentVersion;
import uk.gov.hmcts.dm.domain.StoredDocument;
Expand All @@ -11,29 +14,58 @@
import java.util.Optional;
import java.util.UUID;

@Transactional
@Service
public class DocumentContentVersionService {

private final DocumentContentVersionRepository documentContentVersionRepository;
private static final Logger log = LoggerFactory.getLogger(DocumentContentVersionService.class);

private final DocumentContentVersionRepository documentContentVersionRepository;
private final StoredDocumentRepository storedDocumentRepository;
private final MimeTypeDetectionService mimeTypeDetectionService; // New dependency
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This java comment can be removed


@Autowired
public DocumentContentVersionService(DocumentContentVersionRepository documentContentVersionRepository,
StoredDocumentRepository storedDocumentRepository) {
StoredDocumentRepository storedDocumentRepository,
MimeTypeDetectionService mimeTypeDetectionService) { // Injected here
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This java comment can be removed

this.documentContentVersionRepository = documentContentVersionRepository;
this.storedDocumentRepository = storedDocumentRepository;
this.mimeTypeDetectionService = mimeTypeDetectionService;
}

public Optional<DocumentContentVersion> findById(UUID id) {
return documentContentVersionRepository.findById(id);
}

@Transactional
public Optional<DocumentContentVersion> findMostRecentDocumentContentVersionByStoredDocumentId(UUID id) {
return storedDocumentRepository
.findByIdAndDeleted(id, false)
.map(StoredDocument::getMostRecentDocumentContentVersion);
.findByIdAndDeleted(id, false)
.map(StoredDocument::getMostRecentDocumentContentVersion);
}

@Transactional(propagation = Propagation.REQUIRES_NEW)
public void updateMimeType(UUID documentVersionId) {
log.info("Processing MIME type update for ID: {}", documentVersionId);

String detectedMimeType = mimeTypeDetectionService.detectMimeType(documentVersionId);

if (detectedMimeType == null) {
log.warn(
"Could not detect MIME type for {}. Marking as processed to prevent retries.",
documentVersionId
);
documentContentVersionRepository.markMimeTypeUpdated(documentVersionId);
return;
}
log.info("Updating MIME type for document {}. New: [{}].",
documentVersionId, detectedMimeType);

documentContentVersionRepository.updateMimeType(documentVersionId, detectedMimeType);

log.info("Updated documentVersion id:{}, mimeType:{}",
documentVersionId,
detectedMimeType
);
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package uk.gov.hmcts.dm.service;

import org.apache.commons.io.input.BoundedInputStream;
import org.apache.tika.Tika;
import org.apache.tika.metadata.Metadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.io.InputStream;
import java.util.UUID;

/**
* Service to detect the MIME type of a document stored in blob storage.
*/
@Service
public class MimeTypeDetectionService {

private static final Logger log = LoggerFactory.getLogger(MimeTypeDetectionService.class);
private static final int MAX_BYTES_TO_READ = 2 * 1024 * 1024; // 2 MB is sufficient for Tika to detect type

private final BlobStorageReadService blobStorageReadService;

public MimeTypeDetectionService(BlobStorageReadService blobStorageReadService) {
this.blobStorageReadService = blobStorageReadService;
}

/**
* Detects the MIME type of a document version by reading the first few bytes from its blob.
*
* @param documentVersionId The UUID of the document version.
* @return The detected MIME type as a String, or null if detection fails.
*/
public String detectMimeType(UUID documentVersionId) {
log.debug("Attempting to detect MIME type for document version ID: {}", documentVersionId);
try (InputStream inputStream = blobStorageReadService.getInputStream(documentVersionId);
BoundedInputStream limitedStream = BoundedInputStream.builder()
.setInputStream(inputStream)
.setMaxCount(MAX_BYTES_TO_READ)
.get()) {

Tika tika = new Tika();
Metadata metadata = new Metadata();
String mimeType = tika.detect(limitedStream, metadata);
log.info("Detected MIME type for {} as: {}", documentVersionId, mimeType);
return mimeType;

} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing a unit test for this catch block

log.error("Failed to read blob stream for MIME type detection on document version {}",
documentVersionId);
return null;
} catch (Exception e) {
log.error("An unexpected error occurred during MIME type detection for document version {}",
documentVersionId);
return null;
}
}
}
7 changes: 4 additions & 3 deletions src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ spring:
noOfIterations: ${CASE_DOCUMENTS_DELETION_NO_OF_ITERATIONS:1}
batchSize: ${CASE_DOCUMENTS_DELETION_BATCH_SIZE:10}
serviceName: ${CASE_DOCUMENTS_DELETION_SERVICE_NAME:ccd_case_disposer}

mimeTypeUpdate:
threadLimit: ${MIME_TYPE_UPDATE_THREAD_LIMIT:1}
noOfIterations: ${MIME_TYPE_UPDATE_NO_OF_ITERATIONS:1}
batchSize: ${MIME_TYPE_UPDATE_BATCH_SIZE:300}
# run cleanup every 1 hour (since a single execution should not run longer than 1 hour)
historicExecutionsRetentionMilliseconds: ${HISTORIC_EXECUTIONS_RETENTION_MILLISECONDS:3600000}
documentMetaDataUpdateMilliseconds: ${DOCUMENT_METADATA_UPDATE_MILLISECONDS:10000}
Expand Down Expand Up @@ -113,7 +116,6 @@ logging:
org.springframework.jdbc.core: ${LOG_LEVEL_SPRING_JDBC:DEBUG}
org.hibernate.SQL: ${LOG_LEVEL_HIBERNATE:DEBUG}
org.hibernate.type.descriptor.sql.BasicBinder: ${LOG_LEVEL_HIBERNATE:DEBUG}

azure:
app_insights_key: ${APPINSIGHTS_INSTRUMENTATIONKEY:true}
application-insights:
Expand Down Expand Up @@ -232,4 +234,3 @@ dbMigration:
# When true, the app will run DB migration on startup.
# Otherwise, it will just check if all migrations have been applied (and fail to start if not).
runOnStartup: ${RUN_DB_MIGRATION_ON_STARTUP:true}

Loading