Skip to content

Commit 0809507

Browse files
committed
Fixes
1 parent eb4ce71 commit 0809507

File tree

9 files changed

+126
-60
lines changed

9 files changed

+126
-60
lines changed

cloudkit-proto/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ disjoint_impls! {
123123

124124
impl<T: Serialize + DeserializeOwned + CloudKitBytesKind<Kind = sealed::PlistKind>> CloudKitBytes for T {
125125
fn from_bytes(v: Vec<u8>) -> Self {
126-
// println!("{}", encode_hex(&v));
127-
plist::from_bytes(&v).expect("Deserialization failed!")
126+
plist::from_bytes(&v).expect(&format!("Deserialization failed! {}", encode_hex(&v)))
128127
}
129128

130129
fn to_bytes(&self) -> Vec<u8> {

src/cloudkit.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use std::{collections::HashMap, io::{Cursor, Read, Write}, marker::PhantomData,
33
use aes::{cipher::consts::U12, Aes128, Aes256};
44
use aes_gcm::{AesGcm, Nonce, Tag};
55
use aes_siv::siv::CmacSiv;
6-
use cloudkit_proto::{record, request_operation::header::IsolationLevel, AssetGetResponse, AssetsToDownload, CloudKitRecord, ProtectionInfo, Record, RecordIdentifier, RecordZoneIdentifier, ResponseOperation, Zone};
6+
use cloudkit_proto::{record, request_operation::header::IsolationLevel, retrieve_changes_response::RecordChange, AssetGetResponse, AssetsToDownload, CloudKitRecord, ProtectionInfo, Record, RecordIdentifier, RecordZoneIdentifier, ResponseOperation, Zone};
77
use hkdf::Hkdf;
8+
use log::info;
89
use omnisette::{AnisetteProvider, ArcAnisetteClient};
910
use openssl::{bn::{BigNum, BigNumContext}, ec::{EcGroup, EcKey, EcPoint}, hash::MessageDigest, nid::Nid, pkcs5::pbkdf2_hmac, pkey::{HasPublic, PKey, Private}, sha::sha1, sign::{Signer, Verifier}};
1011
use plist::Value;
@@ -479,6 +480,30 @@ impl FetchRecordChangesOperation {
479480
include_mergeable_deltas: None,
480481
})
481482
}
483+
484+
pub async fn do_sync(container: &CloudKitOpenContainer<'_, impl AnisetteProvider>,
485+
zones: &[(cloudkit_proto::RecordZoneIdentifier, Option<Vec<u8>>)], assets: &cloudkit_proto::AssetsToDownload) -> Result<Vec<(Vec<AssetGetResponse>, Vec<RecordChange>, Option<Vec<u8>>)>, PushError> {
486+
let mut responses = zones.iter().map(|zone| (vec![], vec![], zone.1.clone())).collect::<Vec<_>>();
487+
488+
let mut finished_zones = vec![];
489+
while finished_zones.len() != zones.len() {
490+
let mut sync_zones_here = zones.iter().enumerate().filter(|(_, zone)| !finished_zones.contains(&zone.0)).collect::<Vec<_>>();
491+
let operations = container.perform_operations_checked(&CloudKitSession::new(),
492+
&sync_zones_here.iter().map(|(idx, zone)| FetchRecordChangesOperation::new(zone.0.clone(), responses[*idx].2.clone(), assets))
493+
.collect::<Vec<_>>(), IsolationLevel::Zone).await?;
494+
for (result, (zone_idx, zone)) in operations.into_iter().zip(sync_zones_here.iter_mut()) {
495+
if result.1.status() == 3 {
496+
// done syncing
497+
finished_zones.push(zone.0.clone());
498+
}
499+
responses[*zone_idx].0.extend(result.0);
500+
responses[*zone_idx].1.extend(result.1.change);
501+
responses[*zone_idx].2 = result.1.sync_continuation_token.clone();
502+
}
503+
}
504+
505+
Ok(responses)
506+
}
482507
}
483508

484509
pub fn should_reset(error: Option<&PushError>) -> bool {
@@ -846,10 +871,13 @@ impl<'t, T: AnisetteProvider> CloudKitOpenContainer<'t, T> {
846871
..
847872
})) => {
848873
let service = PCSPrivateKey::get_service_key(client, pcs_service, self.client.config.as_ref()).await?;
874+
875+
info!("Creating zone {} with service key {}", zone_name, encode_hex(&service.key().compress()));
849876

850-
let request = ZoneSaveOperation::new(zone.clone(), Some(&service.key()), pcs_service.global_record).unwrap();
877+
let request = ZoneSaveOperation::new(zone.clone(), Some(&service.key()), pcs_service.global_record)?;
851878
let zone = request.0.clone().zone.unwrap();
852-
self.perform(&CloudKitSession::new(), request).await.unwrap();
879+
self.perform(&CloudKitSession::new(), request).await?;
880+
info!("Created zone");
853881
zone
854882
},
855883
Err(err) => return Err(err)
@@ -978,6 +1006,9 @@ impl<'t, T: AnisetteProvider> CloudKitOpenContainer<'t, T> {
9781006
if response.status().as_u16() == 401 {
9791007
self.client.token_provider.refresh_mme().await?;
9801008
}
1009+
if response.status().as_u16() == 429 {
1010+
return Err(PushError::TooManyRequests);
1011+
}
9811012

9821013
let token: Vec<u8> = response.bytes().await?
9831014
.into();

src/error.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ pub enum PushError {
171171
NotInClique,
172172
#[error("Missing group photo!")]
173173
MissingGroupPhoto,
174-
#[error("PCS Share key not found!")]
175-
ShareKeyNotFound,
174+
#[error("PCS Share key {0} not found!")]
175+
ShareKeyNotFound(String),
176176
#[error("BatchError {0}")]
177177
BatchError(Arc<PushError>),
178178
#[error("Invalid 2fa code!")]
@@ -181,4 +181,6 @@ pub enum PushError {
181181
PCSRecordKeyMissing,
182182
#[error("Circle is over!")]
183183
CircleOver,
184+
#[error("Too many requests!")]
185+
TooManyRequests,
184186
}

src/findmy.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -420,22 +420,20 @@ impl<P: AnisetteProvider> FindMyClient<P> {
420420

421421
let mut state = self.state.state.lock().await;
422422

423-
let mut result = container.perform(&CloudKitSession::new(),
424-
FetchRecordChangesOperation::new(beacon_zone.clone(), state.state_token.clone(), &NO_ASSETS)).await;
423+
let mut result = FetchRecordChangesOperation::do_sync(&container, &[(beacon_zone.clone(), state.state_token.clone())], &NO_ASSETS).await;
425424
if should_reset(result.as_ref().err()) {
426425
state.state_token = None;
427426
state.accessories.clear();
428-
result = container.perform(&CloudKitSession::new(),
429-
FetchRecordChangesOperation::new(beacon_zone, state.state_token.clone(), &NO_ASSETS)).await;
427+
result = FetchRecordChangesOperation::do_sync(&container, &[(beacon_zone.clone(), state.state_token.clone())], &NO_ASSETS).await;
430428
}
431429

432-
let (_, changes) = result?;
430+
let (_, changes, continuation) = result?.remove(0);
433431

434-
state.state_token = changes.sync_continuation_token.clone();
432+
state.state_token = continuation.clone();
435433

436434
let accessories = &mut state.accessories;
437435

438-
for change in changes.change {
436+
for change in changes {
439437
let identifier = change.identifier.as_ref().unwrap().value.as_ref().unwrap().name().to_string();
440438
let Some(record) = change.record else {
441439
accessories.remove(&identifier);
@@ -447,7 +445,7 @@ impl<P: AnisetteProvider> FindMyClient<P> {
447445
if record.r#type.as_ref().unwrap().name() == MasterBeaconRecord::record_type() {
448446
let item = MasterBeaconRecord::from_record_encrypted(&record.record_field, Some((&pcs_key_for_record(&record, &key)?, record.record_identifier.as_ref().unwrap())));
449447

450-
println!("Got beacon {:?} {}", item, identifier);
448+
info!("Got beacon {:?} {}", item, identifier);
451449

452450
if let Some(accessory) = accessories.get_mut(&identifier) {
453451
accessory.master_record = item;
@@ -524,6 +522,7 @@ impl<P: AnisetteProvider> FindMyClient<P> {
524522
let adv = base64_encode(&sha256(&x.to_vec_padded(28)?));
525523
key_map.insert(adv.clone(), (id.clone(), key, idx));
526524
device_keys.push(adv);
525+
527526
}
528527

529528
search.push(json!({
@@ -536,6 +535,11 @@ impl<P: AnisetteProvider> FindMyClient<P> {
536535
}));
537536
}
538537

538+
if search.is_empty() {
539+
info!("Not searching, no item!");
540+
return Ok(())
541+
}
542+
539543
let mut request = self.anisette.lock().await.get_headers().await?.clone();
540544
request.remove("X-Mme-Client-Info").unwrap();
541545
let anisette_headers: HeaderMap = request.into_iter().map(|(a, b)| (HeaderName::from_str(&a).unwrap(), b.parse().unwrap())).collect();

src/imessage/cloud_messages.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ message MessageProto2 {
1313
message MessageProto {
1414
uint32 unk1 = 1; // always 1, is finished or ck sync state
1515
optional string groupTitle = 2;
16-
string text = 3;
17-
bytes attributedBody = 4;
16+
optional string text = 3;
17+
optional bytes attributedBody = 4;
1818
optional string balloonBundleId = 5;
1919
optional bytes payloadData = 6;
2020
optional bytes messageSummaryInfo = 7;

src/imessage/cloud_messages.rs

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::ops::{Deref, DerefMut};
66
use std::sync::Arc;
77
use std::time::{Duration, SystemTime};
88

9+
use backon::{ConstantBuilder, Retryable};
910
use cloudkit_derive::CloudKitRecord;
1011
use cloudkit_proto::request_operation::header::IsolationLevel;
1112
use cloudkit_proto::retrieve_changes_response::RecordChange;
@@ -400,7 +401,7 @@ impl Into<Option<MMCSAttachmentMeta>> for &Attachment {
400401
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
401402
pub struct AttachmentMetaExtra {
402403
#[serde(rename = "pgens")]
403-
pub preview_generation_state: Option<u32>, // set to 1
404+
pub preview_generation_state: Option<i32>, // set to 1
404405
}
405406

406407
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
@@ -412,7 +413,7 @@ pub struct AttachmentMeta {
412413
#[serde(rename = "tb")]
413414
pub total_bytes: u64,
414415
#[serde(rename = "st")]
415-
pub transfer_state: u32,
416+
pub transfer_state: i32,
416417
#[serde(rename = "is")]
417418
pub is_sticker: bool,
418419
#[serde(rename = "aguid")]
@@ -430,7 +431,7 @@ pub struct AttachmentMeta {
430431
#[serde(rename = "tn")]
431432
pub transfer_name: String,
432433
#[serde(rename = "vers")]
433-
pub version: u32, // set to 1
434+
pub version: i32, // set to 1
434435
#[serde(rename = "t")]
435436
pub uti: Option<String>, // uti type
436437
#[serde(rename = "cdt")]
@@ -474,26 +475,17 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
474475
return Ok(locked.clone().unwrap())
475476
}
476477

477-
// todo keep syncing until end
478-
async fn sync_records<T: CloudKitRecord>(&self, zone: &str, mut continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<T>>), PushError> {
478+
async fn sync_records<T: CloudKitRecord>(&self, zone: &str, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<T>>, i32), PushError> {
479479
let container = self.get_container().await?;
480480

481481
let zone = container.private_zone(zone.to_string());
482482
let key = container.get_zone_encryption_config(&zone, &self.keychain, &MESSAGES_SERVICE).await?;
483-
let (_assets, mut response) = container.perform(&CloudKitSession::new(),
483+
let (_assets, response) = container.perform(&CloudKitSession::new(),
484484
FetchRecordChangesOperation::new(zone.clone(), continuation_token, &NO_ASSETS)).await?;
485-
let mut items = vec![];
486-
while !response.change.is_empty() {
487-
items.extend(response.change);
488-
continuation_token = response.sync_continuation_token.clone();
489-
response = container.perform(&CloudKitSession::new(),
490-
FetchRecordChangesOperation::new(zone.clone(), continuation_token, &NO_ASSETS)).await?.1;
491-
}
492-
493485

494486
let mut results = HashMap::new();
495487

496-
for change in &items {
488+
for change in &response.change {
497489
let identifier = change.identifier.as_ref().unwrap().value.as_ref().unwrap().name().to_string();
498490

499491
let Some(record) = &change.record else {
@@ -515,7 +507,7 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
515507
results.insert(identifier, Some(item));
516508
}
517509

518-
Ok((response.sync_continuation_token().to_vec(), results))
510+
Ok((response.sync_continuation_token().to_vec(), results, response.status()))
519511
}
520512

521513
async fn save_records<T: CloudKitRecord>(&self, zone: &str, records: HashMap<String, T>) -> Result<HashMap<String, Result<(), PushError>>, PushError> {
@@ -555,13 +547,16 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
555547

556548
let zone = container.private_zone(zone.to_string());
557549

558-
let mut operations = vec![];
559-
for record_id in records {
560-
operations.push(DeleteRecordOperation::new(record_identifier(zone.clone(), record_id)));
550+
for batch in records.chunks(256) {
551+
let mut operations = vec![];
552+
for record_id in batch {
553+
operations.push(DeleteRecordOperation::new(record_identifier(zone.clone(), record_id)));
554+
}
555+
(|| async {
556+
container.perform_operations_checked(&CloudKitSession::new(), &operations, IsolationLevel::Operation).await
557+
}).retry(&ConstantBuilder::default().with_delay(Duration::from_secs(5)).with_max_times(3)).await?;
561558
}
562559

563-
container.perform_operations_checked(&CloudKitSession::new(), &operations, IsolationLevel::Operation).await?;
564-
565560
Ok(())
566561
}
567562

@@ -575,11 +570,18 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
575570
ZoneDeleteOperation::new(container.private_zone("messageManateeZone".to_string())),
576571
ZoneDeleteOperation::new(container.private_zone("attachmentManateeZone".to_string())),
577572
ZoneDeleteOperation::new(container.private_zone("chat1ManateeZone".to_string())),
573+
ZoneDeleteOperation::new(container.private_zone("messageUpdateZone".to_string())),
574+
ZoneDeleteOperation::new(container.private_zone("recoverableMessageDeleteZone".to_string())),
575+
ZoneDeleteOperation::new(container.private_zone("scheduledMessageZone".to_string())),
576+
ZoneDeleteOperation::new(container.private_zone("chatBotMessageZone".to_string())),
577+
ZoneDeleteOperation::new(container.private_zone("chatBotAttachmentZone".to_string())),
578+
ZoneDeleteOperation::new(container.private_zone("chatBotRecoverableMessageDeleteZone".to_string())),
579+
578580
], IsolationLevel::Operation).await?;
579581
Ok(())
580582
}
581583

582-
pub async fn sync_chats(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudChat>>), PushError> {
584+
pub async fn sync_chats(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudChat>>, i32), PushError> {
583585
self.sync_records("chatManateeZone", continuation_token).await
584586
}
585587

@@ -591,7 +593,7 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
591593
self.delete_records("chatManateeZone", chats).await
592594
}
593595

594-
pub async fn sync_messages(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudMessage>>), PushError> {
596+
pub async fn sync_messages(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudMessage>>, i32), PushError> {
595597
self.sync_records("messageManateeZone", continuation_token).await
596598
}
597599

@@ -603,7 +605,7 @@ impl<P: AnisetteProvider> CloudMessagesClient<P> {
603605
self.delete_records("messageManateeZone", messages).await
604606
}
605607

606-
pub async fn sync_attachments(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudAttachment>>), PushError> {
608+
pub async fn sync_attachments(&self, continuation_token: Option<Vec<u8>>) -> Result<(Vec<u8>, HashMap<String, Option<CloudAttachment>>, i32), PushError> {
607609
self.sync_records("attachmentManateeZone", continuation_token).await
608610
}
609611

src/keychain.rs

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::{BTreeMap, HashMap}, io::{Cursor, Read}, sync::Arc};
1+
use std::{collections::{BTreeMap, HashMap}, io::{Cursor, Read}, sync::Arc, time::Duration};
22

33
use aes_gcm::{AesGcm, Nonce};
44
use cloudkit_derive::CloudKitRecord;
@@ -32,6 +32,8 @@ use srp::{client::{SrpClient, SrpClientVerifier}, groups::G_2048, server::SrpSer
3232
use tokio::sync::{Mutex, RwLock};
3333
use crate::{aps::APSInterestToken, auth::MobileMeDelegateResponse, cloudkit::{CloudKitClient, CloudKitContainer, CloudKitOpenContainer, CloudKitSession, FetchRecordChangesOperation, FunctionInvokeOperation, ALL_ASSETS}, ids::{CompactECKey, IDSRecvMessage}, util::{base64_decode, base64_encode, bin_deserialize, bin_deserialize_opt_vec, bin_serialize, bin_serialize_opt_vec, decode_hex, decode_uleb128, duration_since_epoch, ec_deserialize_priv, ec_serialize_priv, encode_hex, kdf_ctr_hmac, plist_to_bin, plist_to_string, proto_deserialize, proto_deserialize_opt, proto_serialize, proto_serialize_opt, rfc6637_unwrap_key, NSData, NSDataClass, REQWEST}, APSConnection, APSMessage, IdentityManager, KeyedArchive, OSConfig, PushError};
3434

35+
use backon::{BackoffBuilder, ConstantBuilder, ExponentialBuilder};
36+
use backon::Retryable;
3537

3638
// ansi_x963_kdf (use crate later, current dependencies are broken)
3739
#[inline]
@@ -1161,23 +1163,21 @@ impl<P: AnisetteProvider> KeychainClient<P> {
11611163
let security_container = self.get_security_container().await?;
11621164

11631165
let mut state = self.state.write().await;
1164-
let mut result = security_container.perform_operations_checked(&CloudKitSession::new(),
1165-
&zones.iter().map(|zone| FetchRecordChangesOperation::new(security_container.private_zone(zone.to_string()),
1166-
state.items.get(*zone).and_then(|z| z.change_tag.clone()).map(|z| z.into()), &ALL_ASSETS)).collect::<Vec<_>>(), IsolationLevel::Zone).await;
1166+
let mut result = FetchRecordChangesOperation::do_sync(&security_container, &zones.iter()
1167+
.map(|zone| (security_container.private_zone(zone.to_string()), state.items.get(*zone).and_then(|z| z.change_tag.clone()).map(|z| z.into()))).collect::<Vec<_>>(), &ALL_ASSETS).await;
11671168
if should_reset(result.as_ref().err()) {
11681169
state.items.clear();
1169-
result = security_container.perform_operations_checked(&CloudKitSession::new(),
1170-
&zones.iter().map(|zone| FetchRecordChangesOperation::new(security_container.private_zone(zone.to_string()),
1171-
state.items.get(*zone).and_then(|z| z.change_tag.clone()).map(|z| z.into()), &ALL_ASSETS)).collect::<Vec<_>>(), IsolationLevel::Zone).await;
1170+
result = FetchRecordChangesOperation::do_sync(&security_container, &zones.iter()
1171+
.map(|zone| (security_container.private_zone(zone.to_string()), state.items.get(*zone).and_then(|z| z.change_tag.clone()).map(|z| z.into()))).collect::<Vec<_>>(), &ALL_ASSETS).await;
11721172
}
11731173
let item = result?;
11741174

11751175
let state = &mut *state;
11761176

1177-
for (zone, (_, item)) in zones.iter().zip(item.into_iter()) {
1177+
for (zone, (_, changes, change)) in zones.iter().zip(item.into_iter()) {
11781178
let saved_keychain_zone = state.items.entry(zone.to_string()).or_default();
1179-
saved_keychain_zone.change_tag = Some(item.sync_continuation_token.unwrap().into());
1180-
for change in item.change {
1179+
saved_keychain_zone.change_tag = change.clone().map(|i| i.into());
1180+
for change in changes {
11811181
let Some(record) = change.record else {
11821182
warn!("record missing change {:?}", change);
11831183
continue
@@ -1309,13 +1309,29 @@ impl<P: AnisetteProvider> KeychainClient<P> {
13091309

13101310
drop(state);
13111311

1312-
security.perform_operations_checked(&CloudKitSession::new(), &delete_ops, IsolationLevel::Zone).await?;
13131312

1314-
let _: CuttlefishResetResponse = self.invoke_cuttlefish("reset", CuttlefishResetRequest {
1315-
reason: Some(3),
1316-
}).await?;
1313+
(|| async {
1314+
(|| async { security.perform_operations_checked(&CloudKitSession::new(), &delete_ops, IsolationLevel::Zone).await })
1315+
.retry(&ConstantBuilder::default()
1316+
.with_delay(Duration::ZERO)
1317+
.with_max_times(3))
1318+
.notify(|err: &PushError, _dur: Duration| {
1319+
println!("erasing keys {:?}", err);
1320+
}).await?;
1321+
1322+
let _: CuttlefishResetResponse = self.invoke_cuttlefish("reset", CuttlefishResetRequest {
1323+
reason: Some(3),
1324+
}).await?;
13171325

1318-
self.join_clique(device_password, None, &shares, viewkeys).await?;
1326+
self.join_clique(device_password, None, &shares, viewkeys.clone()).await?;
1327+
Ok(())
1328+
})
1329+
.retry(&ConstantBuilder::default()
1330+
.with_delay(Duration::ZERO)
1331+
.with_max_times(2))
1332+
.notify(|err: &PushError, _dur: Duration| {
1333+
println!("resetting clique {:?}", err);
1334+
}).await?;
13191335

13201336
Ok(())
13211337
}

0 commit comments

Comments
 (0)