Skip to content

Commit 5603c34

Browse files
Adding flock to JS examples (#85)
* Adding flock - Included flock js clone in examples - Added credentials-template.json as a template for crendentials.json - Added package.json file * Updating package.json file and report stats format * Adding docker and docker-compose files - Adding Dockerfile for the flock node app - Adding docker-compose yaml file for Dgraph Cluster - Adding docker-compose-flock yaml file for flock node app * Minor changes - Updating package.json file to rebuild - Replacing Done message with report status notice when success * Adding docs and images used in docs * Removing restart policy from docker-compose yml files * Fixing the rebuild tweak with a fix - Added .dockerignore file - Updated Dockerfile - Updated package.json file
1 parent 20f139a commit 5603c34

File tree

10 files changed

+415
-0
lines changed

10 files changed

+415
-0
lines changed

examples/flock/.dockerignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.git
2+
*Dockerfile*
3+
*docker-compose*
4+
node_modules

examples/flock/Dockerfile

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Alpine base image with NodeJS (lts)
2+
FROM node:lts-alpine
3+
4+
LABEL maintainer="Prashant Shahi <[email protected]>"
5+
6+
# Setting work directory
7+
WORKDIR /usr/src/app
8+
9+
# Copy the source code of app to docker daemon
10+
COPY . ./
11+
12+
# Install npm dependencies
13+
RUN npm install
14+
15+
# Run the node command
16+
CMD ["node", "index.js"]

examples/flock/README.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Flock
2+
3+
Flock loads real Twitter streams into Dgraph to make use of graph traversals.
4+
5+
This example follows the [Flock using Go](https://github.com/dgraph-io/flock) closely.
6+
7+
Flock has two parts :
8+
- [*Tweet loader*](./index.js) - It connects to [realtime Tweets][tweetsapi] via the
9+
Twitter Developer API and loads a graph model of Twitter into Dgraph via mutations.
10+
- [*Query client*](./client/index.js) - It runs interesting graph queries on the Tweets
11+
data stored in Dgraph.
12+
13+
Here is the graph schema of Flock:
14+
15+
![Schema](./schema.png)
16+
17+
[tweetsapi]: https://developer.twitter.com/en/docs/tweets/sample-realtime/overview/GET_statuse_sample
18+
19+
# Running Flock
20+
21+
## Obtaining Twitter credentials
22+
23+
We need to create a Twitter developer account and an app to be able to fetch a stream
24+
of Tweets using their APIs. Let's start with how to create a Twitter developer account.
25+
26+
- Apply for a Twitter developer account [here](https://developer.twitter.com/en/apply/user)
27+
and follow the instructions. The series of steps would end with your email verification.
28+
- Create a Twitter app from [this link](https://developer.twitter.com/en/apps/create).
29+
All fields are `not` required.
30+
- You'll be redirected to the App details tab after creating the app. Go to the
31+
`Keys and tokens` tab and create new access and secret tokens.
32+
![Twitter Developer account](./twitter-keys.png)
33+
- Create a copy of the credentials template.
34+
```sh
35+
cp credentials-template.json credentials.json
36+
```
37+
- Open the `crendentials.json` file and replace the placeholders with the keys from
38+
the Twitter app's `Keys and token` tab.
39+
40+
---
41+
## Setup
42+
43+
- Clone the repository.
44+
```sh
45+
$ git clone https://github.com/dgraph-io/dgraph-js.git
46+
$ cd dgraph-js/examples/flock
47+
```
48+
49+
- Export the persistent data directory. Since Dgraph is run using Docker containers, it
50+
is nice to mount a directory on the host machine to persist the data across multiple runs.
51+
```sh
52+
$ mkdir ~/dgraph
53+
$ export DATA_DIR=~/dgraph
54+
```
55+
56+
- If you're running Linux, you can add the current user to the `docker` group to use
57+
Docker as a non-root user. The `newgrp` command creates a new terminal session.
58+
It is necessary after the user modification to see the effects.
59+
60+
```
61+
$ sudo usermod -aG docker $USER
62+
$ newgrp docker
63+
```
64+
65+
- Ensure that `credentials.json` with the valid Twitter credentials exist in
66+
the root directory of Flock.
67+
68+
- Start the Dgraph servers and Ratel with Docker Compose. Visit http://localhost:8000
69+
on your browser to view the UI.
70+
71+
```sh
72+
$ docker-compose up
73+
```
74+
75+
- On another terminal, start Flock:
76+
77+
```sh
78+
$ docker-compose -f docker-compose-flock.yml up
79+
```
80+
81+
Flock will begin printing out periodic log messages mentioning its
82+
loading rate. You're good to go if you see the `Commit Rate` higher
83+
than 0/sec, which means data has been successfully committed to
84+
Dgraph.
85+
86+
A few minutes of running Flock is sufficient to get enough data for
87+
some interesting queries. To stop running Flock, press Ctrl+C on the
88+
terminal running Flock.
89+
90+
```sh
91+
$ docker-compose -f docker-compose-flock.yml up
92+
...
93+
<Ctrl+C>
94+
Killing flock ... done
95+
```
96+
97+
---
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"access_token_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
3+
"access_token_key": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
4+
"consumer_key": "XXXXXXXXXXXXXXXXXXXXXXXXX",
5+
"consumer_secret": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
6+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
version: "3.5"
2+
services:
3+
flock:
4+
build: .
5+
container_name: flock
6+
labels:
7+
cluster: flock-cluster
8+
environment:
9+
- ALPHA_ADDR=localhost:9080
10+
- LOG_INTERVAL_TIME=2000
11+
network_mode: "host"
12+
volumes:
13+
- ./credentials.json:/usr/src/app/credentials.json

examples/flock/docker-compose.yml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
version: "3.2"
2+
services:
3+
zero:
4+
image: dgraph/dgraph:v1.1.0
5+
container_name: zero
6+
labels:
7+
cluster: flock-cluster
8+
ports:
9+
- 5080:5080
10+
- 6080:6080
11+
volumes:
12+
- type: bind
13+
source: $DATA_DIR
14+
target: /dgraph
15+
command: dgraph zero --my=zero:5080
16+
alpha:
17+
container_name: alpha
18+
image: dgraph/dgraph:v1.1.0
19+
labels:
20+
cluster: flock-cluster
21+
ports:
22+
- 8080:8080
23+
- 9080:9080
24+
volumes:
25+
- type: bind
26+
source: $DATA_DIR
27+
target: /dgraph
28+
command: dgraph alpha --my=alpha:7080 --lru_mb=2048 --zero=zero:5080
29+
ratel:
30+
container_name: ratel
31+
image: dgraph/dgraph:v1.1.0
32+
labels:
33+
cluster: flock-cluster
34+
volumes:
35+
- type: volume
36+
source: dgraph
37+
target: /dgraph
38+
volume:
39+
nocopy: true
40+
ports:
41+
- 8000:8000
42+
command: dgraph-ratel
43+
44+
volumes:
45+
dgraph:

examples/flock/index.js

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
const dgraph = require('dgraph-js');
2+
const grpc = require('grpc');
3+
const twitter = require('twitter');
4+
5+
const creds = require('./credentials.json');
6+
const client = new twitter(creds);
7+
8+
const ALPHA_ADDR = process.env.ALPHA_ADDR || "localhost:9080"
9+
const LOG_INTERVAL_TIME = process.env.LOG_INTERVAL_TIME || 2000;
10+
const startStatus = new Date().getTime();
11+
12+
let lastStatus = 0;
13+
let retry = true;
14+
let failureCount = 0;
15+
let totalTweets = 0;
16+
let oldTotalTweets = 0;
17+
let retryCount = 0;
18+
let errorCount = 0;
19+
20+
const dgraphClientStub = new dgraph.DgraphClientStub(ALPHA_ADDR, grpc.credentials.createInsecure());
21+
const dgraphClient = new dgraph.DgraphClient(dgraphClientStub);
22+
23+
async function setSchema() {
24+
const schema = `
25+
type Tweet {
26+
id_str: string
27+
created_at: dateTime
28+
message: string
29+
urls: [string]
30+
hashtags: [string]
31+
author: [User]
32+
mention: [User]
33+
retweet: bool
34+
}
35+
36+
type User {
37+
user_id: string
38+
user_name: string
39+
screen_name: string
40+
description: string
41+
friends_count: int
42+
verified: bool
43+
profile_banner_url: string
44+
profile_image_url: string
45+
}
46+
47+
user_id: string @index(exact) .
48+
user_name: string @index(hash) .
49+
screen_name: string @index(term) .
50+
id_str: string @index(exact) .
51+
created_at: dateTime @index(hour) .
52+
urls: [string] @index(term) .
53+
hashtags: [string] @index(exact) .
54+
mention: [uid] @count @reverse .
55+
author: [uid] @count @reverse .
56+
`;
57+
const op = new dgraph.Operation();
58+
op.setSchema(schema)
59+
await dgraphClient.alter(op);
60+
}
61+
62+
async function upsertData(jsonObj, query) {
63+
try {
64+
const mu = new dgraph.Mutation();
65+
mu.setSetJson(jsonObj);
66+
67+
const req = new dgraph.Request();
68+
req.setMutationsList([mu]);
69+
req.setQuery(query);
70+
req.setCommitNow(true);
71+
72+
await dgraphClient.newTxn().doRequest(req);
73+
} catch (err) {
74+
const errMsg = err.message;
75+
if (errMsg.includes('connection refused')) {
76+
// wait for alpha to restart
77+
console.log('ERROR Connection refused... waiting a bit');
78+
await wait(5000);
79+
} else if (errMsg.includes('already been committed or discarded')) {
80+
failureCount += 1;
81+
} else if (retry && errMsg.includes('Please retry')) {
82+
retryCount += 1;
83+
await wait(100);
84+
retry = false;
85+
await upsertData(jsonObj, query);
86+
} else {
87+
errorCount += 1;
88+
console.log(`ERROR Unable to commit.\n${err}\n`);
89+
}
90+
}
91+
}
92+
93+
async function filterTweet(tweet) {
94+
const userMentions = [];
95+
const usersObject = [];
96+
usersObject[tweet.user.id_str] = 'uid(u)';
97+
tweet.entities.user_mentions.forEach((element, index) => {
98+
let uid;
99+
if (usersObject[element.id_str] != undefined) {
100+
uid = usersObject[element.id_str];
101+
} else {
102+
uid = `uid(m${index+1})`;
103+
usersObject[element.id_str] = uid;
104+
}
105+
userMentions.push({
106+
'uid': uid,
107+
'user_id': element.id_str,
108+
'dgraph.type': 'User',
109+
'user_name': element.name,
110+
'screen_name': element.screen_name,
111+
});
112+
});
113+
const hashtags = [];
114+
tweet.entities.hashtags.forEach((element) => {
115+
hashtags.push(element.text);
116+
});
117+
const author = {
118+
'uid': `uid(u)`,
119+
'user_id': tweet.user.id_str,
120+
'dgraph.type': 'User',
121+
'user_name': tweet.user.name,
122+
'screen_name': tweet.user.screen_name,
123+
'description': tweet.user.description,
124+
'friends_count': tweet.user.friends_count,
125+
'followers_count': tweet.user.followers_count,
126+
'verified': tweet.user.verified,
127+
'profile_banner_url': tweet.user.profile_banner_url,
128+
'profile_image_url': tweet.user.profile_image_url,
129+
};
130+
const userObj = {
131+
'uid': `uid(t)`,
132+
'id_str': tweet.id_str,
133+
'dgraph.type': 'Tweet',
134+
'created_at': new Date(tweet.created_at),
135+
'message': tweet.text,
136+
'urls': tweet.urls,
137+
'hashtags': hashtags,
138+
'mention': userMentions,
139+
'author': author,
140+
};
141+
return userObj;
142+
}
143+
144+
async function buildQuery(tweet) {
145+
const usersObject = [];
146+
const query = [];
147+
148+
query.push(`t as var(func: eq(id_str, "${tweet.id_str}"))`);
149+
query.push(`u as var(func: eq(user_id, "${tweet.author.user_id}"))`);
150+
151+
usersObject[tweet.author.user_id] = 'u';
152+
153+
tweet.mention.forEach((element, index) => {
154+
let name;
155+
if (usersObject[element.user_id] != undefined) {
156+
name = usersObject[element.user_id];
157+
} else {
158+
name = `m${index+1}`;
159+
query.push(`${name} as var(func: eq(user_id, ${element.user_id}))`);
160+
usersObject[element.user_id] = name;
161+
}
162+
});
163+
164+
const finalQuery = `query {${query.join('\n')}}`;
165+
return finalQuery;
166+
}
167+
168+
function reportStats() {
169+
const now = new Date().getTime();
170+
// tslint:disable-next-line no-console
171+
console.log(`STATS Tweets: ${totalTweets}, Failues: ${failureCount}, Retries: ${retryCount}, \
172+
Errors: ${errorCount}, Commit Rate: ${(totalTweets-oldTotalTweets)/(LOG_INTERVAL_TIME/1000)} Total Time: ${now - startStatus} ms`);
173+
oldTotalTweets = totalTweets;
174+
}
175+
176+
async function wait(time) {
177+
return new Promise((resolve) => {
178+
const id = setTimeout(
179+
() => {
180+
clearTimeout(id);
181+
resolve();
182+
},
183+
time,
184+
);
185+
});
186+
}
187+
188+
async function main() {
189+
await setSchema();
190+
setInterval(reportStats, LOG_INTERVAL_TIME);
191+
client.stream('statuses/sample.json', function(stream) {
192+
stream.on('data', async function(tweet) {
193+
totalTweets += 1;
194+
const tweetObj = await filterTweet(tweet);
195+
const queries = await buildQuery(tweetObj);
196+
retry = true;
197+
await upsertData(tweetObj, queries);
198+
});
199+
stream.on('error', function(error) {
200+
console.log(error);
201+
});
202+
});
203+
}
204+
205+
main().then(() => {
206+
console.log(`\nReporting stats every ${LOG_INTERVAL_TIME/1000} seconds\n`)
207+
}).catch((e) => {
208+
console.log(e);
209+
});

0 commit comments

Comments
 (0)