Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@
import org.apache.pinot.controller.api.upload.ZKOperator;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.retention.RetentionManager;
import org.apache.pinot.controller.validation.DiskUtilizationChecker;
import org.apache.pinot.controller.validation.StorageQuotaChecker;
import org.apache.pinot.controller.validation.UtilizationChecker;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
Expand Down Expand Up @@ -126,15 +128,18 @@
import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;


@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY),
@Authorization(value = DATABASE)})
@Api(tags = Constants.SEGMENT_TAG, authorizations = {
@Authorization(value = SWAGGER_AUTHORIZATION_KEY),
@Authorization(value = DATABASE)
Comment on lines +131 to +133
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

})
@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = {
@ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic <token>\" or \"Bearer <token>\"```"),
@ApiKeyAuthDefinition(name = DATABASE, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
description = "Database context passed through http header. If no context is provided 'default' database "
+ "context will be considered.")}))
+ "context will be considered.")
}))
@Path("/")
public class PinotSegmentUploadDownloadRestletResource {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotSegmentUploadDownloadRestletResource.class);
Expand All @@ -156,6 +161,9 @@ public class PinotSegmentUploadDownloadRestletResource {
@Inject
AccessControlFactory _accessControlFactory;

@Inject
DiskUtilizationChecker _diskUtilizationChecker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we plug in ResourceUtilizationManager instead? We want to run all utilization checkers

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I described in more detail why I went with this approach in #17557

we'd like a way to enable/disable each resource utilization checker individually, instead of being automatically opted in when more are added in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can decide which checker to include when initializing the controller, but all the checkers should be applied. This is the only way to make resource checkers pluggable. In OSS code, it might not have access to the custom checkers


@GET
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/segments/{tableName}/{segmentName}")
Expand Down Expand Up @@ -403,6 +411,26 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
SegmentValidationUtils.checkStorageQuota(segmentName, segmentSizeInBytes, untarredSegmentSizeInBytes, tableConfig,
_storageQuotaChecker);

// Perform resource utilization checks
UtilizationChecker.CheckResult isDiskUtilizationWithinLimits =
_diskUtilizationChecker.isResourceUtilizationWithinLimits(tableNameWithType,
UtilizationChecker.CheckPurpose.OFFLINE_SEGMENT_UPLOAD);
if (isDiskUtilizationWithinLimits == UtilizationChecker.CheckResult.FAIL) {
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
1L);
throw new ControllerApplicationException(LOGGER,
String.format("Disk utilization limit exceeded for table: %s, rejecting upload for segment: %s",
tableNameWithType,
segmentName),
Response.Status.FORBIDDEN);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

went with FORBIDDEN since the storage quota checker uses it

} else if (isDiskUtilizationWithinLimits == UtilizationChecker.CheckResult.UNDETERMINED) {
LOGGER.warn(
"Disk utilization status could not be determined for table: {}. Will allow segment upload to proceed.",
tableNameWithType);
}
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, ControllerGauge.RESOURCE_UTILIZATION_LIMIT_EXCEEDED,
0L);

// Encrypt segment
String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName();
Pair<String, File> encryptionInfo =
Expand Down Expand Up @@ -580,7 +608,7 @@ private SuccessResponse uploadSegments(String tableName, TableType tableType, Fo

try {
int entryCount = 0;
for (Map.Entry<String, SegmentMetadataInfo> entry: segmentsMetadataInfoMap.entrySet()) {
for (Map.Entry<String, SegmentMetadataInfo> entry : segmentsMetadataInfoMap.entrySet()) {
String segmentName = entry.getKey();
SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
segmentNames.add(segmentName);
Expand Down Expand Up @@ -1197,7 +1225,7 @@ private static Map<String, SegmentMetadataInfo> createSegmentsMetadataInfoMap(Fo
}

Map<String, SegmentMetadataInfo> segmentsMetadataInfoMap = new HashMap<>();
for (File file: segmentsMetadataFiles) {
for (File file : segmentsMetadataFiles) {
String fileName = file.getName();
if (fileName.equalsIgnoreCase(SegmentUploadConstants.ALL_SEGMENTS_METADATA_FILENAME)) {
try (InputStream inputStream = FileUtils.openInputStream(file)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ void computeResourceUtilization(BiMap<String, String> endpointsToInstances,
enum CheckPurpose {
// REALTIME_INGESTION if the check is performed from the realtime ingestion code path to pause ingestion
// TASK_GENERATION if the check is performed from the task generation framework to pause creation of new tasks
REALTIME_INGESTION, TASK_GENERATION
// OFFLINE_SEGMENT_UPLOAD if the check is performed from the offline segment upload code path to reject a segment
// upload
REALTIME_INGESTION, TASK_GENERATION, OFFLINE_SEGMENT_UPLOAD
}

enum CheckResult {
Expand Down