-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathtest_lowcode_connectors.py
More file actions
36 lines (31 loc) · 1.05 KB
/
test_lowcode_connectors.py
File metadata and controls
36 lines (31 loc) · 1.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations
from pathlib import Path
import sys
import pytest
from airbyte import get_source
from airbyte._util.meta import is_windows
UNIT_TEST_DB_PATH: Path = Path(".cache") / "unit_tests" / "test_db.duckdb"
@pytest.mark.parametrize(
"connector_name, config",
[
("source-pokeapi", {"pokemon_name": "ditto"}),
],
)
@pytest.mark.xfail(condition=is_windows(), reason="Test expected to fail on Windows.")
@pytest.mark.skipif(
sys.version_info >= (3, 12),
reason="Test fails in Python 3.12 as pokeAPI interface is blocked for bots/CI runners",
)
def test_nocode_execution(connector_name: str, config: dict) -> None:
source = get_source(
name=connector_name,
config=config,
source_manifest=True,
)
source.check()
source.select_all_streams()
source.read()
for name, records in source.read().streams.items():
assert name
assert len(records) > 0, f"No records were returned from the '{name}' stream."