Skip to content
This repository was archived by the owner on Sep 22, 2025. It is now read-only.

Commit 9b8ca37

Browse files
Merge pull request #1 from singer-io/master
bringing in updates from the source
2 parents c881005 + e07102f commit 9b8ca37

30 files changed

+3486
-326
lines changed

.circleci/config.yml

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ jobs:
1313
command: [mongod, --replSet, rs0]
1414
steps:
1515
- checkout
16-
- add_ssh_keys
1716
- run:
1817
name: 'Install Dockerize'
1918
command: wget https://github.com/jwilder/dockerize/releases/download/$DOCKERIZE_VERSION/dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && tar -C /usr/local/bin -xzvf dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz && rm dockerize-linux-amd64-$DOCKERIZE_VERSION.tar.gz
@@ -37,7 +36,6 @@ jobs:
3736
- run:
3837
name: 'Setup virtual env'
3938
command: |
40-
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/sandbox tap-tester.env
4139
pyenv local 3.5.6
4240
python3 -mvenv /usr/local/share/virtualenvs/tap-mongodb
4341
source /usr/local/share/virtualenvs/tap-mongodb/bin/activate
@@ -52,32 +50,41 @@ jobs:
5250
name: "Unit Tests"
5351
command: |
5452
source /usr/local/share/virtualenvs/tap-mongodb/bin/activate
55-
pip install pymongo==3.8.0
53+
pip install pymongo==3.12.3
5654
nosetests tests/unittests/
5755
- run:
5856
name: 'Integration Tests'
5957
command: |
6058
source tap-tester.env
59+
mkdir /tmp/${CIRCLE_PROJECT_REPONAME}
60+
export STITCH_CONFIG_DIR=/tmp/${CIRCLE_PROJECT_REPONAME}
6161
source /usr/local/share/virtualenvs/tap-tester/bin/activate
62-
pip install pymongo==3.8.0
62+
pip install pymongo==3.12.3
6363
run-test --tap=tap-mongodb tests
64+
- run:
65+
name: 'Get Curl'
66+
command: |
67+
apt update
68+
apt install -y curl
6469
- slack/notify-on-failure:
6570
only_for_branches: master
71+
- store_artifacts:
72+
path: /tmp/tap-mongodb
6673

6774
workflows:
6875
version: 2
69-
commit:
76+
commit: &commit_jobs
7077
jobs:
7178
- build:
72-
context: circleci-user
79+
context:
80+
- circleci-user
81+
- tier-1-tap-user
7382
build_daily:
83+
<<: *commit_jobs
7484
triggers:
7585
- schedule:
76-
cron: "0 6 * * *"
86+
cron: "0 1 * * *"
7787
filters:
7888
branches:
7989
only:
8090
- master
81-
jobs:
82-
- build:
83-
context: circleci-user

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
11
# Changelog
22

