Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
7681d7b
feat(salesforce): add sobjectNames field for multi-object selection
lexfrei Nov 25, 2025
f517a51
Merge branch 'main' into feat/salesforce-multi-object-selection
lexfrei Nov 25, 2025
0cf9418
Merge branch 'main' of github.com:open-metadata/OpenMetadata into fea…
keshavmohta09 Jan 27, 2026
019a922
refactor: removed sobjectName field and added a migration for 1.11.8 …
keshavmohta09 Jan 28, 2026
c96eb72
Merge branch 'main' into feat/salesforce-multi-object-selection
keshavmohta09 Jan 28, 2026
461c161
fix: sobjectNames priority comment
keshavmohta09 Jan 28, 2026
f493a07
Merge branch 'feat/salesforce-multi-object-selection' of github.com:l…
keshavmohta09 Jan 28, 2026
a40b24a
refactor: sobjectNames changes in ts files
keshavmohta09 Jan 28, 2026
f557b1a
fix: yaml structure in test_salesforce
keshavmohta09 Jan 29, 2026
9053423
fix: test_salesforce.py - metadata as OpenMetadata object
keshavmohta09 Jan 29, 2026
884cdd5
Merge branch 'main' into feat/salesforce-multi-object-selection
keshavmohta09 Jan 29, 2026
dae747b
fix: added new line in sql migrations
keshavmohta09 Jan 29, 2026
81949f6
Merge branch 'main' into feat/salesforce-multi-object-selection
mohityadav766 Jan 29, 2026
392f257
Merge branch 'main' into feat/salesforce-multi-object-selection
keshavmohta09 Jan 30, 2026
aceeabd
fix: sql migration serviceType
keshavmohta09 Jan 30, 2026
8bf1b10
Merge branch 'feat/salesforce-multi-object-selection' of github.com:l…
keshavmohta09 Jan 30, 2026
fa916a5
Merge branch 'main' of github.com:open-metadata/OpenMetadata into fea…
keshavmohta09 Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion bootstrap/sql/migrations/native/1.11.8/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,26 @@
-- Migrate Salesforce connection from sobjectName (string) to sobjectNames (array)
-- Converts sobjectName to sobjectNames array and removes the old field
UPDATE dbservice_entity
SET
json = JSON_REMOVE (
JSON_SET (
json,
'$.connection.config.sobjectNames',
JSON_ARRAY (
JSON_UNQUOTE (
JSON_EXTRACT (json, '$.connection.config.sobjectName')
)
)
),
'$.connection.config.sobjectName'
)
WHERE
serviceType = 'Salesforce'
AND JSON_TYPE (
JSON_EXTRACT (json, '$.connection.config.sobjectName')
) != 'NULL';

-- Upgrade appliedAt to microsecond precision to match PostgreSQL behavior.
-- Without this, MySQL returns second-precision timestamps which cause spurious
-- diffs in JSON patch operations, leading to deserialization failures.
ALTER TABLE tag_usage MODIFY appliedAt TIMESTAMP(6) NULL DEFAULT CURRENT_TIMESTAMP(6);
ALTER TABLE tag_usage MODIFY appliedAt TIMESTAMP(6) NULL DEFAULT CURRENT_TIMESTAMP(6);
12 changes: 12 additions & 0 deletions bootstrap/sql/migrations/native/1.11.8/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
@@ -1 +1,13 @@
-- Migrate Salesforce connection from sobjectName (string) to sobjectNames (array)
-- Converts sobjectName to sobjectNames array and removes the old field

UPDATE dbservice_entity
SET json = jsonb_set(
json::jsonb #- '{connection,config,sobjectName}',
'{connection,config,sobjectNames}',
jsonb_build_array(json->'connection'->'config'->>'sobjectName')
)::json
WHERE serviceType = 'Salesforce'
AND json->'connection'->'config'->>'sobjectName' IS NOT NULL;

