Skip to content

Latest commit

 

History

History
430 lines (353 loc) · 11.5 KB

File metadata and controls

430 lines (353 loc) · 11.5 KB
title Actix WebSocket Actorless
description Learn how websockets can upgrade your web service by providing live update functionality, using Actix Web.

Description

This example shows how to use a WebSocket to show the live status of the Shuttle API on a web page. The app also provides an echo service and notifies when the number of connected users change.

You can clone the example below by running the following (you'll need shuttle CLI installed):

shuttle init --from shuttle-hq/shuttle-examples --subfolder actix-web/websocket-actorless

Code

```rust src/main.rs use actix_files::NamedFile; use actix_web::{ web::{self, ServiceConfig}, HttpRequest, HttpResponse, Responder, }; use actix_ws::Message; use chrono::{DateTime, Utc}; use futures::StreamExt; use serde::Serialize; use shuttle_actix_web::ShuttleActixWeb; use std::{ sync::{atomic::AtomicUsize, Arc}, time::Duration, }; use tokio::sync::{mpsc, watch};

const PAUSE_SECS: u64 = 15; const STATUS_URI: &str = "https://api.shuttle.dev/.healthz";

type AppState = ( mpsc::UnboundedSender, watch::Receiver, );

#[derive(Debug, Clone)] enum WsState { Connected, Disconnected, }

#[derive(Serialize, Default, Clone, Debug)] struct ApiStateMessage { client_count: usize, origin: String, date_time: DateTime, is_up: bool, }

async fn echo_handler( mut session: actix_ws::Session, mut msg_stream: actix_ws::MessageStream, tx: mpsc::UnboundedSender, ) { while let Some(Ok(msg)) = msg_stream.next().await { match msg { Message::Ping(bytes) => { if session.pong(&bytes).await.is_err() { return; } } Message::Text(s) => { session.text(s.clone()).await.unwrap(); tracing::info!("Got text, {}", s); } _ => break, } }

if let Err(e) = tx.send(WsState::Disconnected) {
    tracing::error!("Failed to send disconnected state: {e:?}");
}

let _ = session.close(None).await;

}

async fn websocket( req: HttpRequest, body: web::Payload, app_state: web::Data, ) -> actix_web::Result { let app_state = app_state.into_inner(); let (response, session, msg_stream) = actix_ws::handle(&req, body)?;

let tx_ws_state = app_state.0.clone();
let tx_ws_state2 = tx_ws_state.clone();

// send connected state
if let Err(e) = tx_ws_state.send(WsState::Connected) {
    tracing::error!("Failed to send connected state: {e:?}");
}

// listen for api state changes
let mut session_clone = session.clone();
let mut rx_api_state = app_state.1.clone();
actix_web::rt::spawn(async move {
    // adding some delay to avoid getting the first message too soon.
    tokio::time::sleep(Duration::from_millis(500)).await;
    while rx_api_state.changed().await.is_ok() {
        let msg = rx_api_state.borrow().clone();
        tracing::info!("Handling ApiStateMessage: {msg:?}");
        let msg = serde_json::to_string(&msg).unwrap();
        session_clone.text(msg).await.unwrap();
    }
});

// echo handler
actix_web::rt::spawn(echo_handler(session, msg_stream, tx_ws_state2));
Ok(response)

}

async fn index() -> impl Responder { NamedFile::open_async("./static/index.html") .await .map_err(actix_web::error::ErrorInternalServerError) }

#[shuttle_runtime::main] async fn main() -> ShuttleActixWeb<impl FnOnce(&mut ServiceConfig) + Send + Clone + 'static> { // We're going to use channels to communicate between threads. // api state channel let (tx_api_state, rx_api_state) = watch::channel(ApiStateMessage::default()); // websocket state channel let (tx_ws_state, mut rx_ws_state) = mpsc::unbounded_channel::();

// create a shared state for the client counter
let client_count = Arc::new(AtomicUsize::new(0));
let client_count2 = client_count.clone();

// share tx_api_state
let shared_tx_api_state = Arc::new(tx_api_state);
let shared_tx_api_state2 = shared_tx_api_state.clone();

// share reqwest client
let client = reqwest::Client::default();
let client2 = client.clone();

// Spawn a thread to continually check the status of the api
tokio::spawn(async move {
    let duration = Duration::from_secs(PAUSE_SECS);

    loop {
        tokio::time::sleep(duration).await;
        let is_up = get_api_status(&client).await;

        let response = ApiStateMessage {
            client_count: client_count.load(std::sync::atomic::Ordering::SeqCst),
            origin: "api_update loop".to_string(),
            date_time: Utc::now(),
            is_up,
        };

        if shared_tx_api_state.send(response).is_err() {
            tracing::error!("Failed to send api state from checker thread");
            break;
        }
    }
});

// spawn a thread to continuously check the status of the websocket connections
tokio::spawn(async move {
    while let Some(state) = rx_ws_state.recv().await {
        match state {
            WsState::Connected => {
                tracing::info!("Client connected");
                client_count2.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
            }
            WsState::Disconnected => {
                tracing::info!("Client disconnected");
                client_count2.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
            }
        }

        let client_count = client_count2.load(std::sync::atomic::Ordering::SeqCst);
        tracing::info!("Client count: {client_count}");

        let is_up = get_api_status(&client2).await;

        if let Err(e) = shared_tx_api_state2.send(ApiStateMessage {
            client_count,
            origin: "ws_update".to_string(),
            date_time: Utc::now(),
            is_up,
        }) {
            tracing::error!("Failed to send api state: {e:?}");
        }
    }
});

let app_state = web::Data::new((tx_ws_state, rx_api_state));

let config = move |cfg: &mut ServiceConfig| {
    cfg.service(web::resource("/").route(web::get().to(index)))
        .service(
            web::resource("/ws")
                .app_data(app_state)
                .route(web::get().to(websocket)),
        );
};
Ok(config.into())

}

async fn get_api_status(client: &reqwest::Client) -> bool { let response = client.get(STATUS_URI).send().await; response.is_ok_and(|r| r.status().is_success()) }


```html static/index.html
<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8" />
    <title>WS with Actix Web</title>

    <style>
      :root {
        font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto,
          Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif;
        font-size: 18px;
      }

      input[type='text'] {
        font-size: inherit;
      }

      #log {
        width: 30em;
        height: 20em;
        overflow: auto;
        margin: 0.5em 0;

        border: 1px solid black;
      }

      #status {
        padding: 0 0.2em;
      }

      #text {
        width: 17em;
        padding: 0.5em;
      }

      .msg {
        margin: 0;
        padding: 0.25em 0.5em;
      }

      .msg--status {
        /* a light yellow */
        background-color: #ffffc9;
      }

      .msg--message {
        /* a light blue */
        background-color: #d2f4ff;
      }

      .msg--error {
        background-color: pink;
      }
    </style>
  </head>

  <body>
    <h1>WebSocket example</h1>

    <p>
      When you connect you will be notified of the Shuttle API status and the
      amount of connected users every 15 seconds.
    </p>
    <p>
      You can also send a message to the server and you will get back the echo.
    </p>

    <div>
      <button id="connect">Connect</button>
      <span>Status:</span>
      <span id="status">disconnected</span>
    </div>

    <div id="log"></div>

    <form id="chatForm">
      <input type="text" id="text" />
      <input type="submit" id="send" value="Echo" />
    </form>

    <script>
      const $status = document.querySelector('#status');
      const $connectButton = document.querySelector('#connect');
      const $log = document.querySelector('#log');
      const $form = document.querySelector('#chatForm');
      const $input = document.querySelector('#text');

      /** @type {WebSocket | null} */
      var socket = null;

      function log(msg, type = 'status') {
        $log.innerHTML += `<p class="msg msg--${type}">${msg}</p>`;
        $log.scrollTop += 1000;
      }

      function connect() {
        disconnect();

        const { location } = window;

        const proto = location.protocol.startsWith('https') ? 'wss' : 'ws';
        const wsUri = `${proto}://${location.host}/ws`;

        log('Connecting...');
        socket = new WebSocket(wsUri);

        socket.onopen = () => {
          log('Connected');
          updateConnectionStatus();
        };

        socket.onmessage = (ev) => {
          log('Received: ' + ev.data, 'message');
        };

        socket.onclose = () => {
          log('Disconnected');
          socket = null;
          updateConnectionStatus();
        };
      }

      function disconnect() {
        if (socket) {
          log('Disconnecting...');
          socket.close();
          socket = null;

          updateConnectionStatus();
        }
      }

      function updateConnectionStatus() {
        if (socket) {
          $status.style.backgroundColor = 'transparent';
          $status.style.color = 'green';
          $status.textContent = `connected`;
          $connectButton.innerHTML = 'Disconnect';
          $input.focus();
        } else {
          $status.style.backgroundColor = 'red';
          $status.style.color = 'white';
          $status.textContent = 'disconnected';
          $connectButton.textContent = 'Connect';
        }
      }

      $connectButton.addEventListener('click', () => {
        if (socket) {
          disconnect();
        } else {
          connect();
        }

        updateConnectionStatus();
      });

      $form.addEventListener('submit', (ev) => {
        ev.preventDefault();

        const text = $input.value;

        log('Sending: ' + text);
        socket.send(text);

        $input.value = '';
        $input.focus();
      });

      updateConnectionStatus();
    </script>
  </body>
</html>
[package]
name = "websocket-actorless"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
actix-files = "0.6.2"
actix-web = "4.3.1"
actix-ws = "0.2.5"
chrono = { version = "0.4.23", features = ["serde"] }
futures = "0.3"
reqwest = "0.11"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
shuttle-actix-web = "0.57.0"
shuttle-runtime = "0.57.0"
tokio = { version = "1", features = ["rt-multi-thread", "sync"] }
tracing = "0.1"
[build]
assets = [
    "static/*",
]

Usage

Once you've cloned the example, launch it locally using shuttle run and then go to http://localhost:8000. You should be able to see a status page and if you go to your Inspect/Chrome Devtools (depending on what browser you're using), if you go to the Network tab you'll see that your browser received a HTTP status code of 101.