Skip to content

Commit 9dca64e

Browse files
Merge pull request #3 from hcengineering/pulse-connection
Fix hulypulse connections
2 parents 8621e73 + b360afa commit 9dca64e

File tree

12 files changed

+215
-108
lines changed

12 files changed

+215
-108
lines changed

.github/workflows/ci.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
name: CI Validation
2+
3+
on:
4+
workflow_dispatch:
5+
pull_request:
6+
push:
7+
branches:
8+
- main
9+
- master
10+
11+
jobs:
12+
rust-validate:
13+
runs-on: ubuntu-latest
14+
steps:
15+
- name: Checkout
16+
uses: actions/checkout@v4
17+
18+
- name: Install Rust toolchain
19+
uses: dtolnay/rust-toolchain@stable
20+
with:
21+
components: rustfmt, clippy
22+
23+
- name: Cache cargo registry and target
24+
uses: actions/cache@v4
25+
with:
26+
path: |
27+
~/.cargo/registry
28+
~/.cargo/git
29+
target
30+
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
31+
restore-keys: |
32+
${{ runner.os }}-cargo-
33+
34+
- name: Format check
35+
run: cargo fmt --all -- --check
36+
37+
- name: Build
38+
run: cargo build --locked
39+
40+
- name: Clippy
41+
run: cargo clippy --bin hulypulse --all-features -- -D warnings
42+
43+
- name: Unit tests
44+
run: cargo test --bin hulypulse --locked

Cargo.lock

Lines changed: 25 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "hulypulse"
3-
version = "0.4.1"
3+
version = "0.4.2"
44
edition = "2024"
55

