Skip to content

Commit 986f7ab

Browse files
committed
add streaming request and save response to tmp file
1 parent 175c720 commit 986f7ab

File tree

7 files changed

+116
-12
lines changed

7 files changed

+116
-12
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### Version 1.6.2 (Jan 30, 2026)
2+
3+
* Add streaming request and save response to tmp file
4+
15
### Version 1.6.1 (Jan 20, 2026)
26

37
* Fix prepare rake task on ClickHouse 24.6

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ Action.with(ActionView.select(Arel.sql('min(date)')) => :min_date).where(Arel.sq
235235
#=> #<ActiveRecord::Relation [#<Action *** >]>
236236
```
237237

238+
### Streaming request
239+
240+
```ruby
241+
path = Action.connection.execute_streaming(Action.where(date: Date.current), format: 'CSVWithNames')
242+
# Clickhouse Stream (10.3ms) SELECT actions.* FROM actions WHERE actions.date = '2017-11-29'
243+
file = File.open(path)
244+
```
245+
238246

239247
### Migration Data Types
240248

lib/active_record/connection_adapters/clickhouse/schema_statements.rb

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,18 @@ def execute_batch(statements, name = nil, **kwargs)
5353
end
5454
end
5555

56+
# @return [ClickhouseActiverecord::StreamResponse]
57+
def execute_streaming(sql, name = nil, format: @response_format, settings: {})
58+
with_response_format(format) do
59+
log(sql, [adapter_name, 'Stream', name].compact.join(' ')) do
60+
statement = Statement.new(sql, format: @response_format)
61+
request(statement, settings: settings) do |response|
62+
return statement.streaming_response(response)
63+
end
64+
end
65+
end
66+
end
67+
5668
def exec_insert(sql, name = nil, _binds = [], _pk = nil, _sequence_name = nil, returning: nil)
5769
new_sql = sql.sub(/ (DEFAULT )?VALUES/, " VALUES")
5870
with_response_format(nil) { execute(new_sql, name) }
@@ -268,21 +280,22 @@ def has_default_function?(default) # :nodoc:
268280

269281
def raw_execute(sql, settings: {}, except_params: [])
270282
statement = Statement.new(sql, format: @response_format)
271-
statement.response = request(statement, settings: settings, except_params: except_params)
272-
statement.processed_response
283+
response = request(statement, settings: settings, except_params: except_params)
284+
statement.processed_response(response)
273285
end
274286

275287
# Make HTTP request to ClickHouse server
276288
# @param [ActiveRecord::ConnectionAdapters::Clickhouse::Statement] statement
277289
# @param [Hash] settings
278290
# @param [Array] except_params
279291
# @return [Net::HTTPResponse]
280-
def request(statement, settings: {}, except_params: [])
292+
def request(statement, settings: {}, except_params: [], &block)
281293
@lock.synchronize do
282-
@connection.post("/?#{settings_params(settings, except: except_params)}",
283-
statement.formatted_sql,
284-
'Content-Type' => 'application/x-www-form-urlencoded',
285-
'User-Agent' => ClickhouseAdapter::USER_AGENT)
294+
req = Net::HTTP::Post.new("/?#{settings_params(settings, except: except_params)}", {
295+
'Content-Type' => 'application/x-www-form-urlencoded',
296+
'User-Agent' => ClickhouseAdapter::USER_AGENT,
297+
})
298+
@connection.request(req, statement.formatted_sql, &block)
286299
end
287300
end
288301

lib/active_record/connection_adapters/clickhouse/statement.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,27 @@ module Clickhouse
99
class Statement
1010

1111
attr_reader :format
12-
attr_writer :response
1312

1413
def initialize(sql, format:)
1514
@sql = sql
1615
@format = format
1716
end
1817

18+
# @return [String]
1919
def formatted_sql
2020
@formatted_sql ||= FormatManager.new(@sql, format: @format).apply
2121
end
2222

23-
def processed_response
24-
ResponseProcessor.new(@response, @format, @sql).process
23+
# @param [Net::HTTPResponse] response
24+
# @return [String, Hash, Array, nil]
25+
def processed_response(response)
26+
ResponseProcessor.new(response, @format, @sql).process
27+
end
28+
29+
# @param [Net::HTTPResponse] response
30+
# @return [String, nil]
31+
def streaming_response(response)
32+
ResponseProcessor.new(response, @format, @sql).streaming_process
2533
end
2634

2735
end

lib/active_record/connection_adapters/clickhouse/statement/response_processor.rb

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,18 @@ class ResponseProcessor
88

99
DB_EXCEPTION_REGEXP = /\ACode:\s+\d+\.\s+DB::Exception:/.freeze
1010

11+
# @param [Net::HTTPResponse] raw_response
12+
# @param [String, nil] format
13+
# @param [String] sql
1114
def initialize(raw_response, format, sql)
1215
@raw_response = raw_response
13-
@body = raw_response.body
1416
@format = format
1517
@sql = sql
1618
end
1719

20+
# @return [String, Hash, Array, nil]
1821
def process
22+
@body = @raw_response.body
1923
if success?
2024
process_successful_response
2125
else
@@ -25,12 +29,28 @@ def process
2529
@body
2630
end
2731

32+
# @return [String, nil]
33+
def streaming_process
34+
file = Tempfile.new('clickhouse-activerecord', binmode: true)
35+
if success?
36+
@raw_response.read_body do |chunk|
37+
file.write(chunk)
38+
end
39+
file.close
40+
file.path
41+
else
42+
@body = @raw_response.body
43+
raise_database_error!
44+
end
45+
end
46+
2847
private
2948

3049
def success?
3150
@raw_response.code.to_i == 200
3251
end
3352

53+
# @return [String, Hash, Array]
3454
def process_successful_response
3555
raise_generic!(@sql) if @body.include?('DB::Exception') && @body.match?(DB_EXCEPTION_REGEXP)
3656

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
module ClickhouseActiverecord
2-
VERSION = '1.6.1'
2+
VERSION = '1.6.2'
33
end

spec/single/streaming_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# frozen_string_literal: true
2+
3+
RSpec.describe 'Streaming', :migrations do
4+
class Model < ActiveRecord::Base
5+
self.table_name = 'sample'
6+
end
7+
8+
describe 'sample' do
9+
before do
10+
migrations_dir = File.join(FIXTURES_PATH, 'migrations', 'add_sample_data')
11+
quietly { ActiveRecord::MigrationContext.new(migrations_dir).up }
12+
end
13+
14+
it 'simple' do
15+
path = Model.connection.execute_streaming('SELECT count(*) AS count FROM sample')
16+
expect(path.is_a?(String)).to be_truthy
17+
expect(File.read(path)).to eq("[\"count\"]\n[\"UInt64\"]\n[\"0\"]\n")
18+
end
19+
20+
it 'JSONCompact format' do
21+
path = Model.connection.execute_streaming('SELECT count(*) AS count FROM sample', format: 'JSONCompact')
22+
data = JSON.parse(File.read(path))
23+
expect(data['data'][0][0]).to eq('0')
24+
end
25+
26+
it 'JSONEachRow format' do
27+
path = Model.connection.execute_streaming('SELECT count(*) AS count FROM sample', format: 'JSONEachRow')
28+
data = JSON.parse(File.read(path))
29+
expect(data['count']).to eq('0')
30+
end
31+
32+
it 'multiple rows JSONEachRow format' do
33+
path = Model.connection.execute_streaming('SELECT * FROM generate_series(1, 1000000)', format: 'JSONEachRow')
34+
lines = File.readlines(path)
35+
expect(JSON.parse(lines[0])).to eq('generate_series' => '1')
36+
expect(lines.size).to eq(1000000)
37+
end
38+
39+
it 'multiple rows CSVWithNames format' do
40+
path = Model.connection.execute_streaming('SELECT * FROM generate_series(1, 1000000)', format: 'CSVWithNames')
41+
lines = File.readlines(path)
42+
expect(JSON.parse(lines[0])).to eq('generate_series')
43+
expect(JSON.parse(lines[1])).to eq(1)
44+
expect(lines.size).to eq(1000001)
45+
end
46+
47+
it 'error' do
48+
expect { Model.connection.execute_streaming('error request') }.to raise_error(ActiveRecord::ActiveRecordError, include('DB::Exception'))
49+
end
50+
end
51+
end

0 commit comments

Comments
 (0)