Skip to content

Commit 487c242

Browse files
Merge pull request #174 from datastax/RUBY-220
RUBY-220 - Plumbing to support custom types: serializing and deserial…
2 parents f69fc39 + 132b983 commit 487c242

File tree

20 files changed

+153
-29
lines changed

20 files changed

+153
-29
lines changed

lib/cassandra.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ module Cassandra
6666
:connections_per_remote_node,
6767
:consistency,
6868
:credentials,
69+
:custom_types,
6970
:datacenter,
7071
:futures_factory,
7172
:heartbeat_interval,
@@ -801,6 +802,7 @@ def self.validate_and_massage_options(options)
801802
require 'cassandra/executors'
802803
require 'cassandra/future'
803804
require 'cassandra/cluster'
805+
require 'cassandra/custom_data'
804806
require 'cassandra/driver'
805807
require 'cassandra/host'
806808
require 'cassandra/session'

lib/cassandra/cluster/connector.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ def do_connect(host)
134134
@connection_options.compressor,
135135
@connection_options.heartbeat_interval,
136136
@connection_options.idle_timeout,
137-
@connection_options.requests_per_connection)
137+
@connection_options.requests_per_connection,
138+
@connection_options.custom_type_handlers)
138139
end.flat_map do |connection|
139140
# connection is a CqlProtocolHandler
140141
f = request_options(connection)

lib/cassandra/cluster/options.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class Options
2424

2525
attr_reader :auth_provider, :compressor, :connect_timeout, :credentials,
2626
:heartbeat_interval, :idle_timeout, :port, :schema_refresh_delay,
27-
:schema_refresh_timeout, :ssl
27+
:schema_refresh_timeout, :ssl, :custom_type_handlers
2828
attr_boolean :protocol_negotiable, :synchronize_schema, :nodelay
2929

3030
attr_accessor :protocol_version
@@ -45,7 +45,8 @@ def initialize(logger,
4545
schema_refresh_delay,
4646
schema_refresh_timeout,
4747
nodelay,
48-
requests_per_connection)
48+
requests_per_connection,
49+
custom_types)
4950
@logger = logger
5051
@protocol_version = protocol_version
5152
@credentials = credentials
@@ -60,6 +61,10 @@ def initialize(logger,
6061
@schema_refresh_delay = schema_refresh_delay
6162
@schema_refresh_timeout = schema_refresh_timeout
6263
@nodelay = nodelay
64+
@custom_type_handlers = {}
65+
custom_types.each do |type_klass|
66+
@custom_type_handlers[type_klass.type] = type_klass
67+
end
6368

6469
@connections_per_local_node = connections_per_local_node
6570
@connections_per_remote_node = connections_per_remote_node

lib/cassandra/cluster/schema/cql_type_parser.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ def lookup_type(node, types)
7272
Cassandra::Types.tuple(*node.children.map { |t| lookup_type(t, types)})
7373
when 'empty' then
7474
Cassandra::Types.custom('org.apache.cassandra.db.marshal.EmptyType')
75+
when /\A'/ then
76+
# Custom type.
77+
Cassandra::Types.custom(node.name[1..-2])
7578
else
7679
types.fetch(node.name) do
7780
raise IncompleteTypeError, "unable to lookup type #{node.name.inspect}"

lib/cassandra/cluster/schema/fetchers.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -836,7 +836,7 @@ def create_aggregate(aggregate_data, functions)
836836
initial_state = Util.encode_object(
837837
Protocol::Coder.read_value_v4(
838838
Protocol::CqlByteBuffer.new.append_bytes(aggregate_data['initcond']),
839-
state_type))
839+
state_type, nil))
840840

