Skip to content

Commit 26d9db4

Browse files
committed
add change stream example
1 parent 7292077 commit 26d9db4

File tree

9 files changed

+200
-0
lines changed

9 files changed

+200
-0
lines changed

examples/change_stream/.formatter.exs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Used by "mix format"
2+
[
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
4+
]

examples/change_stream/README.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Change streams
2+
3+
This project shows a change stream example. You need to set up a replica set, because change streams are only available for replica sets and sharded clusters. For more information see
4+
5+
* https://www.mongodb.com/blog/post/an-introduction-to-change-streams
6+
* https://docs.mongodb.com/manual/changeStreams/
7+
8+
If you creating a new replica set then you need to create the database `db-1` first, before starting the example. Otherwise you will get some errors, because the database which we will observe does not exsists.
9+
10+
The `ChangeStream` module uses a GenServer for observing changes. It spawns a process to consume the documents returned by the change stream cursor:
11+
12+
pid = spawn(fn -> Enum.each(get_cursor(state), fn doc -> new_doc(doc) end) end)
13+
14+
While running this process you will receive some message:
15+
16+
* token: you get a token after a while. You can use this token to reconnect to the change stream without getting old change documents again.
17+
* documents: If data changes, you get a document which describes these changes
18+
19+
Let's start the program with `iex -S mix`:
20+
21+
iex(2)>
22+
16:10:05.018 [info] Connecting change stream
23+
24+
16:10:05.022 [info] Receiving new token nil
25+
26+
iex(3)> Mongo.insert_one(:mongo, "http_errors", %{url: "https://elixir-lang.org"})
27+
{:ok,
28+
%Mongo.InsertOneResult{
29+
acknowledged: true,
30+
inserted_id: #BSON.ObjectId<5d595c42306a5f0d87ab24e7>
31+
}}
32+
iex(4)>
33+
16:10:10.509 [info] Receiving new token %{"_data" => #BSON.Binary<825d595c420000000146645f696400645d595c42306a5f0d87ab24e7005a1004fefbdf8754024c339cd73f510a91db2b04>}
34+
35+
16:10:10.509 [info] Receiving new document %{"coll" => "http_errors", "db" => "db-1"}
36+
37+
16:10:10.509 [info] Got http error for url https://elixir-lang.org
38+
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# This file is responsible for configuring your application
2+
# and its dependencies with the aid of the Mix.Config module.
3+
use Mix.Config
4+
5+
# This configuration is loaded before any dependency and is restricted
6+
# to this project. If another project depends on this project, this
7+
# file won't be loaded nor affect the parent project. For this reason,
8+
# if you want to provide default values for your application for
9+
# third-party users, it should be done in your "mix.exs" file.
10+
11+
# You can configure your application as:
12+
#
13+
# config :change_stream, key: :value
14+
#
15+
# and access this configuration in your application as:
16+
#
17+
# Application.get_env(:change_stream, :key)
18+
#
19+
# You can also configure a third-party app:
20+
#
21+
# config :logger, level: :info
22+
#
23+
24+
# It is also possible to import configuration files, relative to this
25+
# directory. For example, you can emulate configuration per environment
26+
# by uncommenting the line below and defining dev.exs, test.exs and such.
27+
# Configuration from the imported file will override the ones defined
28+
# here (which is why it is important to import them last).
29+
#
30+
# import_config "#{Mix.env()}.exs"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
defmodule ChangeStream do
2+
use GenServer
3+
4+
require Logger
5+
6+
@collection "http_errors"
7+
@me __MODULE__
8+
9+
def start_link() do
10+
GenServer.start_link(__MODULE__, nil, name: @me)
11+
end
12+
13+
def new_token(token) do
14+
GenServer.cast(@me, {:token, token})
15+
end
16+
17+
def new_doc(doc) do
18+
GenServer.cast(@me, {:doc, doc})
19+
end
20+
21+
def init(_) do
22+
state = %{last_resume_token: nil}
23+
Process.send_after(self(), :connect, 3000)
24+
{:ok, state}
25+
end
26+
27+
def handle_info({:DOWN, _, :process, _pid, reason}, state) do
28+
Logger.info("#Cursor process is down: #{inspect reason}")
29+
Process.send_after(self(), :connect, 3000)
30+
{:noreply, state}
31+
end
32+
33+
def handle_info(:connect, state) do
34+
Logger.info("Connecting change stream")
35+
# Span a new process
36+
pid = spawn(fn -> Enum.each(get_cursor(state), fn doc -> new_doc(doc) end) end)
37+
38+
# Monitor the process
39+
Process.monitor(pid)
40+
41+
{:noreply, state}
42+
end
43+
44+
def handle_cast({:doc, doc}, state) do
45+
Logger.info("Receiving new document #{inspect doc["ns"]}")
46+
process_doc(doc)
47+
{:noreply, state}
48+
end
49+
50+
def handle_cast({:token, token}, state) do
51+
Logger.info("Receiving new token #{inspect token}")
52+
{:noreply, %{state | last_resume_token: token}}
53+
end
54+
55+
defp process_doc(%{"fullDocument" => %{"url" => url}, "ns" => %{"coll" => "http_errors", "db" => "db-1"}}) do
56+
Logger.info("Got http error for url #{url}")
57+
end
58+
59+
defp get_cursor(%{last_resume_token: nil}) do
60+
Mongo.watch_collection(:mongo, @collection, [], fn token -> new_token(token) end, max_time: 2_000)
61+
end
62+
defp get_cursor(%{last_resume_token: token}) do
63+
Mongo.watch_collection(:mongo, @collection, [], fn token -> new_token(token) end, max_time: 2_000, resume_after: token)
64+
end
65+
66+
end
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
defmodule ChangeStream.Application do
2+
3+
@moduledoc false
4+
5+
use Application
6+
import Supervisor.Spec
7+
8+
def start(_type, _args) do
9+
10+
children = [
11+
# this should be a replicat set!
12+
worker(Mongo, [[name: :mongo, url: "mongodb://localhost:27027/db-1", pool_size: 3]]),
13+
worker(ChangeStream, [])
14+
]
15+
16+
opts = [strategy: :one_for_one, name: ChangeStream.Supervisor]
17+
Supervisor.start_link(children, opts)
18+
end
19+
end

examples/change_stream/mix.exs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
defmodule ChangeStream.MixProject do
2+
use Mix.Project
3+
4+
def project do
5+
[
6+
app: :change_stream,
7+
version: "0.1.0",
8+
elixir: "~> 1.8",
9+
start_permanent: Mix.env() == :prod,
10+
deps: deps()
11+
]
12+
end
13+
14+
# Run "mix help compile.app" to learn about applications.
15+
def application do
16+
[
17+
extra_applications: [:logger],
18+
mod: {ChangeStream.Application, []}
19+
]
20+
end
21+
22+
# Run "mix help deps" to learn about dependencies.
23+
defp deps do
24+
[
25+
{:mongodb_driver, "~> 0.5"}
26+
]
27+
end
28+
end

examples/change_stream/mix.lock

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
%{
2+
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
3+
"db_connection": {:hex, :db_connection, "2.0.6", "bde2f85d047969c5b5800cb8f4b3ed6316c8cb11487afedac4aa5f93fd39abfa", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
4+
"decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"},
5+
"mongodb_driver": {:hex, :mongodb_driver, "0.5.7", "f29cab9a011f685210c472888ee3fde3573459323f839b67c4d942e067d9eda1", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0.6", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}], "hexpm"},
6+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
defmodule ChangeStreamTest do
2+
use ExUnit.Case
3+
doctest ChangeStream
4+
5+
test "greets the world" do
6+
assert ChangeStream.hello() == :world
7+
end
8+
end
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ExUnit.start()

0 commit comments

Comments
 (0)