Skip to content

Commit 510dc32

Browse files
committed
[Issue #1082] fix some details
1 parent 70fa63a commit 510dc32

File tree

8 files changed

+120
-54
lines changed

8 files changed

+120
-54
lines changed

pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterImpl.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,12 +344,16 @@ public PixelsWriter build() throws PixelsWriterException
344344
requireNonNull(this.builderStorage, "storage is not set");
345345
requireNonNull(this.builderFilePath, "file path is not set");
346346

347-
if(this.builderPhysicalWriter == null){
348-
try {
347+
if(this.builderPhysicalWriter == null)
348+
{
349+
try
350+
{
349351
this.builderPhysicalWriter = PhysicalWriterUtil.newPhysicalWriter(
350352
this.builderStorage, this.builderFilePath, this.builderBlockSize, this.builderReplication,
351353
this.builderBlockPadding, this.builderOverwrite);
352-
} catch (IOException e) {
354+
}
355+
catch (IOException e)
356+
{
353357
LOGGER.error("Failed to create PhysicalWriter");
354358
throw new PixelsWriterException(
355359
"Failed to create PixelsWriter due to error of creating PhysicalWriter", e);
@@ -362,8 +366,6 @@ public PixelsWriter build() throws PixelsWriterException
362366
}
363367
}
364368

365-
366-
367369
return new PixelsWriterImpl(
368370
builderSchema,
369371
builderHasHiddenColumn,

pixels-core/src/main/java/io/pixelsdb/pixels/core/PixelsWriterStreamImpl.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -314,10 +314,13 @@ public PixelsWriter build() throws PixelsWriterException
314314
PhysicalWriter fsWriter = this.fsWriter;
315315
if (fsWriter == null)
316316
{
317-
try {
317+
try
318+
{
318319
fsWriter = PhysicalWriterUtil.newPhysicalWriter(
319320
this.builderStorage, this.builderFilePath, null);
320-
} catch (IOException e) {
321+
}
322+
catch (IOException e)
323+
{
321324
LOGGER.error("Failed to create PhysicalWriter");
322325
throw new PixelsWriterException(
323326
"Failed to create PixelsWriter due to error of creating PhysicalWriter", e);
@@ -604,7 +607,8 @@ private void writeRowGroup() throws IOException
604607
columnWriters[i] = newColumnWriter(children.get(i), columnWriterOption);
605608
}
606609
physicalWriter.flush();
607-
} catch (IOException e)
610+
}
611+
catch (IOException e)
608612
{
609613
LOGGER.error(e.getMessage());
610614
throw e;

pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public PhysicalS3QSReader(Storage storage, String path) throws IOException
7474
this.buffer = responseBytes.asByteArrayUnsafe();
7575
this.length = this.buffer.length;
7676
this.position = 0;
77-
} catch (Exception e)
77+
}
78+
catch (Exception e)
7879
{
7980
this.position = 0;
8081
throw new IOException("Failed to read object.", e);

pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/PhysicalS3QSWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,9 @@ public void close() throws IOException
123123
{
124124
this.queue.push(this.pathStr);
125125
}
126-
}catch (IOException e){
126+
}
127+
catch (IOException e)
128+
{
127129
throw e;
128130
}
129131
}

pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QS.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException
136136
mesg.getPartitionNum()+"-"+
137137
(String.valueOf(System.currentTimeMillis()))
138138
);
139-
}catch (SqsException e)
139+
}
140+
catch (SqsException e)
140141
{
141142
//TODO: if name is duplicated in aws try again later
142143
throw new IOException(e);
@@ -146,7 +147,9 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException
146147
queue = openQueue(queueUrl);
147148
PartitionSet.add(mesg.getPartitionNum());
148149
PartitionMap.put(mesg.getPartitionNum(), queue);
149-
} else{
150+
}
151+
else
152+
{
150153
throw new IOException("create new queue failed.");
151154
}
152155
}
@@ -159,7 +162,8 @@ public PhysicalWriter offer(S3QueueMessage mesg) throws IOException
159162
}
160163