841841
# The state-function takes arguments: first the stype, then the args of the aggregate.
842842
state_function = functions.get(aggregate_data['state_func'],

lib/cassandra/custom_data.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# encoding: utf-8
2+
3+
#--
4+
# Copyright 2013-2016 DataStax, Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#++
18+
19+
# Use this module to mark domain object classes as custom type implementations for custom-type
20+
# columns in C*. This module has no logic of its own, but indicates that the marked class has
21+
# certain methods.
22+
# @private
23+
module Cassandra::CustomData
24+
def self.included base
25+
base.send :include, InstanceMethods
26+
base.extend ClassMethods
27+
end
28+
29+
module ClassMethods
30+
# @return [Cassandra::Types::Custom] the custom type that this class represents.
31+
def type
32+
raise NotImplementedError, "#{self.class} must implement the :type class method"
33+
end
34+
35+
# Deserialize the given data into an instance of this domain object class.
36+
# @param data [String] byte-array representation of a column value of this custom type.
37+
# @return An instance of the domain object class.
38+
# @raise [Cassandra::Errors::DecodingError] upon failure.
39+
def deserialize(data)
40+
raise NotImplementedError, "#{self.class} must implement the :deserialize class method"
41+
end
42+
end
43+
44+
module InstanceMethods
45+
# Serialize this domain object into a byte array to send to C*.
46+
# @return [String] byte-array representation of this domain object.
47+
def serialize
48+
raise NotImplementedError, "#{self.class} must implement the :serialize instance method"
49+
end
50+
end
51+
end

lib/cassandra/driver.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,12 @@ def self.let(name, &block)
139139
schema_refresh_delay,
140140
schema_refresh_timeout,
141141
nodelay,
142-
requests_per_connection
142+
requests_per_connection,
143+
custom_types
143144
)
144145
end
145146

147+
let(:custom_types) { [] }
146148
let(:port) { 9042 }
147149
let(:protocol_version) { nil }
148150
let(:connect_timeout) { 10 }

lib/cassandra/protocol.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ module Formats
2828

2929
BYTES_FORMAT = 'C*'.freeze
3030
TWO_INTS_FORMAT = 'NN'.freeze
31+
32+
# All of the formats above are big-endian (e.g. network-byte-order). Some payloads (custom types) may have
33+
# little-endian components.
34+
35+
DOUBLE_FORMAT_LE = 'E'.freeze
36+
INT_FORMAT_LE = 'V'.freeze
37+
SHORT_FORMAT_LE = 'v'.freeze
3138
end
3239

3340
module Constants

lib/cassandra/protocol/coder.rb

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ def write_value_v4(buffer, value, type)
9797
when :bigint, :counter then write_bigint(buffer, value)
9898
when :blob then write_blob(buffer, value)
9999
when :boolean then write_boolean(buffer, value)
100+
when :custom then write_custom(buffer, value, type)
100101
when :decimal then write_decimal(buffer, value)
101102
when :double then write_double(buffer, value)
102103
when :float then write_float(buffer, value)
@@ -221,19 +222,19 @@ def read_type_v4(buffer)
221222
end
222223
end
223224

224-
def read_values_v4(buffer, column_metadata)
225+
def read_values_v4(buffer, column_metadata, custom_type_handlers)
225226
::Array.new(buffer.read_int) do |_i|
226227
row = ::Hash.new
227228

228229
column_metadata.each do |(_, _, column, type)|
229-
row[column] = read_value_v4(buffer, type)
230+
row[column] = read_value_v4(buffer, type, custom_type_handlers)
230231
end
231232

232233
row
233234
end
234235
end
235236

236-
def read_value_v4(buffer, type)
237+
def read_value_v4(buffer, type, custom_type_handlers)
237238
case type.kind
238239
when :ascii then read_ascii(buffer)
239240
when :bigint, :counter then read_bigint(buffer)
@@ -253,11 +254,12 @@ def read_value_v4(buffer, type)
253254
when :smallint then read_smallint(buffer)
254255
when :time then read_time(buffer)
255256
when :date then read_date(buffer)
257+
when :custom then read_custom(buffer, type, custom_type_handlers)
256258
when :list
257259
return nil unless read_size(buffer)
258260

259261
value_type = type.value_type
260-
::Array.new(buffer.read_signed_int) { read_value_v4(buffer, value_type) }
262+
::Array.new(buffer.read_signed_int) { read_value_v4(buffer, value_type, custom_type_handlers) }
261263
when :map
262264
return nil unless read_size(buffer)
263265

@@ -266,7 +268,7 @@ def read_value_v4(buffer, type)
266268
value = ::Hash.new
267269

268270
buffer.read_signed_int.times do
269-
value[read_value_v4(buffer, key_type)] = read_value_v4(buffer, value_type)
271+
value[read_value_v4(buffer, key_type, custom_type_handlers)] = read_value_v4(buffer, value_type, custom_type_handlers)
270272
end
271273

