Skip to content

Commit 0247ac9

Browse files
authored
feat(dv): hashing and schema processing (#3401)
1 parent 78ec1dc commit 0247ac9

File tree

6 files changed

+499
-0
lines changed

6 files changed

+499
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.dofn;
17+
18+
import com.google.cloud.spanner.BatchClient;
19+
import com.google.cloud.spanner.BatchReadOnlyTransaction;
20+
import com.google.cloud.spanner.DatabaseAdminClient;
21+
import com.google.cloud.spanner.Dialect;
22+
import com.google.cloud.spanner.TimestampBound;
23+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
24+
import com.google.cloud.teleport.v2.spanner.ddl.InformationSchemaScanner;
25+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
26+
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
27+
import org.apache.beam.sdk.transforms.DoFn;
28+
29+
public class ProcessInformationSchemaFn extends DoFn<Void, Ddl> {
30+
31+
private final SpannerConfig spannerConfig;
32+
33+
private transient SpannerAccessor spannerAccessor;
34+
private transient Dialect dialect;
35+
36+
public ProcessInformationSchemaFn(SpannerConfig spannerConfig) {
37+
this.spannerConfig = spannerConfig;
38+
}
39+
40+
@Setup
41+
public void setup() throws Exception {
42+
spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig);
43+
44+
DatabaseAdminClient databaseAdminClient = spannerAccessor.getDatabaseAdminClient();
45+
dialect =
46+
databaseAdminClient
47+
.getDatabase(spannerConfig.getInstanceId().get(), spannerConfig.getDatabaseId().get())
48+
.getDialect();
49+
}
50+
51+
@Teardown
52+
public void teardown() throws Exception {
53+
if (spannerAccessor != null) {
54+
spannerAccessor.close();
55+
}
56+
}
57+
58+
@ProcessElement
59+
public void processElement(ProcessContext c) {
60+
Ddl mainDdl = getInformationSchemaAsDdl(spannerAccessor, dialect);
61+
c.output(mainDdl);
62+
}
63+
64+
private Ddl getInformationSchemaAsDdl(SpannerAccessor accessor, Dialect dialect) {
65+
BatchClient batchClient = accessor.getBatchClient();
66+
BatchReadOnlyTransaction context =
67+
batchClient.batchReadOnlyTransaction(TimestampBound.strong());
68+
InformationSchemaScanner scanner = new InformationSchemaScanner(context, dialect);
69+
return scanner.scan();
70+
}
71+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.dofn;
17+
18+
import com.google.cloud.teleport.v2.dto.ComparisonRecord;
19+
import com.google.cloud.teleport.v2.mapper.ComparisonRecordMapper;
20+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
21+
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper;
22+
import java.util.Objects;
23+
import org.apache.avro.generic.GenericRecord;
24+
import org.apache.beam.sdk.transforms.DoFn;
25+
import org.apache.beam.sdk.transforms.SerializableFunction;
26+
import org.apache.beam.sdk.values.PCollectionView;
27+
28+
/** A {@link DoFn} that converts {@link GenericRecord} to {@link ComparisonRecord}. */
29+
public class SourceHashFn extends DoFn<GenericRecord, ComparisonRecord> {
30+
31+
private final PCollectionView<Ddl> ddlView;
32+
private final SerializableFunction<Ddl, ISchemaMapper> schemaMapperProvider;
33+
34+
private transient ComparisonRecordMapper comparisonRecordMapper;
35+
36+
public SourceHashFn(
37+
PCollectionView<Ddl> ddlView, SerializableFunction<Ddl, ISchemaMapper> schemaMapperProvider) {
38+
this.ddlView = ddlView;
39+
this.schemaMapperProvider = schemaMapperProvider;
40+
}
41+
42+
@ProcessElement
43+
public void processElement(ProcessContext c) {
44+
Ddl ddl = c.sideInput(ddlView);
45+
46+
// lazy initialization of the mapper.
47+
if (comparisonRecordMapper == null) {
48+
comparisonRecordMapper =
49+
new ComparisonRecordMapper(schemaMapperProvider.apply(ddl), null, ddl);
50+
}
51+
52+
ComparisonRecord comparisonRecord =
53+
comparisonRecordMapper.mapFrom(Objects.requireNonNull(c.element()));
54+
if (comparisonRecord != null) {
55+
c.output(comparisonRecord);
56+
}
57+
}
58+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.dofn;
17+
18+
import com.google.cloud.spanner.Struct;
19+
import com.google.cloud.teleport.v2.dto.ComparisonRecord;
20+
import com.google.cloud.teleport.v2.mapper.ComparisonRecordMapper;
21+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
22+
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaMapper;
23+
import java.util.Objects;
24+
import org.apache.beam.sdk.transforms.DoFn;
25+
import org.apache.beam.sdk.transforms.SerializableFunction;
26+
import org.apache.beam.sdk.values.PCollectionView;
27+
28+
public class SpannerHashFn extends DoFn<Struct, ComparisonRecord> {
29+
30+
private final PCollectionView<Ddl> ddlView;
31+
private final SerializableFunction<Ddl, ISchemaMapper> schemaMapperProvider;
32+
33+
private transient ComparisonRecordMapper comparisonRecordMapper;
34+
35+
public SpannerHashFn(
36+
PCollectionView<Ddl> ddlView, SerializableFunction<Ddl, ISchemaMapper> schemaMapperProvider) {
37+
this.ddlView = ddlView;
38+
this.schemaMapperProvider = schemaMapperProvider;
39+
}
40+
41+
@ProcessElement
42+
public void processElement(ProcessContext c) {
43+
Ddl ddl = c.sideInput(ddlView);
44+
// lazy initialization of the mapper.
45+
if (comparisonRecordMapper == null) {
46+
comparisonRecordMapper =
47+
new ComparisonRecordMapper(schemaMapperProvider.apply(ddl), null, ddl);
48+
}
49+
50+
ComparisonRecord comparisonRecord =
51+
comparisonRecordMapper.mapFrom(Objects.requireNonNull(c.element()));
52+
if (comparisonRecord != null) {
53+
c.output(comparisonRecord);
54+
}
55+
}
56+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright (C) 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.dofn;
17+
18+
import static org.mockito.ArgumentMatchers.any;
19+
import static org.mockito.Mockito.mockConstruction;
20+
import static org.mockito.Mockito.mockStatic;
21+
import static org.mockito.Mockito.verify;
22+
import static org.mockito.Mockito.when;
23+
24+
import com.google.cloud.spanner.BatchClient;
25+
import com.google.cloud.spanner.BatchReadOnlyTransaction;
26+
import com.google.cloud.spanner.Database;
27+
import com.google.cloud.spanner.DatabaseAdminClient;
28+
import com.google.cloud.spanner.Dialect;
29+
import com.google.cloud.spanner.TimestampBound;
30+
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
31+
import com.google.cloud.teleport.v2.spanner.ddl.InformationSchemaScanner;
32+
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
33+
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
34+
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
35+
import org.apache.beam.sdk.transforms.DoFn;
36+
import org.junit.After;
37+
import org.junit.Before;
38+
import org.junit.Test;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.JUnit4;
41+
import org.mockito.Mock;
42+
import org.mockito.MockedConstruction;
43+
import org.mockito.MockedStatic;
44+
import org.mockito.MockitoAnnotations;
45+
46+
/** Unit tests for {@link ProcessInformationSchemaFn}. */
47+
@RunWith(JUnit4.class)
48+
public class ProcessInformationSchemaFnTest {
49+
50+
@Mock private SpannerConfig spannerConfig;
51+
@Mock private SpannerAccessor spannerAccessor;
52+
@Mock private DatabaseAdminClient databaseAdminClient;
53+
@Mock private Database database;
54+
@Mock private BatchClient batchClient;
55+
@Mock private BatchReadOnlyTransaction batchReadOnlyTransaction;
56+
@Mock private DoFn<Void, Ddl>.ProcessContext processContext;
57+
@Mock private Ddl ddl;
58+
59+
private MockedStatic<SpannerAccessor> mockedSpannerAccessor;
60+
61+
@Before
62+
public void setUp() {
63+
MockitoAnnotations.openMocks(this);
64+
65+
mockedSpannerAccessor = mockStatic(SpannerAccessor.class);
66+
mockedSpannerAccessor
67+
.when(() -> SpannerAccessor.getOrCreate(spannerConfig))
68+
.thenReturn(spannerAccessor);
69+
70+
when(spannerConfig.getInstanceId()).thenReturn(StaticValueProvider.of("instance"));
71+
when(spannerConfig.getDatabaseId()).thenReturn(StaticValueProvider.of("database"));
72+
73+
when(spannerAccessor.getDatabaseAdminClient()).thenReturn(databaseAdminClient);
74+
75+
when(databaseAdminClient.getDatabase("instance", "database")).thenReturn(database);
76+
77+
when(database.getDialect()).thenReturn(Dialect.GOOGLE_STANDARD_SQL);
78+
79+
when(spannerAccessor.getBatchClient()).thenReturn(batchClient);
80+
when(batchClient.batchReadOnlyTransaction(any(TimestampBound.class)))
81+
.thenReturn(batchReadOnlyTransaction);
82+
}
83+
84+
@After
85+
public void tearDown() {
86+
mockedSpannerAccessor.close();
87+
}
88+
89+
@Test
90+
public void testProcessElement() throws Exception {
91+
ProcessInformationSchemaFn fn = new ProcessInformationSchemaFn(spannerConfig);
92+
93+
fn.setup();
94+
95+
try (MockedConstruction<InformationSchemaScanner> mockedScanner =
96+
mockConstruction(
97+
InformationSchemaScanner.class,
98+
(mock, context) -> {
99+
when(mock.scan()).thenReturn(ddl);
100+
})) {
101+
102+
fn.processElement(processContext);
103+
104+
verify(processContext).output(ddl);
105+
106+
assert (mockedScanner.constructed().size() == 1);
107+
verify(mockedScanner.constructed().get(0)).scan();
108+
}
109+
110+
fn.teardown();
111+
verify(spannerAccessor).close();
112+
}
113+
}

0 commit comments

Comments
 (0)