diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 9f2771cb3f92..b76866af3fcd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -96,7 +96,9 @@ import org.apache.pinot.controller.api.upload.ZKOperator; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.retention.RetentionManager; +import org.apache.pinot.controller.validation.DiskUtilizationChecker; import org.apache.pinot.controller.validation.StorageQuotaChecker; +import org.apache.pinot.controller.validation.UtilizationChecker; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; @@ -126,15 +128,18 @@ import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; -@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY), - @Authorization(value = DATABASE)}) +@Api(tags = Constants.SEGMENT_TAG, authorizations = { + @Authorization(value = SWAGGER_AUTHORIZATION_KEY), + @Authorization(value = DATABASE) +}) @SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = { @ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY, description = "The format of the key is ```\"Basic \" or \"Bearer \"```"), @ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE, description = "Database context passed through http header. If no context is provided 'default' database " - + "context will be considered.")})) + + "context will be considered.") +})) @Path("/") public class PinotSegmentUploadDownloadRestletResource { private static final Logger LOGGER = LoggerFactory.getLogger(PinotSegmentUploadDownloadRestletResource.class); @@ -156,6 +161,9 @@ public class PinotSegmentUploadDownloadRestletResource { @Inject AccessControlFactory _accessControlFactory; + @Inject + DiskUtilizationChecker _diskUtilizationChecker; + @GET @Produces(MediaType.APPLICATION_OCTET_STREAM) @Path("/segments/{tableName}/{segmentName}") @@ -403,6 +411,26 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl SegmentValidationUtils.checkStorageQuota(segmentName, segmentSizeInBytes, untarredSegmentSizeInBytes, tableConfig, _storageQuotaChecker); + // Perform resource utilization checks + UtilizationChecker.CheckResult isDiskUtilizationWithinLimits = + _diskUtilizationChecker.isResourceUtilizationWithinLimits(tableNameWithType, + UtilizationChecker.CheckPurpose.OFFLINE_SEGMENT_UPLOAD); + if (isDiskUtilizationWithinLimits == UtilizationChecker.CheckResult.FAIL) { + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, + 1L); + throw new ControllerApplicationException(LOGGER, + String.format("Disk utilization limit exceeded for table: %s, rejecting upload for segment: %s", + tableNameWithType, + segmentName), + Response.Status.FORBIDDEN); + } else if (isDiskUtilizationWithinLimits == UtilizationChecker.CheckResult.UNDETERMINED) { + LOGGER.warn( + "Disk utilization status could not be determined for table: {}. Will allow segment upload to proceed.", + tableNameWithType); + } + _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED, + 0L); + // Encrypt segment String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); Pair encryptionInfo = @@ -580,7 +608,7 @@ private SuccessResponse uploadSegments(String tableName, TableType tableType, Fo try { int entryCount = 0; - for (Map.Entry entry: segmentsMetadataInfoMap.entrySet()) { + for (Map.Entry entry : segmentsMetadataInfoMap.entrySet()) { String segmentName = entry.getKey(); SegmentMetadataInfo segmentMetadataInfo = entry.getValue(); segmentNames.add(segmentName); @@ -1197,7 +1225,7 @@ private static Map createSegmentsMetadataInfoMap(Fo } Map segmentsMetadataInfoMap = new HashMap<>(); - for (File file: segmentsMetadataFiles) { + for (File file : segmentsMetadataFiles) { String fileName = file.getName(); if (fileName.equalsIgnoreCase(SegmentUploadConstants.ALL_SEGMENTS_METADATA_FILENAME)) { try (InputStream inputStream = FileUtils.openInputStream(file)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java index 7b122bb98f14..7fd1fddaeec1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/UtilizationChecker.java @@ -54,7 +54,9 @@ void computeResourceUtilization(BiMap endpointsToInstances, enum CheckPurpose { // REALTIME_INGESTION if the check is performed from the realtime ingestion code path to pause ingestion // TASK_GENERATION if the check is performed from the task generation framework to pause creation of new tasks - REALTIME_INGESTION, TASK_GENERATION + // OFFLINE_SEGMENT_UPLOAD if the check is performed from the offline segment upload code path to reject a segment + // upload + REALTIME_INGESTION, TASK_GENERATION, OFFLINE_SEGMENT_UPLOAD } enum CheckResult {