3+
## 2.1.2
4+
* Update pymongo to v3.12.3 [#81](https://github.com/singer-io/tap-mongodb/pull/81)
5+
6+
## 2.1.1
7+
* Fix bug in oplog bookmarking where the bookmark would not advance due to fencepost querying finding a single record [#80](https://github.com/singer-io/tap-mongodb/pull/80)
8+
9+
## 2.1.0
10+
* Optimize oplog extractions to only query for the selected tables [#78](https://github.com/singer-io/tap-mongodb/pull/78)
11+
312
## 2.0.1
413
* Modify `get_databases` function to return a unique list of databases [#58](https://github.com/singer-io/tap-mongodb/pull/58)
514

@@ -78,7 +87,7 @@
7887

7988
## 0.1.0
8089
* Added key-based incremental sync [commit](https://github.com/singer-io/tap-mongodb/commit/b618b11d91e111680f70b402c6e94c9bf40c7b8f)
81-
90+
8291
## 0.0.5
8392
* Fixed bug in oplog projections [commit](https://github.com/singer-io/tap-mongodb/commit/b400836678440499d4a15fb7d5b0a40a13e3342e)
8493

bin/test-db

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def start_container(name):
3434

3535
ip_addr = get_ip_addr(name)
3636
CONFIGURE_COMMAND = """
37-
docker run --rm mongo mongo --host {} \test -u {} -p {} --authenticationDatabase admin --eval {}
37+
docker run --rm mongo mongo --host {} test -u {} -p {} --authenticationDatabase admin --eval {}
3838
""".format(ip_addr,
3939
os.getenv('TAP_MONGODB_USER'),
4040
os.getenv('TAP_MONGODB_PASSWORD'),

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,15 @@
33
from setuptools import setup
44

55
setup(name='tap-mongodb',
6-
version='2.0.1',
6+
version='2.1.2',
77
description='Singer.io tap for extracting data from MongoDB',
88
author='Stitch',
99
url='https://singer.io',
1010
classifiers=['Programming Language :: Python :: 3 :: Only'],
1111
py_modules=['tap_mongodb'],
1212
install_requires=[
1313
'singer-python==5.8.0',
14-
'pymongo==3.8.0',
14+
'pymongo==3.12.3',
1515
'tzlocal==2.0.0',
1616
'terminaltables==3.1.0',
1717
],

tap_mongodb/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ def sync_stream(client, stream, state):
146146
LOGGER.info("Clearing state because Oplog has aged out")
147147
state.get('bookmarks', {}).pop(tap_stream_id)
148148

149+
collection_oplog_ts = oplog.get_latest_ts(client)
150+
149151
# make sure initial full table sync has been completed
150152
if not singer.get_bookmark(state, tap_stream_id, 'initial_full_table_complete'):
151153
msg = 'Must complete full table sync before starting oplog replication for %s'
@@ -154,12 +156,11 @@ def sync_stream(client, stream, state):
154156
# only mark current ts in oplog on first sync so tap has a
155157
# starting point after the full table sync
156158
if singer.get_bookmark(state, tap_stream_id, 'version') is None:
157-
collection_oplog_ts = oplog.get_latest_ts(client)
158159
oplog.update_bookmarks(state, tap_stream_id, collection_oplog_ts)
159160

160161
full_table.sync_collection(client, stream, state, stream_projection)
161162

162-
oplog.sync_collection(client, stream, state, stream_projection)
163+
oplog.sync_collection(client, stream, state, stream_projection, collection_oplog_ts)
163164

164165
elif replication_method == 'FULL_TABLE':
165166
full_table.sync_collection(client, stream, state, stream_projection)

tap_mongodb/sync_strategies/full_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,4 @@ def sync_collection(client, stream, state, projection):
157157

158158
singer.write_message(activate_version_message)
159159

160-
LOGGER.info('Syncd {} records for {}'.format(rows_saved, tap_stream_id))
160+
LOGGER.info('Synced {} records for {}'.format(rows_saved, tap_stream_id))

tap_mongodb/sync_strategies/incremental.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,4 @@ def sync_collection(client, stream, state, projection):
127127

128128
singer.write_message(activate_version_message)
129129

130-
LOGGER.info('Syncd %s records for %s', rows_saved, tap_stream_id)
130+
LOGGER.info('Synced %s records for %s', rows_saved, tap_stream_id)

tap_mongodb/sync_strategies/oplog.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#!/usr/bin/env python3
2-
import copy
32
import time
43

54
import pymongo
@@ -105,7 +104,7 @@ def flush_buffer(client, update_buffer, stream_projection, db_name, collection_n
105104

106105

107106
# pylint: disable=too-many-locals, too-many-branches, too-many-statements
108-
def sync_collection(client, stream, state, stream_projection):
107+
def sync_collection(client, stream, state, stream_projection, max_oplog_ts=None):
109108
tap_stream_id = stream['tap_stream_id']
110109
LOGGER.info('Starting oplog sync for %s', tap_stream_id)
111110

@@ -130,7 +129,8 @@ def sync_collection(client, stream, state, stream_projection):
130129
start_time = time.time()
131130

132131
oplog_query = {
133-
'ts': {'$gte': oplog_ts}
132+
'ts': {'$gte': oplog_ts},
133+
'ns': {'$eq' : '{}.{}'.format(database_name, collection_name)}
134134
}
135135

136136
projection = transform_projection(stream_projection)
@@ -160,13 +160,6 @@ def sync_collection(client, stream, state, stream_projection):
160160
raise common.MongoAssertionException(
161161
"Mongo is not honoring the sort ascending param")
162162

163-
if row.get('ns') != '{}.{}'.format(database_name, collection_name):
164-
if row.get('ts'):
165-
state = update_bookmarks(state,
166-
tap_stream_id,
167-
row['ts'])
168-
continue
169-
170163
row_op = row['op']
171164

172165
if row_op == 'i':
@@ -240,7 +233,7 @@ def sync_collection(client, stream, state, stream_projection):
240233
update_buffer = set()
241234

242235
# write state
243-
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))
236+
singer.write_message(singer.StateMessage(value=state))
244237

245238
# flush buffer if finished with oplog
246239
for buffered_row in flush_buffer(client,
@@ -257,6 +250,18 @@ def sync_collection(client, stream, state, stream_projection):
257250
singer.write_message(record_message)
258251
rows_saved += 1
259252

253+
254+
# Compare the current bookmark with the max_oplog_ts and write the max
255+
bookmarked_ts = timestamp.Timestamp(state.get('bookmarks', {}).get(tap_stream_id, {}).get('oplog_ts_time'),
256+
state.get('bookmarks', {}).get(tap_stream_id, {}).get('oplog_ts_inc'))
257+
258+
actual_max_ts = max(bookmarked_ts, max_oplog_ts)
259+
260+
state = update_bookmarks(state,
261+
tap_stream_id,
262+
actual_max_ts)
263+
singer.write_message(singer.StateMessage(value=state))
264+
260265
common.COUNTS[tap_stream_id] += rows_saved
261266
common.TIMES[tap_stream_id] += time.time()-start_time
262-
LOGGER.info('Syncd %s records for %s', rows_saved, tap_stream_id)
267+
LOGGER.info('Synced %s records for %s', rows_saved, tap_stream_id)

tests/mongodb_common.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,30 @@
1+
import os
2+
import pymongo
3+
4+
5+
def ensure_environment_variables_set():
6+
missing_envs = [x for x in ['TAP_MONGODB_HOST',
7+
'TAP_MONGODB_USER',
8+
'TAP_MONGODB_PASSWORD',
9+
'TAP_MONGODB_PORT',
10+
'TAP_MONGODB_DBNAME'] if os.getenv(x) is None]
11+
if len(missing_envs) != 0:
12+
raise Exception(f"set environment variables: {missing_envs}")
13+
14+
##########################################################################
15+
### Database Interactions
16+
##########################################################################
17+
18+
def get_test_connection():
19+
username = os.getenv('TAP_MONGODB_USER')
20+
password = os.getenv('TAP_MONGODB_PASSWORD')
21+
host= os.getenv('TAP_MONGODB_HOST')
22+
auth_source = os.getenv('TAP_MONGODB_DBNAME')
23+
port = os.getenv('TAP_MONGODB_PORT')
24+
ssl = False
25+
conn = pymongo.MongoClient(host=host, username=username, password=password, port=27017, authSource=auth_source, ssl=ssl)
26+
return conn
27+
128
def drop_all_collections(client):
229
############# Drop all dbs/collections #############
330
for db_name in client.list_database_names():

0 commit comments

Comments
 (0)