Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion images/spin/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=${BUILDPLATFORM} rust:1.79 AS build
FROM --platform=${BUILDPLATFORM} rust:1.81 AS build
WORKDIR /opt/build
COPY . .
RUN rustup target add wasm32-wasi && cargo build --target wasm32-wasi --release
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[toolchain]
channel = "1.79.0"
channel = "1.81.0"
components = ["clippy", "rustfmt"]
targets = ["wasm32-wasi", "wasm32-unknown-unknown"]
profile = "default"
13 changes: 13 additions & 0 deletions scripts/deploy-workloads.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,29 @@ if ! command -v kubectl &> /dev/null; then
sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl;
fi

update_mqtt_workload_with_broker_cluster_ip() {
local dir=$1
echo "Waiting for emqx pod to be ready"
kubectl wait --for=condition=ready --timeout=20s pod/emqx
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
# Replace "EMQX_CLUSTER_IP" with the actual ClusterIP of the EMQX service
local cluster_ip=$(kubectl get svc emqx -o jsonpath='{.spec.clusterIP}')
sed -i "s/EMQX_CLUSTER_IP/$cluster_ip/g" $dir/workloads.yaml
echo "Updated workloads.yaml with ClusterIP: $cluster_ip"
}

# apply the workloads
echo ">>> apply workloads"
kubectl apply -f tests/workloads-common
# wait for all the pods to be ready
kubectl wait --for=condition=ready --timeout=120s pod --all

if [ "$1" == "workloads-pushed-using-spin-registry-push" ]; then
update_mqtt_workload_with_broker_cluster_ip "tests/workloads-pushed-using-spin-registry-push"
echo "deploying spin apps pushed to registry using 'spin registry push' command"
kubectl apply -f tests/workloads-pushed-using-spin-registry-push
else
update_mqtt_workload_with_broker_cluster_ip "tests/workloads-pushed-using-docker-build-push"
echo "deploying spin apps pushed to registry using 'docker build && k3d image import' command"
kubectl apply -f tests/workloads-pushed-using-docker-build-push
fi
Expand Down
25 changes: 16 additions & 9 deletions tests/src/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

let forward_port = port_forward_redis(redis_port).await?;
let forward_port = port_forward_svc(redis_port, "redis").await?;

let client = redis::Client::open(format!("redis://localhost:{}", forward_port))?;
let mut con = client.get_multiplexed_async_connection().await?;
Expand Down Expand Up @@ -145,13 +145,14 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

let forward_port = port_forward_redis(redis_port).await?;
let forward_port = port_forward_svc(redis_port, "redis").await?;

let client = redis::Client::open(format!("redis://localhost:{}", forward_port))
.context("connecting to redis")?;
let mut con = client.get_multiplexed_async_connection().await?;

con.publish("testchannel", "some-payload").await?;
con.publish::<_, _, ()>("testchannel", "some-payload")
.await?;

let one_sec = time::Duration::from_secs(1);
thread::sleep(one_sec);
Expand All @@ -174,8 +175,11 @@ mod test {
anyhow::bail!("kubectl is not installed");
}

// Publish a message to the MQTT broker
let mut mqttoptions = rumqttc::MqttOptions::new("123", "test.mosquitto.org", mqtt_port);
// Port forward the emqx mqtt broker
let forward_port = port_forward_svc(mqtt_port, "emqx").await?;

// Publish a message to the emqx broker
let mut mqttoptions = rumqttc::MqttOptions::new("123", "127.0.0.1", forward_port);
mqttoptions.set_keep_alive(std::time::Duration::from_secs(1));

let (client, mut eventloop) = rumqttc::AsyncClient::new(mqttoptions, 10);
Expand Down Expand Up @@ -253,15 +257,18 @@ mod test {
}
}

async fn port_forward_redis(redis_port: u16) -> Result<u16> {
async fn port_forward_svc(svc_port: u16, svc_name: &str) -> Result<u16> {
let port = get_random_port()?;

println!(" >>> kubectl portforward redis {}:{} ", port, redis_port);
println!(
" >>> kubectl portforward svc {} {}:{} ",
svc_name, port, svc_port
);

Command::new("kubectl")
.arg("port-forward")
.arg("redis")
.arg(format!("{}:{}", port, redis_port))
.arg(svc_name)
.arg(format!("{}:{}", port, svc_port))
.spawn()?;
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Ok(port)
Expand Down
25 changes: 25 additions & 0 deletions tests/workloads-common/mqtt-broker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
apiVersion: v1
kind: Pod
metadata:
name: emqx
labels:
app: emqx
spec:
containers:
- name: emqx
image: emqx/emqx
ports:
- containerPort: 1883
---
apiVersion: v1
kind: Service
metadata:
name: emqx
spec:
selector:
app: emqx
ports:
- protocol: TCP
port: 1883
targetPort: 1883
type: ClusterIP
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ spec:
value: containerd-shim-spin/mqtt-test-17h24d
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
- name: SPIN_VARIABLE_MQTT_BROKER_URI
value: "mqtt://test.mosquitto.org"
value: "mqtt://EMQX_CLUSTER_IP:1883"
---
apiVersion: v1
kind: Service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ spec:
value: containerd-shim-spin/mqtt-test-17h24d
# The MQTT trigger cannot do DNS resolution, so we need to use the IP address of the MQTT broker
- name: SPIN_VARIABLE_MQTT_BROKER_URI
value: "mqtt://test.mosquitto.org"
value: "mqtt://EMQX_CLUSTER_IP:1883"
---
apiVersion: v1
kind: Service
Expand Down
Loading