Skip to content

Conversation

@vladaliii
Copy link
Contributor

Related to #1082
This PR is the first step to enable shuffle support in S3QS. S3QS can now support message passing between workers in different groups, as well as pipeline termination signals.

Key improvements:

Defined the message format provided by workers, enabling identification of worker IDs and partition numbers.
Since S3QS is used exclusively for shuffle tasks, it can serve as the task manager connecting the upstream and downstream of the shuffle, providing communication channels.

@@ -0,0 +1,9 @@
package io.pixelsdb.pixels.storage.s3qs.exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a license header.

@@ -0,0 +1,85 @@
package io.pixelsdb.pixels.storage.s3qs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

License header.

this.builderPhysicalWriter = PhysicalWriterUtil.newPhysicalWriter(
this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication,
this.builderBlockPadding, this.builderOverwrite);
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch on a new line, { on a new line.

LOGGER.error("Failed to create PhysicalWriter");
throw new PixelsWriterException(
"Failed to create PixelsWriter due to error of creating PhysicalWriter", e);
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{ in a new line.

{
this.queue.push(this.pathStr);
}
}catch (IOException e){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new lines.

private long timestamp = System.currentTimeMillis();
private String metadata = "";

public S3QueueMessage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{ in a new line.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants