Skip to content

Commit 5e59650

Browse files
committed
Skeleton for parallel commit
1 parent 8581348 commit 5e59650

File tree

3 files changed

+78
-2
lines changed

3 files changed

+78
-2
lines changed

pg_lake_engine/include/pg_lake/util/s3_writer_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@
2020
extern PGDLLEXPORT void CopyLocalFileToS3(char *localFilePath, char *s3Uri);
2121
extern PGDLLEXPORT void CopyLocalFileToS3WithCleanupOnAbort(char *localFilePath, char *s3Uri);
2222
extern PGDLLEXPORT void CopyLocalManifestFileToS3WithCleanupOnAbort(char *localFilePath, char *s3Uri);
23+
extern PGDLLEXPORT void FinishAllUploads(void);

pg_lake_engine/src/utils/s3_writer_utils.c

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,22 @@
2626
#include "pg_lake/util/rel_utils.h"
2727
#include "utils/builtins.h"
2828

29+
30+
/*
31+
* ScheduledUpload represents an upload to be performed by FinishAllUpload.
32+
*/
33+
typedef struct ScheduledUpload
34+
{
35+
char *localFile;
36+
char *remoteUrl;
37+
} ScheduledUpload;
38+
2939
static char *CopyLocalFileToS3Command(char *localFileUri, char *s3Uri);
40+
static void ScheduleFileUpload(char *localFile, char *remoteUrl);
41+
static void ResetPendingUploads(void);
42+
43+
static MemoryContext UploadContext = NULL;
44+
static List *PendingUploads = NIL;
3045

3146

3247
/*
@@ -39,7 +54,7 @@ CopyLocalFileToS3WithCleanupOnAbort(char *localFilePath, char *s3Uri)
3954
{
4055
InsertInProgressFileRecord(s3Uri);
4156

42-
CopyLocalFileToS3(localFilePath, s3Uri);
57+
ScheduleFileUpload(localFilePath, s3Uri);
4358
}
4459

4560
/*
@@ -55,7 +70,7 @@ CopyLocalManifestFileToS3WithCleanupOnAbort(char *localFilePath, char *s3Uri)
5570

5671
InsertInProgressFileRecordExtended(s3Uri, isPrefix, deferDeletion);
5772

58-
CopyLocalFileToS3(localFilePath, s3Uri);
73+
ScheduleFileUpload(localFilePath, s3Uri);
5974
}
6075

6176
/*
@@ -85,3 +100,59 @@ CopyLocalFileToS3Command(char *localFileUri, char *s3Uri)
85100

86101
return command.data;
87102
}
103+
104+
105+
/*
106+
* ScheduleFileUpload schedules a file for being uploaded into object storage
107+
* when FinishAllUploads is called.
108+
*/
109+
static void
110+
ScheduleFileUpload(char *localFile, char *remoteUrl)
111+
{
112+
if (UploadContext == NULL)
113+
{
114+
UploadContext = AllocSetContextCreate(TopTransactionContext,
115+
"Upload scheduler",
116+
ALLOCSET_DEFAULT_SIZES);
117+
118+
/* reset PendingUploads on abort */
119+
MemoryContextCallback *cb = MemoryContextAllocZero(UploadContext,
120+
sizeof(MemoryContextCallback));
121+
122+
cb->func = (MemoryContextCallbackFunction) ResetPendingUploads;
123+
cb->arg = NULL;
124+
MemoryContextRegisterResetCallback(UploadContext, cb);
125+
}
126+
127+
MemoryContext oldContext = MemoryContextSwitchTo(UploadContext);
128+
129+
ScheduledUpload *upload = palloc0(sizeof(ScheduledUpload));
130+
131+
upload->localFile = pstrdup(localFile);
132+
upload->remoteUrl = pstrdup(remoteUrl);
133+
134+
PendingUploads = lappend(PendingUploads, upload);
135+
136+
MemoryContextSwitchTo(oldContext);
137+
}
138+
139+
140+
/*
141+
* ResetPendingUploads prevents PendingUploads containing dangling pointers
142+
* when the memory context is reset.
143+
*/
144+
static void
145+
ResetPendingUploads(void)
146+
{
147+
PendingUploads = NIL;
148+
}
149+
150+
151+
/*
152+
* FinishAllUpload completes all of the pending uploads in parallel.
153+
*/
154+
void
155+
FinishAllUploads(void)
156+
{
157+
158+
}

pg_lake_table/src/transaction/track_iceberg_metadata_changes.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "pg_lake/transaction/transaction_hooks.h"
3838
#include "pg_lake/util/injection_points.h"
3939
#include "pg_lake/json/json_utils.h"
40+
#include "pg_lake/util/s3_writer_utils.h"
4041
#include "pg_lake/util/url_encode.h"
4142

4243
#define ONE_MB (1 * 1024 * 1024)
@@ -772,6 +773,9 @@ ApplyTrackedIcebergMetadataChanges(void)
772773
}
773774
}
774775

776+
/* now write all the metadata files to object storage in parallel */
777+
FinishAllUploads();
778+
775779
INJECTION_POINT_COMPAT("after-apply-iceberg-changes");
776780

777781
ExternalHeavyAssertsOnIcebergMetadataChange();

0 commit comments

Comments
 (0)