Skip to content

Commit be6fd75

Browse files
committed
Added documentation for copy.existing
KAFKA-22 / KAFKA-65
1 parent 248b658 commit be6fd75

File tree

1 file changed

+30
-13
lines changed

1 file changed

+30
-13
lines changed

docs/source.md

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,19 +57,22 @@ The following document represents all possible fields that a change stream respo
5757

5858
### Source Connector Configuration Properties
5959

60-
| Name | Description | Type | Default | Valid Values | Importance |
61-
|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-----------------------------------------------------------|------------------------------------------------|------------|
62-
| connection.uri | The connection URI as supported by the official drivers. eg: ``mongodb://user@pass@locahost/``. | string | mongodb://localhost:27017,localhost:27018,localhost:27019 | A valid connection string | high |
63-
| database | The database to watch. If not set then all databases will be watched. | string | "" | | medium |
64-
| collection | The collection in the database to watch. If not set then all collections will be watched. | string | "" | | medium |
65-
| publish.full.document.only | Only publish the actual changed document rather than the full change stream document. Automatically, sets `change.stream.full.document=updateLookup` so updated documents will be included. | boolean | false | | high |
66-
| pipeline | An inline JSON array with objects describing the pipeline operations to run. Example: `[{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]` | string | [] | A valid JSON array | medium |
67-
| collation | The json representation of the Collation options to use for the change stream. Use the `Collation.asDocument().toJson()` to create the specific json representation. | string | "" | A valid JSON document representing a collation | high |
68-
| batch.size | The cursor batch size. | int | 0 | [0,...] | medium |
69-
| change.stream.full.document | Determines what to return for update operations when using a Change Stream. When set to 'updateLookup', the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from *some time* after the change occurred. | string | "" | An empty string OR [default, updatelookup] | high |
70-
| poll.await.time.ms | The amount of time to wait before checking for new results on the change stream | long | 5000 | [1,...] | low |
71-
| poll.max.batch.size | Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector. | int | 1000 | [1,...] | low |
72-
| topic.prefix | Prefix to prepend to database & collection names to generate the name of the Kafka topic to publish data to. | string | "" | | low |
60+
| Name | Description | Type | Default | Valid Values | Importance |
61+
|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-----------------------------------------------------------|------------------------------------------------|------------|
62+
| connection.uri | The connection URI as supported by the official drivers. eg: ``mongodb://user@pass@locahost/``. | string | mongodb://localhost:27017,localhost:27018,localhost:27019 | A valid connection string | high |
63+
| database | The database to watch. If not set then all databases will be watched. | string | "" | | medium |
64+
| collection | The collection in the database to watch. If not set then all collections will be watched. | string | "" | | medium |
65+
| publish.full.document.only | Only publish the actual changed document rather than the full change stream document. Automatically, sets `change.stream.full.document=updateLookup` so updated documents will be included. | boolean | false | | high |
66+
| pipeline | An inline JSON array with objects describing the pipeline operations to run. Example: `[{"$match": {"operationType": "insert"}}, {"$addFields": {"Kafka": "Rules!"}}]` | string | [] | A valid JSON array | medium |
67+
| collation | The json representation of the Collation options to use for the change stream. Use the `Collation.asDocument().toJson()` to create the specific json representation. | string | "" | A valid JSON document representing a collation | high |
68+
| batch.size | The cursor batch size. | int | 0 | [0,...] | medium |
69+
| change.stream.full.document | Determines what to return for update operations when using a Change Stream. When set to 'updateLookup', the change stream for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from *some time* after the change occurred. | string | "" | An empty string OR [default, updatelookup] | high |
70+
| poll.await.time.ms | The amount of time to wait before checking for new results on the change stream | long | 5000 | [1,...] | low |
71+
| poll.max.batch.size | Maximum number of change stream documents to include in a single batch when polling for new data. This setting can be used to limit the amount of data buffered internally in the connector. | int | 1000 | [1,...] | low |
72+
| topic.prefix | Prefix to prepend to database & collection names to generate the name of the Kafka topic to publish data to. | string | "" | | low |
73+
| copy.existing | Copy existing data from all the collections being used as the source then add any changes after. It should be noted that the reading of all the data during the copy and then the subsequent change stream events may produce duplicated events. During the copy, clients can make changes to the data in MongoDB, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent the changes can be applied so that the data is eventually consistent. Renaming a collection during the copying process is not supported. | boolean | false | | medium |
74+
| copy.existing.max.threads | The number of threads to use when performing the data copy. Defaults to the number of processors. | int | Defaults to the number of processors | [1,...] | medium |
75+
| copy.existing.queue.size | The max size of the queue to use when copying data. | int | 16000 | [1,...] | medium |
7376

7477

7578
### Custom pipelines
@@ -106,6 +109,20 @@ topic.prefix=mongo
106109

107110
In this case changes to the 'data' collection in the 'test' database will published to a topic called: 'mongo.test.data'.
108111

112+
### Copy existing data
113+
114+
The MongoDB Kafka Source connector can be configured to copy existing data from namespaces on to their given topic as insert events before broadcasting change stream events.
115+
It should be noted that the reading of all the data during the copy and then the subsequent change stream events may produce duplicated events. During the copy, clients can make
116+
changes to the data in MongoDB, which may be represented both by the copying process and the change stream. However, as the change stream events are idempotent the changes can be
117+
applied so that the data is eventually consistent. Renaming a collection during the copying process is not supported.
118+
119+
The following example, will copy all collections from the `example` database into topics and then broadcast any changes to the data in those collections.
120+
121+
```properties
122+
database=example
123+
copy.existing=true
124+
```
125+
109126
---
110127
### Next
111128

0 commit comments

Comments
 (0)