-- No changes needed for PostgreSQL - TIMESTAMP already has microsecond precision.
7 changes: 6 additions & 1 deletion ingestion/src/metadata/examples/workflows/salesforce.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ source:
password: password
securityToken: securityToken
organizationId: organizationId
sobjectName: sobjectName
# Use sobjectNames for multiple objects
# sobjectNames:
# - Contact
# - Account
# - Lead
# - Opportunity
# sslConfig:
# caCertificate: |
# -----BEGIN CERTIFICATE-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def __init__(self, config, metadata: OpenMetadata):
self.config.sourceConfig.config
)
self.metadata = metadata
self.service_connection = self.config.serviceConnection.root.config
self.service_connection: SalesforceConnection = (
self.config.serviceConnection.root.config
)
self.ssl_manager: SSLManager = check_ssl_and_init(self.service_connection)
if self.ssl_manager:
self.service_connection = self.ssl_manager.setup_ssl(
Expand Down Expand Up @@ -160,43 +162,51 @@ def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
Fetches them up using the context information and
the inspector set when preparing the db.

Priority:
1. sobjectNames (array) - if specified, iterate over these
2. All objects from describe()

tableFilterPattern is applied in ALL cases.

:return: tables or views, depending on config
"""
schema_name = self.context.get().database_schema

try:
if self.service_connection.sobjectName:
table_name = self.standardize_table_name(
schema_name, self.service_connection.sobjectName
)
yield table_name, TableType.Regular
if self.service_connection.sobjectNames:
object_names = list(self.service_connection.sobjectNames)

else:
for salesforce_object in self.client.describe()["sobjects"]:
table_name = salesforce_object["name"]
table_name = self.standardize_table_name(schema_name, table_name)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
table_name=table_name,
object_names = [
salesforce_object["name"]
for salesforce_object in self.client.describe()["sobjects"]
]

for table_name in object_names:
table_name = self.standardize_table_name(schema_name, table_name)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
table_name=table_name,
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
(
table_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else table_name
),
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern,
(
table_fqn
if self.config.sourceConfig.config.useFqnForFiltering
else table_name
),
):
self.status.filter(
table_fqn,
"Table Filtered Out",
)
continue
continue

yield table_name, TableType.Regular
yield table_name, TableType.Regular
except Exception as exc:
self.status.failed(
StackTraceError(
Expand Down
2 changes: 1 addition & 1 deletion ingestion/tests/unit/test_source_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ def test_salesforce():
"username": "username",
"password": "password",
"securityToken": "securityToken",
"sobjectName": "sobjectName",
"sobjectNames": ["sobjectName"],
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
Expand Down
182 changes: 174 additions & 8 deletions ingestion/tests/unit/topology/database/test_salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.salesforce.metadata import SalesforceSource

mock_salesforce_config = {
Expand All @@ -46,7 +47,6 @@
"username": "username",
"password": "password",
"securityToken": "securityToken",
"sobjectName": "sobjectName",
}
},
"sourceConfig": {
Expand Down Expand Up @@ -80,7 +80,38 @@
"consumerKey": "test_consumer_key",
"consumerSecret": "test_consumer_secret",
"salesforceDomain": "login",
"sobjectName": "sobjectName",
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
}
},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "salesforce"},
}
},
}

mock_salesforce_multi_objects_config = {
"source": {
"type": "salesforce",
"serviceName": "local_salesforce_multi",
"serviceConnection": {
"config": {
"type": "Salesforce",
"username": "username",
"password": "password",
"securityToken": "securityToken",
"sobjectNames": ["Contact", "Account", "Lead"],
}
},
"sourceConfig": {
Expand Down Expand Up @@ -473,16 +504,18 @@ def __init__(self, methodName, salesforce, test_connection) -> None:
self.config = OpenMetadataWorkflowConfig.model_validate(mock_salesforce_config)
self.salesforce_source = SalesforceSource.create(
mock_salesforce_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
OpenMetadata(config=self.config.workflowConfig.openMetadataServerConfig),
)

self.salesforce_source.context.get().__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE
self.salesforce_source.context.get().__dict__["database"] = MOCK_DATABASE
] = MOCK_DATABASE_SERVICE.name.root
self.salesforce_source.context.get().__dict__[
"database"
] = MOCK_DATABASE.name.root
self.salesforce_source.context.get().__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA
] = MOCK_DATABASE_SCHEMA.name.root

@patch(
"metadata.ingestion.source.database.salesforce.metadata.SalesforceSource.get_table_column_description"
Expand Down Expand Up @@ -512,7 +545,7 @@ def test_oauth_connection(self, salesforce, test_connection) -> None:
)
self.salesforce_source = SalesforceSource.create(
mock_salesforce_oauth_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
OpenMetadata(config=self.config.workflowConfig.openMetadataServerConfig),
)
self.assertTrue(
self.salesforce_source.config.serviceConnection.root.config.consumerKey
Expand Down Expand Up @@ -553,8 +586,141 @@ def test_check_ssl(self, salesforce, test_connection) -> None:
self.config = OpenMetadataWorkflowConfig.model_validate(mock_salesforce_config)
self.salesforce_source = SalesforceSource.create(
mock_salesforce_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
OpenMetadata(config=self.config.workflowConfig.openMetadataServerConfig),
)
self.assertTrue(self.salesforce_source.ssl_manager.ca_file_path)
self.assertTrue(self.salesforce_source.ssl_manager.cert_file_path)
self.assertTrue(self.salesforce_source.ssl_manager.key_file_path)

@patch(
"metadata.ingestion.source.database.salesforce.metadata.SalesforceSource.test_connection"
)
@patch("simple_salesforce.api.Salesforce")
def test_sobject_names_config(self, salesforce, test_connection) -> None:
"""Test that sobjectNames array is properly parsed from config"""
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.model_validate(
mock_salesforce_multi_objects_config
)
salesforce_source = SalesforceSource.create(
mock_salesforce_multi_objects_config["source"],
OpenMetadata(config=self.config.workflowConfig.openMetadataServerConfig),
)
self.assertEqual(
salesforce_source.service_connection.sobjectNames,
["Contact", "Account", "Lead"],
)

@patch(
"metadata.ingestion.source.database.salesforce.metadata.SalesforceSource.test_connection"
)
@patch("simple_salesforce.api.Salesforce")
def test_ingestion_with_sobject_names_list(
self, salesforce, test_connection
) -> None:
"""Test that sobjectNames list correctly filters which objects to ingest"""
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.model_validate(
mock_salesforce_multi_objects_config
)
salesforce_source = SalesforceSource.create(
mock_salesforce_multi_objects_config["source"],
OpenMetadata(config=config.workflowConfig.openMetadataServerConfig),
)
salesforce_source.context.get().__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.root
salesforce_source.context.get().__dict__["database"] = MOCK_DATABASE.name.root
salesforce_source.context.get().__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.root

# Mock describe to return many objects
salesforce_source.client.describe = lambda: {
"sobjects": [
{"name": "Contact"},
{"name": "Account"},
{"name": "Lead"},
{"name": "Opportunity"},
{"name": "Case"},
]
}

# Get tables - should only return the ones in sobjectNames
tables = list(salesforce_source.get_tables_name_and_type())

# Should only get Contact, Account, Lead (from config)
table_names = [table[0] for table in tables]
self.assertEqual(len(table_names), 3)
self.assertIn("Contact", table_names)
self.assertIn("Account", table_names)
self.assertIn("Lead", table_names)

self.assertNotIn("Opportunity", table_names)
self.assertNotIn("Case", table_names)

@patch(
"metadata.ingestion.source.database.salesforce.metadata.SalesforceSource.test_connection"
)
@patch("simple_salesforce.api.Salesforce")
def test_ingestion_without_sobject_names(self, salesforce, test_connection) -> None:
"""Test that without sobjectNames, all objects from describe are ingested"""
test_connection.return_value = False
# Use config without sobjectNames
config_without_filters = {
"source": {
"type": "salesforce",
"serviceName": "local_salesforce_all",
"serviceConnection": {
"config": {
"type": "Salesforce",
"username": "username",
"password": "password",
"securityToken": "securityToken",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "salesforce"},
}
},
}

config = OpenMetadataWorkflowConfig.model_validate(config_without_filters)
salesforce_source = SalesforceSource.create(
config_without_filters["source"],
OpenMetadata(config=config.workflowConfig.openMetadataServerConfig),
)
salesforce_source.context.get().__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.root
salesforce_source.context.get().__dict__["database"] = MOCK_DATABASE.name.root
salesforce_source.context.get().__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.root

# Mock describe to return specific objects
salesforce_source.client.describe = lambda: {
"sobjects": [
{"name": "Contact"},
{"name": "Account"},
{"name": "Lead"},
{"name": "Opportunity"},
]
}

# Get tables - should return all objects from describe
tables = list(salesforce_source.get_tables_name_and_type())

# Should get all 4 objects
table_names = [table[0] for table in tables]
self.assertEqual(len(table_names), 4)
self.assertIn("Contact", table_names)
self.assertIn("Account", table_names)
self.assertIn("Lead", table_names)
self.assertIn("Opportunity", table_names)
Loading
Loading