Skip to content

Commit d4e83e7

Browse files
Started vstream example/doc
Signed-off-by: Rohit Nayak <[email protected]>
1 parent fe9d4e7 commit d4e83e7

File tree

6 files changed

+120
-162
lines changed

6 files changed

+120
-162
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Notes on VReplication
1+
# A Rough Guide to Vitess VReplication
22

33
## Table Of Contents
44
1. [Introduction](#)
@@ -13,7 +13,7 @@
1313
1. [Change Data Capture](./use-cases/change-data-capture.md)
1414
1. [Command Reference](./commands/commands.md)
1515
1. [TBD: Performance and Scalability](./misc/scalability.md)
16-
1. [Internals](./misc/internals.md)
16+
1. [Internals](./internals/internals.md)
1717
1. [Frequently Asked Questions](./misc/faq.md)
1818

1919
---

examples/vstream/vstream_client.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"log"
8+
9+
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
10+
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
11+
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
12+
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
13+
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
14+
)
15+
16+
func main() {
17+
//vstreamCurrent()
18+
vstreamCopy()
19+
}
20+
21+
var vgtid_current_customerAllShards = &binlogdatapb.VGtid{
22+
ShardGtids: []*binlogdatapb.ShardGtid{{
23+
Keyspace: "customer",
24+
Shard: "-80",
25+
Gtid: "current",
26+
},{
27+
Keyspace: "customer",
28+
Shard: "80-",
29+
Gtid: "current",
30+
}},
31+
}
32+
33+
var vgtid_copy_commerce = &binlogdatapb.VGtid{
34+
ShardGtids: []*binlogdatapb.ShardGtid{{
35+
Keyspace: "customer",
36+
Shard: "80-",
37+
Gtid: "",
38+
}},
39+
}
40+
41+
func vstreamCurrent() {
42+
filter := &binlogdatapb.Filter{
43+
Rules: []*binlogdatapb.Rule{{
44+
Match: "/.*/",
45+
}},
46+
}
47+
startVStream(vgtid_current_customerAllShards, filter)
48+
}
49+
50+
func vstreamCopy() {
51+
filter := &binlogdatapb.Filter{
52+
Rules: []*binlogdatapb.Rule{{
53+
Match: "/customer/",
54+
}},
55+
}
56+
startVStream(vgtid_copy_commerce, filter)
57+
}
58+
59+
func startVStream(vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter) {
60+
ctx := context.Background()
61+
conn, err := vtgateconn.Dial(ctx, "localhost:15991")
62+
if err != nil {
63+
log.Fatal(err)
64+
}
65+
defer conn.Close()
66+
reader, err := conn.VStream(ctx, topodatapb.TabletType_MASTER, vgtid, filter)
67+
for {
68+
e, err := reader.Recv()
69+
switch err {
70+
case nil:
71+
for i, ev := range e {
72+
fmt.Printf("%d:%v\n", i, ev)
73+
}
74+
case io.EOF:
75+
fmt.Printf("stream ended\n")
76+
default:
77+
fmt.Printf("remote error: %v\n", err)
78+
}
79+
}
80+
}

internals/internals.md

Lines changed: 3 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -1,159 +1,5 @@
11
# VReplication Internals
22

