-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.mjs
More file actions
184 lines (158 loc) · 6.11 KB
/
test.mjs
File metadata and controls
184 lines (158 loc) · 6.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env node
/**
* SimpleScheduler 测试脚本
* 用于测试调度器的各项功能
*
* 重要概念:
* - 服务定义包含具体的 URL/命令和参数映射配置
* - 客户端只需要提供业务参数(如 city, reportType 等)
* - 系统负责将业务参数映射到实际的 API 调用或命令执行
*/
const BASE_URL = 'http://localhost:3005'
async function testScheduler() {
console.log('🧪 开始测试 SimpleScheduler...\n')
try {
// 测试 0: 获取可用的预定义服务列表
console.log('📋 测试 0: 获取可用的预定义服务')
const response0 = await fetch(`${BASE_URL}/api/v1/services`)
const { services } = await response0.json()
console.log(`✅ 系统中预定义了 ${services.length} 个服务:`)
for (const service of services) {
console.log(` - ${service.id} (${service.type}): ${service.description || '无描述'}`)
console.log(` 业务参数: ${Object.keys(service.parameters).join(', ')}`)
if (service.config.url) {
console.log(` URL: ${service.config.url}`)
} else if (service.config.command) {
console.log(` 命令: ${service.config.command}`)
}
}
console.log()
// 测试 1: 提交 Web API 任务(httpbin-test)
console.log('📝 测试 1: 提交 Web API 任务')
console.log(' 使用预定义服务: httpbin-test')
console.log(' 客户端只提供业务参数: delay=2')
const response1 = await fetch(`${BASE_URL}/api/v1/schedule`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
serviceId: 'httpbin-test', // 引用预定义的服务 ID
payload: {
delay: 2 // 只提供业务参数
},
queue: 'default'
})
})
if (!response1.ok) {
throw new Error(`HTTP ${response1.status}: ${await response1.text()}`)
}
const { jobId } = await response1.json()
console.log(`✅ 任务已提交,Job ID: ${jobId}\n`)
// 测试 2: 查询任务状态
console.log('📊 测试 2: 查询任务状态')
await new Promise((resolve) => setTimeout(resolve, 3000)) // 等待 3 秒
const response2 = await fetch(`${BASE_URL}/api/v1/status/${jobId}`)
const job = await response2.json()
console.log(`✅ 任务状态: ${job.status}`)
console.log(` 创建时间: ${job.createdAt}`)
if (job.result) {
console.log(` 结果: ${JSON.stringify(job.result).substring(0, 100)}...\n`)
} else {
console.log()
}
// 测试 3: 测试参数验证(缺少必需参数)
console.log('🔍 测试 3: 参数验证(提供未定义的参数)')
const response3 = await fetch(`${BASE_URL}/api/v1/schedule`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
serviceId: 'httpbin-test',
payload: {
delay: 1,
unknownParam: 'test' // 提供未定义的参数
},
queue: 'default'
})
})
if (!response3.ok) {
const errorText = await response3.text()
console.log(`✅ 参数验证成功(按预期失败): ${errorText}\n`)
} else {
throw new Error('应该返回错误,但却成功了')
}
// 测试 4: 测试 SSE 流式接口
console.log('🔄 测试 4: SSE 流式接口')
console.log(' 提交新任务并监听状态...')
console.log(' 使用预定义服务: httpbin-test')
const response4 = await fetch(`${BASE_URL}/api/v1/schedule`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
serviceId: 'httpbin-test', // 引用预定义的服务 ID
payload: {
delay: 2 // 只提供业务参数
},
queue: 'default'
})
})
const { jobId: jobId2 } = await response4.json()
console.log(` Job ID: ${jobId2}`)
// 使用原生 fetch 监听 SSE(Node.js 18+ 支持)
const sseResponse = await fetch(`${BASE_URL}/api/v1/stream/${jobId2}`)
if (!sseResponse.body) {
throw new Error('No response body')
}
const reader = sseResponse.body.getReader()
const decoder = new TextDecoder()
console.log(' 接收事件:')
let done = false
while (!done) {
const { value, done: readerDone } = await reader.read()
done = readerDone
if (value) {
const chunk = decoder.decode(value)
const lines = chunk.split('\n').filter((line) => line.trim())
for (const line of lines) {
if (line.startsWith('data:')) {
const data = line.substring(5).trim()
try {
const parsed = JSON.parse(data)
console.log(` 📡 事件: ${parsed.eventType}`)
console.log(
` 📦 数据:`,
JSON.stringify(parsed.data, null, 2)
.split('\n')
.map((l) => ` ${l}`)
.join('\n')
)
} catch (e) {
// 忽略解析错误
console.log(` 📄 原始数据: ${data}`)
}
}
}
}
}
console.log('✅ SSE 测试完成\n')
// 测试 5: 查看队列状态
console.log('📦 测试 5: 查看队列状态')
const response5 = await fetch(`${BASE_URL}/api/v1/queues`)
const { queues } = await response5.json()
console.log('✅ 队列状态:')
for (const queue of queues) {
console.log(
` - ${queue.name}: 并发=${queue.concurrency}, 等待=${queue.pendingJobs}, 运行中=${queue.runningJobs}`
)
}
console.log('\n🎉 所有测试通过!')
console.log('\n💡 关键要点:')
console.log(' 1. 服务定义包含具体的 URL/命令和参数映射配置')
console.log(' 2. 客户端只需要提供业务参数(如 city、delay 等)')
console.log(' 3. 系统自动将业务参数映射到实际的 API 调用或命令执行')
console.log(' 4. 这种设计确保了系统的安全性和可控性')
} catch (error) {
console.error('\n❌ 测试失败:', error.message)
process.exit(1)
}
}
// 运行测试
testScheduler()