From a1e7b5664aef74474a1db1b85502ed3873799908 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Mon, 8 Dec 2025 18:17:58 +0800 Subject: [PATCH 1/7] [Issue1082] enhance s3qs to support a message format with more metadata --- .../pixels/core/PixelsWriterStreamImpl.java | 25 ++++--- .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 63 +++++++++++++++- .../pixels/storage/s3qs/S3QueueMessage.java | 75 +++++++++++++++++++ .../pixels/storage/s3qs/TestS3QS.java | 5 +- 4 files changed, 156 insertions(+), 12 deletions(-) create mode 100644 pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java index 984a262723..8b20e2d15d 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java @@ -186,6 +186,7 @@ public static class Builder private EncodingLevel builderEncodingLevel = EncodingLevel.EL0; private boolean builderPartitioned = false; private boolean builderNullsPadding = false; + private PhysicalWriter fsWriter = null; private Optional> builderPartKeyColumnIds = Optional.empty(); // added compared to PixelsWriterImpl @@ -256,6 +257,11 @@ public Builder setNullsPadding(boolean nullsPadding) this.builderNullsPadding = nullsPadding; return this; } + public Builder setPhysicalWriter(PhysicalWriter fsWriter) + { + this.fsWriter = fsWriter; + return this; + } public Builder setEncodingLevel(EncodingLevel encodingLevel) { @@ -305,16 +311,17 @@ public PixelsWriter build() throws PixelsWriterException (this.builderPartKeyColumnIds.isPresent() && !this.builderPartKeyColumnIds.get().isEmpty()), "partition column ids are present while partitioned is false, or vice versa"); - PhysicalWriter fsWriter; - try - { - fsWriter = PhysicalWriterUtil.newPhysicalWriter( - this.builderStorage, this.builderFilePath, null); - } catch (IOException e) + PhysicalWriter fsWriter = this.fsWriter; + if (fsWriter == null) { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + try { + fsWriter = PhysicalWriterUtil.newPhysicalWriter( + this.builderStorage, this.builderFilePath, null); + } catch (IOException e) { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + } } if (fsWriter == null) diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index 06c99a5f53..6bbbb3c6bd 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -32,6 +32,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.HashSet; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; @@ -72,6 +73,10 @@ public class S3Queue implements Closeable private final S3QS s3qs; + private final HashSet producerSet; + + private final HashSet consumerSet; + private final Lock lock = new ReentrantLock(); private boolean closed = false; @@ -81,8 +86,45 @@ public S3Queue(S3QS s3qs, String queueUrl) this.s3qs = s3qs; this.queueUrl = queueUrl; this.sqsClient = this.s3qs.getSqsClient(); + this.producerSet = new HashSet<>(); + this.consumerSet = new HashSet<>(); + } + + public void addProducer(int workerId) + { + if(!this.producerSet.contains(workerId)){ + this.producerSet.add(workerId); + } + } + + public void removeProducer(int workerId) + { + this.producerSet.remove(workerId); + if(this.producerSet.isEmpty()) + { + this.push(""); + } + } + + public void addConsumer(int workerId) + { + if(!this.consumerSet.contains(workerId)) + { + this.consumerSet.add(workerId); + } + } + + public void removeConsumer(int workerId) + { + this.consumerSet.remove(workerId); + if(consumerSet.isEmpty()) + { + // TODO: close queue + } } + + /** * Poll one object path from the SQS queue and create a physical reader for the object. * Calling this method can receive a batch of object paths from SQS using long polling @@ -95,6 +137,7 @@ public S3Queue(S3QS s3qs, String queueUrl) * @throws IOException if fails to create the physical reader for the path */ public PhysicalReader poll(int timeoutSec) throws IOException + //TODO: now we have clear terminated message, if timeout is still necessary? { String s3Path = this.s3PathQueue.poll(); if (s3Path == null) @@ -140,6 +183,13 @@ public PhysicalReader poll(int timeoutSec) throws IOException return PhysicalReaderUtil.newPhysicalReader(this.s3qs, s3Path); } + private boolean assertPartition(String path , int HashPartition){ + //TODO: if there are dynamic hash or multi-level hash? + String[] parts = path.split("/"); + int pathPartition = Integer.parseInt(parts[parts.length-1]); + return HashPartition == pathPartition; + } + protected void push(String objectPath) { SendMessageRequest request = SendMessageRequest.builder() @@ -150,18 +200,27 @@ protected void push(String objectPath) /** * Create a physical writer for an object of the given path. When the object is written * and the physical writer is closed successfully, the object path is sent to SQS. - * @param objectPath the path of the object + * @param body the information from upstream worker * @return the physical writer of the object * @throws IOException if fails to create the physical writer for the path */ - public PhysicalWriter offer(String objectPath) throws IOException + public PhysicalWriter offer(S3QueueMessage body) throws IOException { + String objectPath = getMessageGroup(body); + addProducer(body.getWorkerNum()); + PhysicalS3QSWriter writer = (PhysicalS3QSWriter) PhysicalWriterUtil .newPhysicalWriter(this.s3qs, objectPath, false); writer.setQueue(this); return writer; } + private String getMessageGroup(S3QueueMessage body) throws IOException + { + return body.getObjectPath()+body.getPartitionNum(); + } + + public boolean isClosed() { return closed; diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java new file mode 100644 index 0000000000..a690376039 --- /dev/null +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -0,0 +1,75 @@ +package io.pixelsdb.pixels.storage.s3qs; + +public class S3QueueMessage +{ + private String objectPath; + private int WorkerNum; + private int PartitionNum; + private String bucketName; + private long timestamp; + private String metadata; // 可选:其他元数据 + + // 无参构造函数 + public S3QueueMessage() { + } + + public S3QueueMessage(String objectPath) { + this.objectPath = objectPath; + this.timestamp = System.currentTimeMillis(); + } + + // getter 和 setter 方法(JSON 库需要) + public String getObjectPath() { + return objectPath; + } + + public S3QueueMessage setObjectPath(String objectPath) { + this.objectPath = objectPath; + return this; + } + + public int getWorkerNum() { + return this.WorkerNum; + } + + public S3QueueMessage setWorkerNum(int WorkerNum) { + this.WorkerNum = WorkerNum; + return this; + } + + public int getPartitionNum() { + return this.PartitionNum; + } + + public S3QueueMessage setPartitionNum(int PartitionNum) { + this.PartitionNum = PartitionNum; + return this; + } + + public String getBucketName() { + return bucketName; + } + + public S3QueueMessage setBucketName(String bucketName) { + this.bucketName = bucketName; + return this; + } + + public long getTimestamp() { + return timestamp; + } + + public S3QueueMessage setTimestamp(long timestamp) { + this.timestamp = timestamp; + return this; + } + + public String getMetadata() { + return metadata; + } + + public S3QueueMessage setMetadata(String metadata) { + this.metadata = metadata; + return this; + } +} diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index b3c2d84425..b7b7fb160a 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -80,7 +80,10 @@ public void testWriter() throws IOException, InterruptedException long startTime = System.currentTimeMillis(); for (int i = 0; i < 2; i++) { - try (PhysicalWriter writer = queue.offer("pixels-turbo-intermediate/shuffle/" + i)) + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/" + i + "/") + .setPartitionNum(2); + try (PhysicalWriter writer = queue.offer(body)) { writer.append(buffer); } From 729ced73a7028ce31fe7bfe0cdcc3244efc8a8a0 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Wed, 10 Dec 2025 20:51:27 +0800 Subject: [PATCH 2/7] [Issue #1082] Allows s3qs shuffle to support one SQS queue per partition --- pixels-storage/pixels-storage-s3qs/pom.xml | 4 + .../io/pixelsdb/pixels/storage/s3qs/S3QS.java | 92 ++++++++++++ .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 30 ++-- .../pixels/storage/s3qs/S3QueueMessage.java | 10 ++ .../pixels/storage/s3qs/TestS3QS.java | 131 ++++++++++++++++-- 5 files changed, 245 insertions(+), 22 deletions(-) diff --git a/pixels-storage/pixels-storage-s3qs/pom.xml b/pixels-storage/pixels-storage-s3qs/pom.xml index 12f56e12bf..c8f0433cde 100644 --- a/pixels-storage/pixels-storage-s3qs/pom.xml +++ b/pixels-storage/pixels-storage-s3qs/pom.xml @@ -40,6 +40,10 @@ + + junit + junit + diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index b7c2babe98..3e3fb9896d 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -20,17 +20,24 @@ package io.pixelsdb.pixels.storage.s3qs; import io.pixelsdb.pixels.common.physical.ObjectPath; +import io.pixelsdb.pixels.common.physical.PhysicalWriter; import io.pixelsdb.pixels.storage.s3.AbstractS3; import io.pixelsdb.pixels.storage.s3qs.io.S3QSInputStream; import io.pixelsdb.pixels.storage.s3qs.io.S3QSOutputStream; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; +import software.amazon.awssdk.services.sqs.model.SqsException; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; /** * {@link S3QS} is to write and read the small intermediate files in data shuffling. It is compatible with S3, hence its @@ -49,11 +56,20 @@ public final class S3QS extends AbstractS3 { private static final String SchemePrefix = Scheme.s3qs.name() + "://"; + public final HashSet producerSet; + // maybe can use array to improve + private final HashSet PartitionSet; + private final HashMap PartitionMap; + + private SqsClient sqs; public S3QS() { this.connect(); + this.producerSet = new HashSet<>(); + this.PartitionSet = new HashSet<>(); + this.PartitionMap = new HashMap<>(); } private synchronized void connect() @@ -90,11 +106,71 @@ public String ensureSchemePrefix(String path) throws IOException return SchemePrefix + path; } + //producers in a shuffle offer their message to s3qs. + public PhysicalWriter offer(S3QueueMessage mesg) throws IOException + { + S3Queue queue = null; + + //if current Partition is new one, create a new queue + if(!(this.PartitionSet).contains(mesg.getPartitionNum())) + { + String queueUrl = ""; + try { + queueUrl = createQueue(sqs, + (String.valueOf(System.currentTimeMillis())) + + "-" + mesg.getPartitionNum() + ); + }catch (SqsException e) { + //TODO: if name is duplicated in aws try again later + throw new IOException(e); + } + if(!(queueUrl.isEmpty())) + { + queue = openQueue(queueUrl); + PartitionSet.add(mesg.getPartitionNum()); + PartitionMap.put(mesg.getPartitionNum(), queue); + } else{ + throw new IOException("create new queue failed."); + } + } + else + { + queue = PartitionMap.get(mesg.getPartitionNum()); + if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + } + return queue.offer(mesg); + } + + private static String createQueue(SqsClient sqsClient, String queueName) { + try { + //System.out.println("\nCreate Queue"); + + CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() + .queueName(queueName) + .build(); + + sqsClient.createQueue(createQueueRequest); + + //System.out.println("\nGet queue url"); + + GetQueueUrlResponse getQueueUrlResponse = sqsClient + .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); + return getQueueUrlResponse.queueUrl(); + + } catch (SqsException e) { + System.err.println(e.awsErrorDetails().errorMessage()); + System.exit(1); + } + return ""; + } + public S3Queue openQueue(String queueUrl) { return new S3Queue(this, queueUrl); } + + @Override public DataInputStream open(String path) throws IOException { @@ -149,6 +225,12 @@ public boolean directCopy(String src, String dest) @Override public void close() throws IOException { + for (S3Queue queue : PartitionMap.values()){ + queue.close(); + } + this.producerSet.clear(); + this.PartitionSet.clear(); + this.PartitionMap.clear(); if (this.sqs != null) { this.sqs.close(); @@ -159,6 +241,16 @@ public void close() throws IOException } } + public void flush() throws IOException + { + for (S3Queue queue : PartitionMap.values()){ + queue.close(); + } + this.producerSet.clear(); + this.PartitionSet.clear(); + this.PartitionMap.clear(); + } + public SqsClient getSqsClient() { return sqs; diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index 6bbbb3c6bd..db059efcf7 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -73,9 +73,7 @@ public class S3Queue implements Closeable private final S3QS s3qs; - private final HashSet producerSet; - - private final HashSet consumerSet; + private final HashSetconsumerSet; private final Lock lock = new ReentrantLock(); @@ -86,21 +84,21 @@ public S3Queue(S3QS s3qs, String queueUrl) this.s3qs = s3qs; this.queueUrl = queueUrl; this.sqsClient = this.s3qs.getSqsClient(); - this.producerSet = new HashSet<>(); this.consumerSet = new HashSet<>(); } public void addProducer(int workerId) { - if(!this.producerSet.contains(workerId)){ - this.producerSet.add(workerId); + if(!(this.s3qs.producerSet).contains(workerId)){ + this.s3qs.producerSet.add(workerId); } } + //s3qs is the highest manager of a shuffle, so producerSet is in charge of s3qs. public void removeProducer(int workerId) { - this.producerSet.remove(workerId); - if(this.producerSet.isEmpty()) + this.s3qs.producerSet.remove(workerId); + if(this.s3qs.producerSet.isEmpty()) { this.push(""); } @@ -108,12 +106,13 @@ public void removeProducer(int workerId) public void addConsumer(int workerId) { - if(!this.consumerSet.contains(workerId)) + if(!(this.consumerSet).contains(workerId)) { this.consumerSet.add(workerId); } } + //maybe unnecessary public void removeConsumer(int workerId) { this.consumerSet.remove(workerId); @@ -123,6 +122,7 @@ public void removeConsumer(int workerId) } } + //TODO: Implement DLQ to handle bad message. /** @@ -206,7 +206,8 @@ protected void push(String objectPath) */ public PhysicalWriter offer(S3QueueMessage body) throws IOException { - String objectPath = getMessageGroup(body); + if(endWork(body)) removeProducer(body.getWorkerNum()); + String objectPath = getObjectPath(body); addProducer(body.getWorkerNum()); PhysicalS3QSWriter writer = (PhysicalS3QSWriter) PhysicalWriterUtil @@ -215,9 +216,14 @@ public PhysicalWriter offer(S3QueueMessage body) throws IOException return writer; } - private String getMessageGroup(S3QueueMessage body) throws IOException + private String getObjectPath(S3QueueMessage body) throws IOException + { + return body.getObjectPath()+body.getPartitionNum() + "/"+ String.valueOf(System.currentTimeMillis()) ; + } + + private boolean endWork(S3QueueMessage body) throws IOException { - return body.getObjectPath()+body.getPartitionNum(); + return body.getEndWork(); } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java index a690376039..9c3b46e14b 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -5,6 +5,7 @@ public class S3QueueMessage private String objectPath; private int WorkerNum; private int PartitionNum; + private boolean EndWork; private String bucketName; private long timestamp; private String metadata; // 可选:其他元数据 @@ -46,6 +47,15 @@ public S3QueueMessage setPartitionNum(int PartitionNum) { return this; } + public boolean getEndWork() { + return this.EndWork; + } + + public S3QueueMessage setEndwork(boolean endwork) { + this.EndWork = endwork; + return this; + } + public String getBucketName() { return bucketName; } diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index b7b7fb160a..fecca2fe9b 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -26,9 +26,7 @@ import org.junit.Test; import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; /** * @author hank @@ -70,39 +68,152 @@ public void startReader() throws IOException, InterruptedException this.executor.awaitTermination(100, TimeUnit.HOURS); } +// @Deprecated +// @Test +// public void testWriter() throws IOException, InterruptedException, ExecutionException +// { +// S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); +// S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); +// +// Future future = this.executor.submit(() -> { +// byte[] buffer = new byte[8 * 1024 * 1024]; +// long startTime = System.currentTimeMillis(); +// for (int i = 0; i < 2; i++) +// { +// S3QueueMessage body = new S3QueueMessage() +// .setObjectPath("pixels-turbo-intermediate/shuffle/" + i + "/") +// .setPartitionNum(2) +// .setEndwork(false); +// try (PhysicalWriter writer = queue.offer(body)) +// { +// writer.append(buffer); +// System.out.println("Wrote partition " + i); // 添加日志 +// } +// catch (IOException e) +// { +// System.err.println("Error in iteration " + i + ": " + e.getMessage()); +// throw new RuntimeException(e); +// } +// } +// System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); +// try +// { +// queue.close(); +// } +// catch (IOException e) +// { +// throw new RuntimeException(e); +// } +// }); +// +// try { +// future.get(); // 等待任务完成并获取可能的异常 +// } catch (ExecutionException e) { +// // 打印真实的异常堆栈 +// e.getCause().printStackTrace(); +// throw e; +// } +// +// this.executor.shutdown(); +// this.executor.awaitTermination(100, TimeUnit.HOURS); +// } +//} + @Test - public void testWriter() throws IOException, InterruptedException + public void testWriter() throws IOException, InterruptedException, ExecutionException { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); - S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); - this.executor.submit(() -> { + //S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); + + Future future = this.executor.submit(() -> { byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); for (int i = 0; i < 2; i++) { S3QueueMessage body = new S3QueueMessage() .setObjectPath("pixels-turbo-intermediate/shuffle/" + i + "/") - .setPartitionNum(2); - try (PhysicalWriter writer = queue.offer(body)) + .setPartitionNum(2) + .setEndwork(false); + try (PhysicalWriter writer = s3qs.offer(body)) { writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 } catch (IOException e) { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); throw new RuntimeException(e); } } System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); try { - queue.close(); + s3qs.flush(); } catch (IOException e) { throw new RuntimeException(e); } }); + + try { + future.get(); // 等待任务完成并获取可能的异常 + } catch (ExecutionException e) { + // 打印真实的异常堆栈 + e.getCause().printStackTrace(); + throw e; + } + this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } -} \ No newline at end of file + @Test + public void testMultiSQSWriter() throws IOException, InterruptedException, ExecutionException + { + S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); + + Future future = this.executor.submit(() -> { + byte[] buffer = new byte[8 * 1024 * 1024]; + long startTime = System.currentTimeMillis(); + for (int i = 0; i < 2; i++) + { + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/" + 9 + "/") + .setPartitionNum(9) + .setEndwork(false); + try (PhysicalWriter writer = s3qs.offer(body)) + { + writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 + } + catch (IOException e) + { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); + throw new RuntimeException(e); + } + } + System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + + }); + + try { + future.get(); // 等待任务完成并获取可能的异常 + } catch (ExecutionException e) { + // 打印真实的异常堆栈 + e.getCause().printStackTrace(); + throw e; + } + try + { + s3qs.close(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + this.executor.shutdown(); + this.executor.awaitTermination(100, TimeUnit.HOURS); + } + +} From 2a8ab589072f08eef479a390d323b0a235cdfe96 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Tue, 23 Dec 2025 19:36:46 +0800 Subject: [PATCH 3/7] [Issue #1082] improve consumers control flow --- .../storage/s3qs/PhysicalS3QSWriter.java | 10 +- .../io/pixelsdb/pixels/storage/s3qs/S3QS.java | 113 ++++++++++++++-- .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 125 ++++++++++++------ .../pixels/storage/s3qs/S3QueueMessage.java | 32 ++--- .../s3qs/exception/TaskErrorException.java | 9 ++ .../pixels/storage/s3qs/TestS3QS.java | 78 +++++++++-- 6 files changed, 289 insertions(+), 78 deletions(-) create mode 100644 pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java index 3865963854..2cbc35987a 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java @@ -116,10 +116,14 @@ public long append(byte[] buffer, int offset, int length) throws IOException @Override public void close() throws IOException { - this.out.close(); - if (this.queue != null && !this.queue.isClosed()) + try { - this.queue.push(this.pathStr); + this.out.close(); + if (this.queue != null && !this.queue.isClosed()) { + this.queue.push(this.pathStr); + } + }catch (IOException e){ + throw e; } } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index 3e3fb9896d..958d09637c 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -20,24 +20,26 @@ package io.pixelsdb.pixels.storage.s3qs; import io.pixelsdb.pixels.common.physical.ObjectPath; +import io.pixelsdb.pixels.common.physical.PhysicalReader; import io.pixelsdb.pixels.common.physical.PhysicalWriter; import io.pixelsdb.pixels.storage.s3.AbstractS3; +import io.pixelsdb.pixels.storage.s3qs.exception.TaskErrorException; import io.pixelsdb.pixels.storage.s3qs.io.S3QSInputStream; import io.pixelsdb.pixels.storage.s3qs.io.S3QSOutputStream; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.CreateQueueRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; -import software.amazon.awssdk.services.sqs.model.SqsException; +import software.amazon.awssdk.services.sqs.model.*; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.UnsupportedOperationException; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.Objects; /** * {@link S3QS} is to write and read the small intermediate files in data shuffling. It is compatible with S3, hence its @@ -56,6 +58,7 @@ public final class S3QS extends AbstractS3 { private static final String SchemePrefix = Scheme.s3qs.name() + "://"; + //TODO: need thread safe public final HashSet producerSet; // maybe can use array to improve private final HashSet PartitionSet; @@ -106,6 +109,16 @@ public String ensureSchemePrefix(String path) throws IOException return SchemePrefix + path; } + public void addProducer(int workerId) + { + if(!(this.producerSet).contains(workerId)){ + this.producerSet.add(workerId); + } + } + + + //TODO: GC for files, objects and sqs. + //TODO: can modify the timeout that message keep invisible for other thread //producers in a shuffle offer their message to s3qs. public PhysicalWriter offer(S3QueueMessage mesg) throws IOException { @@ -117,8 +130,8 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException String queueUrl = ""; try { queueUrl = createQueue(sqs, - (String.valueOf(System.currentTimeMillis())) - + "-" + mesg.getPartitionNum() + mesg.getPartitionNum()+"-"+ + (String.valueOf(System.currentTimeMillis())) ); }catch (SqsException e) { //TODO: if name is duplicated in aws try again later @@ -136,7 +149,7 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException else { queue = PartitionMap.get(mesg.getPartitionNum()); - if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + if(queue.getStopInput()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); } return queue.offer(mesg); } @@ -158,10 +171,8 @@ private static String createQueue(SqsClient sqsClient, String queueName) { return getQueueUrlResponse.queueUrl(); } catch (SqsException e) { - System.err.println(e.awsErrorDetails().errorMessage()); - System.exit(1); + throw new RuntimeException("fail to create sqs queue: " + queueName, e); } - return ""; } public S3Queue openQueue(String queueUrl) @@ -170,7 +181,87 @@ public S3Queue openQueue(String queueUrl) } + /** + @return including writer and recipthanle(to sign a mesg in sqs). + 3 situation :closed(exception) / not ready or timeout or endinput(null) / succeed(writer) + */ + public Map.Entry poll(S3QueueMessage mesg, int timeoutSec) throws IOException + { + S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); + if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + if(queue == null) return null; + + //if there is no more input, just wait invisibletime. if timeout, no more message + //issue: dead message maybe only handle twice, I think that's reasonable + //TODO: once a message dead, push it in dead message queue. when delete a message, delete + Map.Entry pair = null; + try{ + if (queue.getStopInput()) { + pair = queue.poll(queue.getInvisibleTime()); + if (pair == null) { // no more message + queue.close(); //logical close, no effect to consumers + return null; //come back later and find queue is closed + } + } else { + pair = queue.poll(timeoutSec); + if (pair == null) return null; //upstream is working, come back later + } + }catch (TaskErrorException e) { + //clean up + } + +// PhysicalReader reader = pair.getValue(); +// if(Objects.equals(reader.getPath(), "")) { +// queue.stopInput(); +// return null; //comeback later. Maybe there are some message failed and return queue +// } + queue.addConsumer(mesg.getWorkerNum()); + return pair; + } + + public int finishWork(S3QueueMessage mesg) throws IOException + { + //DONOT close queue here. we don't know whether downstream workers meet error + //if we check to close queue here, we maybe check many times. That is expensive + // Once queue closed, consumers in the queue will know in the next poll. + String receiptHandle = mesg.getReceiptHandle(); + S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); + // queue close means consumer don't need to listen from it + //if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + + if(queue == null) { + //queue not exist: an error, or a timeout worker + throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); + } + try { + queue.deleteMessage(receiptHandle); + }catch (SqsException e) { + //TODO: log + return 2; + } + + // TODO: when all the consumers exit with some messages staying in DLQ, the work occur an error + // though we can actually close in poll() function, we close here make sure sqs can close safely. + // thus, we can check which part of task failed in sqs. + if(queue.removeConsumer(mesg.getWorkerNum()) && queue.isClosed()) + { + try { + DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() + .queueUrl(queue.getQueueUrl()) + .build(); + + sqs.deleteQueue(deleteQueueRequest); + + } catch (SqsException e) { + // TODO: log + return 1; + } + + } + + return 0; + } @Override public DataInputStream open(String path) throws IOException { @@ -241,7 +332,7 @@ public void close() throws IOException } } - public void flush() throws IOException + public void refresh() throws IOException { for (S3Queue queue : PartitionMap.values()){ queue.close(); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index db059efcf7..33599de1a4 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -24,20 +24,19 @@ import io.pixelsdb.pixels.common.physical.PhysicalWriter; import io.pixelsdb.pixels.common.physical.PhysicalWriterUtil; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.storage.s3qs.exception.TaskErrorException; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.Message; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.*; import java.io.Closeable; import java.io.IOException; -import java.util.HashSet; -import java.util.Queue; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT; + /** * This is the queue to read from and write to s3+sqs. * It is thread safe. Using multiple threads to poll and offer the queue can improve the data transfer throughput @@ -65,7 +64,7 @@ public class S3Queue implements Closeable } } - private final Queue s3PathQueue = new ConcurrentLinkedQueue<>(); + private final Queue> s3PathQueue = new ConcurrentLinkedQueue<>(); private final String queueUrl; @@ -79,28 +78,43 @@ public class S3Queue implements Closeable private boolean closed = false; - public S3Queue(S3QS s3qs, String queueUrl) + private boolean stopinput = false; + + private int invisibleTime; + + public S3Queue(S3QS s3qs, String queueUrl, int invisibleTime) { this.s3qs = s3qs; this.queueUrl = queueUrl; this.sqsClient = this.s3qs.getSqsClient(); + this.invisibleTime = invisibleTime; this.consumerSet = new HashSet<>(); } + public S3Queue(S3QS s3qs, String queueUrl) + { + + this(s3qs, queueUrl,30); + } - public void addProducer(int workerId) + public String getQueueUrl() { - if(!(this.s3qs.producerSet).contains(workerId)){ - this.s3qs.producerSet.add(workerId); - } + return this.queueUrl; } + public int getInvisibleTime() + { + return this.invisibleTime; + } + + //s3qs is the highest manager of a shuffle, so producerSet is in charge of s3qs. - public void removeProducer(int workerId) + private void removeProducer(int workerId) throws IOException { this.s3qs.producerSet.remove(workerId); if(this.s3qs.producerSet.isEmpty()) { - this.push(""); + this.stopInput(); + //this.push(""); } } @@ -113,13 +127,11 @@ public void addConsumer(int workerId) } //maybe unnecessary - public void removeConsumer(int workerId) + public boolean removeConsumer(int workerId) { this.consumerSet.remove(workerId); - if(consumerSet.isEmpty()) - { - // TODO: close queue - } + // TODO: close queue + return consumerSet.isEmpty(); } //TODO: Implement DLQ to handle bad message. @@ -136,10 +148,10 @@ public void removeConsumer(int workerId) * @return null if the queue is still empty after timeout * @throws IOException if fails to create the physical reader for the path */ - public PhysicalReader poll(int timeoutSec) throws IOException - //TODO: now we have clear terminated message, if timeout is still necessary? + public Map.Entry poll(int timeoutSec) throws IOException, SqsException,TaskErrorException + { - String s3Path = this.s3PathQueue.poll(); + Map.Entry s3Path = this.s3PathQueue.poll(); if (s3Path == null) { if (timeoutSec < 1) @@ -157,19 +169,33 @@ public PhysicalReader poll(int timeoutSec) throws IOException while ((s3Path = this.s3PathQueue.poll()) == null) { ReceiveMessageRequest request = ReceiveMessageRequest.builder() - .queueUrl(queueUrl).maxNumberOfMessages(POLL_BATCH_SIZE).waitTimeSeconds(timeoutSec).build(); + .queueUrl(queueUrl) + .attributeNamesWithStrings("ApproximateReceiveCount") + .maxNumberOfMessages(POLL_BATCH_SIZE) + .waitTimeSeconds(timeoutSec).build(); ReceiveMessageResponse response = sqsClient.receiveMessage(request); if (response.hasMessages()) { for (Message message : response.messages()) { String path = message.body(); - this.s3PathQueue.add(path); + String receiptHandle = message.receiptHandle(); + this.s3PathQueue.add(new AbstractMap.SimpleEntry<>(path, receiptHandle)); + + String countStr = message.attributes().get(APPROXIMATE_RECEIVE_COUNT); + if (countStr == null) { + // 如果没有返回,可能是没有请求该属性或消息不存在该属性 + throw new TaskErrorException("ApproximateReceiveCount not returned"); + } else { + int count = Integer.parseInt(countStr); + // because we can only promise two receipts can be handled + if (count > 2) throw new TaskErrorException("Dead message occurred"); + } } } else { - // the sqs queue is also empty + // the sqs queue is also empty,timeout,invoker handle this situation, which means timeout. return null; } } @@ -179,22 +205,34 @@ public PhysicalReader poll(int timeoutSec) throws IOException this.lock.unlock(); } } - - return PhysicalReaderUtil.newPhysicalReader(this.s3qs, s3Path); + String receiptHandle = s3Path.getValue(); + PhysicalReader reader = PhysicalReaderUtil.newPhysicalReader(this.s3qs, s3Path.getKey()); + return new AbstractMap.SimpleEntry(receiptHandle, reader); } - private boolean assertPartition(String path , int HashPartition){ - //TODO: if there are dynamic hash or multi-level hash? - String[] parts = path.split("/"); - int pathPartition = Integer.parseInt(parts[parts.length-1]); - return HashPartition == pathPartition; + // do not need to delete local queue + public void deleteMessage(String receiptHandle) throws SqsException + { + DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() + .queueUrl(this.getQueueUrl()) + .receiptHandle(receiptHandle) + .build(); + this.sqsClient.deleteMessage(deleteMessageRequest); } - protected void push(String objectPath) + + + protected void push(String objectPath) throws IOException { - SendMessageRequest request = SendMessageRequest.builder() - .queueUrl(queueUrl).messageBody(objectPath).build(); - sqsClient.sendMessage(request); + try + { + SendMessageRequest request = SendMessageRequest.builder() + .queueUrl(queueUrl).messageBody(objectPath).build(); + sqsClient.sendMessage(request); + }catch (Exception e) + { + throw new IOException("sqs: fail to send message.",e); + } } /** @@ -206,13 +244,12 @@ protected void push(String objectPath) */ public PhysicalWriter offer(S3QueueMessage body) throws IOException { - if(endWork(body)) removeProducer(body.getWorkerNum()); + //TODO: same name issue String objectPath = getObjectPath(body); - addProducer(body.getWorkerNum()); - PhysicalS3QSWriter writer = (PhysicalS3QSWriter) PhysicalWriterUtil .newPhysicalWriter(this.s3qs, objectPath, false); writer.setQueue(this); + if(endWork(body)) removeProducer(body.getWorkerNum()); return writer; } @@ -232,6 +269,16 @@ public boolean isClosed() return closed; } + public void stopInput() + { + this.stopinput = true; + } + + public boolean getStopInput() + { + return this.stopinput; + } + @Override public void close() throws IOException { diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java index 9c3b46e14b..f4570a3d51 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -3,12 +3,12 @@ public class S3QueueMessage { private String objectPath; - private int WorkerNum; - private int PartitionNum; - private boolean EndWork; - private String bucketName; - private long timestamp; - private String metadata; // 可选:其他元数据 + private int workerNum = 0; + private int partitionNum = 0; + private boolean endWork = false;//specific to workers, indicating whether his work is finished + private String receiptHandle = "";//specific to consumers, indicating which mesg is consumed. + private long timestamp = System.currentTimeMillis(); + private String metadata = ""; // 无参构造函数 public S3QueueMessage() { @@ -30,38 +30,38 @@ public S3QueueMessage setObjectPath(String objectPath) { } public int getWorkerNum() { - return this.WorkerNum; + return this.workerNum; } public S3QueueMessage setWorkerNum(int WorkerNum) { - this.WorkerNum = WorkerNum; + this.workerNum = WorkerNum; return this; } public int getPartitionNum() { - return this.PartitionNum; + return this.partitionNum; } public S3QueueMessage setPartitionNum(int PartitionNum) { - this.PartitionNum = PartitionNum; + this.partitionNum = PartitionNum; return this; } public boolean getEndWork() { - return this.EndWork; + return this.endWork; } public S3QueueMessage setEndwork(boolean endwork) { - this.EndWork = endwork; + this.endWork = endwork; return this; } - public String getBucketName() { - return bucketName; + public String getReceiptHandle() { + return this.receiptHandle; } - public S3QueueMessage setBucketName(String bucketName) { - this.bucketName = bucketName; + public S3QueueMessage setReceiptHandle(String ReceiptHandle) { + this.receiptHandle = ReceiptHandle; return this; } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java new file mode 100644 index 0000000000..0c3f9fc3af --- /dev/null +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java @@ -0,0 +1,9 @@ +package io.pixelsdb.pixels.storage.s3qs.exception; + +public class TaskErrorException extends Exception +{ + public TaskErrorException(String msg) + { + super(msg); + } +} diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index fecca2fe9b..f45e961a1c 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -26,6 +26,7 @@ import org.junit.Test; import java.io.IOException; +import java.util.HashMap; import java.util.concurrent.*; /** @@ -36,34 +37,92 @@ public class TestS3QS { private final ExecutorService executor = Executors.newFixedThreadPool(2); + @Test - public void startReader() throws IOException, InterruptedException + public void TestReader() throws IOException, InterruptedException, ExecutionException { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); - S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); - this.executor.submit(() -> { + Future future = this.executor.submit(() -> { + byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); + s3qs.addProducer(0); s3qs.addProducer(1); for (int i = 0; i < 2; i++) { - try (PhysicalReader reader = queue.poll(20)) + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/") + .setWorkerNum(i) + .setPartitionNum(99) + .setEndwork(true); + try (PhysicalWriter writer = s3qs.offer(body)) + { + writer.append(buffer); + System.out.println("Wrote partition " + i); // 添加日志 + } + catch (IOException e) { - reader.readFully(8 * 1024 * 1024); + System.err.println("Error in iteration " + i + ": " + e.getMessage()); + throw new RuntimeException(e); + } + } + + System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + + for (int i = 0; i < 3; i++) + { + S3QueueMessage body = new S3QueueMessage() + .setObjectPath("pixels-turbo-intermediate/shuffle/") + .setPartitionNum(99) + .setWorkerNum(2+i); + PhysicalReader reader = null; + try{ + HashMap.Entry pair = s3qs.poll(body,20); + reader = pair.getValue(); + String receiptHandle = pair.getKey(); + body.setReceiptHandle(receiptHandle); + //reader.append(buffer); + System.out.println("read object " + reader.getPath()); // 添加日志 } catch (IOException e) { + System.err.println("Error in iteration " + i + ": " + e.getMessage()); + throw new RuntimeException(e); + } + finally { + if(reader != null){ + try { + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + try { + s3qs.finishWork(body); + } catch (IOException e) { throw new RuntimeException(e); } } - System.out.println("read finished in " + (System.currentTimeMillis() - startTime) + " ms"); + + + System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); try { - queue.close(); + s3qs.refresh(); } catch (IOException e) { throw new RuntimeException(e); } }); + + try { + future.get(); // 等待任务完成并获取可能的异常 + } catch (ExecutionException e) { + // 打印真实的异常堆栈 + //e.getCause().printStackTrace(); + throw e; + } + this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } @@ -146,9 +205,10 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce } } System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + try { - s3qs.flush(); + s3qs.refresh(); } catch (IOException e) { @@ -160,7 +220,7 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce future.get(); // 等待任务完成并获取可能的异常 } catch (ExecutionException e) { // 打印真实的异常堆栈 - e.getCause().printStackTrace(); + //e.getCause().printStackTrace(); throw e; } From fb9cfd8f7ba30b2c0591b75273b652950c5b02b6 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Wed, 24 Dec 2025 18:03:47 +0800 Subject: [PATCH 4/7] [Issue #1082] support sqs invisible time out --- .../io/pixelsdb/pixels/storage/s3qs/S3QS.java | 25 ++++++++++++------- .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 2 +- .../pixels/storage/s3qs/TestS3QS.java | 22 ++++++++-------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index 958d09637c..bcad811c70 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -36,10 +36,7 @@ import java.io.IOException; import java.lang.UnsupportedOperationException; import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Objects; +import java.util.*; /** * {@link S3QS} is to write and read the small intermediate files in data shuffling. It is compatible with S3, hence its @@ -63,16 +60,22 @@ public final class S3QS extends AbstractS3 // maybe can use array to improve private final HashSet PartitionSet; private final HashMap PartitionMap; + private final int invisibleTime; private SqsClient sqs; public S3QS() { + this(30); + } + + public S3QS(int invisibleTime){ this.connect(); this.producerSet = new HashSet<>(); this.PartitionSet = new HashSet<>(); this.PartitionMap = new HashMap<>(); + this.invisibleTime = invisibleTime; } private synchronized void connect() @@ -118,7 +121,8 @@ public void addProducer(int workerId) //TODO: GC for files, objects and sqs. - //TODO: can modify the timeout that message keep invisible for other thread + //TODO(DONE): can modify the timeout that message keep invisible for other thread + //TODO: allow independent invisible time config //producers in a shuffle offer their message to s3qs. public PhysicalWriter offer(S3QueueMessage mesg) throws IOException { @@ -129,7 +133,7 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException { String queueUrl = ""; try { - queueUrl = createQueue(sqs, + queueUrl = createQueue(sqs,invisibleTime, mesg.getPartitionNum()+"-"+ (String.valueOf(System.currentTimeMillis())) ); @@ -154,12 +158,15 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException return queue.offer(mesg); } - private static String createQueue(SqsClient sqsClient, String queueName) { + private static String createQueue(SqsClient sqsClient,int invisibleTime, String queueName) { try { //System.out.println("\nCreate Queue"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) + .attributes(Collections.singletonMap( + QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(invisibleTime) + )) .build(); sqsClient.createQueue(createQueueRequest); @@ -193,7 +200,7 @@ public Map.Entry poll(S3QueueMessage mesg, int timeoutSec //if there is no more input, just wait invisibletime. if timeout, no more message //issue: dead message maybe only handle twice, I think that's reasonable - //TODO: once a message dead, push it in dead message queue. when delete a message, delete + //TODO(OUT-OF-DATE): once a message dead, push it in dead message queue. when delete a message, delete Map.Entry pair = null; try{ if (queue.getStopInput()) { @@ -241,7 +248,7 @@ public int finishWork(S3QueueMessage mesg) throws IOException return 2; } - // TODO: when all the consumers exit with some messages staying in DLQ, the work occur an error + // TODO(OUT-OF-DATE): when all the consumers exit with some messages staying in DLQ, the work occur an error // though we can actually close in poll() function, we close here make sure sqs can close safely. // thus, we can check which part of task failed in sqs. if(queue.removeConsumer(mesg.getWorkerNum()) && queue.isClosed()) diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index 33599de1a4..c6f59166ff 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -134,7 +134,7 @@ public boolean removeConsumer(int workerId) return consumerSet.isEmpty(); } - //TODO: Implement DLQ to handle bad message. + //TODO(OUT-OF-DATE): Implement DLQ to handle bad message. /** diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index f45e961a1c..27416d222a 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -74,13 +74,15 @@ public void TestReader() throws IOException, InterruptedException, ExecutionExce .setPartitionNum(99) .setWorkerNum(2+i); PhysicalReader reader = null; + String receiptHandle = null; try{ HashMap.Entry pair = s3qs.poll(body,20); - reader = pair.getValue(); - String receiptHandle = pair.getKey(); - body.setReceiptHandle(receiptHandle); - //reader.append(buffer); - System.out.println("read object " + reader.getPath()); // 添加日志 + if(pair != null) { + reader = pair.getValue(); + receiptHandle = pair.getKey(); + body.setReceiptHandle(receiptHandle); + System.out.println("read object " + reader.getPath()); // 添加日志 + } } catch (IOException e) { @@ -94,13 +96,13 @@ public void TestReader() throws IOException, InterruptedException, ExecutionExce } catch (IOException e) { throw new RuntimeException(e); } + try { + s3qs.finishWork(body); + } catch (IOException e) { + throw new RuntimeException(e); + } } } - try { - s3qs.finishWork(body); - } catch (IOException e) { - throw new RuntimeException(e); - } } From 1502d16e2f9438e719df06b59ae0902b4d5b5a70 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Sun, 28 Dec 2025 16:45:21 +0800 Subject: [PATCH 5/7] [Issue #1082] Optimized the redundancy in the code structure. --- .../java/io/pixelsdb/pixels/storage/s3qs/S3QS.java | 13 +++---------- .../io/pixelsdb/pixels/storage/s3qs/S3Queue.java | 6 +++--- .../pixels/storage/s3qs/S3QueueMessage.java | 2 +- .../io/pixelsdb/pixels/storage/s3qs/TestS3QS.java | 2 +- 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index bcad811c70..e61b216fb5 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -121,8 +121,7 @@ public void addProducer(int workerId) //TODO: GC for files, objects and sqs. - //TODO(DONE): can modify the timeout that message keep invisible for other thread - //TODO: allow independent invisible time config + //TODO: allow separated invisible timeout config //producers in a shuffle offer their message to s3qs. public PhysicalWriter offer(S3QueueMessage mesg) throws IOException { @@ -190,11 +189,12 @@ public S3Queue openQueue(String queueUrl) /** @return including writer and recipthanle(to sign a mesg in sqs). - 3 situation :closed(exception) / not ready or timeout or endinput(null) / succeed(writer) + 3 situation :closed(exception) / not ready or timeout or stopInput(null) / succeed(writer) */ public Map.Entry poll(S3QueueMessage mesg, int timeoutSec) throws IOException { S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); + // queue close means consumer don't need to listen from it if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); if(queue == null) return null; @@ -217,11 +217,6 @@ public Map.Entry poll(S3QueueMessage mesg, int timeoutSec //clean up } -// PhysicalReader reader = pair.getValue(); -// if(Objects.equals(reader.getPath(), "")) { -// queue.stopInput(); -// return null; //comeback later. Maybe there are some message failed and return queue -// } queue.addConsumer(mesg.getWorkerNum()); return pair; } @@ -234,8 +229,6 @@ public int finishWork(S3QueueMessage mesg) throws IOException String receiptHandle = mesg.getReceiptHandle(); S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); - // queue close means consumer don't need to listen from it - //if(queue.isClosed()) throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); if(queue == null) { //queue not exist: an error, or a timeout worker diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index c6f59166ff..b2cbd2e761 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -78,7 +78,7 @@ public class S3Queue implements Closeable private boolean closed = false; - private boolean stopinput = false; + private boolean stopInput = false; private int invisibleTime; @@ -271,12 +271,12 @@ public boolean isClosed() public void stopInput() { - this.stopinput = true; + this.stopInput = true; } public boolean getStopInput() { - return this.stopinput; + return this.stopInput; } @Override diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java index f4570a3d51..b450d573b7 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -19,7 +19,7 @@ public S3QueueMessage(String objectPath) { this.timestamp = System.currentTimeMillis(); } - // getter 和 setter 方法(JSON 库需要) + public String getObjectPath() { return objectPath; } diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index 27416d222a..2f43b38320 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -39,7 +39,7 @@ public class TestS3QS @Test - public void TestReader() throws IOException, InterruptedException, ExecutionException + public void TestWriterAndReader() throws IOException, InterruptedException, ExecutionException { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); Future future = this.executor.submit(() -> { From 70fa63aab61fdeaabceee9624c3063886c524328 Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Wed, 31 Dec 2025 18:35:07 +0800 Subject: [PATCH 6/7] [Issue #1082] Optimized code format --- .../pixels/core/PixelsWriterImpl.java | 44 +++++++----- pixels-storage/pixels-storage-s3qs/pom.xml | 4 -- .../storage/s3qs/PhysicalS3QSWriter.java | 3 +- .../io/pixelsdb/pixels/storage/s3qs/S3QS.java | 14 ++-- .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 2 +- .../pixels/storage/s3qs/S3QueueMessage.java | 7 +- .../s3qs/exception/TaskErrorException.java | 4 ++ .../pixels/storage/s3qs/TestS3QS.java | 67 ++----------------- 8 files changed, 50 insertions(+), 95 deletions(-) diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java index e5cf60b21a..f81e33d1b5 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java @@ -219,6 +219,7 @@ public static class Builder private boolean builderPartitioned = false; private boolean builderNullsPadding = false; private Optional> builderPartKeyColumnIds = Optional.empty(); + private PhysicalWriter builderPhysicalWriter = null; private Builder() { @@ -230,6 +231,12 @@ public Builder setSchema(TypeDescription schema) return this; } + public Builder setPhysicalWriter(PhysicalWriter writer) + { + this.builderPhysicalWriter = requireNonNull(writer); + return this; + } + public Builder setHasHiddenColumn(boolean hasHiddenColumn) { this.builderHasHiddenColumn = hasHiddenColumn; @@ -336,26 +343,27 @@ public PixelsWriter build() throws PixelsWriterException { requireNonNull(this.builderStorage, "storage is not set"); requireNonNull(this.builderFilePath, "file path is not set"); - PhysicalWriter fsWriter = null; - try - { - fsWriter = PhysicalWriterUtil.newPhysicalWriter( - this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication, - this.builderBlockPadding, this.builderOverwrite); - } catch (IOException e) - { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); - } - if (fsWriter == null) - { - LOGGER.error("Failed to create PhysicalWriter"); - throw new PixelsWriterException( - "Failed to create PixelsWriter due to error of creating PhysicalWriter"); + if(this.builderPhysicalWriter == null){ + try { + this.builderPhysicalWriter = PhysicalWriterUtil.newPhysicalWriter( + this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication, + this.builderBlockPadding, this.builderOverwrite); + } catch (IOException e) { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); + } + if (this.builderPhysicalWriter == null) + { + LOGGER.error("Failed to create PhysicalWriter"); + throw new PixelsWriterException( + "Failed to create PixelsWriter due to error of creating PhysicalWriter"); + } } + + return new PixelsWriterImpl( builderSchema, builderHasHiddenColumn, @@ -364,7 +372,7 @@ public PixelsWriter build() throws PixelsWriterException builderCompressionKind, builderCompressionBlockSize, builderTimeZone, - fsWriter, + builderPhysicalWriter, builderEncodingLevel, builderNullsPadding, builderPartitioned, diff --git a/pixels-storage/pixels-storage-s3qs/pom.xml b/pixels-storage/pixels-storage-s3qs/pom.xml index c8f0433cde..12f56e12bf 100644 --- a/pixels-storage/pixels-storage-s3qs/pom.xml +++ b/pixels-storage/pixels-storage-s3qs/pom.xml @@ -40,10 +40,6 @@ - - junit - junit - diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java index 2cbc35987a..7db7bb3309 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java @@ -119,7 +119,8 @@ public void close() throws IOException try { this.out.close(); - if (this.queue != null && !this.queue.isClosed()) { + if (this.queue != null && !this.queue.isClosed()) + { this.queue.push(this.pathStr); } }catch (IOException e){ diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index e61b216fb5..b65254b62e 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -62,7 +62,6 @@ public final class S3QS extends AbstractS3 private final HashMap PartitionMap; private final int invisibleTime; - private SqsClient sqs; public S3QS() @@ -114,12 +113,12 @@ public String ensureSchemePrefix(String path) throws IOException public void addProducer(int workerId) { - if(!(this.producerSet).contains(workerId)){ + if(!(this.producerSet).contains(workerId)) + { this.producerSet.add(workerId); } } - //TODO: GC for files, objects and sqs. //TODO: allow separated invisible timeout config //producers in a shuffle offer their message to s3qs. @@ -131,12 +130,14 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException if(!(this.PartitionSet).contains(mesg.getPartitionNum())) { String queueUrl = ""; - try { + try + { queueUrl = createQueue(sqs,invisibleTime, mesg.getPartitionNum()+"-"+ (String.valueOf(System.currentTimeMillis())) ); - }catch (SqsException e) { + }catch (SqsException e) + { //TODO: if name is duplicated in aws try again later throw new IOException(e); } @@ -159,7 +160,6 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException private static String createQueue(SqsClient sqsClient,int invisibleTime, String queueName) { try { - //System.out.println("\nCreate Queue"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) @@ -170,8 +170,6 @@ private static String createQueue(SqsClient sqsClient,int invisibleTime, String sqsClient.createQueue(createQueueRequest); - //System.out.println("\nGet queue url"); - GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index b2cbd2e761..beac5dc359 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -184,7 +184,7 @@ public Map.Entry poll(int timeoutSec) throws IOException String countStr = message.attributes().get(APPROXIMATE_RECEIVE_COUNT); if (countStr == null) { - // 如果没有返回,可能是没有请求该属性或消息不存在该属性 + // If no value is returned, it may be because the property was not requested or the message does not contain that property. throw new TaskErrorException("ApproximateReceiveCount not returned"); } else { int count = Integer.parseInt(countStr); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java index b450d573b7..2e1e0d9ddd 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -1,5 +1,10 @@ package io.pixelsdb.pixels.storage.s3qs; +/** + * Unified message format for communication between worker and s3qs. + * @author yanhaoting + * @create 2025-12-19 + */ public class S3QueueMessage { private String objectPath; @@ -10,7 +15,6 @@ public class S3QueueMessage private long timestamp = System.currentTimeMillis(); private String metadata = ""; - // 无参构造函数 public S3QueueMessage() { } @@ -19,7 +23,6 @@ public S3QueueMessage(String objectPath) { this.timestamp = System.currentTimeMillis(); } - public String getObjectPath() { return objectPath; } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java index 0c3f9fc3af..a95b4f6fac 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/exception/TaskErrorException.java @@ -1,5 +1,9 @@ package io.pixelsdb.pixels.storage.s3qs.exception; +/** + * @author yanhaoting + * @create 2025-12-19 + */ public class TaskErrorException extends Exception { public TaskErrorException(String msg) diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index 2f43b38320..4ae1d312c3 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -37,7 +37,6 @@ public class TestS3QS { private final ExecutorService executor = Executors.newFixedThreadPool(2); - @Test public void TestWriterAndReader() throws IOException, InterruptedException, ExecutionException { @@ -67,6 +66,7 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + startTime = System.currentTimeMillis(); for (int i = 0; i < 3; i++) { S3QueueMessage body = new S3QueueMessage() @@ -105,8 +105,7 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec } } - - System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); + System.out.println("read finished in " + (System.currentTimeMillis() - startTime) + " ms"); try { s3qs.refresh(); @@ -118,9 +117,8 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec }); try { - future.get(); // 等待任务完成并获取可能的异常 + future.get(); } catch (ExecutionException e) { - // 打印真实的异常堆栈 //e.getCause().printStackTrace(); throw e; } @@ -129,57 +127,6 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec this.executor.awaitTermination(100, TimeUnit.HOURS); } -// @Deprecated -// @Test -// public void testWriter() throws IOException, InterruptedException, ExecutionException -// { -// S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); -// S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); -// -// Future future = this.executor.submit(() -> { -// byte[] buffer = new byte[8 * 1024 * 1024]; -// long startTime = System.currentTimeMillis(); -// for (int i = 0; i < 2; i++) -// { -// S3QueueMessage body = new S3QueueMessage() -// .setObjectPath("pixels-turbo-intermediate/shuffle/" + i + "/") -// .setPartitionNum(2) -// .setEndwork(false); -// try (PhysicalWriter writer = queue.offer(body)) -// { -// writer.append(buffer); -// System.out.println("Wrote partition " + i); // 添加日志 -// } -// catch (IOException e) -// { -// System.err.println("Error in iteration " + i + ": " + e.getMessage()); -// throw new RuntimeException(e); -// } -// } -// System.out.println("write finished in " + (System.currentTimeMillis() - startTime) + " ms"); -// try -// { -// queue.close(); -// } -// catch (IOException e) -// { -// throw new RuntimeException(e); -// } -// }); -// -// try { -// future.get(); // 等待任务完成并获取可能的异常 -// } catch (ExecutionException e) { -// // 打印真实的异常堆栈 -// e.getCause().printStackTrace(); -// throw e; -// } -// -// this.executor.shutdown(); -// this.executor.awaitTermination(100, TimeUnit.HOURS); -// } -//} - @Test public void testWriter() throws IOException, InterruptedException, ExecutionException { @@ -219,9 +166,8 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce }); try { - future.get(); // 等待任务完成并获取可能的异常 + future.get(); } catch (ExecutionException e) { - // 打印真实的异常堆栈 //e.getCause().printStackTrace(); throw e; } @@ -229,6 +175,7 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } + @Test public void testMultiSQSWriter() throws IOException, InterruptedException, ExecutionException { @@ -259,9 +206,8 @@ public void testMultiSQSWriter() throws IOException, InterruptedException, Execu }); try { - future.get(); // 等待任务完成并获取可能的异常 + future.get(); } catch (ExecutionException e) { - // 打印真实的异常堆栈 e.getCause().printStackTrace(); throw e; } @@ -277,5 +223,4 @@ public void testMultiSQSWriter() throws IOException, InterruptedException, Execu this.executor.shutdown(); this.executor.awaitTermination(100, TimeUnit.HOURS); } - } From 510dc320cfccdbf8226300fc440eee56a3ffd64e Mon Sep 17 00:00:00 2001 From: yanhaoting <2773158174@qq.com> Date: Wed, 31 Dec 2025 18:55:27 +0800 Subject: [PATCH 7/7] [Issue #1082] fix some details --- .../pixels/core/PixelsWriterImpl.java | 12 ++-- .../pixels/core/PixelsWriterStreamImpl.java | 10 +++- .../storage/s3qs/PhysicalS3QSReader.java | 3 +- .../storage/s3qs/PhysicalS3QSWriter.java | 4 +- .../io/pixelsdb/pixels/storage/s3qs/S3QS.java | 55 +++++++++++++------ .../pixelsdb/pixels/storage/s3qs/S3Queue.java | 10 +++- .../pixels/storage/s3qs/S3QueueMessage.java | 27 ++++++--- .../pixels/storage/s3qs/TestS3QS.java | 53 ++++++++++++------ 8 files changed, 120 insertions(+), 54 deletions(-) diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java index f81e33d1b5..9b86e55906 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java @@ -344,12 +344,16 @@ public PixelsWriter build() throws PixelsWriterException requireNonNull(this.builderStorage, "storage is not set"); requireNonNull(this.builderFilePath, "file path is not set"); - if(this.builderPhysicalWriter == null){ - try { + if(this.builderPhysicalWriter == null) + { + try + { this.builderPhysicalWriter = PhysicalWriterUtil.newPhysicalWriter( this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication, this.builderBlockPadding, this.builderOverwrite); - } catch (IOException e) { + } + catch (IOException e) + { LOGGER.error("Failed to create PhysicalWriter"); throw new PixelsWriterException( "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); @@ -362,8 +366,6 @@ public PixelsWriter build() throws PixelsWriterException } } - - return new PixelsWriterImpl( builderSchema, builderHasHiddenColumn, diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java index 6074ec2cf7..d20898cce8 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java @@ -314,10 +314,13 @@ public PixelsWriter build() throws PixelsWriterException PhysicalWriter fsWriter = this.fsWriter; if (fsWriter == null) { - try { + try + { fsWriter = PhysicalWriterUtil.newPhysicalWriter( this.builderStorage, this.builderFilePath, null); - } catch (IOException e) { + } + catch (IOException e) + { LOGGER.error("Failed to create PhysicalWriter"); throw new PixelsWriterException( "Failed to create PixelsWriter due to error of creating PhysicalWriter", e); @@ -604,7 +607,8 @@ private void writeRowGroup() throws IOException columnWriters[i] = newColumnWriter(children.get(i), columnWriterOption); } physicalWriter.flush(); - } catch (IOException e) + } + catch (IOException e) { LOGGER.error(e.getMessage()); throw e; diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java index a87da35b25..9a8a4b8dbc 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java @@ -74,7 +74,8 @@ public PhysicalS3QSReader(Storage storage, String path) throws IOException this.buffer = responseBytes.asByteArrayUnsafe(); this.length = this.buffer.length; this.position = 0; - } catch (Exception e) + } + catch (Exception e) { this.position = 0; throw new IOException("Failed to read object.", e); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java index 7db7bb3309..9ba6da62f3 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java @@ -123,7 +123,9 @@ public void close() throws IOException { this.queue.push(this.pathStr); } - }catch (IOException e){ + } + catch (IOException e) + { throw e; } } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java index b65254b62e..26330f1121 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java @@ -136,7 +136,8 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException mesg.getPartitionNum()+"-"+ (String.valueOf(System.currentTimeMillis())) ); - }catch (SqsException e) + } + catch (SqsException e) { //TODO: if name is duplicated in aws try again later throw new IOException(e); @@ -146,7 +147,9 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException queue = openQueue(queueUrl); PartitionSet.add(mesg.getPartitionNum()); PartitionMap.put(mesg.getPartitionNum(), queue); - } else{ + } + else + { throw new IOException("create new queue failed."); } } @@ -159,7 +162,8 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException } private static String createQueue(SqsClient sqsClient,int invisibleTime, String queueName) { - try { + try + { CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) @@ -174,7 +178,9 @@ private static String createQueue(SqsClient sqsClient,int invisibleTime, String .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); return getQueueUrlResponse.queueUrl(); - } catch (SqsException e) { + } + catch (SqsException e) + { throw new RuntimeException("fail to create sqs queue: " + queueName, e); } } @@ -200,18 +206,26 @@ public Map.Entry poll(S3QueueMessage mesg, int timeoutSec //issue: dead message maybe only handle twice, I think that's reasonable //TODO(OUT-OF-DATE): once a message dead, push it in dead message queue. when delete a message, delete Map.Entry pair = null; - try{ - if (queue.getStopInput()) { + try + { + if (queue.getStopInput()) + { pair = queue.poll(queue.getInvisibleTime()); - if (pair == null) { // no more message + if (pair == null) + { + // no more message queue.close(); //logical close, no effect to consumers return null; //come back later and find queue is closed } - } else { + } + else + { pair = queue.poll(timeoutSec); if (pair == null) return null; //upstream is working, come back later } - }catch (TaskErrorException e) { + } + catch (TaskErrorException e) + { //clean up } @@ -228,13 +242,17 @@ public int finishWork(S3QueueMessage mesg) throws IOException S3Queue queue = PartitionMap.get(mesg.getPartitionNum()); - if(queue == null) { + if(queue == null) + { //queue not exist: an error, or a timeout worker throw new IOException("queue " + mesg.getPartitionNum() + " is closed."); } - try { + try + { queue.deleteMessage(receiptHandle); - }catch (SqsException e) { + } + catch (SqsException e) + { //TODO: log return 2; } @@ -244,14 +262,17 @@ public int finishWork(S3QueueMessage mesg) throws IOException // thus, we can check which part of task failed in sqs. if(queue.removeConsumer(mesg.getWorkerNum()) && queue.isClosed()) { - try { + try + { DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queue.getQueueUrl()) .build(); sqs.deleteQueue(deleteQueueRequest); - } catch (SqsException e) { + } + catch (SqsException e) + { // TODO: log return 1; } @@ -314,7 +335,8 @@ public boolean directCopy(String src, String dest) @Override public void close() throws IOException { - for (S3Queue queue : PartitionMap.values()){ + for (S3Queue queue : PartitionMap.values()) + { queue.close(); } this.producerSet.clear(); @@ -332,7 +354,8 @@ public void close() throws IOException public void refresh() throws IOException { - for (S3Queue queue : PartitionMap.values()){ + for (S3Queue queue : PartitionMap.values()) + { queue.close(); } this.producerSet.clear(); diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java index beac5dc359..73dc0bce55 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java @@ -183,10 +183,13 @@ public Map.Entry poll(int timeoutSec) throws IOException this.s3PathQueue.add(new AbstractMap.SimpleEntry<>(path, receiptHandle)); String countStr = message.attributes().get(APPROXIMATE_RECEIVE_COUNT); - if (countStr == null) { + if (countStr == null) + { // If no value is returned, it may be because the property was not requested or the message does not contain that property. throw new TaskErrorException("ApproximateReceiveCount not returned"); - } else { + } + else + { int count = Integer.parseInt(countStr); // because we can only promise two receipts can be handled if (count > 2) throw new TaskErrorException("Dead message occurred"); @@ -229,7 +232,8 @@ protected void push(String objectPath) throws IOException SendMessageRequest request = SendMessageRequest.builder() .queueUrl(queueUrl).messageBody(objectPath).build(); sqsClient.sendMessage(request); - }catch (Exception e) + } + catch (Exception e) { throw new IOException("sqs: fail to send message.",e); } diff --git a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java index 2e1e0d9ddd..60bbe57079 100644 --- a/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java +++ b/pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java @@ -15,10 +15,12 @@ public class S3QueueMessage private long timestamp = System.currentTimeMillis(); private String metadata = ""; - public S3QueueMessage() { + public S3QueueMessage() + { } - public S3QueueMessage(String objectPath) { + public S3QueueMessage(String objectPath) + { this.objectPath = objectPath; this.timestamp = System.currentTimeMillis(); } @@ -27,7 +29,8 @@ public String getObjectPath() { return objectPath; } - public S3QueueMessage setObjectPath(String objectPath) { + public S3QueueMessage setObjectPath(String objectPath) + { this.objectPath = objectPath; return this; } @@ -36,7 +39,8 @@ public int getWorkerNum() { return this.workerNum; } - public S3QueueMessage setWorkerNum(int WorkerNum) { + public S3QueueMessage setWorkerNum(int WorkerNum) + { this.workerNum = WorkerNum; return this; } @@ -45,7 +49,8 @@ public int getPartitionNum() { return this.partitionNum; } - public S3QueueMessage setPartitionNum(int PartitionNum) { + public S3QueueMessage setPartitionNum(int PartitionNum) + { this.partitionNum = PartitionNum; return this; } @@ -54,7 +59,8 @@ public boolean getEndWork() { return this.endWork; } - public S3QueueMessage setEndwork(boolean endwork) { + public S3QueueMessage setEndwork(boolean endwork) + { this.endWork = endwork; return this; } @@ -63,7 +69,8 @@ public String getReceiptHandle() { return this.receiptHandle; } - public S3QueueMessage setReceiptHandle(String ReceiptHandle) { + public S3QueueMessage setReceiptHandle(String ReceiptHandle) + { this.receiptHandle = ReceiptHandle; return this; } @@ -72,7 +79,8 @@ public long getTimestamp() { return timestamp; } - public S3QueueMessage setTimestamp(long timestamp) { + public S3QueueMessage setTimestamp(long timestamp) + { this.timestamp = timestamp; return this; } @@ -81,7 +89,8 @@ public String getMetadata() { return metadata; } - public S3QueueMessage setMetadata(String metadata) { + public S3QueueMessage setMetadata(String metadata) + { this.metadata = metadata; return this; } diff --git a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java index 4ae1d312c3..6fe62f0411 100644 --- a/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java +++ b/pixels-storage/pixels-storage-s3qs/src/test/java/io/pixelsdb/pixels/storage/s3qs/TestS3QS.java @@ -75,9 +75,11 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec .setWorkerNum(2+i); PhysicalReader reader = null; String receiptHandle = null; - try{ + try + { HashMap.Entry pair = s3qs.poll(body,20); - if(pair != null) { + if(pair != null) + { reader = pair.getValue(); receiptHandle = pair.getKey(); body.setReceiptHandle(receiptHandle); @@ -89,16 +91,24 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec System.err.println("Error in iteration " + i + ": " + e.getMessage()); throw new RuntimeException(e); } - finally { - if(reader != null){ - try { + finally + { + if(reader != null) + { + try + { reader.close(); - } catch (IOException e) { + } + catch (IOException e) + { throw new RuntimeException(e); } - try { + try + { s3qs.finishWork(body); - } catch (IOException e) { + } + catch (IOException e) + { throw new RuntimeException(e); } } @@ -116,9 +126,12 @@ public void TestWriterAndReader() throws IOException, InterruptedException, Exec } }); - try { + try + { future.get(); - } catch (ExecutionException e) { + } + catch (ExecutionException e) + { //e.getCause().printStackTrace(); throw e; } @@ -133,7 +146,8 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); //S3Queue queue = s3qs.openQueue("https://sqs.us-east-2.amazonaws.com/970089764833/pixels-shuffle"); - Future future = this.executor.submit(() -> { + Future future = this.executor.submit(() -> + { byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); for (int i = 0; i < 2; i++) @@ -165,9 +179,12 @@ public void testWriter() throws IOException, InterruptedException, ExecutionExce } }); - try { + try + { future.get(); - } catch (ExecutionException e) { + } + catch (ExecutionException e) + { //e.getCause().printStackTrace(); throw e; } @@ -181,7 +198,8 @@ public void testMultiSQSWriter() throws IOException, InterruptedException, Execu { S3QS s3qs = (S3QS) StorageFactory.Instance().getStorage(Storage.Scheme.s3qs); - Future future = this.executor.submit(() -> { + Future future = this.executor.submit(() -> + { byte[] buffer = new byte[8 * 1024 * 1024]; long startTime = System.currentTimeMillis(); for (int i = 0; i < 2; i++) @@ -205,9 +223,12 @@ public void testMultiSQSWriter() throws IOException, InterruptedException, Execu }); - try { + try + { future.get(); - } catch (ExecutionException e) { + } + catch (ExecutionException e) + { e.getCause().printStackTrace(); throw e; }