3-
4-
![VReplication Flow](../images/VReplicationFlow.png)
5-
6-
7-
### Introduction
8-
9-
The diagram above outlines how a VReplication workflow is performed. VReplication can be asked to start
10-
from a specific GTID or from the start. When starting from a GTID the _replication_ mode is used
11-
where it streams events from the binlog.
12-
13-
14-
#### Full table copy
15-
16-
When starting from the beginning the simple streaming done by _replication_ can create an avalanche of events
17-
(think 10s of millions of rows). To speed things up a _copy/catchup_ mode is initiated first: data in the tables
18-
are copied over in a consistent manner using bulk inserts. Once we have copied enough data so that we are close
19-
enough to the current position (when replication lag is low) it switches over (and stays in) the _replication_ mode.
20-
21-
While we may have multiple database sources in a workflow each vstream has just one source and one target. The source
22-
is always a vttablet (and hence one mysql instance). The target could be another vttablet (resharding) or
23-
a streaming grpc response (vstream api clients).
24-
25-
#### Transformation and Filtering
26-
27-
Note that for all steps the data selected from the source will only be from the list of tables specified
28-
(specified via Match). Furthermore if a Filter is specified for a table it will be applied before being sent to
29-
the target. Columns may also be transformed in the Filter’s select clause.
30-
31-
32-
#### Source and Sink
33-
34-
Each stream has two parts. The target initiates streaming by making grpc calls to the source tablet. The source
35-
sources the data connecting to mysql as a slave or using sql queries and streams it to the target. The target
36-
takes appropriate action: in case of resharding it will convert the events into CRUDs and apply it to the
37-
target database. In case of vstream clients the events are forwarded by vtgate to the client.
38-
39-
Note that the target always pulls the data. This ensures that there is no problems of buffer overruns that
40-
can occur if the source is pushing the data since (especially in sharding) it is possible that the application
41-
of events can be substantially cpu intensive especially in the case of bulk inserts.
42-
43-
44-
#### State management:
45-
46-
\_vt.vreplication
47-
\_vt.copy_state
48-
49-
50-
### Modes, in detail
51-
52-
53-
#### Replicate
54-
55-
This is the easiest step to understand. The source stream just mimics a mysql slave and processes events as
56-
they are received. Events (after filtering and transformation) are sent to the target. Replication runs continuously
57-
with short sleeps when there are no more events to source.
58-
59-
60-
#### Initialize
61-
62-
Initialize is called at the start of the copy phase. For each table to be copied an entry is created in \_vt.copy_state
63-
with a zero primary key. As each table copy is completed the related entry is deleted and when there are no more
64-
entries for this workflow the copy phase is considered complete and the workflow moves into the Replication mode.
65-
66-
67-
#### Copy
68-
69-
Copy works on one table at a time. The source selects a set of rows from the table with higher primary keys
70-
that the one copied so far using a consistent snapshot. This results in a stream of rows to be sent to the
71-
target which generates a bulk insert of these rows.
72-
73-
However there are a couple of factors which complicate our story::
74-
75-
* Each copy selects all rows until the current position of the binlog.
76-
* Since transactions continue to be applied (presuming the database is online) the gtid positions are continuously moving forward
77-
78-
Consider this example.
79-
80-
We have two tables t1 and t2 and this is how the copy state proceeds: Each has 20 rows and we copy 10 rows at a time.
81-
(Queries are not exact but simplified for readability).
82-
83-
If we follow this we get:
84-
85-
T1: select * from t1 where pk > 0 limit 10. GTID: 100, Last PK 10
86-
87-
send rows to target
88-
89-
T2: select * from t1 where pk > 10 limit 10 GTID: 110, Last PK 20
90-
91-
send rows to target
92-
93-
Gotcha: however we see that 10 new txs have occurred since T1. Some of these can potentially modify the rows
94-
returned from the query at T1. Hence if we just return the rows from T2 (which have only rows from pk 11 to 20)
95-
we will have an inconsistent state on the target: the updates to rows with PK between 1 and 10 will not be present.
96-
97-
This means that we need to first stream the events between 100 to 110 for PK between 1 and 10 first
98-
and then do the second select:
99-
100-
T1: select * from t1 where pk > 0 limit 10. GTID: 100, Last PK 10
101-
102-
send rows to target
103-
104-
T2: replicate from 100 to current position (110 from previous example),
105-
106-
only pass events for pks 1 to 10
107-
108-
T3: select * from t1 where pk > 10 limit 10 GTID: 112, Last PK 20
109-
110-
send rows to target
111-
112-
Another gotcha!: Note that at T3 when we selected the pks from 11 to 20 the gtid position has moved further! This happened because of transactions that were applied between T2 and T3. So if we just applied the rows from T3 we would still have an inconsistent state, if transactions 111 and 112 affected the rows from pks 1 to 10.
113-
114-
This leads us to the following flow:
115-
116-
T1: select * from t1 where pk > 0 limit 10. GTID: 100, Last PK 10 \
117-
118-
send rows to target
119-
120-
121-
T2: replicate from 100 to current position (110 from previous example),
122-
123-
only pass events for pks 1 to 10
124-
125-
T3: select * from t1 where pk > 10 limit 10 GTID: 112, Last PK 20
126-
127-
T4: replicate from 111 to 112
128-
129-
only pass events for pks 1 to 10
130-
131-
T5: Send rows for pks 11 to 20 to target
132-
133-
This flow actually works!
134-
135-
T1 can take a long time (due to the bulk inserts). T3 (which is just a snapshot) is quick. So the position can
136-
diverge much more at T2 than at T4. Hence we call the step in T2 as Catchup and Step T4 is called Fast Forward.
137-
138-
139-
#### Catchup
140-
141-
As detailed above the catchup phase runs between two copy phases. During the copy phase the gtid position
142-
can move significantly ahead. So we run a replicate till we come close to the current position i.e.the replication
143-
lag is small. At this point we call Copy again.
144-
145-
146-
#### Fast forward
147-
148-
During the copy phase we first take a snapshot. Then we fast forward: we run another replicate from the gtid position
149-
where we stopped the Catchup to the position of the snapshot.
150-
151-
Finally once we have finished copying all the tables we proceed to replicate until our job is done: for example if
152-
we have resharded and switched over the reads and writes to the new shards or when the vstream client
153-
closes its connection.
154-
155-
156-
# VStream Internals
157-
158-
159-
![VStream Design](../images/VStream.svg)
3+
## Table Of Contents
4+
1. [VReplication Internals](./vreplication.md)
5+
1. [VStream API](./vstream-api.md)

