Skip to content

Commit 3f06223

Browse files
committed
updated types, made rollover policy more clear, updated path copy, feedback
1 parent b2b63e6 commit 3f06223

File tree

7 files changed

+67
-44
lines changed

7 files changed

+67
-44
lines changed

packages/wrangler/src/__tests__/pipelines.test.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,7 @@ describe("wrangler pipelines", () => {
912912
});
913913

914914
describe("pipelines streams create", () => {
915+
const { setIsTTY } = useMockIsTTY();
915916
function mockCreateStreamRequest(expectedRequest: {
916917
name: string;
917918
hasSchema?: boolean;
@@ -982,6 +983,12 @@ describe("wrangler pipelines", () => {
982983
});
983984

984985
it("should create stream with default settings", async () => {
986+
setIsTTY(true);
987+
mockConfirm({
988+
text: "No schema file provided. Create stream without a schema (unstructured JSON)?",
989+
result: true,
990+
});
991+
985992
const createRequest = mockCreateStreamRequest({
986993
name: "my_stream",
987994
});

packages/wrangler/src/pipelines/cli/setup.ts

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ async function setupStreamConfiguration(
150150

151151
setupConfig.streamConfig = {
152152
name: setupConfig.streamName,
153+
format: { type: "json" as const, ...(!schema && { unstructured: true }) },
153154
http: {
154155
enabled: httpEnabled,
155156
authentication: httpAuth,
@@ -321,9 +322,19 @@ async function loadSchemaFromFile(): Promise<SchemaField[]> {
321322

322323
return parsedSchema.fields;
323324
} catch (error) {
324-
throw new UserError(
325+
logger.error(
325326
`Failed to read schema file: ${error instanceof Error ? error.message : String(error)}`
326327
);
328+
329+
const retry = await confirm("Would you like to try again?", {
330+
defaultValue: true,
331+
});
332+
333+
if (retry) {
334+
return loadSchemaFromFile();
335+
} else {
336+
throw new UserError("Schema file loading cancelled");
337+
}
327338
}
328339
}
329340

@@ -370,9 +381,12 @@ async function setupR2Sink(
370381
);
371382
}
372383

373-
const path = await prompt("File prefix (optional):", {
374-
defaultValue: "",
375-
});
384+
const path = await prompt(
385+
"The base prefix in your bucket where data will be written (optional):",
386+
{
387+
defaultValue: "",
388+
}
389+
);
376390

377391
const timePartitionPattern = await prompt(
378392
"Time partition pattern (optional):",
@@ -391,7 +405,6 @@ async function setupR2Sink(
391405
});
392406

393407
let compression;
394-
let targetRowGroupSize;
395408
if (format === "parquet") {
396409
compression = await select("Compression:", {
397410
choices: [
@@ -401,21 +414,20 @@ async function setupR2Sink(
401414
{ title: "zstd", value: "zstd" },
402415
{ title: "lz4", value: "lz4" },
403416
],
404-
defaultOption: 0,
405-
fallbackOption: 0,
406-
});
407-
408-
targetRowGroupSize = await prompt("Target row group size (MB):", {
409-
defaultValue: "128",
417+
defaultOption: 3,
418+
fallbackOption: 3,
410419
});
411420
}
412421

413-
const fileSizeMB = await prompt("Maximum file size (MB):", {
422+
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
414423
defaultValue: "100",
415424
});
416-
const intervalSeconds = await prompt("Maximum time interval (seconds):", {
417-
defaultValue: "300",
418-
});
425+
const intervalSeconds = await prompt(
426+
"Roll file when time reaches (seconds):",
427+
{
428+
defaultValue: "300",
429+
}
430+
);
419431

420432
const useOAuth = await confirm(
421433
"Automatically generate credentials needed to write to your R2 bucket?",
@@ -457,9 +469,6 @@ async function setupR2Sink(
457469
...(compression && {
458470
compression: compression as ParquetFormat["compression"],
459471
}),
460-
...(targetRowGroupSize && {
461-
row_group_bytes: parseInt(targetRowGroupSize) * 1024 * 1024,
462-
}),
463472
};
464473
}
465474

@@ -506,24 +515,22 @@ async function setupDataCatalogSink(setupConfig: SetupConfig): Promise<void> {
506515
fallbackOption: 0,
507516
});
508517

509-
const targetRowGroupSize = await prompt("Target row group size (MB):", {
510-
defaultValue: "128",
511-
});
512-
513-
const fileSizeMB = await prompt("Maximum file size (MB):", {
518+
const fileSizeMB = await prompt("Roll file when size reaches (MB):", {
514519
defaultValue: "100",
515520
});
516-
const intervalSeconds = await prompt("Maximum time interval (seconds):", {
517-
defaultValue: "300",
518-
});
521+
const intervalSeconds = await prompt(
522+
"Roll file when time reaches (seconds):",
523+
{
524+
defaultValue: "300",
525+
}
526+
);
519527

520528
setupConfig.sinkConfig = {
521529
name: setupConfig.sinkName,
522530
type: "r2_data_catalog",
523531
format: {
524532
type: "parquet",
525533
compression: compression as ParquetFormat["compression"],
526-
row_group_bytes: parseInt(targetRowGroupSize) * 1024 * 1024,
527534
},
528535
config: {
529536
bucket,

packages/wrangler/src/pipelines/cli/sinks/create.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,23 @@ export const pipelinesSinksCreateCommand = createCommand({
5959
type: "string",
6060
},
6161
path: {
62-
describe: "Subpath within the bucket",
62+
describe: "The base prefix in your bucket where data will be written",
6363
type: "string",
6464
},
6565
partitioning: {
6666
describe: "Time partition pattern (r2 sinks only)",
6767
type: "string",
6868
},
69-
"batch-file-size": {
70-
describe: "Maximum file size before rolling (e.g., 100MB, 1GB)",
69+
"roll-size": {
70+
describe: "Roll file when size reaches (e.g., 100MB, 1GB)",
7171
type: "string",
7272
default:
7373
SINK_DEFAULTS.rolling_policy.file_size_bytes === 0
7474
? undefined
7575
: `${SINK_DEFAULTS.rolling_policy.file_size_bytes}`,
7676
},
77-
"batch-interval": {
78-
describe: "Time interval before rolling (e.g., 5m, 1h)",
77+
"roll-interval": {
78+
describe: "Roll file when time reaches (e.g., 5m, 1h)",
7979
type: "string",
8080
default: `${SINK_DEFAULTS.rolling_policy.interval_seconds}s`,
8181
},
@@ -175,25 +175,25 @@ export const pipelinesSinksCreateCommand = createCommand({
175175
};
176176
}
177177

178-
if (args.batchFileSize || args.batchInterval) {
178+
if (args.rollSize || args.rollInterval) {
179179
let file_size_bytes: number =
180180
SINK_DEFAULTS.rolling_policy.file_size_bytes;
181181
let interval_seconds: number =
182182
SINK_DEFAULTS.rolling_policy.interval_seconds;
183183

184-
if (args.batchFileSize) {
184+
if (args.rollSize) {
185185
// Parse file size (e.g., "100MB" -> bytes)
186-
const sizeMatch = args.batchFileSize.match(/^(\d+)(MB|GB)?$/i);
186+
const sizeMatch = args.rollSize.match(/^(\d+)(MB|GB)?$/i);
187187
if (sizeMatch) {
188188
const size = parseInt(sizeMatch[1]);
189189
const unit = sizeMatch[2]?.toUpperCase() || "MB";
190190
file_size_bytes =
191191
unit === "GB" ? size * 1024 * 1024 * 1024 : size * 1024 * 1024;
192192
}
193193
}
194-
if (args.batchInterval) {
194+
if (args.rollInterval) {
195195
// Parse interval (e.g., "300s" or "5m" -> seconds)
196-
const intervalMatch = args.batchInterval.match(/^(\d+)([sm])?$/i);
196+
const intervalMatch = args.rollInterval.match(/^(\d+)([sm])?$/i);
197197
if (intervalMatch) {
198198
const interval = parseInt(intervalMatch[1]);
199199
const unit = intervalMatch[2]?.toLowerCase() || "s";

packages/wrangler/src/pipelines/cli/streams/create.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { readFileSync } from "node:fs";
22
import { createCommand } from "../../../core/create-command";
3+
import { confirm } from "../../../dialogs";
34
import { UserError } from "../../../errors";
45
import { logger } from "../../../logger";
56
import { parseJSON } from "../../../parse";
@@ -70,11 +71,23 @@ export const pipelinesStreamsCreateCommand = createCommand({
7071
`Failed to read or parse schema file '${args.schemaFile}': ${error instanceof Error ? error.message : String(error)}`
7172
);
7273
}
74+
} else {
75+
// No schema file provided - confirm with user
76+
const confirmNoSchema = await confirm(
77+
"No schema file provided. Create stream without a schema (unstructured JSON)?",
78+
{ defaultValue: false }
79+
);
80+
81+
if (!confirmNoSchema) {
82+
throw new UserError(
83+
"Stream creation cancelled. Please provide a schema file using --schema-file"
84+
);
85+
}
7386
}
7487

7588
const streamConfig: CreateStreamRequest = {
7689
name: streamName,
77-
...(!schema && { format: { type: "json" as const, unstructured: true } }),
90+
format: { type: "json" as const, ...(!schema && { unstructured: true }) },
7891
http: {
7992
enabled: args.httpEnabled,
8093
authentication: args.httpAuth,

packages/wrangler/src/pipelines/cli/streams/utils.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,8 @@ function generateSampleValue(field: SchemaField): SampleValue {
130130
case "bool":
131131
return true;
132132
case "int32":
133-
case "u_int32":
134133
return 42;
135134
case "int64":
136-
case "u_int64":
137135
return "9223372036854775807"; // Large numbers as strings to avoid JS precision issues
138136
case "f32":
139137
case "f64":

packages/wrangler/src/pipelines/defaults.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ import type { ParquetFormat, Sink } from "./types";
33
export const SINK_DEFAULTS = {
44
format: {
55
type: "parquet",
6-
compression: "uncompressed",
7-
row_group_bytes: 128 * 1024 * 1024,
6+
compression: "zstd",
7+
row_group_bytes: 1024 * 1024 * 1024,
88
} as ParquetFormat,
99
rolling_policy: {
1010
file_size_bytes: 0,

packages/wrangler/src/pipelines/types.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ export type SchemaField = {
137137
| "bool"
138138
| "int32"
139139
| "int64"
140-
| "u_int32"
141-
| "u_int64"
142140
| "f32"
143141
| "f64"
144142
| "string"

0 commit comments

Comments
 (0)