Skip to content

Commit b279b08

Browse files
authored
docs: schema validation testing scripts (#61)
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
1 parent 97d782f commit b279b08

File tree

8 files changed

+467
-0
lines changed

8 files changed

+467
-0
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Copy to .env and fill in.
2+
3+
# Pulsar admin service URL (required)
4+
PULSAR_ADMIN_URL=https://your-pulsar-admin.example.com
5+
6+
# Full topic name: persistent://tenant/namespace/topic-name (required for register-schema and publish scripts)
7+
PULSAR_TOPIC=persistent://tenant/namespace/test-topic
8+
9+
# JWT for pulsarctl and REST publish (required)
10+
PULSAR_AUTH_TOKEN=your-jwt-token
11+
12+
# Optional: number of messages for publish-messages.sh (default: 15)
13+
# NUM_MESSAGES=15
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# Register schema and publish messages
2+
3+
Scripts to register schemas on Pulsar topics and publish test messages for schema validation testing. All connection settings come from a `.env` file.
4+
5+
## Prerequisites
6+
7+
- **pulsarctl** – for schema registration (e.g. `brew install pulsarctl`)
8+
- **Python 3** – for the generic Avro encoder used by `publish-messages.sh`
9+
- **curl** – used by the publish script to POST messages
10+
- **pip install avro** – required by `encode_avro_generic.py` for encoding
11+
12+
## Setup
13+
14+
1. Copy the example env file and fill in your Pulsar details:
15+
16+
```bash
17+
cp .env.example .env
18+
```
19+
20+
2. Edit `.env` and set (use `KEY=value` with no spaces around `=`):
21+
22+
| Variable | Required | Description |
23+
|----------|----------|-------------|
24+
| `PULSAR_ADMIN_URL` | Yes | Pulsar admin service URL (e.g. StreamNative Cloud or your cluster). |
25+
| `PULSAR_TOPIC` | Yes | Full topic name: `persistent://tenant/namespace/topic-name`. |
26+
| `PULSAR_AUTH_TOKEN` | Yes | JWT for admin and REST publish. |
27+
| `NUM_MESSAGES` | No | Default number of messages for `publish-messages.sh` (default: 15). |
28+
29+
---
30+
31+
## Register a schema
32+
33+
**Script:** `register-schema.sh`
34+
35+
Uploads a schema to a topic and turns on schema validation for that topic’s namespace.
36+
37+
**Usage:**
38+
39+
```bash
40+
./register-schema.sh <schema-file> [topic]
41+
```
42+
43+
| Argument | Required | Description |
44+
|----------|----------|-------------|
45+
| `schema-file` | Yes | Path to the schema JSON file (relative to this directory or absolute). |
46+
| `topic` | No | Full topic name. If omitted, uses `PULSAR_TOPIC` from `.env`. |
47+
48+
**Examples:**
49+
50+
```bash
51+
# Use topic from .env
52+
./register-schema.sh schema-test-topic.json
53+
./register-schema.sh schema-test-topic-avro.json
54+
55+
# Override topic
56+
./register-schema.sh schema-test-topic-avro.json persistent://other-tenant/ns/my-topic
57+
```
58+
59+
**Included schema files:**
60+
61+
- `schema-test-topic.json` – JSON schema (TestMessage: name, topic)
62+
- `schema-test-topic-avro.json` – AVRO schema (TestMessage: name, topic)
63+
- `schema-test-topic-avro-name-only.json` – AVRO schema (single field: name)
64+
65+
---
66+
67+
## Publish messages
68+
69+
**Script:** `publish-messages.sh`
70+
71+
Sends messages to a topic by running a **generic Avro encoder** once per message (using the schema file you pass) and POSTing the binary output to the Pulsar admin REST API.
72+
73+
**Why encoding?** When a topic has an Avro schema, the broker expects the message body to be **Avro binary** (the same format a real Avro producer would send). You cannot POST raw JSON or plain text and have it accepted as a valid Avro message. The generic encoder produces that binary from your schema (and optional record data) so the curl request body is in the correct format.
74+
75+
**Usage:**
76+
77+
```bash
78+
./publish-messages.sh <schema-file> [topic] [num-messages] [data-file]
79+
```
80+
81+
| Argument | Required | Description |
82+
|----------|----------|-------------|
83+
| `schema-file` | Yes | Path to the Avro schema (JSON or Pulsar format). Same files you use with `register-schema.sh`. Relative to this directory or absolute. |
84+
| `topic` | No | Full topic name. If omitted, uses `PULSAR_TOPIC` from `.env`. |
85+
| `num-messages` | No | Number of messages to send. Default: 15 or `NUM_MESSAGES` from `.env`. |
86+
| `data-file` | No | JSON file with one record matching the schema. If omitted, the encoder generates default values from the schema (e.g. empty strings, zeros). |
87+
88+
**Examples:**
89+
90+
```bash
91+
# Default topic and 15 messages (encoder uses default record from schema)
92+
./publish-messages.sh schema-test-topic-avro.json
93+
./publish-messages.sh schema-test-topic-avro-name-only.json
94+
95+
# Override topic and count
96+
./publish-messages.sh schema-test-topic-avro-name-only.json persistent://tenant/ns/my-topic 10
97+
98+
# Use a custom JSON record for each message
99+
./publish-messages.sh schema-test-topic-avro.json persistent://tenant/ns/my-topic 5 my-record.json
100+
```
101+
102+
**Generic encoder:** `encode_avro_generic.py`
103+
104+
- Works with **any** Avro record schema. You pass the schema file path (and optionally a JSON record).
105+
- Accepts raw Avro schema JSON or Pulsar format (`{"type":"AVRO","schema":"..."}`).
106+
- Without a data file: builds a default record from the schema (strings → `""`, numbers → `0`, etc.) so you can publish without writing record JSON.
107+
108+
---
109+
110+
## Example workflows
111+
112+
**Register full Avro schema and publish valid messages:**
113+
114+
```bash
115+
./register-schema.sh schema-test-topic-avro.json
116+
./publish-messages.sh schema-test-topic-avro.json
117+
```
118+
119+
**Register name-only schema and publish matching messages:**
120+
121+
```bash
122+
./register-schema.sh schema-test-topic-avro-name-only.json
123+
./publish-messages.sh schema-test-topic-avro-name-only.json
124+
```
125+
126+
**Test schema validation :**
127+
128+
Register the full schema, then publish name-only messages (wrong schema). This should lead to a java io exception when you try to consume from that topic because the messages in the topic do not match the schema.
129+
```bash
130+
./register-schema.sh schema-test-topic-avro.json
131+
./publish-messages.sh schema-test-topic-avro-name-only.json
132+
# Expect "rejected" for each message
133+
```
134+
135+
---
136+
137+
## Adding your own schema
138+
139+
- Create a schema file (see existing `schema-*.json` for format) and pass it to `register-schema.sh`.
140+
- Use the **same schema file** with `publish-messages.sh` to publish messages. The generic encoder works with any Avro record schema. Optionally pass a JSON data file so each message uses your record instead of default values.
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#!/usr/bin/env python3
2+
# Generic Avro encoder: takes a schema file path and optional record data, writes one Avro binary message to stdout.
3+
# Works with any Avro record schema. Record data can be from a JSON file, stdin, or auto-generated defaults.
4+
#
5+
# Usage: encode_avro_generic.py <schema-file> [data-file]
6+
# schema-file Path to schema (Avro JSON or Pulsar format with "type":"AVRO" and "schema" key).
7+
# data-file Optional. JSON file with one record matching the schema. If omitted, reads JSON from stdin;
8+
# if stdin is empty, generates a default record from the schema.
9+
10+
import json
11+
import io
12+
import sys
13+
from pathlib import Path
14+
15+
16+
def load_schema_from_file(path: str):
17+
"""Load Avro schema. Supports raw Avro JSON or Pulsar wrapper {"type":"AVRO","schema":"..."}."""
18+
with open(path, "r") as f:
19+
raw = json.load(f)
20+
if isinstance(raw, dict) and "schema" in raw:
21+
schema_str = raw["schema"]
22+
if isinstance(schema_str, str):
23+
raw = json.loads(schema_str)
24+
else:
25+
raw = schema_str
26+
return raw
27+
28+
29+
def default_for_schema(schema_obj, field_name=None):
30+
"""Build a default value for an Avro schema (for records: dict of field defaults)."""
31+
if isinstance(schema_obj, dict):
32+
type_name = schema_obj.get("type")
33+
elif isinstance(schema_obj, str):
34+
type_name = schema_obj
35+
else:
36+
return None
37+
if isinstance(schema_obj, dict) and type_name == "record":
38+
return {
39+
f["name"]: default_for_schema(f["type"], f["name"])
40+
for f in schema_obj.get("fields", [])
41+
}
42+
if type_name == "string":
43+
if field_name == "topic":
44+
return "test-topic"
45+
return "test-message"
46+
if type_name in ("int", "long"):
47+
return 0
48+
if type_name in ("float", "double"):
49+
return 0.0
50+
if type_name == "boolean":
51+
return False
52+
if type_name == "null":
53+
return None
54+
if type_name == "bytes":
55+
return b""
56+
if type_name == "array":
57+
return []
58+
if type_name == "map":
59+
return {}
60+
if isinstance(type_name, list):
61+
for t in type_name:
62+
if t != "null":
63+
return default_for_schema(t, field_name)
64+
return None
65+
return None
66+
67+
68+
def main():
69+
if len(sys.argv) < 2:
70+
print("Usage: encode_avro_generic.py <schema-file> [data-file]", file=sys.stderr)
71+
sys.exit(1)
72+
73+
schema_path = sys.argv[1]
74+
data_path = sys.argv[2] if len(sys.argv) > 2 else None
75+
76+
if not Path(schema_path).is_file():
77+
print(f"Error: schema file not found: {schema_path}", file=sys.stderr)
78+
sys.exit(1)
79+
80+
schema_dict = load_schema_from_file(schema_path)
81+
82+
try:
83+
import avro.schema
84+
import avro.io
85+
except ImportError:
86+
print("Error: Python 'avro' package required. Run: pip install avro", file=sys.stderr)
87+
sys.exit(1)
88+
89+
schema = avro.schema.parse(json.dumps(schema_dict))
90+
91+
if data_path:
92+
with open(data_path, "r") as f:
93+
record = json.load(f)
94+
else:
95+
if not sys.stdin.isatty():
96+
try:
97+
line = sys.stdin.readline()
98+
if line.strip():
99+
record = json.loads(line)
100+
else:
101+
record = default_for_schema(schema_dict)
102+
except json.JSONDecodeError as e:
103+
print(f"Error: invalid JSON from stdin: {e}", file=sys.stderr)
104+
sys.exit(1)
105+
else:
106+
record = default_for_schema(schema_dict)
107+
108+
writer = avro.io.DatumWriter(schema)
109+
buf = io.BytesIO()
110+
encoder = avro.io.BinaryEncoder(buf)
111+
writer.write(record, encoder)
112+
sys.stdout.buffer.write(buf.getvalue())
113+
114+
115+
if __name__ == "__main__":
116+
main()
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#!/usr/bin/env bash
2+
# Publish messages to a Pulsar topic by invoking an encoder script per message and POSTing the output.
3+
# All connection config from .env: PULSAR_ADMIN_URL, PULSAR_TOPIC, PULSAR_AUTH_TOKEN.
4+
#
5+
# Usage: publish-messages.sh <schema-file> [topic] [num-messages] [data-file]
6+
#
7+
# First argument: path to schema file (Avro JSON or Pulsar format). Used by the generic encoder.
8+
# topic Optional. Full topic (persistent://tenant/namespace/name). Default: PULSAR_TOPIC from .env.
9+
# num-messages Optional. How many messages to send. Default: 15 or NUM_MESSAGES from .env.
10+
# data-file Optional. JSON file with one record matching the schema. If omitted, encoder uses defaults.
11+
#
12+
# Examples:
13+
# publish-messages.sh schema-test-topic-avro.json
14+
# publish-messages.sh schema-test-topic-avro-name-only.json persistent://tenant/ns/my-topic 10
15+
# publish-messages.sh schema-test-topic-avro.json persistent://tenant/ns/my-topic 5 record.json
16+
17+
set -e
18+
19+
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
20+
if [[ ! -f "${SCRIPT_DIR}/.env" ]]; then
21+
echo "Error: .env file not found. Copy .env.example to .env and set PULSAR_ADMIN_URL, PULSAR_TOPIC, PULSAR_AUTH_TOKEN." >&2
22+
exit 1
23+
fi
24+
source "${SCRIPT_DIR}/.env"
25+
26+
if [[ -z "${PULSAR_ADMIN_URL:-}" ]]; then
27+
echo "Error: PULSAR_ADMIN_URL is not set in .env." >&2
28+
exit 1
29+
fi
30+
if [[ -z "${PULSAR_AUTH_TOKEN:-}" ]]; then
31+
echo "Error: PULSAR_AUTH_TOKEN is not set in .env." >&2
32+
exit 1
33+
fi
34+
35+
if [[ $# -lt 1 ]]; then
36+
echo "Usage: $(basename "$0") <schema-file> [topic] [num-messages] [data-file]" >&2
37+
echo " schema-file Path to Avro schema (JSON or Pulsar format)." >&2
38+
echo " topic Optional. Default: PULSAR_TOPIC from .env." >&2
39+
echo " num-messages Optional. Default: 15 or NUM_MESSAGES from .env." >&2
40+
echo " data-file Optional. JSON record matching the schema. If omitted, encoder uses defaults." >&2
41+
exit 1
42+
fi
43+
44+
SCHEMA_FILE="$1"
45+
if [[ "$SCHEMA_FILE" != /* ]]; then
46+
SCHEMA_FILE="${SCRIPT_DIR}/${SCHEMA_FILE}"
47+
fi
48+
if [[ ! -f "$SCHEMA_FILE" ]]; then
49+
echo "Error: Schema file not found: $SCHEMA_FILE" >&2
50+
exit 1
51+
fi
52+
53+
ENCODER_SCRIPT="${SCRIPT_DIR}/encode_avro_generic.py"
54+
if [[ ! -f "$ENCODER_SCRIPT" ]]; then
55+
echo "Error: Encoder script not found: $ENCODER_SCRIPT" >&2
56+
exit 1
57+
fi
58+
59+
if [[ -n "${2:-}" ]]; then
60+
TOPIC="$2"
61+
else
62+
TOPIC="${PULSAR_TOPIC:-}"
63+
if [[ -z "$TOPIC" ]]; then
64+
echo "Error: PULSAR_TOPIC is not set in .env and no topic argument was provided." >&2
65+
exit 1
66+
fi
67+
fi
68+
69+
NUM_MESSAGES="${3:-${NUM_MESSAGES:-15}}"
70+
DATA_FILE="${4:-}"
71+
72+
# Parse persistent://tenant/namespace/topic-name
73+
if [[ ! "$TOPIC" =~ ^persistent://([^/]+)/([^/]+)/([^/]+)$ ]]; then
74+
echo "Error: Topic must be persistent://tenant/namespace/topic-name (got: $TOPIC)" >&2
75+
exit 1
76+
fi
77+
TENANT="${BASH_REMATCH[1]}"
78+
NAMESPACE="${BASH_REMATCH[2]}"
79+
TOPIC_NAME="${BASH_REMATCH[3]}"
80+
81+
BASE_URL="${PULSAR_ADMIN_URL%/}"
82+
URL="${BASE_URL}/admin/rest/topics/v1/persistent/${TENANT}/${NAMESPACE}/${TOPIC_NAME}/message"
83+
84+
echo "Publishing $NUM_MESSAGES messages to $TOPIC (schema: $SCHEMA_FILE) ..."
85+
echo ""
86+
87+
TMP=$(mktemp)
88+
trap 'rm -f "$TMP"' EXIT
89+
90+
success=0
91+
rejected=0
92+
for i in $(seq 1 "$NUM_MESSAGES"); do
93+
if [[ -n "$DATA_FILE" ]]; then
94+
python3 "$ENCODER_SCRIPT" "$SCHEMA_FILE" "$DATA_FILE" > "$TMP" || { echo "Error: Failed to encode message." >&2; exit 1; }
95+
else
96+
python3 "$ENCODER_SCRIPT" "$SCHEMA_FILE" > "$TMP" || { echo "Error: Failed to encode message." >&2; exit 1; }
97+
fi
98+
code=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 10 --max-time 30 -X POST "$URL" \
99+
-H "Authorization: Bearer ${PULSAR_AUTH_TOKEN}" \
100+
-H "Accept: application/json" \
101+
-H "Content-Type: application/octet-stream" \
102+
--data-binary "@$TMP")
103+
if [[ "$code" -ge 200 && "$code" -lt 300 ]]; then
104+
(( success++ )) || true
105+
echo " message $i -> HTTP $code"
106+
else
107+
(( rejected++ )) || true
108+
echo " message $i -> HTTP $code (rejected)"
109+
fi
110+
done
111+
112+
echo ""
113+
echo "Done: $success published, $rejected rejected."

0 commit comments

Comments
 (0)