internals/vstream-api.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
11
# VStream API Internals
22

3+
The VStream API is a grpc API provided by VTGate. It lets you stream events for one or more
4+
tables in Vitess in a shard-aware and customized way. You can think of it as a filtered binlog streamer.
5+
6+
Here is a high-level design of how VStream works:
37

48
![VStream Design](../images/VStream.svg)
9+
10+
VStreams can work in one of two modes:
11+
12+
1. Where you specify a GTID position to stream from.
13+
*current* is also a valid position
14+
which maps to last GTID. Note that if the data you request belongs to multiple shards, each
15+
shard will have its own GTID position.
16+
17+
2. When you want to stream from the start (i.e. the "first" GTID)
18+
This mode is referred to as VStream Copy
19+
20+
Let's look at a few examples of VStreams to get a better understanding
21+
22+
1. Streaming a set of tables
23+
24+
TBD

todos.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66

77
Next:
88

9-
* Go over Vreplication Internals
10-
119
* Write VStream API internals (example will be the change data capture use case)
1210

1311
* Related schema tables

use-cases/change-data-capture.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,15 @@
1-
# Streaming data from Vitess for Change Data Capture
1+
# Streaming data from Vitess for Change Data Capture
2+
3+
The VStream API can be used to stream events from your database cluster. We will
4+
go through an example of using VStream API. A more detailed reference is available
5+
at [VStream API Internals](../internals/vstream-api.md)
6+
7+
The VStream API is accessible over grpc. Our example is in golang.
8+
9+
We will reuse the local example to create a Vitess cluster from which to
10+
stream from.
11+
12+
From the vitess root directory run test/local_example.sh _after_ commenting
13+
out the last line in the script that tears down the installation.
14+
15+
TBD

0 commit comments

Comments
 (0)