Skip to content

Commit ca88ad5

Browse files
committed
Comments.
1 parent 640ccf6 commit ca88ad5

File tree

4 files changed

+66
-70
lines changed

4 files changed

+66
-70
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public static Mutation createMutationFromBeamRows(
104104
return mutationBuilder.build();
105105
}
106106

107+
private static Timestamp toSpannerTimestamp(Instant instant) {
108+
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
109+
return Timestamp.ofTimeMicroseconds(micros);
110+
}
111+
107112
private static void setBeamValueToKey(
108113
Key.Builder keyBuilder, Schema.FieldType field, String columnName, Row row) {
109114
switch (field.getTypeName()) {
@@ -157,8 +162,7 @@ private static void setBeamValueToKey(
157162
if (instant == null) {
158163
keyBuilder.append((Timestamp) null);
159164
} else {
160-
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
161-
keyBuilder.append(Timestamp.ofTimeMicroseconds(micros));
165+
keyBuilder.append(toSpannerTimestamp(instant));
162166
}
163167
} else {
164168
throw new IllegalArgumentException(
@@ -250,8 +254,7 @@ private static void setBeamValueToMutation(
250254
if (instant == null) {
251255
mutationBuilder.set(columnName).to((Timestamp) null);
252256
} else {
253-
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1_000L;
254-
mutationBuilder.set(columnName).to(Timestamp.ofTimeMicroseconds(micros));
257+
mutationBuilder.set(columnName).to(toSpannerTimestamp(instant));
255258
}
256259
} else {
257260
throw new IllegalArgumentException(
@@ -380,11 +383,7 @@ private static void addIterableToMutationBuilder(
380383
StreamSupport.stream(iterable.spliterator(), false)
381384
.map(
382385
instant -> {
383-
Instant javaInstant = (java.time.Instant) instant;
384-
long micros =
385-
javaInstant.getEpochSecond() * 1_000_000L
386-
+ javaInstant.getNano() / 1_000L;
387-
return Timestamp.ofTimeMicroseconds(micros);
386+
return toSpannerTimestamp((java.time.Instant) instant);
388387
})
389388
.collect(toList()));
390389
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,11 @@ private static void addIterableToStructBuilder(
353353
}
354354
}
355355

356+
private static java.time.Instant fromSpannerTimestamp(Timestamp spannerTimestamp) {
357+
long micros = spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
358+
return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
359+
}
360+
356361
private static @Nullable Object getStructValue(Struct struct, Schema.Field field) {
357362
String column = field.getName();
358363
Type.Code typeCode = struct.getColumnType(column).getCode();
@@ -373,11 +378,7 @@ private static void addIterableToStructBuilder(
373378
if (fieldType.getTypeName().isLogicalType()) {
374379
Schema.@Nullable LogicalType<?, ?> logicalType = fieldType.getLogicalType();
375380
if (logicalType != null && logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
376-
// Convert to java.time.Instant with microsecond precision
377-
long micros =
378-
spannerTimestamp.getSeconds() * 1_000_000L + spannerTimestamp.getNanos() / 1_000L;
379-
return java.time.Instant.ofEpochSecond(
380-
micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
381+
return fromSpannerTimestamp(spannerTimestamp);
381382
}
382383
}
383384
// Default DATETIME behavior: convert to Joda DateTime
@@ -434,10 +435,7 @@ private static void addIterableToStructBuilder(
434435
return struct.getTimestampList(column).stream()
435436
.map(
436437
timestamp -> {
437-
long micros =
438-
timestamp.getSeconds() * 1_000_000L + timestamp.getNanos() / 1_000L;
439-
return java.time.Instant.ofEpochSecond(
440-
micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
438+
return fromSpannerTimestamp(timestamp);
441439
})
442440
.collect(toList());
443441
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/StructUtilsTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@
4747
public class StructUtilsTest {
4848
private static final Schema EMPTY_SCHEMA = Schema.builder().build();
4949
private static final Schema INT64_SCHEMA = Schema.builder().addInt64Field("int64").build();
50+
private static final Timestamp TIMESTAMP = Timestamp.ofTimeMicroseconds(1234567890123456L);
51+
private static final Instant INSTANT =
52+
Instant.ofEpochSecond(
53+
1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 1_000L);
5054

5155
@Test
5256
public void testStructToBeamRow() {
@@ -299,30 +303,26 @@ public void testStructToBeamRowWithMicrosInstant() {
299303
Schema.FieldType.array(Schema.FieldType.logicalType(new MicrosInstant())))
300304
.build();
301305

302-
Timestamp ts = Timestamp.ofTimeMicroseconds(1234567890123456L);
303306
Struct struct =
304307
Struct.newBuilder()
305308
.set("f_int64")
306309
.to(42L)
307310
.set("f_micros_instant")
308-
.to(ts)
311+
.to(TIMESTAMP)
309312
.set("f_micros_instant_array")
310-
.toTimestampArray(ImmutableList.of(ts, ts))
313+
.toTimestampArray(ImmutableList.of(TIMESTAMP, TIMESTAMP))
311314
.build();
312315

313316
Row result = StructUtils.structToBeamRow(struct, schema);
314317

315318
assertEquals(42L, result.getInt64("f_int64").longValue());
316319

317-
Instant expectedInstant =
318-
Instant.ofEpochSecond(
319-
1234567890123456L / 1_000_000L, (1234567890123456L % 1_000_000L) * 1_000L);
320-
assertEquals(expectedInstant, result.getValue("f_micros_instant"));
320+
assertEquals(INSTANT, result.getValue("f_micros_instant"));
321321

322322
@SuppressWarnings("unchecked")
323323
List<Instant> instants = (List<Instant>) result.getValue("f_micros_instant_array");
324324
assertEquals(2, instants.size());
325-
assertEquals(expectedInstant, instants.get(0));
325+
assertEquals(INSTANT, instants.get(0));
326326
}
327327

328328
private StructType.Field getFieldForTypeCode(String name, TypeCode typeCode) {

sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
DockerContainer = None
5454
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
5555

56+
TIMESTAMPS = [Timestamp.of(1234567890.0 + i) for i in range(1000)]
57+
5658

5759
class SpannerTestKey(NamedTuple):
5860
f_string: str
@@ -72,10 +74,10 @@ class SpannerPartTestRow(NamedTuple):
7274

7375

7476
@pytest.mark.uses_gcp_java_expansion_service
75-
@unittest.skipUnless(
76-
os.environ.get('EXPANSION_JARS'),
77-
"EXPANSION_JARS environment var is not provided, "
78-
"indicating that jars have not been built")
77+
# @unittest.skipUnless(
78+
# os.environ.get('EXPANSION_JARS'),
79+
# "EXPANSION_JARS environment var is not provided, "
80+
# "indicating that jars have not been built")
7981
@unittest.skipIf(spanner is None, 'GCP dependencies are not installed.')
8082
@unittest.skipIf(
8183
DockerContainer is None, 'testcontainers package is not installed.')
@@ -129,15 +131,15 @@ def tearDown(self):
129131
def test_spanner_insert_or_update(self):
130132
self.spanner_helper.insert_values(
131133
self.database_id,
132-
[('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
133-
('or_update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
134+
[('or_update0', 5, False, TIMESTAMPS[1].to_rfc3339()),
135+
('or_update1', 9, False, TIMESTAMPS[0].to_rfc3339())])
134136

135137
def to_row_fn(i):
136138
return SpannerTestRow(
137139
f_int64=i,
138140
f_string=f'or_update{i}',
139141
f_boolean=i % 2 == 0,
140-
f_timestamp=Timestamp.of(1234567890.0 + i))
142+
f_timestamp=TIMESTAMPS[i])
141143

142144
self.run_write_pipeline(3, to_row_fn, SpannerTestRow, SpannerInsertOrUpdate)
143145

@@ -148,15 +150,15 @@ def to_row_fn(i):
148150
self.assertEqual(row[0], f'or_update{i}')
149151
self.assertEqual(row[1], i)
150152
self.assertEqual(row[2], i % 2 == 0)
151-
self.assertIsNotNone(row[3]) # timestamp field
153+
self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
152154

153155
def test_spanner_insert(self):
154156
def to_row_fn(num):
155157
return SpannerTestRow(
156158
f_string=f'insert{num}',
157159
f_int64=num,
158160
f_boolean=None,
159-
f_timestamp=Timestamp.of(1234567890.0 + num))
161+
f_timestamp=TIMESTAMPS[num])
160162

161163
self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
162164

@@ -172,71 +174,68 @@ def compare_row(row):
172174
self.assertEqual(row[0], f'insert{i}')
173175
self.assertEqual(row[1], i)
174176
self.assertIsNone(row[2])
175-
self.assertIsNotNone(row[3])
177+
self.assertEqual(row[3].timestamp_pb(), TIMESTAMPS[i].to_proto())
176178

177179
def test_spanner_replace(self):
178180
self.spanner_helper.insert_values(
179181
self.database_id,
180-
[('replace0', 0, True, Timestamp.of(1234567890.0).to_rfc3339()),
181-
('replace1', 1, False, Timestamp.of(1234567891.0).to_rfc3339())])
182+
[('replace0', 0, True, TIMESTAMPS[10].to_rfc3339()),
183+
('replace1', 1, False, TIMESTAMPS[11].to_rfc3339())])
182184

183185
def to_row_fn(num):
184186
return SpannerPartTestRow(
185187
f_string=f'replace{num}',
186188
f_int64=num + 10,
187-
f_timestamp=Timestamp.of(1234567900.0 + num))
189+
f_timestamp=TIMESTAMPS[num])
188190

189191
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
190-
191192
results = self.spanner_helper.read_data(self.database_id, prefix='replace')
192-
self.assertEqual(len(results), 2)
193-
# In REPLACE, boolean should be NULL but timestamp should be updated
194-
self.assertEqual(results[0][0], 'replace0')
195-
self.assertEqual(results[0][1], 10)
196-
self.assertIsNone(results[0][2]) # boolean replaced with NULL
197-
self.assertIsNotNone(results[0][3]) # timestamp updated
193+
for i in range(len(results)):
194+
results[i][3] = results[i][3].timestamp_pb()
195+
self.assertEqual(
196+
results,
197+
[['replace0', 10, None, TIMESTAMPS[0].to_proto()],
198+
['replace1', 11, None, TIMESTAMPS[1].to_proto()]])
198199

199200
def test_spanner_update(self):
200201
self.spanner_helper.insert_values(
201202
self.database_id,
202-
[('update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
203-
('update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
203+
[('update0', 5, False, TIMESTAMPS[10].to_rfc3339()),
204+
('update1', 9, False, TIMESTAMPS[100].to_rfc3339())])
204205

205206
def to_row_fn(num):
206207
return SpannerPartTestRow(
207208
f_string=f'update{num}',
208209
f_int64=num + 10,
209-
f_timestamp=Timestamp.of(1234567900.0 + num))
210+
f_timestamp=TIMESTAMPS[num])
210211

211212
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate)
212-
213213
results = self.spanner_helper.read_data(self.database_id, 'update')
214-
self.assertEqual(len(results), 2)
215-
# In UPDATE, boolean preserved but timestamp updated
216-
self.assertEqual(results[0][1], 10)
217-
self.assertEqual(results[0][2], False) # boolean preserved
218-
self.assertIsNotNone(results[0][3]) # timestamp updated
214+
for i in range(len(results)):
215+
results[i][3] = results[i][3].timestamp_pb()
216+
self.assertEqual(
217+
results,
218+
[['update0', 10, False, TIMESTAMPS[0].to_proto()],
219+
['update1', 11, False, TIMESTAMPS[1].to_proto()]])
219220

220221
def test_spanner_delete(self):
221222
self.spanner_helper.insert_values(
222223
self.database_id,
223224
values=[
224-
('delete0', 0, None, Timestamp.of(1234567890.0).to_rfc3339()),
225-
('delete6', 6, False, Timestamp.of(1234567896.0).to_rfc3339()),
226-
('delete20', 20, True, Timestamp.of(1234567910.0).to_rfc3339()),
225+
('delete0', 0, None, TIMESTAMPS[0].to_rfc3339()),
226+
('delete6', 6, False, TIMESTAMPS[0].to_rfc3339()),
227+
('delete20', 20, True, TIMESTAMPS[0].to_rfc3339()),
227228
])
228229

229230
def to_row_fn(num):
230231
return SpannerTestKey(f_string=f'delete{num}')
231232

232233
self.run_write_pipeline(10, to_row_fn, SpannerTestKey, SpannerDelete)
233-
234234
results = self.spanner_helper.read_data(self.database_id, prefix='delete')
235-
self.assertEqual(len(results), 1)
236-
self.assertEqual(results[0][0], 'delete20')
237-
self.assertEqual(results[0][1], 20)
238-
self.assertEqual(results[0][2], True)
239-
self.assertIsNotNone(results[0][3]) # timestamp preserved
235+
for i in range(len(results)):
236+
results[i][3] = results[i][3].timestamp_pb()
237+
self.assertEqual(
238+
results, [['delete20', 20, True, TIMESTAMPS[0].to_proto()]])
240239

241240
def test_spanner_read_query(self):
242241
self.insert_read_values('query_read')
@@ -268,17 +267,17 @@ def run_read_pipeline(self, prefix, table=None, query=None):
268267
f_int64=0,
269268
f_string=f'{prefix}0',
270269
f_boolean=None,
271-
f_timestamp=Timestamp.of(1234567890.0)),
270+
f_timestamp=TIMESTAMPS[0]),
272271
SpannerTestRow(
273272
f_int64=1,
274273
f_string=f'{prefix}1',
275274
f_boolean=True,
276-
f_timestamp=Timestamp.of(1234567891.0)),
275+
f_timestamp=TIMESTAMPS[1]),
277276
SpannerTestRow(
278277
f_int64=2,
279278
f_string=f'{prefix}2',
280279
f_boolean=False,
281-
f_timestamp=Timestamp.of(1234567892.0)),
280+
f_timestamp=TIMESTAMPS[2]),
282281
]))
283282

284283
def run_write_pipeline(
@@ -303,9 +302,9 @@ def insert_read_values(self, prefix):
303302
self.spanner_helper.insert_values(
304303
self.database_id,
305304
values=[
306-
(f'{prefix}0', 0, None, Timestamp.of(1234567890.0).to_rfc3339()),
307-
(f'{prefix}1', 1, True, Timestamp.of(1234567891.0).to_rfc3339()),
308-
(f'{prefix}2', 2, False, Timestamp.of(1234567892.0).to_rfc3339()),
305+
(f'{prefix}0', 0, None, TIMESTAMPS[0].to_rfc3339()),
306+
(f'{prefix}1', 1, True, TIMESTAMPS[1].to_rfc3339()),
307+
(f'{prefix}2', 2, False, TIMESTAMPS[2].to_rfc3339()),
309308
])
310309

311310

0 commit comments

Comments
 (0)