161164
private static String createQueue(SqsClient sqsClient,int invisibleTime, String queueName) {
162-
try {
165+
try
166+
{
163167

164168
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
165169
.queueName(queueName)
@@ -174,7 +178,9 @@ private static String createQueue(SqsClient sqsClient,int invisibleTime, String
174178
.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build());
175179
return getQueueUrlResponse.queueUrl();
176180

177-
} catch (SqsException e) {
181+
}
182+
catch (SqsException e)
183+
{
178184
throw new RuntimeException("fail to create sqs queue: " + queueName, e);
179185
}
180186
}
@@ -200,18 +206,26 @@ public Map.Entry<String,PhysicalReader> poll(S3QueueMessage mesg, int timeoutSec
200206
//issue: dead message maybe only handle twice, I think that's reasonable
201207
//TODO(OUT-OF-DATE): once a message dead, push it in dead message queue. when delete a message, delete
202208
Map.Entry<String,PhysicalReader> pair = null;
203-
try{
204-
if (queue.getStopInput()) {
209+
try
210+
{
211+
if (queue.getStopInput())
212+
{
205213
pair = queue.poll(queue.getInvisibleTime());
206-
if (pair == null) { // no more message
214+
if (pair == null)
215+
{
216+
// no more message
207217
queue.close(); //logical close, no effect to consumers
208218
return null; //come back later and find queue is closed
209219
}
210-
} else {
220+
}
221+
else
222+
{
211223
pair = queue.poll(timeoutSec);
212224
if (pair == null) return null; //upstream is working, come back later
213225
}
214-
}catch (TaskErrorException e) {
226+
}
227+
catch (TaskErrorException e)
228+
{
215229
//clean up
216230
}
217231

@@ -228,13 +242,17 @@ public int finishWork(S3QueueMessage mesg) throws IOException
228242
S3Queue queue = PartitionMap.get(mesg.getPartitionNum());
229243

230244

231-
if(queue == null) {
245+
if(queue == null)
246+
{
232247
//queue not exist: an error, or a timeout worker
233248
throw new IOException("queue " + mesg.getPartitionNum() + " is closed.");
234249
}
235-
try {
250+
try
251+
{
236252
queue.deleteMessage(receiptHandle);
237-
}catch (SqsException e) {
253+
}
254+
catch (SqsException e)
255+
{
238256
//TODO: log
239257
return 2;
240258
}
@@ -244,14 +262,17 @@ public int finishWork(S3QueueMessage mesg) throws IOException
244262
// thus, we can check which part of task failed in sqs.
245263
if(queue.removeConsumer(mesg.getWorkerNum()) && queue.isClosed())
246264
{
247-
try {
265+
try
266+
{
248267
DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder()
249268
.queueUrl(queue.getQueueUrl())
250269
.build();
251270

252271
sqs.deleteQueue(deleteQueueRequest);
253272

254-
} catch (SqsException e) {
273+
}
274+
catch (SqsException e)
275+
{
255276
// TODO: log
256277
return 1;
257278
}
@@ -314,7 +335,8 @@ public boolean directCopy(String src, String dest)
314335
@Override
315336
public void close() throws IOException
316337
{
317-
for (S3Queue queue : PartitionMap.values()){
338+
for (S3Queue queue : PartitionMap.values())
339+
{
318340
queue.close();
319341
}
320342
this.producerSet.clear();
@@ -332,7 +354,8 @@ public void close() throws IOException
332354

333355
public void refresh() throws IOException
334356
{
335-
for (S3Queue queue : PartitionMap.values()){
357+
for (S3Queue queue : PartitionMap.values())
358+
{
336359
queue.close();
337360
}
338361
this.producerSet.clear();

pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3Queue.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,13 @@ public Map.Entry<String,PhysicalReader> poll(int timeoutSec) throws IOException
183183
this.s3PathQueue.add(new AbstractMap.SimpleEntry<>(path, receiptHandle));
184184

185185
String countStr = message.attributes().get(APPROXIMATE_RECEIVE_COUNT);
186-
if (countStr == null) {
186+
if (countStr == null)
187+
{
187188
// If no value is returned, it may be because the property was not requested or the message does not contain that property.
188189
throw new TaskErrorException("ApproximateReceiveCount not returned");
189-
} else {
190+
}
191+
else
192+
{
190193
int count = Integer.parseInt(countStr);
191194
// because we can only promise two receipts can be handled
192195
if (count > 2) throw new TaskErrorException("Dead message occurred");
@@ -229,7 +232,8 @@ protected void push(String objectPath) throws IOException
229232
SendMessageRequest request = SendMessageRequest.builder()
230233
.queueUrl(queueUrl).messageBody(objectPath).build();
231234
sqsClient.sendMessage(request);
232-
}catch (Exception e)
235+
}
236+
catch (Exception e)
233237
{
234238
throw new IOException("sqs: fail to send message.",e);
235239
}

pixels-storage/pixels-storage-s3qs/src/main/java/io/pixelsdb/pixels/storage/s3qs/S3QueueMessage.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,12 @@ public class S3QueueMessage
1515
private long timestamp = System.currentTimeMillis();
1616
private String metadata = "";
1717

18-
public S3QueueMessage() {
18+
public S3QueueMessage()
19+
{
1920
}
2021

21-
public S3QueueMessage(String objectPath) {
22+
public S3QueueMessage(String objectPath)
23+
{
2224
this.objectPath = objectPath;
2325
this.timestamp = System.currentTimeMillis();
2426
}
@@ -27,7 +29,8 @@ public String getObjectPath() {
2729
return objectPath;
2830
}
2931

30-
public S3QueueMessage setObjectPath(String objectPath) {
32+
public S3QueueMessage setObjectPath(String objectPath)
33+
{
3134
this.objectPath = objectPath;
3235
return this;
3336
}
@@ -36,7 +39,8 @@ public int getWorkerNum() {
3639
return this.workerNum;
3740
}
3841

39-
public S3QueueMessage setWorkerNum(int WorkerNum) {
42+
public S3QueueMessage setWorkerNum(int WorkerNum)
43+
{
4044
this.workerNum = WorkerNum;
4145
return this;
4246
}
@@ -45,7 +49,8 @@ public int getPartitionNum() {
4549
return this.partitionNum;
4650
}
4751

48-
public S3QueueMessage setPartitionNum(int PartitionNum) {
52+
public S3QueueMessage setPartitionNum(int PartitionNum)
53+
{
4954
this.partitionNum = PartitionNum;
5055
return this;
5156
}
@@ -54,7 +59,8 @@ public boolean getEndWork() {
5459
return this.endWork;
5560
}
5661

57-
public S3QueueMessage setEndwork(boolean endwork) {
62+
public S3QueueMessage setEndwork(boolean endwork)
63+
{
5864
this.endWork = endwork;
5965
return this;
6066
}
@@ -63,7 +69,8 @@ public String getReceiptHandle() {
6369
return this.receiptHandle;
6470
}
6571

66-
public S3QueueMessage setReceiptHandle(String ReceiptHandle) {
72+
public S3QueueMessage setReceiptHandle(String ReceiptHandle)
73+
{
6774
this.receiptHandle = ReceiptHandle;
6875
return this;
6976
}
@@ -72,7 +79,8 @@ public long getTimestamp() {
7279
return timestamp;
7380
}
7481

75-
public S3QueueMessage setTimestamp(long timestamp) {
82+
public S3QueueMessage setTimestamp(long timestamp)
83+
{
7684
this.timestamp = timestamp;
7785
return this;
7886
}
@@ -81,7 +89,8 @@ public String getMetadata() {
8189
return metadata;
8290
}
8391

84-
public S3QueueMessage setMetadata(String metadata) {
92+
public S3QueueMessage setMetadata(String metadata)
93+
{
8594
this.metadata = metadata;
8695
return this;
8796
}

0 commit comments

Comments
 (0)