|
17 | 17 | */ |
18 | 18 | package org.apache.beam.sdk.io.gcp.bigquery; |
19 | 19 |
|
20 | | -import static org.junit.Assert.assertEquals; |
21 | | - |
22 | 20 | import com.google.api.services.bigquery.model.TableFieldSchema; |
23 | 21 | import com.google.api.services.bigquery.model.TableRow; |
24 | 22 | import com.google.api.services.bigquery.model.TableSchema; |
| 23 | +import com.google.cloud.bigquery.storage.v1.DataFormat; |
25 | 24 | import java.security.SecureRandom; |
26 | 25 | import java.util.List; |
27 | 26 | import org.apache.beam.sdk.Pipeline; |
28 | | -import org.apache.beam.sdk.coders.SerializableCoder; |
29 | 27 | import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; |
30 | 28 | import org.apache.beam.sdk.io.gcp.testing.BigqueryClient; |
31 | 29 | import org.apache.beam.sdk.testing.PAssert; |
32 | 30 | import org.apache.beam.sdk.testing.TestPipeline; |
33 | 31 | import org.apache.beam.sdk.transforms.Create; |
34 | | -import org.apache.beam.sdk.transforms.MapElements; |
35 | | -import org.apache.beam.sdk.transforms.SerializableFunction; |
36 | | -import org.apache.beam.sdk.transforms.SimpleFunction; |
37 | 32 | import org.apache.beam.sdk.values.PCollection; |
38 | 33 | import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; |
39 | 34 | import org.junit.AfterClass; |
|
43 | 38 | import org.junit.runners.JUnit4; |
44 | 39 |
|
45 | 40 | /** |
46 | | - * Integration tests for BigQuery TIMESTAMP with picosecond precision (precision=12). |
47 | | - * Tests write using Storage Write API and read back using readTableRows. |
| 41 | + * Integration tests for BigQuery TIMESTAMP with picosecond precision (precision=12). Tests write |
| 42 | + * using Storage Write API and read back using readTableRows. |
48 | 43 | */ |
49 | 44 | @RunWith(JUnit4.class) |
50 | 45 | public class BigQueryTimestampPicosIT { |
51 | 46 |
|
52 | 47 | private static String project; |
| 48 | + // private static final String DATASET_ID = "cvandermerwe_regional_test"; |
53 | 49 | private static final String DATASET_ID = |
54 | | - "bq_timestamp_picos_it_" |
55 | | - + System.currentTimeMillis() |
56 | | - + "_" |
57 | | - + new SecureRandom().nextInt(32); |
| 50 | + "bq_timestamp_picos_it_" + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32); |
58 | 51 |
|
59 | 52 | private static TestBigQueryOptions bqOptions; |
60 | 53 | private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryTimestampPicosIT"); |
61 | 54 |
|
| 55 | + private static final List<TableRow> ISO_PICOS_TABLEROWS = |
| 56 | + ImmutableList.of( |
| 57 | + new TableRow().set("ts_picos", "2024-01-15T10:30:45.123456789012Z"), |
| 58 | + new TableRow().set("ts_picos", "2024-01-15T10:30:45.000000000001Z"), |
| 59 | + new TableRow().set("ts_picos", "0001-01-01T10:30:45.999999999999Z"), |
| 60 | + new TableRow().set("ts_picos", "1970-01-01T00:00:00.000000000001Z"), |
| 61 | + new TableRow().set("ts_picos", "9999-12-31T23:59:59.999999999999Z")); |
| 62 | + |
| 63 | + private static final List<TableRow> UTC_NANOS_TABLEROWS = |
| 64 | + ImmutableList.of( |
| 65 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.123456789 UTC"), |
| 66 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.011111111 UTC"), |
| 67 | + new TableRow().set("ts_picos", "2262-04-11 23:47:16.854775807 UTC"), |
| 68 | + new TableRow().set("ts_picos", "1970-01-01 00:00:00.001111111 UTC"), |
| 69 | + new TableRow().set("ts_picos", "1677-09-21 00:12:43.145224192 UTC")); |
| 70 | + |
| 71 | + private static final List<TableRow> UTC_MICROS_TABLEROWS = |
| 72 | + ImmutableList.of( |
| 73 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.123456 UTC"), |
| 74 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.011111 UTC"), |
| 75 | + new TableRow().set("ts_picos", "0001-01-01 10:30:45.999999 UTC"), |
| 76 | + new TableRow().set("ts_picos", "1970-01-01 00:00:00.001111 UTC"), |
| 77 | + new TableRow().set("ts_picos", "9999-12-31 23:59:59.999999 UTC")); |
| 78 | + |
| 79 | + private static final List<TableRow> UTC_MILLIS_TABLEROWS = |
| 80 | + ImmutableList.of( |
| 81 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.123 UTC"), |
| 82 | + new TableRow().set("ts_picos", "2024-01-15 10:30:45.011 UTC"), |
| 83 | + new TableRow().set("ts_picos", "0001-01-01 10:30:45.999 UTC"), |
| 84 | + new TableRow().set("ts_picos", "1970-01-01 00:00:00.001 UTC"), |
| 85 | + new TableRow().set("ts_picos", "9999-12-31 23:59:59.999 UTC")); |
| 86 | + |
62 | 87 | @BeforeClass |
63 | 88 | public static void setup() throws Exception { |
64 | 89 | bqOptions = TestPipeline.testingPipelineOptions().as(TestBigQueryOptions.class); |
65 | 90 | project = bqOptions.as(GcpOptions.class).getProject(); |
66 | 91 | // Create dataset for all test cases |
67 | | - BQ_CLIENT.createNewDataset(project, DATASET_ID, null, bqOptions.getBigQueryLocation()); |
| 92 | + BQ_CLIENT.createNewDataset(project, DATASET_ID, null, "us-central1"); |
68 | 93 | } |
69 | 94 |
|
70 | 95 | @AfterClass |
71 | 96 | public static void cleanup() { |
72 | 97 | BQ_CLIENT.deleteDataset(project, DATASET_ID); |
73 | 98 | } |
74 | 99 |
|
75 | | - private void configureStorageWriteApi() { |
76 | | - bqOptions.setUseStorageWriteApi(true); |
77 | | - bqOptions.setNumStorageWriteApiStreams(1); |
78 | | - bqOptions.setStorageWriteApiTriggeringFrequencySec(1); |
79 | | - } |
80 | | - |
81 | 100 | /** Schema with a TIMESTAMP field having picosecond precision (12 fractional digits). */ |
82 | 101 | private TableSchema timestampPicosSchema() { |
83 | 102 | return new TableSchema() |
84 | | - .setFields( |
85 | | - ImmutableList.of( |
86 | | - new TableFieldSchema().setName("id").setType("INTEGER"), |
87 | | - new TableFieldSchema() |
88 | | - .setName("ts_picos") |
89 | | - .setType("TIMESTAMP") |
90 | | - .setPrecision(12L))); // Picosecond precision |
| 103 | + .setFields( |
| 104 | + ImmutableList.of( |
| 105 | + new TableFieldSchema() |
| 106 | + .setName("ts_picos") |
| 107 | + .setType("TIMESTAMP") |
| 108 | + .setTimestampPrecision(12L))); // Picosecond precision |
91 | 109 | } |
92 | 110 |
|
93 | | - @Test |
94 | | - public void testWriteAndReadTimestampPicos() throws Exception { |
95 | | - configureStorageWriteApi(); |
96 | | - String tableName = "timestamp_picos_" + System.currentTimeMillis(); |
97 | | - String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, tableName); |
| 111 | + private void runTimestampTest( |
| 112 | + TimestampPrecision precision, |
| 113 | + DataFormat format, |
| 114 | + List<TableRow> inputRows, |
| 115 | + List<TableRow> expectedRows) { |
98 | 116 |
|
99 | | - // Test data: timestamps with 12 fractional digits |
100 | | - List<TableRow> inputRows = |
101 | | - ImmutableList.of( |
102 | | - new TableRow().set("id", 1).set("ts_picos", "2024-01-15T10:30:45.123456789012Z"), |
103 | | - new TableRow().set("id", 2).set("ts_picos", "2024-01-15T10:30:45.000000000001Z"), |
104 | | - new TableRow().set("id", 3).set("ts_picos", "2024-01-15T10:30:45.999999999999Z"), |
105 | | - new TableRow().set("id", 4).set("ts_picos", "1970-01-01T00:00:00.000000000001Z"), |
106 | | - new TableRow().set("id", 5).set("ts_picos", "9999-12-31T23:59:59.999999999999Z")); |
| 117 | + String tableName = String.format("ts_%s_%s_%d", precision, format, System.currentTimeMillis()); |
| 118 | + String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, tableName); |
107 | 119 |
|
108 | | - // ========== WRITE PIPELINE ========== |
| 120 | + // Write (Always writes the full PICOS precision) |
109 | 121 | Pipeline writePipeline = Pipeline.create(bqOptions); |
110 | 122 | writePipeline |
111 | | - .apply("CreateInput", Create.of(inputRows)) |
112 | | - .apply( |
113 | | - "WriteToBQ", |
114 | | - BigQueryIO.writeTableRows() |
115 | | - .to(tableSpec) |
116 | | - .withSchema(timestampPicosSchema()) |
117 | | - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) |
118 | | - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); |
| 123 | + .apply("CreateInput", Create.of(inputRows)) |
| 124 | + .apply( |
| 125 | + "WriteToBQ", |
| 126 | + BigQueryIO.writeTableRows() |
| 127 | + .to(tableSpec) |
| 128 | + .withSchema(timestampPicosSchema()) |
| 129 | + .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API) |
| 130 | + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) |
| 131 | + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); |
119 | 132 | writePipeline.run().waitUntilFinish(); |
120 | 133 |
|
121 | | - // ========== READ PIPELINE ========== |
| 134 | + // Read & Verify |
122 | 135 | Pipeline readPipeline = Pipeline.create(bqOptions); |
123 | | - PCollection<TableRow> readRows = |
124 | | - readPipeline.apply( |
125 | | - "ReadFromBQ", |
126 | | - BigQueryIO.readTableRows() |
127 | | - .from(tableSpec)); |
128 | | - |
129 | | - // Extract timestamp values and verify |
130 | | - PCollection<String> timestamps = |
131 | | - readRows.apply( |
132 | | - "ExtractTimestamps", |
133 | | - MapElements.via( |
134 | | - new SimpleFunction<TableRow, String>() { |
135 | | - @Override |
136 | | - public String apply(TableRow row) { |
137 | | - // BigQuery returns timestamps - extract and format |
138 | | - return row.get("id") + ":" + row.get("ts_picos"); |
139 | | - } |
140 | | - })); |
141 | | - |
142 | | - // Verify all rows were written and read correctly |
143 | | - PAssert.that(timestamps) |
144 | | - .satisfies( |
145 | | - (SerializableFunction<Iterable<String>, Void>) |
146 | | - rows -> { |
147 | | - int count = 0; |
148 | | - for (String row : rows) { |
149 | | - count++; |
150 | | - } |
151 | | - assertEquals("Expected 5 rows", 5, count); |
152 | | - return null; |
153 | | - }); |
| 136 | + |
| 137 | + PCollection<TableRow> readTableRows = |
| 138 | + readPipeline.apply( |
| 139 | + "ReadTableRows", |
| 140 | + BigQueryIO.readTableRows() |
| 141 | + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) |
| 142 | + .withFormat(format) |
| 143 | + .withDirectReadPicosTimestampPrecision(precision) |
| 144 | + .from(tableSpec)); |
| 145 | + PCollection<TableRow> readTableRowsWithSchema = |
| 146 | + readPipeline.apply( |
| 147 | + "ReadTableRowsWithSchema", |
| 148 | + BigQueryIO.readTableRows() |
| 149 | + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) |
| 150 | + .withFormat(format) |
| 151 | + .withDirectReadPicosTimestampPrecision(precision) |
| 152 | + .from(tableSpec)); |
| 153 | + PCollection<TableRow> readTableRowsWithQuery = |
| 154 | + readPipeline.apply( |
| 155 | + "ReadTableRowsWithFromQuery", |
| 156 | + BigQueryIO.readTableRows() |
| 157 | + .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ) |
| 158 | + .fromQuery(String.format("SELECT * FROM %s.%s.%s", project, DATASET_ID, tableName)) |
| 159 | + .usingStandardSql() |
| 160 | + .withFormat(format) |
| 161 | + .withDirectReadPicosTimestampPrecision(precision)); |
| 162 | + |
| 163 | + PAssert.that(readTableRows).containsInAnyOrder(expectedRows); |
| 164 | + PAssert.that(readTableRowsWithSchema).containsInAnyOrder(expectedRows); |
| 165 | + PAssert.that(readTableRowsWithQuery).containsInAnyOrder(expectedRows); |
154 | 166 |
|
155 | 167 | readPipeline.run().waitUntilFinish(); |
156 | 168 | } |
157 | 169 |
|
158 | 170 | @Test |
159 | | - public void testWriteAndReadTimestampPicos_roundTrip() throws Exception { |
160 | | - configureStorageWriteApi(); |
161 | | - String tableName = "timestamp_picos_roundtrip_" + System.currentTimeMillis(); |
162 | | - String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, tableName); |
163 | | - |
164 | | - // Single test value to verify exact round-trip |
165 | | - String inputTimestamp = "2024-06-15T12:34:56.123456789012Z"; |
166 | | - TableRow inputRow = new TableRow().set("id", 1).set("ts_picos", inputTimestamp); |
167 | | - |
168 | | - // ========== WRITE ========== |
169 | | - Pipeline writePipeline = Pipeline.create(bqOptions); |
170 | | - writePipeline |
171 | | - .apply("CreateInput", Create.of(ImmutableList.of(inputRow))) |
172 | | - .apply( |
173 | | - "WriteToBQ", |
174 | | - BigQueryIO.writeTableRows() |
175 | | - .to(tableSpec) |
176 | | - .withSchema(timestampPicosSchema()) |
177 | | - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) |
178 | | - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); |
179 | | - writePipeline.run().waitUntilFinish(); |
180 | | - |
181 | | - // ========== READ AND VERIFY ========== |
182 | | - Pipeline readPipeline = Pipeline.create(bqOptions); |
183 | | - PCollection<TableRow> readRows = |
184 | | - readPipeline.apply("ReadFromBQ", BigQueryIO.readTableRows().from(tableSpec)); |
185 | | - |
186 | | - // Verify the timestamp matches exactly |
187 | | - PAssert.thatSingleton( |
188 | | - readRows.apply( |
189 | | - MapElements.via( |
190 | | - new SimpleFunction<TableRow, String>() { |
191 | | - @Override |
192 | | - public String apply(TableRow row) { |
193 | | - // The timestamp should be returned in ISO format |
194 | | - return (String) row.get("ts_picos"); |
195 | | - } |
196 | | - }))) |
197 | | - .isEqualTo(inputTimestamp); |
198 | | - |
199 | | - readPipeline.run().waitUntilFinish(); |
| 171 | + public void testPicos_Avro_roundTrip() { |
| 172 | + // Expect exact match of input (12 digits) |
| 173 | + runTimestampTest( |
| 174 | + TimestampPrecision.PICOS, DataFormat.AVRO, ISO_PICOS_TABLEROWS, ISO_PICOS_TABLEROWS); |
200 | 175 | } |
201 | 176 |
|
202 | 177 | @Test |
203 | | - public void testWriteTimestampPicos_utcFormat() throws Exception { |
204 | | - configureStorageWriteApi(); |
205 | | - String tableName = "timestamp_picos_utc_" + System.currentTimeMillis(); |
206 | | - String tableSpec = String.format("%s:%s.%s", project, DATASET_ID, tableName); |
| 178 | + public void testPicos_Arrow_roundTrip() { |
| 179 | + runTimestampTest( |
| 180 | + TimestampPrecision.PICOS, DataFormat.ARROW, ISO_PICOS_TABLEROWS, ISO_PICOS_TABLEROWS); |
| 181 | + } |
207 | 182 |
|
208 | | - // Test with UTC format (space separator, "UTC" suffix) - should still work for nano precision |
209 | | - List<TableRow> inputRows = |
210 | | - ImmutableList.of( |
211 | | - new TableRow().set("id", 1).set("ts_picos", "2024-01-15 10:30:45.123456789 UTC")); |
| 183 | + @Test |
| 184 | + public void testNanos_Avro_roundTrip() { |
| 185 | + runTimestampTest( |
| 186 | + TimestampPrecision.NANOS, DataFormat.AVRO, UTC_NANOS_TABLEROWS, UTC_NANOS_TABLEROWS); |
| 187 | + } |
212 | 188 |
|
213 | | - Pipeline writePipeline = Pipeline.create(bqOptions); |
214 | | - writePipeline |
215 | | - .apply("CreateInput", Create.of(inputRows)) |
216 | | - .apply( |
217 | | - "WriteToBQ", |
218 | | - BigQueryIO.writeTableRows() |
219 | | - .to(tableSpec) |
220 | | - .withSchema(timestampPicosSchema()) |
221 | | - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) |
222 | | - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); |
223 | | - writePipeline.run().waitUntilFinish(); |
| 189 | + @Test |
| 190 | + public void testNanos_Arrow_roundTrip() { |
| 191 | + runTimestampTest( |
| 192 | + TimestampPrecision.NANOS, DataFormat.ARROW, UTC_NANOS_TABLEROWS, UTC_NANOS_TABLEROWS); |
| 193 | + } |
224 | 194 |
|
225 | | - // Verify by reading back |
226 | | - Pipeline readPipeline = Pipeline.create(bqOptions); |
227 | | - PCollection<TableRow> readRows = |
228 | | - readPipeline.apply("ReadFromBQ", BigQueryIO.readTableRows().from(tableSpec)); |
229 | | - |
230 | | - PAssert.thatSingleton(readRows) |
231 | | - .satisfies( |
232 | | - row -> { |
233 | | - assertEquals(1L, ((Number) row.get("id")).longValue()); |
234 | | - // Timestamp should be present (format may vary) |
235 | | - String ts = (String) row.get("ts_picos"); |
236 | | - assert ts != null && !ts.isEmpty(); |
237 | | - return null; |
238 | | - }); |
| 195 | + @Test |
| 196 | + public void testMicros_Avro_roundTrip() { |
| 197 | + runTimestampTest( |
| 198 | + TimestampPrecision.MICROS, DataFormat.AVRO, UTC_MICROS_TABLEROWS, UTC_MICROS_TABLEROWS); |
| 199 | + } |
239 | 200 |
|
240 | | - readPipeline.run().waitUntilFinish(); |
| 201 | + @Test |
| 202 | + public void testMicros_Arrow_roundTrip() { |
| 203 | + runTimestampTest( |
| 204 | + TimestampPrecision.MICROS, DataFormat.ARROW, UTC_MICROS_TABLEROWS, UTC_MILLIS_TABLEROWS); |
241 | 205 | } |
242 | 206 | } |
0 commit comments