Skip to content

Commit 6145773

Browse files
authored
Merge pull request #46 from JuliaComputing/tan/checkstaleconn
check for stale connection while framing/chunking message to send
2 parents d7bc69f + 794f8ca commit 6145773

File tree

4 files changed

+62
-4
lines changed

4 files changed

+62
-4
lines changed

.github/workflows/ci.yml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
name: CI
2+
on:
3+
push:
4+
branches: [master]
5+
tags: ["*"]
6+
pull_request:
7+
jobs:
8+
test:
9+
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }}
10+
runs-on: ${{ matrix.os }}
11+
strategy:
12+
fail-fast: false
13+
matrix:
14+
version:
15+
- '1.0'
16+
- '1' # automatically expands to the latest stable 1.x release of Julia
17+
- nightly
18+
os:
19+
- ubuntu-latest
20+
arch:
21+
- x64
22+
services:
23+
rabbitmq:
24+
image: rabbitmq:3-management
25+
ports:
26+
- 5672:5672
27+
steps:
28+
- uses: actions/checkout@v2
29+
- uses: julia-actions/setup-julia@v1
30+
with:
31+
version: ${{ matrix.version }}
32+
arch: ${{ matrix.arch }}
33+
- uses: actions/cache@v1
34+
env:
35+
cache-name: cache-artifacts
36+
with:
37+
path: ~/.julia/artifacts
38+
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
39+
restore-keys: |
40+
${{ runner.os }}-test-${{ env.cache-name }}-
41+
${{ runner.os }}-test-
42+
${{ runner.os }}-
43+
- uses: julia-actions/julia-buildpkg@v1
44+
- uses: julia-actions/julia-runtest@v1
45+
- uses: julia-actions/julia-processcoverage@v1
46+
- uses: codecov/codecov-action@v1
47+
with:
48+
file: lcov.info

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ uuid = "79c8b4cd-a41a-55fa-907c-fab5288e1383"
33
keywords = ["amqpclient", "rabbitmq", "amqp", "amqp-client", "message-queue"]
44
license = "MIT"
55
desc = "A Julia AMQP (Advanced Message Queuing Protocol) / RabbitMQ Client."
6-
version = "0.4.2"
6+
version = "0.4.3"
77

88
[deps]
99
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# AMQPClient
22

3-
[![Build Status](https://travis-ci.org/JuliaComputing/AMQPClient.jl.svg?branch=master)](https://travis-ci.org/JuliaComputing/AMQPClient.jl)
4-
[![Coverage Status](https://coveralls.io/repos/JuliaComputing/AMQPClient.jl/badge.svg?branch=master&service=github)](https://coveralls.io/github/JuliaComputing/AMQPClient.jl?branch=master)
3+
[![Build Status](https://github.com/JuliaComputing/AMQPClient.jl/workflows/CI/badge.svg)](https://github.com/JuliaComputing/AMQPClient.jl/actions?query=workflow%3ACI+branch%3Amaster)
4+
[![codecov.io](http://codecov.io/github/JuliaComputing/AMQPClient.jl/coverage.svg?branch=master)](http://codecov.io/github/JuliaComputing/AMQPClient.jl?branch=master)
55

66
A Julia [AMQP (Advanced Message Queuing Protocol)](http://www.amqp.org/) Client.
77

src/protocol.jl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,13 +267,15 @@ get_property(c::Connection, s::Symbol, default) = get(c.properties, s, default)
267267
send(c::MessageChannel, f, msgframes::Vector=[]) = send(c.conn, f, msgframes)
268268
function send(c::Connection, f, msgframes::Vector=[])
269269
#uncomment to enable synchronization (not required till we have preemptive tasks or threads)
270+
@debug("queing messageframes", nframes=length(msgframes))
270271
lck = take!(c.sendlck)
271272
try
272273
put!(c.sendq, TAMQPGenericFrame(f))
273274
for m in msgframes
274275
put!(c.sendq, TAMQPGenericFrame(m))
275276
end
276277
finally
278+
@debug("queued messageframes", nqueued=length(c.sendq.data))
277279
put!(c.sendlck, lck)
278280
end
279281
nothing
@@ -292,17 +294,25 @@ function send(c::MessageChannel, payload::TAMQPMethodPayload, msg::Union{Message
292294
# send one or more message body frames
293295
offset = 1
294296
msglen = length(message.data)
297+
framemax = c.conn.framemax
298+
if framemax <= 0
299+
errormsg = (c.conn.state == CONN_STATE_OPEN) ? "Unexpected framemax ($framemax) value for connection" : "Connection closed"
300+
throw(AMQPClientException(errormsg))
301+
end
302+
295303
while offset <= msglen
296-
msgend = min(msglen, offset + c.conn.framemax - 1)
304+
msgend = min(msglen, offset + framemax - 1)
297305
bodypayload = TAMQPBodyPayload(message.data[offset:msgend])
298306
offset = msgend + 1
307+
@debug("sending", msglen, offset)
299308
push!(msgframes, TAMQPContentBodyFrame(frameprop, bodypayload))
300309
end
301310

302311
send(c, TAMQPMethodFrame(frameprop, payload), msgframes)
303312
else
304313
send(c, TAMQPMethodFrame(frameprop, payload))
305314
end
315+
@debug("sent", methodname=method_name(payload))
306316
end
307317

308318
# ----------------------------------------

0 commit comments

Comments
 (0)