Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 0717160

Browse files
jpignatamikedeck
authored andcommitted
Add Data Processing workshop website
This workshop is based upon the material originally built for the Wild Rydes serverless workshops. Instead of using GitHub to render Markdown files, this version uses a small amount of glue code to generate a static HTML site that can be hosted on Amazon S3. This commit brings these materials back into the repo. This allows the maintainers the options of leveraging the static site generator I put together or including the content I built in another content system.
1 parent 4fa90fc commit 0717160

File tree

67 files changed

+4013
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+4013
-0
lines changed

DataProcessing/client/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
consumer
2+
producer
3+
client.tar

DataProcessing/client/Makefile

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
.PHONY: all clean
2+
3+
all: consumer producer
4+
tar -cvf client.tar producer consumer
5+
6+
consumer:
7+
go build consumer.go
8+
9+
producer:
10+
go build producer.go
11+
12+
clean:
13+
rm client.tar producer consumer

DataProcessing/client/consumer.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"flag"
7+
"fmt"
8+
"os"
9+
"sync"
10+
"time"
11+
12+
"github.com/aws/aws-sdk-go/aws"
13+
"github.com/aws/aws-sdk-go/aws/session"
14+
"github.com/aws/aws-sdk-go/service/kinesis"
15+
)
16+
17+
func main() {
18+
stream := flag.String("stream", "wildrydes", "Stream Name")
19+
flag.Parse()
20+
21+
sess := session.Must(
22+
session.NewSessionWithOptions(
23+
session.Options{
24+
SharedConfigState: session.SharedConfigEnable,
25+
},
26+
),
27+
)
28+
29+
pollShards(kinesis.New(sess), stream)
30+
}
31+
32+
func pollShards(client *kinesis.Kinesis, stream *string) {
33+
var wg sync.WaitGroup
34+
35+
streamDescription, err := client.DescribeStream(
36+
&kinesis.DescribeStreamInput{
37+
StreamName: stream,
38+
},
39+
)
40+
41+
if err != nil {
42+
fmt.Println(err)
43+
os.Exit(1)
44+
}
45+
46+
for _, shard := range streamDescription.StreamDescription.Shards {
47+
go getRecords(client, stream, shard.ShardId)
48+
wg.Add(1)
49+
}
50+
51+
wg.Wait()
52+
}
53+
54+
func getRecords(client *kinesis.Kinesis, stream *string, shardID *string) {
55+
shardIteratorRes, err := client.GetShardIterator(
56+
&kinesis.GetShardIteratorInput{
57+
StreamName: stream,
58+
ShardId: shardID,
59+
ShardIteratorType: aws.String("LATEST"),
60+
},
61+
)
62+
63+
if err != nil {
64+
fmt.Println(err)
65+
os.Exit(1)
66+
}
67+
68+
shardIterator := shardIteratorRes.ShardIterator
69+
ticker := time.NewTicker(time.Second)
70+
71+
for range ticker.C {
72+
records, err := client.GetRecords(
73+
&kinesis.GetRecordsInput{
74+
ShardIterator: shardIterator,
75+
},
76+
)
77+
78+
if err != nil {
79+
fmt.Println(err)
80+
os.Exit(1)
81+
}
82+
83+
for _, record := range records.Records {
84+
var prettyJSON bytes.Buffer
85+
86+
if err := json.Indent(&prettyJSON, record.Data, "", " "); err != nil {
87+
fmt.Println(err)
88+
os.Exit(1)
89+
}
90+
91+
fmt.Print(string(prettyJSON.Bytes()))
92+
}
93+
94+
shardIterator = records.NextShardIterator
95+
}
96+
}

DataProcessing/client/producer.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"flag"
6+
"fmt"
7+
"math"
8+
"math/rand"
9+
"os"
10+
"time"
11+
12+
"github.com/aws/aws-sdk-go/aws/session"
13+
"github.com/aws/aws-sdk-go/service/kinesis"
14+
)
15+
16+
const (
17+
latitudeStart = 40.749167
18+
longitudeStart = -73.985184
19+
latitudeOffset = 110540
20+
longitudeOffset = 111320
21+
pointsStart = 150
22+
minDistance = 29.0
23+
maxDistance = 31.0
24+
)
25+
26+
type unicornStatus struct {
27+
Distance float64
28+
HealthPoints int
29+
Latitude float64
30+
Longitude float64
31+
MagicPoints int
32+
Name *string
33+
StatusTime string
34+
}
35+
36+
func main() {
37+
name := flag.String("name", "Shadowfax", "Unicorn Name")
38+
stream := flag.String("stream", "wildrydes", "Stream Name")
39+
40+
flag.Parse()
41+
42+
sess := session.Must(
43+
session.NewSessionWithOptions(
44+
session.Options{
45+
SharedConfigState: session.SharedConfigEnable,
46+
},
47+
),
48+
)
49+
50+
simulateUnicorn(kinesis.New(sess), name, stream)
51+
}
52+
53+
func simulateUnicorn(client *kinesis.Kinesis, name, stream *string) {
54+
rand.Seed(time.Now().UnixNano())
55+
56+
magicPoints := pointsStart
57+
healthPoints := pointsStart
58+
latitude := latitudeStart
59+
longitude := longitudeStart
60+
bearing := rand.Float64() * math.Pi * 2
61+
ticker := time.NewTicker(time.Second)
62+
63+
for range ticker.C {
64+
magicPoints = nextPoints(magicPoints)
65+
healthPoints = nextPoints(healthPoints)
66+
distance := float64(rand.Intn(maxDistance-minDistance)+minDistance) + rand.Float64()
67+
latitude, longitude = nextLocation(latitude, longitude, bearing, distance)
68+
status, _ := json.Marshal(
69+
&unicornStatus{
70+
Distance: distance,
71+
HealthPoints: healthPoints,
72+
Latitude: latitude,
73+
Longitude: longitude,
74+
MagicPoints: magicPoints,
75+
Name: name,
76+
StatusTime: time.Now().Format("2006-01-02 15:04:05.000"),
77+
},
78+
)
79+
putRecordInput := &kinesis.PutRecordInput{
80+
Data: append([]byte(status), "\n"...),
81+
PartitionKey: name,
82+
StreamName: stream,
83+
}
84+
85+
if _, err := client.PutRecord(putRecordInput); err != nil {
86+
fmt.Println(err)
87+
os.Exit(1)
88+
}
89+
90+
fmt.Print(".")
91+
}
92+
}
93+
94+
func nextLocation(latitude, longitude, bearing float64, distance float64) (nextLatitude, nextLongitude float64) {
95+
nextLatitude = latitude + float64(distance)*math.Sin(bearing)/latitudeOffset
96+
nextLongitude = longitude + float64(distance)*math.Cos(bearing)/(longitudeOffset*math.Cos(math.Pi*latitude/180))
97+
98+
return
99+
}
100+
101+
func nextPoints(points int) int {
102+
y := rand.Intn(2)
103+
104+
if rand.Int()%2 == 0 {
105+
y = y * -1
106+
}
107+
108+
return points + y
109+
}

