Skip to content

Commit 954211e

Browse files
author
R. Tyler Croy
committed
Merge pull request #14 from robbavey/master
Add DSL support for named streams.
2 parents 42b77a1 + c549d70 commit 954211e

File tree

11 files changed

+537
-106
lines changed

11 files changed

+537
-106
lines changed

lib/red_storm/dsl/bolt.rb

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'java'
22
require 'red_storm/configurator'
33
require 'red_storm/environment'
4+
require 'red_storm/dsl/output_fields'
45
require 'red_storm/loggable'
56
require 'pathname'
67

@@ -16,14 +17,12 @@ class Bolt
1617
include Loggable
1718
attr_reader :collector, :context, :config
1819

20+
include OutputFields
21+
1922
def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end
2023

2124
# DSL class methods
22-
23-
def self.output_fields(*fields)
24-
@fields = fields.map(&:to_s)
25-
end
26-
25+
2726
def self.configure(&configure_block)
2827
@configure_block = block_given? ? configure_block : lambda {}
2928
end
@@ -62,10 +61,18 @@ def unanchored_emit(*values)
6261
@collector.emit_tuple(Values.new(*values))
6362
end
6463

64+
def unanchored_stream_emit(stream, *values)
65+
@collector.emit_tuple_stream(stream, Values.new(*values))
66+
end
67+
6568
def anchored_emit(tuple, *values)
6669
@collector.emit_anchor_tuple(tuple, Values.new(*values))
6770
end
6871

72+
def anchored_stream_emit(stream, tuple, *values)
73+
@collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values))
74+
end
75+
6976
def ack(tuple)
7077
@collector.ack(tuple)
7178
end
@@ -80,7 +87,21 @@ def execute(tuple)
8087
output = on_receive(tuple)
8188
if output && self.class.emit?
8289
values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
83-
values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
90+
values_list.each do |values|
91+
if self.class.anchor?
92+
if self.class.stream?
93+
anchored_stream_emit(self.stream, tuple, *values)
94+
else
95+
anchored_emit(tuple, *values)
96+
end
97+
else
98+
if self.class.stream?
99+
unanchored_stream_emit(self.stream, *values)
100+
else
101+
unanchored_emit(*values)
102+
end
103+
end
104+
end
84105
@collector.ack(tuple) if self.class.ack?
85106
end
86107
end
@@ -97,10 +118,6 @@ def cleanup
97118
on_close
98119
end
99120

100-
def declare_output_fields(declarer)
101-
declarer.declare(Fields.new(self.class.fields))
102-
end
103-
104121
def get_component_configuration
105122
configurator = Configurator.new
106123
configurator.instance_exec(&self.class.configure_block)
@@ -113,10 +130,6 @@ def get_component_configuration
113130
def on_init; end
114131
def on_close; end
115132

116-
def self.fields
117-
@fields ||= []
118-
end
119-
120133
def self.configure_block
121134
@configure_block ||= lambda {}
122135
end

lib/red_storm/dsl/output_collector.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,13 @@
66
class OutputCollector
77
java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")]
88
java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")]
9+
java_alias :emit_tuple_stream, :emit, [
10+
java.lang.String,
11+
java.lang.Class.for_name("java.util.List")
12+
]
13+
java_alias :emit_anchor_tuple_stream, :emit, [
14+
java.lang.String,
15+
Tuple.java_class,
16+
java.lang.Class.for_name("java.util.List")
17+
]
918
end

lib/red_storm/dsl/output_fields.rb

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
module RedStorm
2+
module DSL
3+
module OutputFields
4+
5+
def self.included(base)
6+
base.extend ClassMethods
7+
end
8+
9+
def declare_output_fields(declarer)
10+
self.class.fields.each do |stream, fields|
11+
declarer.declareStream(stream, Fields.new(fields))
12+
end
13+
end
14+
15+
def stream
16+
self.class.stream
17+
end
18+
19+
module ClassMethods
20+
21+
def output_fields(*fields)
22+
@output_fields ||= Hash.new([])
23+
fields.each do |field|
24+
case field
25+
when Hash
26+
field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] }
27+
else
28+
@output_fields['default'] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s]
29+
end
30+
end
31+
end
32+
33+
def fields
34+
@output_fields ||= Hash.new([])
35+
end
36+
37+
def stream?
38+
self.receive_options[:stream] && !self.receive_options[:stream].empty?
39+
end
40+
41+
def stream
42+
self.receive_options[:stream]
43+
end
44+
end
45+
end
46+
end
47+
end
48+

lib/red_storm/dsl/spout.rb

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
require 'red_storm/configurator'
33
require 'red_storm/environment'
44
require 'red_storm/loggable'
5+
require 'red_storm/dsl/output_fields'
56
require 'pathname'
67

78
module RedStorm
@@ -13,6 +14,8 @@ class Spout
1314
include Loggable
1415
attr_reader :config, :context, :collector
1516

17+
include OutputFields
18+
1619
def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end
1720

1821
# DSL class methods
@@ -21,10 +24,6 @@ def self.configure(&configure_block)
2124
@configure_block = block_given? ? configure_block : lambda {}
2225
end
2326

