|
1 | 1 | import { createCommand } from "../../../core/create-command"; |
2 | | -import { UserError } from "../../../errors"; |
| 2 | +import { CommandLineArgsError, UserError } from "../../../errors"; |
3 | 3 | import { logger } from "../../../logger"; |
4 | 4 | import { bucketFormatMessage, isValidR2BucketName } from "../../../r2/helpers"; |
5 | 5 | import { requireAuth } from "../../../user"; |
@@ -67,17 +67,14 @@ export const pipelinesSinksCreateCommand = createCommand({ |
67 | 67 | type: "string", |
68 | 68 | }, |
69 | 69 | "roll-size": { |
70 | | - describe: "Roll file when size reaches (e.g., 100MB, 1GB)", |
71 | | - type: "string", |
72 | | - default: |
73 | | - SINK_DEFAULTS.rolling_policy.file_size_bytes === 0 |
74 | | - ? undefined |
75 | | - : `${SINK_DEFAULTS.rolling_policy.file_size_bytes}`, |
| 70 | + describe: "Roll file size in MB", |
| 71 | + type: "number", |
| 72 | + default: SINK_DEFAULTS.rolling_policy.file_size_bytes / (1024 * 1024), |
76 | 73 | }, |
77 | 74 | "roll-interval": { |
78 | | - describe: "Roll file when time reaches (e.g., 5m, 1h)", |
79 | | - type: "string", |
80 | | - default: `${SINK_DEFAULTS.rolling_policy.interval_seconds}s`, |
| 75 | + describe: "Roll file interval in seconds", |
| 76 | + type: "number", |
| 77 | + default: SINK_DEFAULTS.rolling_policy.interval_seconds, |
81 | 78 | }, |
82 | 79 | "access-key-id": { |
83 | 80 | describe: |
@@ -105,32 +102,37 @@ export const pipelinesSinksCreateCommand = createCommand({ |
105 | 102 | type: "string", |
106 | 103 | }, |
107 | 104 | }, |
108 | | - async handler(args, { config }) { |
109 | | - const accountId = await requireAuth(config); |
110 | | - const sinkName = args.sink; |
| 105 | + validateArgs: (args) => { |
111 | 106 | const sinkType = parseSinkType(args.type); |
112 | 107 |
|
113 | 108 | if (!isValidR2BucketName(args.bucket)) { |
114 | | - throw new UserError( |
| 109 | + throw new CommandLineArgsError( |
115 | 110 | `The bucket name "${args.bucket}" is invalid. ${bucketFormatMessage}` |
116 | 111 | ); |
117 | 112 | } |
118 | 113 |
|
119 | 114 | if (sinkType === "r2_data_catalog") { |
120 | 115 | if (!args.namespace) { |
121 | | - throw new UserError( |
| 116 | + throw new CommandLineArgsError( |
122 | 117 | "--namespace is required for r2-data-catalog sinks" |
123 | 118 | ); |
124 | 119 | } |
125 | 120 | if (!args.table) { |
126 | | - throw new UserError("--table is required for r2-data-catalog sinks"); |
| 121 | + throw new CommandLineArgsError( |
| 122 | + "--table is required for r2-data-catalog sinks" |
| 123 | + ); |
127 | 124 | } |
128 | 125 | if (!args.catalogToken) { |
129 | | - throw new UserError( |
| 126 | + throw new CommandLineArgsError( |
130 | 127 | "--catalog-token is required for r2-data-catalog sinks" |
131 | 128 | ); |
132 | 129 | } |
133 | 130 | } |
| 131 | + }, |
| 132 | + async handler(args, { config }) { |
| 133 | + const accountId = await requireAuth(config); |
| 134 | + const sinkName = args.sink; |
| 135 | + const sinkType = parseSinkType(args.type); |
134 | 136 |
|
135 | 137 | const sinkConfig: CreateSinkRequest = { |
136 | 138 | name: sinkName, |
@@ -182,23 +184,10 @@ export const pipelinesSinksCreateCommand = createCommand({ |
182 | 184 | SINK_DEFAULTS.rolling_policy.interval_seconds; |
183 | 185 |
|
184 | 186 | if (args.rollSize) { |
185 | | - // Parse file size (e.g., "100MB" -> bytes) |
186 | | - const sizeMatch = args.rollSize.match(/^(\d+)(MB|GB)?$/i); |
187 | | - if (sizeMatch) { |
188 | | - const size = parseInt(sizeMatch[1]); |
189 | | - const unit = sizeMatch[2]?.toUpperCase() || "MB"; |
190 | | - file_size_bytes = |
191 | | - unit === "GB" ? size * 1024 * 1024 * 1024 : size * 1024 * 1024; |
192 | | - } |
| 187 | + file_size_bytes = args.rollSize * 1024 * 1024; |
193 | 188 | } |
194 | 189 | if (args.rollInterval) { |
195 | | - // Parse interval (e.g., "300s" or "5m" -> seconds) |
196 | | - const intervalMatch = args.rollInterval.match(/^(\d+)([sm])?$/i); |
197 | | - if (intervalMatch) { |
198 | | - const interval = parseInt(intervalMatch[1]); |
199 | | - const unit = intervalMatch[2]?.toLowerCase() || "s"; |
200 | | - interval_seconds = unit === "m" ? interval * 60 : interval; |
201 | | - } |
| 190 | + interval_seconds = args.rollInterval; |
202 | 191 | } |
203 | 192 |
|
204 | 193 | sinkConfig.config.rolling_policy = { |
|
0 commit comments