Skip to content

Commit 024e914

Browse files
Merge branch 'main' into copilot/add-hyperlink-support
2 parents bffaaa7 + 00d6c1d commit 024e914

File tree

11 files changed

+140
-187
lines changed

11 files changed

+140
-187
lines changed

ingestion/src/metadata/data_quality/validations/models.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
class TableParameter(BaseModel):
2121
serviceUrl: Union[str, dict]
2222
path: str
23+
fullyQualifiedName: Optional[str] = None
2324
columns: List[Column]
2425
database_service_type: DatabaseServiceType
2526
privateKey: Optional[CustomSecretStr]

ingestion/src/metadata/data_quality/validations/runtime_param_setter/base_diff_params_setter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def get(
7878
path=self.get_data_diff_table_path(
7979
entity.fullyQualifiedName.root, service.serviceType
8080
),
81+
fullyQualifiedName=entity.fullyQualifiedName.root,
8182
serviceUrl=self.get_data_diff_url(
8283
service,
8384
entity.fullyQualifiedName.root,

ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableDiff.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,12 +257,15 @@ def _run(self) -> TestCaseResult:
257257
if column_diff:
258258
# If there are column differences, we set extra_columns to the common columns for the diff
259259
# Exclude incomparable columns (different data types) from the comparison
260+
# Also exclude key columns since they are handled separately and should not be in extra_columns
260261
common_columns = list(
261262
(
262263
set(column_diff.schemaTable1.schema.keys())
263264
& set(column_diff.schemaTable2.schema.keys())
264265
)
265266
- set(column_diff.changed)
267+
- set(self.runtime_params.table1.key_columns or [])
268+
- set(self.runtime_params.table2.key_columns or [])
266269
)
267270
self.runtime_params.extraColumns = common_columns
268271
self.runtime_params.table1.extra_columns = common_columns
@@ -648,7 +651,8 @@ def get_column_diff(self) -> Optional[ColumnDiffResult]:
648651
changed=changed,
649652
schemaTable1=SchemaDiffResult(
650653
serviceType=self.runtime_params.table1.database_service_type.name,
651-
fullyQualifiedTableName=self.runtime_params.table1.path,
654+
fullyQualifiedTableName=self.runtime_params.table1.fullyQualifiedName
655+
or self.runtime_params.table1.path,
652656
schema={
653657
c.name.root: {
654658
"type": c.dataTypeDisplay,
@@ -659,7 +663,8 @@ def get_column_diff(self) -> Optional[ColumnDiffResult]:
659663
),
660664
schemaTable2=SchemaDiffResult(
661665
serviceType=self.runtime_params.table2.database_service_type.name,
662-
fullyQualifiedTableName=self.runtime_params.table2.path,
666+
fullyQualifiedTableName=self.runtime_params.table2.fullyQualifiedName
667+
or self.runtime_params.table2.path,
663668
schema={
664669
c.name.root: {
665670
"type": c.dataTypeDisplay,

ingestion/tests/integration/data_quality/test_data_diff.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,11 @@ def __init__(self, *args, **kwargs):
234234
TestResultValue(name="changedColumns", value="0"),
235235
TestResultValue(
236236
name="schemaTable1",
237-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
237+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
238238
),
239239
TestResultValue(
240240
name="schemaTable2",
241-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer_without_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
241+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_without_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
242242
),
243243
],
244244
),
@@ -330,11 +330,11 @@ def __init__(self, *args, **kwargs):
330330
TestResultValue(name="changedColumns", value="0"),
331331
TestResultValue(
332332
name="schemaTable1",
333-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
333+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
334334
),
335335
TestResultValue(
336336
name="schemaTable2",
337-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer_different_case_columns' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'First_Name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
337+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'First_Name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
338338
),
339339
],
340340
),
@@ -389,6 +389,14 @@ def test_happy_paths(
389389
workflow_config,
390390
cleanup_fqns,
391391
):
392+
# Replace service name placeholders in expected testResultValue
393+
if parameters.expected.testResultValue:
394+
for result_value in parameters.expected.testResultValue:
395+
if result_value.value:
396+
result_value.value = result_value.value.replace(
397+
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
398+
)
399+
392400
metadata = patched_metadata
393401
table1: Table = metadata.get_by_name(
394402
Table,
@@ -506,11 +514,11 @@ def test_happy_paths(
506514
TestResultValue(name="changedColumns", value="1"),
507515
TestResultValue(
508516
name="schemaTable1",
509-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
517+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
510518
),
511519
TestResultValue(
512520
name="schemaTable2",
513-
value="serviceType='Postgres' fullyQualifiedTableName='public.customer_int_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}, 'first_name': {'type': 'integer', 'constraints': 'NULL'}}",
521+
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_int_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}, 'first_name': {'type': 'integer', 'constraints': 'NULL'}}",
514522
),
515523
],
516524
),
@@ -544,6 +552,14 @@ def test_error_paths(
544552
run_workflow,
545553
cleanup_fqns,
546554
):
555+
# Replace service name placeholders in expected testResultValue
556+
if expected.testResultValue:
557+
for result_value in expected.testResultValue:
558+
if result_value.value:
559+
result_value.value = result_value.value.replace(
560+
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
561+
)
562+
547563
metadata = patched_metadata
548564
table1 = metadata.get_by_name(
549565
Table,

openmetadata-service/src/main/java/org/openmetadata/service/search/SigV4RequestSigningInterceptor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ private SdkHttpFullRequest buildSdkRequest(HttpRequest request, HttpHost host, b
8787
String headerName = header.getName().toLowerCase();
8888
if (!headerName.equals("host")
8989
&& !headerName.equals("content-length")
90-
&& !headerName.equals("content-type")) {
90+
&& !headerName.equals("content-type")
91+
&& !headerName.equals("transfer-encoding")) {
9192
builder.appendHeader(header.getName(), header.getValue());
9293
}
9394
}

openmetadata-service/src/test/java/org/openmetadata/service/search/SigV4RequestSigningInterceptorTest.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,51 @@ void testInterceptorPreservesExistingHeaders() throws Exception {
147147
assertTrue(request.containsHeader("Authorization"));
148148
}
149149

150+
@Test
151+
void testInterceptorExcludesTransferEncodingFromSignedHeaders() throws Exception {
152+
BasicHttpEntityEnclosingRequest request =
153+
new BasicHttpEntityEnclosingRequest("POST", "/_bulk?refresh=false");
154+
String bulkBody = "{\"index\":{\"_index\":\"test\"}}\n{\"field\":\"value\"}\n";
155+
request.setEntity(new StringEntity(bulkBody, StandardCharsets.UTF_8));
156+
request.addHeader("Transfer-Encoding", "chunked");
157+
HttpContext context = createHttpContext("search.eu-west-3.es.amazonaws.com", 443, "https");
158+
159+
interceptor.process(request, context);
160+
161+
assertTrue(request.containsHeader("Authorization"));
162+
String authHeader = request.getFirstHeader("Authorization").getValue();
163+
String signedHeadersPart = authHeader.substring(authHeader.indexOf("SignedHeaders="));
164+
assertFalse(
165+
signedHeadersPart.toLowerCase().contains("transfer-encoding"),
166+
"transfer-encoding should NOT be in SignedHeaders to avoid AWS SigV4 mismatch errors");
167+
}
168+
169+
@Test
170+
void testBulkRequestWithChunkedEncodingSucceeds() throws Exception {
171+
BasicHttpEntityEnclosingRequest request =
172+
new BasicHttpEntityEnclosingRequest("POST", "/_bulk?refresh=false");
173+
String bulkBody =
174+
"{\"index\":{\"_index\":\"table_search_index\",\"_id\":\"123\"}}\n"
175+
+ "{\"id\":\"123\",\"name\":\"test_table\",\"deleted\":false}\n";
176+
request.setEntity(new StringEntity(bulkBody, StandardCharsets.UTF_8));
177+
request.addHeader("Transfer-Encoding", "chunked");
178+
request.addHeader("Content-Type", "application/x-ndjson");
179+
HttpContext context =
180+
createHttpContext("vpc-saas-test-engg.eu-west-3.es.amazonaws.com", 443, "https");
181+
182+
SigV4RequestSigningInterceptor euInterceptor =
183+
new SigV4RequestSigningInterceptor(credentialsProvider, Region.EU_WEST_3, "es");
184+
euInterceptor.process(request, context);
185+
186+
assertTrue(request.containsHeader("Authorization"));
187+
assertTrue(request.containsHeader("X-Amz-Date"));
188+
String authHeader = request.getFirstHeader("Authorization").getValue();
189+
assertTrue(authHeader.contains("eu-west-3"));
190+
assertFalse(
191+
authHeader.toLowerCase().contains("transfer-encoding"),
192+
"Bulk reindex requests must not sign transfer-encoding header");
193+
}
194+
150195
private HttpContext createHttpContext(String hostname, int port, String scheme) {
151196
HttpContext context = new BasicHttpContext();
152197
context.setAttribute("http.target_host", new HttpHost(hostname, port, scheme));

0 commit comments

Comments
 (0)