Skip to content

Commit 008fa38

Browse files
author
Kyle Allan
authored
Added functions for get_selected_streams (#100)
* Added functions for get_selected_streams * refactor functions onto class; pylint
1 parent 4f507e8 commit 008fa38

File tree

3 files changed

+73
-5
lines changed

3 files changed

+73
-5
lines changed

singer/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@
5555
resolve_schema_references
5656
)
5757

58-
from singer.catalog import Catalog
58+
from singer.catalog import (
59+
Catalog,
60+
CatalogEntry
61+
)
5962
from singer.schema import Schema
6063

6164
from singer.bookmarks import (

singer/catalog.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
'''Provides an object model for a Singer Catalog.'''
2-
32
import json
43
import sys
54

6-
from singer.schema import Schema
5+
from . import metadata as metadata_module
6+
from .bookmarks import get_currently_syncing
7+
from .logger import get_logger
8+
from .schema import Schema
9+
10+
LOGGER = get_logger()
11+
712

813
# pylint: disable=too-many-instance-attributes
914
class CatalogEntry():
@@ -33,7 +38,9 @@ def __eq__(self, other):
3338
return self.__dict__ == other.__dict__
3439

3540
def is_selected(self):
36-
return self.schema.selected # pylint: disable=no-member
41+
mdata = metadata_module.to_map(self.metadata)
42+
# pylint: disable=no-member
43+
return self.schema.selected or metadata_module.get(mdata, (), 'selected')
3744

3845
def to_dict(self):
3946
result = {}
@@ -116,3 +123,27 @@ def get_stream(self, tap_stream_id):
116123
if stream.tap_stream_id == tap_stream_id:
117124
return stream
118125
return None
126+
127+
def _shuffle_streams(self, state):
128+
currently_syncing = get_currently_syncing(state)
129+
130+
if currently_syncing is None:
131+
return self.streams
132+
133+
matching_index = 0
134+
for i, catalog_entry in enumerate(self.streams):
135+
if catalog_entry.tap_stream_id == currently_syncing:
136+
matching_index = i
137+
break
138+
top_half = self.streams[matching_index:]
139+
bottom_half = self.streams[:matching_index]
140+
return top_half + bottom_half
141+
142+
143+
def get_selected_streams(self, state):
144+
for stream in self._shuffle_streams(state):
145+
if not stream.is_selected():
146+
LOGGER.info('Skipping stream: %s', stream.tap_stream_id)
147+
continue
148+
149+
yield stream

tests/test_catalog.py

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,40 @@
33
from singer.schema import Schema
44
from singer.catalog import Catalog, CatalogEntry
55

6+
class TestGetSelectedStreams(unittest.TestCase):
7+
def test_one_selected_stream(self):
8+
selected_entry = CatalogEntry(tap_stream_id='a',
9+
schema=Schema(),
10+
metadata=[{'metadata':
11+
{'selected': True},
12+
'breadcrumb': []}])
13+
catalog = Catalog(
14+
[selected_entry,
15+
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
16+
CatalogEntry(tap_stream_id='c',schema=Schema(),metadata=[])])
17+
state = {}
18+
selected_streams = catalog.get_selected_streams(state)
19+
self.assertEquals([e for e in selected_streams],[selected_entry])
20+
21+
def test_resumes_currently_syncing_stream(self):
22+
selected_entry_a = CatalogEntry(tap_stream_id='a',
23+
schema=Schema(),
24+
metadata=[{'metadata':
25+
{'selected': True},
26+
'breadcrumb': []}])
27+
selected_entry_c = CatalogEntry(tap_stream_id='c',
28+
schema=Schema(),
29+
metadata=[{'metadata':
30+
{'selected': True},
31+
'breadcrumb': []}])
32+
catalog = Catalog(
33+
[selected_entry_a,
34+
CatalogEntry(tap_stream_id='b',schema=Schema(),metadata=[]),
35+
selected_entry_c])
36+
state = {'currently_syncing': 'c'}
37+
selected_streams = catalog.get_selected_streams(state)
38+
self.assertEquals([e for e in selected_streams][0],selected_entry_c)
39+
640
class TestToDictAndFromDict(unittest.TestCase):
741

842
dict_form = {
@@ -89,7 +123,7 @@ def test_from_dict(self):
89123

90124
def test_to_dict(self):
91125
self.assertEqual(self.dict_form, self.obj_form.to_dict())
92-
126+
93127

94128
class TestGetStream(unittest.TestCase):
95129
def test(self):

0 commit comments

Comments
 (0)