66
[dependencies]
@@ -30,7 +30,7 @@ hulyrs = { git = "https://github.com/hcengineering/hulyrs.git", features = [ "ac
3030
secrecy = { version = "0.10.3", optional = true }
3131

3232
#redis
33-
redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel"] }
33+
redis = { version = "=0.32.5", features = ["aio", "tokio-comp", "sentinel", "connection-manager"] }
3434

3535
[[bin]]
3636
name = "hulypulse"

client/off/client.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,18 @@ export class HulypulseClient implements Disposable {
183183
// this.handleMyDataChanged(get(myData), true)
184184
}
185185

186+
private static isConnectionLikeError (err: string): boolean {
187+
const s = err.toLowerCase()
188+
return (
189+
s.includes('broken pipe') ||
190+
s.includes('connection reset') ||
191+
s.includes('connection refused') ||
192+
s.includes('connection aborted') ||
193+
s.includes('unexpected eof') ||
194+
s.includes('io error')
195+
)
196+
}
197+
186198
private handleMessage (data: string): void {
187199
if (data === 'pong') {
188200
clearTimeout(this.pingTimeout)
@@ -192,6 +204,17 @@ export class HulypulseClient implements Disposable {
192204
try {
193205
const message = JSON.parse(data); // as IncomingMessage
194206
console.log('Received message', message);
207+
if (
208+
typeof message === 'object' &&
209+
message !== null &&
210+
'error' in message &&
211+
typeof (message as { error: unknown }).error === 'string' &&
212+
HulypulseClient.isConnectionLikeError((message as { error: string }).error)
213+
) {
214+
console.warn('Pulse server reported connection-like error; reconnecting')
215+
this.reconnect()
216+
return
217+
}
195218
// const message = JSON.parse(data) as IncomingMessage
196219
// if (message.type === 'update' && message.presence !== undefined) {
197220
// onPersonUpdate(message.id, message.presence ?? [])

src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub static CONFIG: LazyLock<Config> = LazyLock::new(|| {
9494
match settings {
9595
Ok(settings) => settings,
9696
Err(error) => {
97-
eprintln!("configuration error: {}", error);
97+
eprintln!("configuration error: {error}");
9898
std::process::exit(1);
9999
}
100100
}

src/db.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::memory::{
55
MemoryBackend, memory_delete, memory_info, memory_list, memory_read, memory_save,
66
};
77
use crate::redis::{redis_delete, redis_info, redis_list, redis_read, redis_save};
8-
use redis::aio::MultiplexedConnection;
8+
use redis::aio::ConnectionManager;
99
use serde::Serialize;
1010
use tokio::sync::RwLock;
1111

@@ -80,7 +80,7 @@ pub fn deprecated_symbol_error(s: &str) -> DbResult<()> {
8080

8181
#[derive(Clone)]
8282
enum DbBackend {
83-
Redis(MultiplexedConnection),
83+
Redis(ConnectionManager),
8484
Memory {
8585
db: MemoryBackend,
8686
hub: Arc<RwLock<HubState>>,
@@ -93,7 +93,7 @@ pub struct Db {
9393
}
9494

9595
impl Db {
96-
pub fn new_redis(db: MultiplexedConnection) -> Self {
96+
pub fn new_redis(db: ConnectionManager) -> Self {
9797
Self {
9898
backend: DbBackend::Redis(db),
9999
}

src/handlers_http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub fn map_redis_error(err: impl std::fmt::Display) -> Error {
4040
.nth(1)
4141
.unwrap_or(msg.as_str());
4242
if let Some((code, text)) = detail.split_once(": ") {
43-
let text = format!("{} {}", code, text);
43+
let text = format!("{code} {text}");
4444
return match code {
4545
"400" => actix_web::error::ErrorBadRequest(text),
4646
"404" => actix_web::error::ErrorNotFound(text),

src/handlers_ws.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
// limitations under the License.
1414
//
1515

16-
use actix_ws;
1716
use futures_util::StreamExt;
1817

1918
use futures::future::{AbortHandle, Abortable};
@@ -187,7 +186,7 @@ async fn handle_command(
187186
tracing::debug!("PERSONAL from {} to {}", &client_name, &to);
188187
let payload =
189188
json!({ "personal": client_name, "correlation": correlation, "data": data });
190-
if !send_to_name(&hub_state, &to, payload).await {
189+
if !send_to_name(hub_state, &to, payload).await {
191190
tracing::debug!("PERSONAL send from [{}] to [{}] failed", &client_name, &to);
192191
result_err("failed", &correlation, ws).await;
193192
}
@@ -201,7 +200,7 @@ async fn handle_command(
201200
} => {
202201
tracing::debug!("ANSWER from {} to {}", &client_name, &to);
203202
let payload = json!({ "correlation": correlation, "data": data });
204-
if !send_to_name(&hub_state, &to, payload).await {
203+
if !send_to_name(hub_state, &to, payload).await {
205204
tracing::debug!("PERSONAL send_to failed: no such session {}", to);
206205
}
207206
}
@@ -233,10 +232,8 @@ async fn handle_command(
233232
// TTL logic
234233
let real_ttl = if let Some(secs) = ttl {
235234
Some(Ttl::Sec(secs as usize))
236-
} else if let Some(timestamp) = expires_at {
237-
Some(Ttl::At(timestamp))
238235
} else {
239-
None
236+
expires_at.map(Ttl::At)
240237
};
241238

242239
// SaveMode logic
@@ -447,11 +444,11 @@ pub async fn handler(
447444
_ => "",
448445
};
449446

450-
if let Some(ref claim) = claims {
451-
if !test_rego_claims(claim, cmd.as_ref(), key) {
452-
let _ = session.text("Unauthorized: Rego policy").await;
453-
break;
454-
}
447+
if let Some(ref claim) = claims
448+
&& !test_rego_claims(claim, cmd.as_ref(), key)
449+
{
450+
let _ = session.text("Unauthorized: Rego policy").await;
451+
break;
455452
}
456453
}
457454

@@ -470,7 +467,7 @@ pub async fn handler(
470467
}
471468

472469
Err(err) => {
473-
let _ = session.text(format!("Invalid JSON: {}", err)).await;
470+
let _ = session.text(format!("Invalid JSON: {err}")).await;
474471
}
475472
},
476473

src/main.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,15 +117,15 @@ async fn check_workspace(
117117
let workspace = Uuid::parse_str(&request.extract::<Path<String>>().await?);
118118
let claims = request.extensions().get::<Claims>().cloned().unwrap();
119119

120-
if claims.is_system() || Ok(claims.workspace.clone()) == workspace.clone().map(Some) {
120+
if claims.is_system() || Ok(claims.workspace) == workspace.clone().map(Some) {
121121
next.call(request).await
122122
} else {
123123
warn!(
124124
expected = ?claims.workspace,
125125
actual = ?workspace,
126126
"Unauthorized request, workspace mismatch"
127127
);
128-
Err(actix_web::error::ErrorUnauthorized("Unauthorized").into())
128+
Err(actix_web::error::ErrorUnauthorized("Unauthorized"))
129129
}
130130
}
131131

@@ -145,9 +145,9 @@ async fn main() -> anyhow::Result<()> {
145145
BackendType::Redis => {
146146
let redis_client = redis::client().await?;
147147
let db_connection = redis_client
148-
.get_multiplexed_async_connection()
148+
.get_connection_manager()
149149
.await
150-
.map_err(|e| {
150+
.inspect_err(|_e| {
151151
tracing::error!(
152152
"REDIS not found: {:?}",
153153
&CONFIG
@@ -157,14 +157,11 @@ async fn main() -> anyhow::Result<()> {
157157
.collect::<Vec<_>>()
158158
.join(", ")
159159
);
160-
e
161160
})?;
162161
tokio::spawn({
163162
let hub_state = hub_state.clone();
164163
async move {
165-
if let Err(err) = crate::redis::receiver(redis_client, hub_state).await {
166-
tracing::error!("Redis receiver stopped: {err}");
167-
}
164+
crate::redis::receiver(redis_client, hub_state).await;
168165
}
169166
});
170167
Db::new_redis(db_connection)

src/memory.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -110,9 +110,7 @@ pub async fn memory_list(backend: &MemoryBackend, key_prefix: &str) -> DbResult<
110110
continue;
111111
}
112112

113-
if k.strip_prefix(key_prefix)
114-
.map_or(false, |s| s.contains('$'))
115-
{
113+
if k.strip_prefix(key_prefix).is_some_and(|s| s.contains('$')) {
116114
continue;
117115
}
118116

@@ -138,7 +136,7 @@ pub async fn memory_info(backend: &MemoryBackend) -> DbResult<String> {
138136
let map = backend.inner.read().await;
139137
let keys = map.len();
140138
let memory: usize = map.values().map(|v| v.data.len()).sum();
141-
Ok(format!("{} keys, {} bytes", keys, memory))
139+
Ok(format!("{keys} keys, {memory} bytes"))
142140
}
143141

144142
/// memory_read(&backend, "key")
@@ -215,7 +213,7 @@ pub async fn memory_save<V: AsRef<[u8]>>(
215213
if max_size != 0 && value.len() > max_size {
216214
return error(
217215
400,
218-
format!("Value in memory mode must be less than {} bytes", max_size),
216+
format!("Value in memory mode must be less than {max_size} bytes"),
219217
);
220218
}
221219

@@ -273,10 +271,7 @@ pub async fn memory_save<V: AsRef<[u8]>>(
273271
if &actual_md5 != expected_md5 {
274272
return error(
275273
412,
276-
format!(
277-
"md5 mismatch, current: {}, expected: {}",
278-
actual_md5, expected_md5
279-
),
274+
format!("md5 mismatch, current: {actual_md5}, expected: {expected_md5}"),
280275
);
281276
}
282277
*existing = Entry {
@@ -304,9 +299,7 @@ pub async fn memory_delete(
304299
let mode = mode.unwrap_or(SaveMode::Upsert);
305300

306301
match mode {
307-
SaveMode::Insert => {
308-
return error(412, "Insert mode is not supported for delete");
309-
}
302+
SaveMode::Insert => error(412, "Insert mode is not supported for delete"),
310303
SaveMode::Update | SaveMode::Upsert => {
311304
let existed = map.remove(key).is_some();
312305
Ok(existed)
@@ -320,8 +313,7 @@ pub async fn memory_delete(
320313
return error(
321314
412,
322315
format!(
323-
"md5 mismatch, current: {}, expected: {}",
324-
actual_md5, expected_md5
316+
"md5 mismatch, current: {actual_md5}, expected: {expected_md5}"
325317
),
326318
);
327319
}

0 commit comments

Comments
 (0)