24-
def self.output_fields(*fields)
25-
@fields = fields.map(&:to_s)
26-
end
27-
2827
def self.on_send(*args, &on_send_block)
2928
options = args.last.is_a?(Hash) ? args.pop : {}
3029
method_name = args.first
@@ -120,10 +119,6 @@ def deactivate
120119
on_deactivate
121120
end
122121

123-
def declare_output_fields(declarer)
124-
declarer.declare(Fields.new(self.class.fields))
125-
end
126-
127122
def ack(msg_id)
128123
on_ack(msg_id)
129124
end
@@ -148,10 +143,6 @@ def on_deactivate; end
148143
def on_ack(msg_id); end
149144
def on_fail(msg_id); end
150145

151-
def self.fields
152-
@fields ||= []
153-
end
154-
155146
def self.configure_block
156147
@configure_block ||= lambda {}
157148
end
@@ -171,7 +162,7 @@ def self.reliable?
171162
# below non-dry see Bolt class
172163
def self.inherited(subclass)
173164
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(SpoutError, "unable to extract base topology class path from #{caller.first.inspect}")
174-
subclass.base_class_path = File.expand_path(path)
165+
subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s
175166
end
176167

177168
def self.base_class_path=(path)

lib/red_storm/dsl/topology.rb

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
java_import 'backtype.storm.topology.TopologyBuilder'
77
java_import 'backtype.storm.generated.SubmitOptions'
8+
java_import 'backtype.storm.utils.Utils'
89

910
module RedStorm
1011
module DSL
@@ -27,16 +28,45 @@ def initialize(component_class, constructor_args, id, parallelism)
2728
@constructor_args = constructor_args
2829
@id = id.to_s
2930
@parallelism = parallelism
30-
@output_fields = []
31+
@output_fields = Hash.new([])
32+
33+
initialize_output_fields
3134
end
3235

33-
def output_fields(*args)
34-
args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
36+
def output_fields(*fields)
37+
default_fields = []
38+
fields.each do |field|
39+
case field
40+
when Hash
41+
field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] }
42+
else
43+
default_fields |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s]
44+
end
45+
end
46+
@output_fields[Utils::DEFAULT_STREAM_ID] = default_fields unless default_fields.empty?
47+
48+
@output_fields
3549
end
3650

3751
def is_java?
3852
@clazz.name.split('::').first.downcase == 'java'
3953
end
54+
55+
private
56+
57+
def initialize_output_fields
58+
if @clazz.ancestors.include?(RedStorm::DSL::OutputFields)
59+
@output_fields = @clazz.fields.clone
60+
end
61+
end
62+
63+
def java_safe_fields
64+
java_hash = java.util.HashMap.new()
65+
@output_fields.each do |k, v|
66+
java_hash.put(k, v.to_java('java.lang.String')) unless v.empty?
67+
end
68+
java_hash
69+
end
4070
end
4171

4272
class SpoutDefinition < ComponentDefinition
@@ -48,7 +78,7 @@ def new_instance
4878
elsif is_java?
4979
@clazz.new(*constructor_args)
5080
else
51-
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
81+
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
5282
end
5383
end
5484
end
@@ -61,29 +91,33 @@ def initialize(*args)
6191
@sources = []
6292
end
6393

64-
def source(source_id, grouping)
65-
@sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
94+
def source(source_id, grouping, stream = Utils::DEFAULT_STREAM_ID)
95+
@sources << [
96+
source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s,
97+
grouping.is_a?(Hash) ? grouping : {grouping => nil},
98+
stream.to_s
99+
]
66100
end
67101

68102
def define_grouping(declarer)
69-
@sources.each do |source_id, grouping|
103+
@sources.each do |source_id, grouping, stream|
70104
grouper, params = grouping.first
71105
# declarer.fieldsGrouping(source_id, Fields.new())
72106
case grouper
73107
when :fields
74-
declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
108+
declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s))))
75109
when :global
76-
declarer.globalGrouping(source_id)
110+
declarer.globalGrouping(source_id, stream)
77111
when :shuffle
78-
declarer.shuffleGrouping(source_id)
112+
declarer.shuffleGrouping(source_id, stream)
79113
when :local_or_shuffle
80-
declarer.localOrShuffleGrouping(source_id)
114+
declarer.localOrShuffleGrouping(source_id, stream)
81115
when :none
82-
declarer.noneGrouping(source_id)
116+
declarer.noneGrouping(source_id, stream)
83117
when :all
84-
declarer.allGrouping(source_id)
118+
declarer.allGrouping(source_id, stream)
85119
when :direct
86-
declarer.directGrouping(source_id)
120+
declarer.directGrouping(source_id, stream)
87121
else
88122
raise("unknown grouper=#{grouper.inspect}")
89123
end
@@ -97,7 +131,7 @@ def new_instance
97131
elsif is_java?
98132
@clazz.new(*constructor_args)
99133
else
100-
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
134+
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
101135
end
102136
end
103137
end

0 commit comments

Comments
 (0)