DataProcessing/guide/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.DS_Store

DataProcessing/guide/Dockerfile

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
FROM ruby:2.3-alpine
2+
3+
RUN apk --no-cache add make gcc libc-dev libffi-dev
4+
5+
ADD https://github.com/jgm/pandoc/releases/download/2.1/pandoc-2.1-linux.tar.gz /tmp
6+
7+
WORKDIR /tmp
8+
9+
RUN tar -zxvf pandoc-2.1-linux.tar.gz && \
10+
mv pandoc-2.1/bin/pandoc /usr/local/bin && \
11+
rm -rf pandoc-2.1
12+
13+
COPY Gemfile Gemfile.lock /tmp/
14+
15+
RUN bundle
16+
17+
VOLUME /guide
18+
19+
WORKDIR /guide
20+
21+
CMD ["rake", "listen"]

DataProcessing/guide/Gemfile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
source "https://rubygems.org"
2+
3+
gem "haml"
4+
gem "listen"
5+
gem "rake"
6+
gem "pandoc-ruby"
7+
gem "gemoji-parser"

DataProcessing/guide/Gemfile.lock

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
GEM
2+
remote: https://rubygems.org/
3+
specs:
4+
ffi (1.9.18)
5+
gemoji (3.0.0)
6+
gemoji-parser (1.3.1)
7+
gemoji (>= 2.1.0)
8+
haml (5.0.3)
9+
temple (>= 0.8.0)
10+
tilt
11+
listen (3.0.8)
12+
rb-fsevent (~> 0.9, >= 0.9.4)
13+
rb-inotify (~> 0.9, >= 0.9.7)
14+
pandoc-ruby (2.0.1)
15+
rake (12.1.0)
16+
rb-fsevent (0.9.8)
17+
rb-inotify (0.9.8)
18+
ffi (>= 0.5.0)
19+
temple (0.8.0)
20+
tilt (2.0.8)
21+
22+
PLATFORMS
23+
ruby
24+
25+
DEPENDENCIES
26+
gemoji-parser
27+
haml
28+
listen
29+
pandoc-ruby
30+
rake
31+
32+
BUNDLED WITH
33+
1.13.6

DataProcessing/guide/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Serverless Data Processing on AWS
2+
3+
## Development
4+
5+
### Building content on file changes
6+
7+
- docker build -t guide .
8+
- docker run -ti -v $PWD:/guide guide

DataProcessing/guide/Rakefile

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
require "haml"
2+
require "listen"
3+
require "pandoc-ruby"
4+
require "time"
5+
require "gemoji-parser"
6+
7+
require "./helpers"
8+
9+
LAYOUT = "content/layout.haml"
10+
11+
task default: :build
12+
13+
task :build do
14+
build
15+
end
16+
17+
task :listen do
18+
listener = Listen.to(".", "content/", only: /\.md|\.haml/) do |modified, added, removed|
19+
paths = [modified, added, removed].flatten.map { |path|
20+
ContentFile.new(path).basename
21+
}.join(",")
22+
23+
puts "Building site -- ts=#{Time.now.iso8601} modified=#{paths}"
24+
25+
begin
26+
build
27+
rescue Exception => ex
28+
puts ex
29+
puts ex.backtrace
30+
end
31+
end
32+
33+
trap("SIGINT") do
34+
puts
35+
exit(0)
36+
end
37+
38+
puts "Listening for changes -- ts=#{Time.now.iso8601}"
39+
listener.start
40+
sleep
41+
end
42+
43+
def build
44+
Dir["content/*"].each do |path|
45+
next if path == LAYOUT
46+
47+
content_file = ContentFile.new(path)
48+
next if content_file.partial?
49+
50+
File.open(File.join("site", content_file.stem + ".html"), "w") do |file|
51+
if content_file.fullpage?
52+
file.write(render(path))
53+
else
54+
file.write(render(LAYOUT) { render(path) })
55+
end
56+
end
57+
end
58+
end

0 commit comments

Comments
 (0)