Skip to content

Commit bda3cf7

Browse files
author
Isabella Siu
committed
GODRIVER-194 add ChangeStream examples for docs
Change-Id: Ifd84577c877ef2738f555eb8eadc4cb192fda42c
1 parent 7890aa5 commit bda3cf7

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed

examples/documentation_examples/examples.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ import (
1313
"context"
1414
"io/ioutil"
1515
logger "log"
16+
"sync/atomic"
1617
"testing"
18+
"time"
1719

1820
"github.com/mongodb/mongo-go-driver/bson"
1921
"github.com/mongodb/mongo-go-driver/bson/primitive"
@@ -1934,3 +1936,110 @@ func TransactionsExamples(ctx context.Context, client *mongo.Client) error {
19341936
}
19351937

19361938
// End Transactions Retry Example 3
1939+
1940+
// ChangeStreamExamples contains examples of changestream operations.
1941+
func ChangeStreamExamples(t *testing.T, db *mongo.Database) {
1942+
ctx := context.Background()
1943+
1944+
coll := db.Collection("inventory_changestream")
1945+
1946+
err := coll.Drop(context.Background())
1947+
require.NoError(t, err)
1948+
1949+
_, err = coll.InsertOne(ctx, bson.D{{"x", int32(1)}})
1950+
require.NoError(t, err)
1951+
1952+
var stop int32
1953+
1954+
doInserts := func(coll *mongo.Collection) {
1955+
for atomic.LoadInt32(&stop) == 0 {
1956+
_, err = coll.InsertOne(ctx, bson.D{{"x", 1}})
1957+
time.Sleep(10 * time.Millisecond)
1958+
coll.DeleteOne(ctx, bson.D{{"x", 1}})
1959+
}
1960+
}
1961+
1962+
go doInserts(coll)
1963+
1964+
{
1965+
// Start Changestream Example 1
1966+
1967+
cs, err := coll.Watch(ctx, mongo.Pipeline{})
1968+
require.NoError(t, err)
1969+
defer cs.Close(ctx)
1970+
1971+
ok := cs.Next(ctx)
1972+
next := cs.Current
1973+
1974+
// End Changestream Example 1
1975+
1976+
require.True(t, ok)
1977+
require.NoError(t, err)
1978+
require.NotEqual(t, len(next), 0)
1979+
}
1980+
{
1981+
// Start Changestream Example 2
1982+
1983+
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetFullDocument(options.UpdateLookup))
1984+
require.NoError(t, err)
1985+
defer cs.Close(ctx)
1986+
1987+
ok := cs.Next(ctx)
1988+
next := cs.Current
1989+
1990+
// End Changestream Example 2
1991+
1992+
require.True(t, ok)
1993+
require.NoError(t, err)
1994+
require.NotEqual(t, len(next), 0)
1995+
}
1996+
1997+
{
1998+
original, err := coll.Watch(ctx, mongo.Pipeline{})
1999+
require.NoError(t, err)
2000+
defer original.Close(ctx)
2001+
2002+
ok := original.Next(ctx)
2003+
require.True(t, ok)
2004+
2005+
next := original.Current
2006+
// Start Changestream Example 3
2007+
resumeToken := next.Lookup("_id").Document()
2008+
2009+
cs, err := coll.Watch(ctx, mongo.Pipeline{}, options.ChangeStream().SetResumeAfter(resumeToken))
2010+
require.NoError(t, err)
2011+
defer cs.Close(ctx)
2012+
2013+
ok = cs.Next(ctx)
2014+
result := cs.Current
2015+
2016+
// End Changestream Example 3
2017+
2018+
require.True(t, ok)
2019+
require.NoError(t, err)
2020+
require.NotEqual(t, len(result), 0)
2021+
}
2022+
2023+
{
2024+
// Start Changestream Example 4
2025+
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"$or",
2026+
bson.A{
2027+
bson.D{{"fullDocument.username", "alice"}},
2028+
bson.D{{"operationType", "delete"}}}}},
2029+
}}}
2030+
cs, err := coll.Watch(ctx, pipeline)
2031+
require.NoError(t, err)
2032+
defer cs.Close(ctx)
2033+
2034+
ok := cs.Next(ctx)
2035+
next := cs.Current
2036+
2037+
// End Changestream Example 4
2038+
2039+
require.True(t, ok)
2040+
require.NoError(t, err)
2041+
require.NotEqual(t, len(next), 0)
2042+
}
2043+
2044+
atomic.StoreInt32(&stop, 1)
2045+
}

examples/documentation_examples/examples_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,22 @@ func TestTransactionExamples(t *testing.T) {
6161
require.NoError(t, err)
6262
}
6363

64+
func TestChangeStreamExamples(t *testing.T) {
65+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
66+
defer cancel()
67+
cs := testutil.ConnString(t)
68+
client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(cs.String()))
69+
require.NoError(t, err)
70+
defer client.Disconnect(ctx)
71+
72+
db := client.Database("changestream_examples")
73+
ver, err := getServerVersion(ctx, client)
74+
if err != nil || testutil.CompareVersions(t, ver, "3.6") < 0 || os.Getenv("TOPOLOGY") != "replica_set" {
75+
t.Skip("server does not support changestreams")
76+
}
77+
documentation_examples.ChangeStreamExamples(t, db)
78+
}
79+
6480
func getServerVersion(ctx context.Context, client *mongo.Client) (string, error) {
6581
serverStatus, err := client.Database("admin").RunCommand(
6682
ctx,

0 commit comments

Comments
 (0)