272274
value
@@ -277,7 +279,7 @@ def read_value_v4(buffer, type)
277279
value = ::Set.new
278280

279281
buffer.read_signed_int.times do
280-
value << read_value_v4(buffer, value_type)
282+
value << read_value_v4(buffer, value_type, custom_type_handlers)
281283
end
282284

283285
value
@@ -295,7 +297,7 @@ def read_value_v4(buffer, type)
295297
values[field.name] = if length - buffer.length >= size
296298
nil
297299
else
298-
read_value_v4(buffer, field.type)
300+
read_value_v4(buffer, field.type, custom_type_handlers)
299301
end
300302
end
301303

@@ -308,7 +310,7 @@ def read_value_v4(buffer, type)
308310

309311
members.each do |member_type|
310312
break if buffer.empty?
311-
values << read_value_v4(buffer, member_type)
313+
values << read_value_v4(buffer, member_type, custom_type_handlers)
312314
end
313315

314316
values.fill(nil, values.length, (members.length - values.length))
@@ -774,6 +776,15 @@ def read_boolean(buffer)
774776
read_size(buffer) && buffer.read(1) == Constants::TRUE_BYTE
775777
end
776778

779+
def read_custom(buffer, type, custom_type_handlers)
780+
# Lookup the type-name to get the Class that can deserialize buffer data into a custom domain object.
781+
unless custom_type_handlers && custom_type_handlers.key?(type)
782+
raise Errors::DecodingError, %(Unsupported custom column type: #{type.name})
783+
end
784+
num_bytes = read_size(buffer)
785+
custom_type_handlers[type].deserialize(buffer.read(num_bytes)) if num_bytes && num_bytes > 0
786+
end
787+
777788
def read_decimal(buffer)
778789
size = read_size(buffer)
779790
size && buffer.read_decimal(size)
@@ -860,6 +871,15 @@ def write_boolean(buffer, value)
860871
buffer.append(value ? Constants::TRUE_BYTE : Constants::FALSE_BYTE)
861872
end
862873

874+
def write_custom(buffer, value, type)
875+
# Verify that the given type-name matches the value's cql type name.
876+
if value.class.type != type
877+
raise Errors::EncodingError, "type mismatch: value is a #{value.type} and column is a #{type}"
878+
end
879+
880+
buffer.append_bytes(value.serialize)
881+
end
882+
863883
def write_decimal(buffer, value)
864884
buffer.append_bytes(CqlByteBuffer.new.append_decimal(value))
865885
end

lib/cassandra/protocol/cql_byte_buffer.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ def read_double
9292
"Not enough bytes available to decode a double: #{e.message}", e.backtrace
9393
end
9494

95+
def read_double_le
96+
read(8).unpack(Formats::DOUBLE_FORMAT_LE).first
97+
rescue RangeError => e
98+
raise Errors::DecodingError,
99+
"Not enough bytes available to decode a double: #{e.message}", e.backtrace
100+
end
101+
95102
def read_float
96103
read(4).unpack(Formats::FLOAT_FORMAT).first
97104
rescue RangeError => e
@@ -108,13 +115,27 @@ def read_signed_int
108115
"Not enough bytes available to decode an int: #{e.message}", e.backtrace
109116
end
110117

118+
def read_unsigned_int_le
119+
read(4).unpack(Formats::INT_FORMAT_LE).first
120+
rescue RangeError => e
121+
raise Errors::DecodingError,
122+
"Not enough bytes available to decode an int: #{e.message}", e.backtrace
123+
end
124+
111125
def read_unsigned_short
112126
read_short
113127
rescue RangeError => e
114128
raise Errors::DecodingError,
115129
"Not enough bytes available to decode a short: #{e.message}", e.backtrace
116130
end
117131

132+
def read_unsigned_short_le
133+
read(2).unpack(Formats::SHORT_FORMAT_LE).first
134+
rescue RangeError => e
135+
raise Errors::DecodingError,
136+
"Not enough bytes available to decode a short: #{e.message}", e.backtrace
137+
end
138+
118139
def read_string
119140
length = read_unsigned_short
120141
string = read(length)

0 commit comments

Comments
 (0)