-
Notifications
You must be signed in to change notification settings - Fork 9.1k
Expand file tree
/
Copy pathlocal_chatgpt_auth.rs
More file actions
184 lines (161 loc) · 6.27 KB
/
local_chatgpt_auth.rs
File metadata and controls
184 lines (161 loc) · 6.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
use std::path::Path;
use codex_app_server_protocol::AuthMode;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::load_auth_dot_json;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LocalChatgptAuth {
pub(crate) access_token: String,
pub(crate) chatgpt_account_id: String,
pub(crate) chatgpt_plan_type: Option<String>,
}
pub(crate) fn load_local_chatgpt_auth(
codex_home: &Path,
auth_credentials_store_mode: AuthCredentialsStoreMode,
forced_chatgpt_workspace_id: Option<&str>,
) -> Result<LocalChatgptAuth, String> {
let auth = load_auth_dot_json(codex_home, auth_credentials_store_mode)
.map_err(|err| format!("failed to load local auth: {err}"))?
.ok_or_else(|| "no local auth available".to_string())?;
if matches!(auth.auth_mode, Some(AuthMode::ApiKey)) || auth.openai_api_key.is_some() {
return Err("local auth is not a ChatGPT login".to_string());
}
let tokens = auth
.tokens
.ok_or_else(|| "local ChatGPT auth is missing token data".to_string())?;
let access_token = tokens.access_token;
let chatgpt_account_id = tokens
.account_id
.or(tokens.id_token.chatgpt_account_id.clone())
.ok_or_else(|| "local ChatGPT auth is missing chatgpt account id".to_string())?;
if let Some(expected_workspace) = forced_chatgpt_workspace_id
&& chatgpt_account_id != expected_workspace
{
return Err(format!(
"local ChatGPT auth must use workspace {expected_workspace}, but found {chatgpt_account_id:?}"
));
}
let chatgpt_plan_type = tokens
.id_token
.get_chatgpt_plan_type()
.map(|plan_type| plan_type.to_ascii_lowercase());
Ok(LocalChatgptAuth {
access_token,
chatgpt_account_id,
chatgpt_plan_type,
})
}
#[cfg(test)]
mod tests {
use super::*;
use base64::Engine;
use chrono::Utc;
use codex_app_server_protocol::AuthMode;
use codex_core::auth::AuthDotJson;
use codex_core::auth::save_auth;
use codex_core::token_data::TokenData;
use codex_login::auth::login_with_chatgpt_auth_tokens;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use tempfile::TempDir;
fn fake_jwt(email: &str, account_id: &str, plan_type: &str) -> String {
#[derive(Serialize)]
struct Header {
alg: &'static str,
typ: &'static str,
}
let header = Header {
alg: "none",
typ: "JWT",
};
let payload = json!({
"email": email,
"https://api.openai.com/auth": {
"chatgpt_account_id": account_id,
"chatgpt_plan_type": plan_type,
},
});
let encode = |bytes: &[u8]| base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes);
let header_b64 = encode(&serde_json::to_vec(&header).expect("serialize header"));
let payload_b64 = encode(&serde_json::to_vec(&payload).expect("serialize payload"));
let signature_b64 = encode(b"sig");
format!("{header_b64}.{payload_b64}.{signature_b64}")
}
fn write_chatgpt_auth(codex_home: &Path) {
let id_token = fake_jwt("user@example.com", "workspace-1", "business");
let access_token = fake_jwt("user@example.com", "workspace-1", "business");
let auth = AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(TokenData {
id_token: codex_core::token_data::parse_chatgpt_jwt_claims(&id_token)
.expect("id token should parse"),
access_token,
refresh_token: "refresh-token".to_string(),
account_id: Some("workspace-1".to_string()),
}),
last_refresh: Some(Utc::now()),
};
save_auth(codex_home, &auth, AuthCredentialsStoreMode::File)
.expect("chatgpt auth should save");
}
#[test]
fn loads_local_chatgpt_auth_from_managed_auth() {
let codex_home = TempDir::new().expect("tempdir");
write_chatgpt_auth(codex_home.path());
let auth = load_local_chatgpt_auth(
codex_home.path(),
AuthCredentialsStoreMode::File,
Some("workspace-1"),
)
.expect("chatgpt auth should load");
assert_eq!(auth.chatgpt_account_id, "workspace-1");
assert_eq!(auth.chatgpt_plan_type.as_deref(), Some("business"));
assert!(!auth.access_token.is_empty());
}
#[test]
fn rejects_missing_local_auth() {
let codex_home = TempDir::new().expect("tempdir");
let err = load_local_chatgpt_auth(codex_home.path(), AuthCredentialsStoreMode::File, None)
.expect_err("missing auth should fail");
assert_eq!(err, "no local auth available");
}
#[test]
fn rejects_api_key_auth() {
let codex_home = TempDir::new().expect("tempdir");
save_auth(
codex_home.path(),
&AuthDotJson {
auth_mode: Some(AuthMode::ApiKey),
openai_api_key: Some("sk-test".to_string()),
tokens: None,
last_refresh: None,
},
AuthCredentialsStoreMode::File,
)
.expect("api key auth should save");
let err = load_local_chatgpt_auth(codex_home.path(), AuthCredentialsStoreMode::File, None)
.expect_err("api key auth should fail");
assert_eq!(err, "local auth is not a ChatGPT login");
}
#[test]
fn prefers_managed_auth_over_external_ephemeral_tokens() {
let codex_home = TempDir::new().expect("tempdir");
write_chatgpt_auth(codex_home.path());
login_with_chatgpt_auth_tokens(
codex_home.path(),
&fake_jwt("user@example.com", "workspace-2", "enterprise"),
"workspace-2",
Some("enterprise"),
)
.expect("external auth should save");
let auth = load_local_chatgpt_auth(
codex_home.path(),
AuthCredentialsStoreMode::File,
Some("workspace-1"),
)
.expect("managed auth should win");
assert_eq!(auth.chatgpt_account_id, "workspace-1");
assert_eq!(auth.chatgpt_plan_type.as_deref(), Some("business"));
}
}