Skip to content

Commit a4a8182

Browse files
pedro93iprentic
andauthored
feat(cli): Adds ability to upload recipes to DataHub's UI (#8317)
Co-authored-by: Indy Prentice <[email protected]>
1 parent 9f791a3 commit a4a8182

File tree

9 files changed

+248
-10
lines changed

9 files changed

+248
-10
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourcesResolver.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.common.urn.Urn;
55
import com.linkedin.datahub.graphql.QueryContext;
66
import com.linkedin.datahub.graphql.exception.AuthorizationException;
7+
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
78
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput;
89
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult;
910
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
@@ -20,6 +21,7 @@
2021
import java.util.Collections;
2122
import java.util.Comparator;
2223
import java.util.HashSet;
24+
import java.util.List;
2325
import java.util.Map;
2426
import java.util.concurrent.CompletableFuture;
2527
import java.util.stream.Collectors;
@@ -51,14 +53,16 @@ public CompletableFuture<ListIngestionSourcesResult> get(final DataFetchingEnvir
5153
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
5254
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
5355
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
56+
final List<FacetFilterInput> filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters();
5457

5558
return CompletableFuture.supplyAsync(() -> {
5659
try {
5760
// First, get all ingestion sources Urns.
5861
final SearchResult gmsResult = _entityClient.search(
5962
Constants.INGESTION_SOURCE_ENTITY_NAME,
6063
query,
61-
Collections.emptyMap(),
64+
buildFilter(filters, Collections.emptyList()),
65+
null,
6266
start,
6367
count,
6468
context.getAuthentication(),

datahub-graphql-core/src/main/resources/ingestion.graphql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,11 @@ input ListIngestionSourcesInput {
428428
An optional search query
429429
"""
430430
query: String
431+
432+
"""
433+
Optional Facet filters to apply to the result set
434+
"""
435+
filters: [FacetFilterInput!]
431436
}
432437

433438
"""

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/ingest/source/ListIngestionSourceResolverTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.linkedin.metadata.search.SearchResult;
2020
import com.linkedin.r2.RemoteInvocationException;
2121
import graphql.schema.DataFetchingEnvironment;
22-
import java.util.Collections;
2322
import java.util.HashSet;
2423
import org.mockito.Mockito;
2524
import org.testng.annotations.Test;
@@ -30,7 +29,7 @@
3029

3130
public class ListIngestionSourceResolverTest {
3231

33-
private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null);
32+
private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null);
3433

3534
@Test
3635
public void testGetSuccess() throws Exception {
@@ -44,7 +43,8 @@ public void testGetSuccess() throws Exception {
4443
Mockito.when(mockClient.search(
4544
Mockito.eq(Constants.INGESTION_SOURCE_ENTITY_NAME),
4645
Mockito.eq(""),
47-
Mockito.eq(Collections.emptyMap()),
46+
Mockito.any(),
47+
Mockito.any(),
4848
Mockito.eq(0),
4949
Mockito.eq(20),
5050
Mockito.any(Authentication.class),

docs/cli.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,29 @@ Source specific crawlers are provided by plugins and might sometimes need additi
9292
Usage: datahub [datahub-options] ingest [command-options]
9393

9494
Command Options:
95-
-c / --config Config file in .toml or .yaml format
96-
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
97-
--preview Perform limited ingestion from the source to the sink to get a quick preview
98-
--preview-workunits The number of workunits to produce for preview
99-
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
95+
-c / --config Config file in .toml or .yaml format
96+
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
97+
--preview Perform limited ingestion from the source to the sink to get a quick preview
98+
--preview-workunits The number of workunits to produce for preview
99+
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
100+
--test-source-connection When set, ingestion will only test the source connection details from the recipe
100101
```
101102

103+
#### ingest deploy
104+
105+
The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
106+
This command can also be used to schedule the ingestion while uploading or even to update existing sources.
107+
108+
To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file:
109+
````shell
110+
datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
111+
````
112+
113+
To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update.
114+
115+
**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command.
116+
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.
117+
102118
### init
103119

104120
The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.

docs/ui-ingestion.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ your first **Ingestion Source**.
2828

2929
### Creating an Ingestion Source
3030

31+
<Tabs>
32+
<TabItem value="ui" label="UI" default>
33+
3134
Before ingesting any metadata, you need to create a new Ingestion Source. Start by clicking **+ Create new source**.
3235

3336
![](./imgs/create-new-ingestion-source-button.png)
@@ -151,6 +154,45 @@ _Pinning the CLI version to version `0.8.23.2`_
151154

152155
Once you're happy with your changes, simply click 'Done' to save.
153156

157+
</TabItem>
158+
<TabItem value="cli" label="CLI" default>
159+
160+
You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy).
161+
An example execution would look something like:
162+
163+
```bash
164+
datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml
165+
```
166+
167+
This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter.
168+
DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification.
169+
170+
</TabItem>
171+
<TabItem value="graphql" label="GraphQL" default>
172+
173+
Create ingestion sources using [DataHub's GraphQL API](./api/graphql/overview.md) using the **createIngestionSource** mutation endpoint.
174+
```graphql
175+
mutation {
176+
createIngestionSource(input: {
177+
name: "My Test Ingestion Source",
178+
type: "mysql",
179+
description: "My ingestion source description",
180+
schedule: {interval: "*/5 * * * *", timezone: "UTC"},
181+
config: {
182+
recipe: "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}",
183+
version: "0.8.18",
184+
executorId: "mytestexecutor",
185+
}
186+
})
187+
}
188+
```
189+
190+
To update sources, please use the `updateIngestionSource` endpoint. It is almost identical to the create endpoint, only requiring the urn of the source to be updated in addition to the same input as the create endpoint.
191+
192+
**Note**: Recipe must be double quotes escaped
193+
194+
</TabItem>
195+
</Tabs>
154196

155197
### Running an Ingestion Source
156198

metadata-ingestion/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,19 @@ reporting:
161161
report_recipe: false
162162
```
163163
164+
#### Deploying and scheduling ingestion to the UI
165+
166+
The `deploy` subcommand of the `ingest` command tree allows users to upload their recipes and schedule them in the server.
167+
168+
```shell
169+
datahub ingest deploy -n <user friendly name for ingestion> -c recipe.yaml
170+
```
171+
172+
By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Timezones are inferred from the system time, can be overriden with `--time-zone` flag.
173+
```shell
174+
datahub ingest deploy -n test --schedule "0 * * * *" --time-zone "Europe/London" -c recipe.yaml
175+
```
176+
164177
## Transformations
165178

166179
If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.

metadata-ingestion/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,7 @@ def get_long_description():
415415
"types-termcolor>=1.0.0",
416416
"types-Deprecated",
417417
"types-protobuf>=4.21.0.1",
418+
"types-tzlocal",
418419
}
419420

420421

metadata-ingestion/src/datahub/cli/ingest_cli.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44
import logging
55
import os
66
import sys
7+
import textwrap
78
from datetime import datetime
89
from typing import Optional
910

1011
import click
1112
import click_spinner
13+
import tzlocal
1214
from click_default_group import DefaultGroup
1315
from tabulate import tabulate
1416

@@ -21,6 +23,7 @@
2123
post_rollback_endpoint,
2224
)
2325
from datahub.configuration.config_loader import load_config_file
26+
from datahub.ingestion.graph.client import get_default_graph
2427
from datahub.ingestion.run.connection import ConnectionManager
2528
from datahub.ingestion.run.pipeline import Pipeline
2629
from datahub.telemetry import telemetry
@@ -198,6 +201,156 @@ async def run_ingestion_and_check_upgrade() -> int:
198201
# don't raise SystemExit if there's no error
199202

200203

204+
@ingest.command()
205+
@upgrade.check_upgrade
206+
@telemetry.with_telemetry()
207+
@click.option(
208+
"-n",
209+
"--name",
210+
type=str,
211+
help="Recipe Name",
212+
required=True,
213+
)
214+
@click.option(
215+
"-c",
216+
"--config",
217+
type=click.Path(dir_okay=False),
218+
help="Config file in .toml or .yaml format.",
219+
required=True,
220+
)
221+
@click.option(
222+
"--urn",
223+
type=str,
224+
help="Urn of recipe to update",
225+
required=False,
226+
)
227+
@click.option(
228+
"--executor-id",
229+
type=str,
230+
default="default",
231+
help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.",
232+
required=False,
233+
)
234+
@click.option(
235+
"--cli-version",
236+
type=str,
237+
help="Provide a custom CLI version to use for ingestion. By default will use server default.",
238+
required=False,
239+
default=None,
240+
)
241+
@click.option(
242+
"--schedule",
243+
type=str,
244+
help="Cron definition for schedule. If none is provided, ingestion recipe will not be scheduled",
245+
required=False,
246+
default=None,
247+
)
248+
@click.option(
249+
"--time-zone",
250+
type=str,
251+
help=f"Timezone for the schedule. By default uses the timezone of the current system: {tzlocal.get_localzone_name()}.",
252+
required=False,
253+
default=tzlocal.get_localzone_name(),
254+
)
255+
def deploy(
256+
name: str,
257+
config: str,
258+
urn: str,
259+
executor_id: str,
260+
cli_version: str,
261+
schedule: str,
262+
time_zone: str,
263+
) -> None:
264+
"""
265+
Deploy an ingestion recipe to your DataHub instance.
266+
267+
The urn of the ingestion source will be based on the name parameter in the format:
268+
urn:li:dataHubIngestionSource:<name>
269+
"""
270+
271+
datahub_graph = get_default_graph()
272+
273+
pipeline_config = load_config_file(
274+
config,
275+
allow_stdin=True,
276+
resolve_env_vars=False,
277+
)
278+
279+
graphql_query: str
280+
281+
variables: dict = {
282+
"urn": urn,
283+
"name": name,
284+
"type": pipeline_config["source"]["type"],
285+
"schedule": {"interval": schedule, "timezone": time_zone},
286+
"recipe": json.dumps(pipeline_config),
287+
"executorId": executor_id,
288+
"version": cli_version,
289+
}
290+
291+
if urn:
292+
if not datahub_graph.exists(urn):
293+
logger.error(f"Could not find recipe for provided urn: {urn}")
294+
exit()
295+
logger.info("Found recipe URN, will update recipe.")
296+
297+
graphql_query = textwrap.dedent(
298+
"""
299+
mutation updateIngestionSource(
300+
$urn: String!,
301+
$name: String!,
302+
$type: String!,
303+
$schedule: UpdateIngestionSourceScheduleInput,
304+
$recipe: String!,
305+
$executorId: String!
306+
$version: String) {
307+
308+
updateIngestionSource(urn: $urn, input: {
309+
name: $name,
310+
type: $type,
311+
schedule: $schedule,
312+
config: {
313+
recipe: $recipe,
314+
executorId: $executorId,
315+
version: $version,
316+
}
317+
})
318+
}
319+
"""
320+
)
321+
else:
322+
logger.info("No URN specified recipe urn, will create a new recipe.")
323+
graphql_query = textwrap.dedent(
324+
"""
325+
mutation createIngestionSource(
326+
$name: String!,
327+
$type: String!,
328+
$schedule: UpdateIngestionSourceScheduleInput,
329+
$recipe: String!,
330+
$executorId: String!,
331+
$version: String) {
332+
333+
createIngestionSource(input: {
334+
type: $type,
335+
schedule: $schedule,
336+
config: {
337+
recipe: $recipe,
338+
executorId: $executorId,
339+
version: $version,
340+
}
341+
})
342+
}
343+
"""
344+
)
345+
346+
response = datahub_graph.execute_graphql(graphql_query, variables=variables)
347+
348+
click.echo(
349+
f"✅ Successfully wrote data ingestion source metadata for recipe {name}:"
350+
)
351+
click.echo(response)
352+
353+
201354
def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
202355
connection_report = None
203356
try:

metadata-ingestion/src/datahub/configuration/config_loader.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def load_config_file(
7272
squirrel_original_config: bool = False,
7373
squirrel_field: str = "__orig_config",
7474
allow_stdin: bool = False,
75+
resolve_env_vars: bool = True,
7576
) -> dict:
7677
config_mech: ConfigurationMechanism
7778
if allow_stdin and config_file == "-":
@@ -104,7 +105,10 @@ def load_config_file(
104105

105106
config_fp = io.StringIO(raw_config_file)
106107
raw_config = config_mech.load_config(config_fp)
107-
config = resolve_env_variables(raw_config)
108+
if resolve_env_vars:
109+
config = resolve_env_variables(raw_config)
110+
else:
111+
config = raw_config
108112
if squirrel_original_config:
109113
config[squirrel_field] = raw_config
110114
return config

0 commit comments

Comments
 (0)