diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java index e5cf60b21a..9b86e55906 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java @@ -219,6 +219,7 @@ public static class Builder private boolean builderPartitioned = false; private boolean builderNullsPadding = false; private Optional> builderPartKeyColumnIds = Optional.empty(); + private PhysicalWriter builderPhysicalWriter = null; private Builder() { @@ -230,6 +231,12 @@ public Builder setSchema(TypeDescription schema) return this; } + public Builder setPhysicalWriter(PhysicalWriter writer) + { + this.builderPhysicalWriter = requireNonNull(writer); + return this; + } + public Builder setHasHiddenColumn(boolean hasHiddenColumn) { this.builderHasHiddenColumn = hasHiddenColumn; @@ -336,24 +343,27 @@ public PixelsWriter build() throws PixelsWriterException { requireNonNull(this.builderStorage, "storage is not set"); requireNonNull(this.builderFilePath, "file path is not set"); - PhysicalWriter fsWriter = null; - try - { - fsWriter = PhysicalWriterUtil.newPhysicalWriter( - this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication, - this.builderBlockPadding, this.builderOverwrite); - } catch (IOException e) - { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); - } - if (fsWriter == null) + if(this.builderPhysicalWriter == null) { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter"); + try + { + this.builderPhysicalWriter = PhysicalWriterUtil.newPhysicalWriter( + this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication, + this.builderBlockPadding, this.builderOverwrite); + } + catch (IOException e) + { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + } + if (this.builderPhysicalWriter == null) + { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter"); + } } return new PixelsWriterImpl( @@ -364,7 +374,7 @@ public PixelsWriter build() throws PixelsWriterException builderCompressionKind, builderCompressionBlockSize, builderTimeZone, - fsWriter, + builderPhysicalWriter, builderEncodingLevel, builderNullsPadding, builderPartitioned, diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java index f737378191..d20898cce8 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java @@ -186,6 +186,7 @@ public static class Builder private EncodingLevel builderEncodingLevel = EncodingLevel.EL0; private boolean builderPartitioned = false; private boolean builderNullsPadding = false; + private PhysicalWriter fsWriter = null; private Optional> builderPartKeyColumnIds = Optional.empty(); // added compared to PixelsWriterImpl @@ -256,6 +257,11 @@ public Builder setNullsPadding(boolean nullsPadding) this.builderNullsPadding = nullsPadding; return this; } + public Builder setPhysicalWriter(PhysicalWriter fsWriter) + { + this.fsWriter = fsWriter; + return this; + } public Builder setEncodingLevel(EncodingLevel encodingLevel) { @@ -305,16 +311,20 @@ public PixelsWriter build() throws PixelsWriterException (this.builderPartKeyColumnIds.isPresent() && !this.builderPartKeyColumnIds.get().isEmpty()), "partition column ids are present while partitioned is false, or vice versa"); - PhysicalWriter fsWriter; - try - { - fsWriter = PhysicalWriterUtil.newPhysicalWriter( - this.builderStorage, this.builderFilePath, null); - } catch (IOException e) + PhysicalWriter fsWriter = this.fsWriter; + if (fsWriter == null) { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + try + { + fsWriter = PhysicalWriterUtil.newPhysicalWriter( + this.builderStorage, this.builderFilePath, null); + } + catch (IOException e) + { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + } } if (fsWriter == null) @@ -597,7 +607,8 @@ private void writeRowGroup() throws IOException columnWriters[i] = newColumnWriter(children.get(i), columnWriterOption); } physicalWriter.flush(); - } catch (IOException e) + } + catch (IOException e) { LOGGER.error(e.getMessage()); throw e; diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java index a87da35b25..9a8a4b8dbc 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java @@ -74,7 +74,8 @@ public PhysicalS3QSReader(Storage storage, String path) throws IOException this.buffer = responseBytes.asByteArrayUnsafe(); this.length = this.buffer.length; this.position = 0; - } catch (Exception e) + } + catch (Exception e) { this.position = 0; throw new IOException("Failed to read object.", e); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java index 3865963854..9ba6da62f3 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java @@ -116,10 +116,17 @@ public long append(byte[] buffer, int offset, int length) throws IOException @Override public void close() throws IOException { - this.out.close(); - if (this.queue != null && !this.queue.isClosed()) + try { - this.queue.push(this.pathStr); + this.out.close(); + if (this.queue != null && !this.queue.isClosed()) + { + this.queue.push(this.pathStr); + } + } + catch (IOException e) + { + throw e; } } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index b7c2babe98..26330f1121 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -20,17 +20,23 @@ package io.pixelsdb.pixels.storage.s3qs; import io.pixelsdb.pixels.common.physical.ObjectPath; +import io.pixelsdb.pixels.common.physical.PhysicalReader; +import io.pixelsdb.pixels.common.physical.PhysicalWriter; import io.pixelsdb.pixels.storage.s3.AbstractS3; +import io.pixelsdb.pixels.storage.s3qs.exception.TaskErrorException; import io.pixelsdb.pixels.storage.s3qs.io.S3QSInputStream; import io.pixelsdb.pixels.storage.s3qs.io.S3QSOutputStream; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.UnsupportedOperationException; import java.time.Duration; +import java.util.*; /** * {@link S3QS} is to write and read the small intermediate files in data shuffling. It is compatible with S3, hence its @@ -49,11 +55,26 @@ public final class S3QS extends AbstractS3 { private static final String SchemePrefix = Scheme.s3qs.name() + "://"; + //TODO: need thread safe + public final HashSet producerSet; + // maybe can use array to improve + private final HashSet PartitionSet; + private final HashMap PartitionMap; + private final int invisibleTime; + private SqsClient sqs; public S3QS() { + this(30); + } + + public S3QS(int invisibleTime){ this.connect(); + this.producerSet = new HashSet<>(); + this.PartitionSet = new HashSet<>(); + this.PartitionMap = new HashMap<>(); + this.invisibleTime = invisibleTime; } private synchronized void connect() @@ -90,11 +111,176 @@ public String ensureSchemePrefix(String path) throws IOException return SchemePrefix + path; } + public void addProducer(int workerId) + { + if(!(this.producerSet).contains(workerId)) + { + this.producerSet.add(workerId); + } + } + + //TODO: GC for files, objects and sqs. + //TODO: allow separated invisible timeout config + //producers in a shuffle offer their message to s3qs. + public PhysicalWriter offer(S3QueueMessage mesg) throws IOException + { + S3Queue queue = null; + + //if current Partition is new one, create a new queue + if(!(this.PartitionSet).contains(mesg.getPartitionNum())) + { + String queueUrl = ""; + try + { + queueUrl = createQueue(sqs,invisibleTime, + mesg.getPartitionNum()+"-"+ + (String.valueOf(System.currentTimeMillis())) + ); + } + catch (SqsException e) + { + //TODO: if name is duplicated in aws try again later + throw new IOException(e); + } + if(!(queueUrl.isEmpty())) + { + queue = openQueue(queueUrl); + PartitionSet.add(mesg.getPartitionNum()); + PartitionMap.put(mesg.getPartitionNum(), queue); + } + else + { + throw new IOException("create new queue failed."); + } + } + else + { + queue = PartitionMap.get(mesg.getPartitionNum()); + if(queue.getStopInput()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + } + return queue.offer(mesg); + } + + private static String createQueue(SqsClient sqsClient,int invisibleTime, String queueName) { + try + { + + CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() + .queueName(queueName) + .attributes(Collections.singletonMap( + QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(invisibleTime) + )) + .build(); + + sqsClient.createQueue(createQueueRequest); + + GetQueueUrlResponse getQueueUrlResponse = sqsClient + .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); + return getQueueUrlResponse.queueUrl(); + + } + catch (SqsException e) + { + throw new RuntimeException("fail to create sqs queue: " + queueName, e); + } + } + public S3Queue openQueue(String queueUrl) { return new S3Queue(this, queueUrl); } + + /** + @return including writer and recipthanle(to sign a mesg in sqs). + 3 situation :closed(exception) / not ready or timeout or stopInput(null) / succeed(writer) + */ + public Map.Entry poll(S3QueueMessage mesg, int timeoutSec) throws IOException + { + S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); + // queue close means consumer don't need to listen from it + if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + if(queue == null) return null; + + //if there is no more input, just wait invisibletime. if timeout, no more message + //issue: dead message maybe only handle twice, I think that's reasonable + //TODO(OUT-OF-DATE): once a message dead, push it in dead message queue. when delete a message, delete + Map.Entry pair = null; + try + { + if (queue.getStopInput()) + { + pair = queue.poll(queue.getInvisibleTime()); + if (pair == null) + { + // no more message + queue.close(); //logical close, no effect to consumers + return null; //come back later and find queue is closed + } + } + else + { + pair = queue.poll(timeoutSec); + if (pair == null) return null; //upstream is working, come back later + } + } + catch (TaskErrorException e) + { + //clean up + } + + queue.addConsumer(mesg.getWorkerNum()); + return pair; + } + + public int finishWork(S3QueueMessage mesg) throws IOException + { + //DONOT close queue here. we don't know whether downstream workers meet error + //if we check to close queue here, we maybe check many times. That is expensive + // Once queue closed, consumers in the queue will know in the next poll. + String receiptHandle = mesg.getReceiptHandle(); + S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); + + + if(queue == null) + { + //queue not exist: an error, or a timeout worker + throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + } + try + { + queue.deleteMessage(receiptHandle); + } + catch (SqsException e) + { + //TODO: log + return 2; + } + + // TODO(OUT-OF-DATE): when all the consumers exit with some messages staying in DLQ, the work occur an error + // though we can actually close in poll() function, we close here make sure sqs can close safely. + // thus, we can check which part of task failed in sqs. + if(queue.removeConsumer(mesg.getWorkerNum()) && queue.isClosed()) + { + try + { + DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() + .queueUrl(queue.getQueueUrl()) + .build(); + + sqs.deleteQueue(deleteQueueRequest); + + } + catch (SqsException e) + { + // TODO: log + return 1; + } + + } + + return 0; + } @Override public DataInputStream open(String path) throws IOException { @@ -149,6 +335,13 @@ public boolean directCopy(String src, String dest) @Override public void close() throws IOException { + for (S3Queue queue : PartitionMap.values()) + { + queue.close(); + } + this.producerSet.clear(); + this.PartitionSet.clear(); + this.PartitionMap.clear(); if (this.sqs != null) { this.sqs.close(); @@ -159,6 +352,17 @@ public void close() throws IOException } } + public void refresh() throws IOException + { + for (S3Queue queue : PartitionMap.values()) + { + queue.close(); + } + this.producerSet.clear(); + this.PartitionSet.clear(); + this.PartitionMap.clear(); + } + public SqsClient getSqsClient() { return sqs; diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index 06c99a5f53..73dc0bce55 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -24,19 +24,19 @@ import io.pixelsdb.pixels.common.physical.PhysicalWriter; import io.pixelsdb.pixels.common.physical.PhysicalWriterUtil; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.storage.s3qs.exception.TaskErrorException; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.*; import java.io.Closeable; import java.io.IOException; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT; + /** * This is the queue to read from and write to s3+sqs. * It is thread safe. Using multiple threads to poll and offer the queue can improve the data transfer throughput @@ -64,7 +64,7 @@ public class S3Queue implements Closeable } } - private final Queue s3PathQueue = new ConcurrentLinkedQueue<>(); + private final Queue> s3PathQueue = new ConcurrentLinkedQueue<>(); private final String queueUrl; @@ -72,17 +72,71 @@ public class S3Queue implements Closeable private final S3QS s3qs; + private final HashSetconsumerSet; + private final Lock lock = new ReentrantLock(); private boolean closed = false; - public S3Queue(S3QS s3qs, String queueUrl) + private boolean stopInput = false; + + private int invisibleTime; + + public S3Queue(S3QS s3qs, String queueUrl, int invisibleTime) { this.s3qs = s3qs; this.queueUrl = queueUrl; this.sqsClient = this.s3qs.getSqsClient(); + this.invisibleTime = invisibleTime; + this.consumerSet = new HashSet<>(); + } + public S3Queue(S3QS s3qs, String queueUrl) + { + + this(s3qs, queueUrl,30); + } + + public String getQueueUrl() + { + return this.queueUrl; + } + + public int getInvisibleTime() + { + return this.invisibleTime; + } + + + //s3qs is the highest manager of a shuffle, so producerSet is in charge of s3qs. + private void removeProducer(int workerId) throws IOException + { + this.s3qs.producerSet.remove(workerId); + if(this.s3qs.producerSet.isEmpty()) + { + this.stopInput(); + //this.push(""); + } + } + + public void addConsumer(int workerId) + { + if(!(this.consumerSet).contains(workerId)) + { + this.consumerSet.add(workerId); + } + } + + //maybe unnecessary + public boolean removeConsumer(int workerId) + { + this.consumerSet.remove(workerId); + // TODO: close queue + return consumerSet.isEmpty(); } + //TODO(OUT-OF-DATE): Implement DLQ to handle bad message. + + /** * Poll one object path from the SQS queue and create a physical reader for the object. * Calling this method can receive a batch of object paths from SQS using long polling @@ -94,9 +148,10 @@ public S3Queue(S3QS s3qs, String queueUrl) * @return null if the queue is still empty after timeout * @throws IOException if fails to create the physical reader for the path */ - public PhysicalReader poll(int timeoutSec) throws IOException + public Map.Entry poll(int timeoutSec) throws IOException, SqsException,TaskErrorException + { - String s3Path = this.s3PathQueue.poll(); + Map.Entry s3Path = this.s3PathQueue.poll(); if (s3Path == null) { if (timeoutSec < 1) @@ -114,19 +169,36 @@ public PhysicalReader poll(int timeoutSec) throws IOException while ((s3Path = this.s3PathQueue.poll()) == null) { ReceiveMessageRequest request = ReceiveMessageRequest.builder() - .queueUrl(queueUrl).maxNumberOfMessages(POLL_BATCH_SIZE).waitTimeSeconds(timeoutSec).build(); + .queueUrl(queueUrl) + .attributeNamesWithStrings("ApproximateReceiveCount") + .maxNumberOfMessages(POLL_BATCH_SIZE) + .waitTimeSeconds(timeoutSec).build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request); if (response.hasMessages()) { for (Message message : response.messages()) { String path = message.body(); - this.s3PathQueue.add(path); + String receiptHandle = message.receiptHandle(); + this.s3PathQueue.add(new AbstractMap.SimpleEntry<>(path, receiptHandle)); + + String countStr = message.attributes().get(APPROXIMATE_RECEIVE_COUNT); + if (countStr == null) + { + // If no value is returned, it may be because the property was not requested or the message does not contain that property. + throw new TaskErrorException("ApproximateReceiveCount not returned"); + } + else + { + int count = Integer.parseInt(countStr); + // because we can only promise two receipts can be handled + if (count > 2) throw new TaskErrorException("Dead message occurred"); + } } } else { - // the sqs queue is also empty + // the sqs queue is also empty,timeout,invoker handle this situation, which means timeout. return null; } } @@ -136,37 +208,81 @@ public PhysicalReader poll(int timeoutSec) throws IOException this.lock.unlock(); } } + String receiptHandle = s3Path.getValue(); + PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(this.s3qs, s3Path.getKey()); + return new AbstractMap.SimpleEntry(receiptHandle, reader); + } - return PhysicalReaderUtil.newPhysicalReader(this.s3qs, s3Path); + // do not need to delete local queue + public void deleteMessage(String receiptHandle) throws SqsException + { + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() + .queueUrl(this.getQueueUrl()) + .receiptHandle(receiptHandle) + .build(); + this.sqsClient.deleteMessage(deleteMessageRequest); } - protected void push(String objectPath) + + + protected void push(String objectPath) throws IOException { - SendMessageRequest request = SendMessageRequest.builder() - .queueUrl(queueUrl).messageBody(objectPath).build(); - sqsClient.sendMessage(request); + try + { + SendMessageRequest request = SendMessageRequest.builder() + .queueUrl(queueUrl).messageBody(objectPath).build(); + sqsClient.sendMessage(request); + } + catch (Exception e) + { + throw new IOException("sqs: fail to send message.",e); + } } /** * Create a physical writer for an object of the given path. When the object is written * and the physical writer is closed successfully, the object path is sent to SQS. - * @param objectPath the path of the object + * @param body the information from upstream worker * @return the physical writer of the object * @throws IOException if fails to create the physical writer for the path */ - public PhysicalWriter offer(String objectPath) throws IOException + public PhysicalWriter offer(S3QueueMessage body) throws IOException { + //TODO: same name issue + String objectPath = getObjectPath(body); PhysicalS3QSWriter writer = (PhysicalS3QSWriter) PhysicalWriterUtil .newPhysicalWriter(this.s3qs, objectPath, false); writer.setQueue(this); + if(endWork(body)) removeProducer(body.getWorkerNum()); return writer; } + private String getObjectPath(S3QueueMessage body) throws IOException + { + return body.getObjectPath()+body.getPartitionNum() + "/"+ String.valueOf(System.currentTimeMillis()) ; + } + + private boolean endWork(S3QueueMessage body) throws IOException + { + return body.getEndWork(); + } + + public boolean isClosed() { return closed; } + public void stopInput() + { + this.stopInput = true; + } + + public boolean getStopInput() + { + return this.stopInput; + } + @Override public void close() throws IOException { diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java new file mode 100644 index 0000000000..60bbe57079 --- /dev/null +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -0,0 +1,97 @@ +package io.pixelsdb.pixels.storage.s3qs; + +/** + * Unified message format for communication between worker and s3qs. + * @author yanhaoting + * @create 2025-12-19 + */ +public class S3QueueMessage +{ + private String objectPath; + private int workerNum = 0; + private int partitionNum = 0; + private boolean endWork = false;//specific to workers, indicating whether his work is finished + private String receiptHandle = "";//specific to consumers, indicating which mesg is consumed. + private long timestamp = System.currentTimeMillis(); + private String metadata = ""; + + public S3QueueMessage() + { + } + + public S3QueueMessage(String objectPath) + { + this.objectPath = objectPath; + this.timestamp = System.currentTimeMillis(); + } + + public String getObjectPath() { + return objectPath; + } + + public S3QueueMessage setObjectPath(String objectPath) + { + this.objectPath = objectPath; + return this; + } + + public int getWorkerNum() { + return this.workerNum; + } + + public S3QueueMessage setWorkerNum(int WorkerNum) + { + this.workerNum = WorkerNum; + return this; + } + + public int getPartitionNum() { + return this.partitionNum; + } + + public S3QueueMessage setPartitionNum(int PartitionNum) + { + this.partitionNum = PartitionNum; + return this; + } + + public boolean getEndWork() { + return this.endWork; + } + + public S3QueueMessage setEndwork(boolean endwork) + { + this.endWork = endwork; + return this; + } + + public String getReceiptHandle() { + return this.receiptHandle; + } + + public S3QueueMessage setReceiptHandle(String ReceiptHandle) + { + this.receiptHandle = ReceiptHandle; + return this; + } + + public long getTimestamp() { + return timestamp; + } + + public S3QueueMessage setTimestamp(long timestamp) + { + this.timestamp = timestamp; + return this; + } + + public String getMetadata() { + return metadata; + } + + public S3QueueMessage setMetadata(String metadata) + { + this.metadata = metadata; + return this; + } +} diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java new file mode 100644 index 0000000000..a95b4f6fac --- /dev/null +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java @@ -0,0 +1,13 @@ +package io.pixelsdb.pixels.storage.s3qs.exception; + +/** + * @author yanhaoting + * @create 2025-12-19 + */ +public class TaskErrorException extends Exception +{ + public TaskErrorException(String msg) + { + super(msg); + } +} diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index b3c2d84425..6fe62f0411 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -26,9 +26,8 @@ import org.junit.Test; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.concurrent.*; /** * @author hank @@ -39,67 +38,210 @@ public class TestS3QS private final ExecutorService executor = Executors.newFixedThreadPool(2); @Test - public void startReader() throws IOException, InterruptedException + public void TestWriterAndReader() throws IOException, InterruptedException, ExecutionException { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); - S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); - this.executor.submit(() -> { + Future future = this.executor.submit(() -> { + byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); + s3qs.addProducer(0); s3qs.addProducer(1); for (int i = 0; i < 2; i++) { - try (PhysicalReader reader = queue.poll(20)) + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/") + .setWorkerNum(i) + .setPartitionNum(99) + .setEndwork(true); + try (PhysicalWriter writer = s3qs.offer(body)) + { + writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 + } + catch (IOException e) + { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); + throw new RuntimeException(e); + } + } + + System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + + startTime = System.currentTimeMillis(); + for (int i = 0; i < 3; i++) + { + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/") + .setPartitionNum(99) + .setWorkerNum(2+i); + PhysicalReader reader = null; + String receiptHandle = null; + try { - reader.readFully(8 * 1024 * 1024); + HashMap.Entry pair = s3qs.poll(body,20); + if(pair != null) + { + reader = pair.getValue(); + receiptHandle = pair.getKey(); + body.setReceiptHandle(receiptHandle); + System.out.println("read object " + reader.getPath()); // 添加日志 + } } catch (IOException e) { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); throw new RuntimeException(e); } + finally + { + if(reader != null) + { + try + { + reader.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + try + { + s3qs.finishWork(body); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } } + System.out.println("read finished in " + (System.currentTimeMillis() - startTime) + " ms"); try { - queue.close(); + s3qs.refresh(); } catch (IOException e) { throw new RuntimeException(e); } }); + + try + { + future.get(); + } + catch (ExecutionException e) + { + //e.getCause().printStackTrace(); + throw e; + } + this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } @Test - public void testWriter() throws IOException, InterruptedException + public void testWriter() throws IOException, InterruptedException, ExecutionException { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); - S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); - this.executor.submit(() -> { + //S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); + + Future future = this.executor.submit(() -> + { byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); for (int i = 0; i < 2; i++) { - try (PhysicalWriter writer = queue.offer("pixels-turbo-intermediate/shuffle/" + i)) + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/" + i + "/") + .setPartitionNum(2) + .setEndwork(false); + try (PhysicalWriter writer = s3qs.offer(body)) { writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 } catch (IOException e) { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); throw new RuntimeException(e); } } System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + try { - queue.close(); + s3qs.refresh(); } catch (IOException e) { throw new RuntimeException(e); } }); + + try + { + future.get(); + } + catch (ExecutionException e) + { + //e.getCause().printStackTrace(); + throw e; + } + + this.executor.shutdown(); + this.executor.awaitTermination(100, TimeUnit.HOURS); + } + + @Test + public void testMultiSQSWriter() throws IOException, InterruptedException, ExecutionException + { + S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); + + Future future = this.executor.submit(() -> + { + byte[] buffer = new byte[8 * 1024 * 1024]; + long startTime = System.currentTimeMillis(); + for (int i = 0; i < 2; i++) + { + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/" + 9 + "/") + .setPartitionNum(9) + .setEndwork(false); + try (PhysicalWriter writer = s3qs.offer(body)) + { + writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 + } + catch (IOException e) + { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); + throw new RuntimeException(e); + } + } + System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + + }); + + try + { + future.get(); + } + catch (ExecutionException e) + { + e.getCause().printStackTrace(); + throw e; + } + try + { + s3qs.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } -} \ No newline at end of file +}