@@ -14,7 +14,7 @@ namespace PowerSync.Common.Client.Sync.Stream;
1414public class SyncStreamOptions
1515{
1616 public string Path { get ; set ; } = "" ;
17-
17+
1818 public StreamingSyncRequest Data { get ; set ; } = new ( ) ;
1919 public Dictionary < string , string > Headers { get ; set ; } = new ( ) ;
2020
@@ -23,6 +23,9 @@ public class SyncStreamOptions
2323
2424public class Remote
2525{
26+
27+ private const int STREAMING_POST_TIMEOUT_MS = 30_000 ;
28+
2629 private readonly HttpClient httpClient ;
2730 protected IPowerSyncBackendConnector connector ;
2831
@@ -113,34 +116,37 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
113116 var responseData = await response . Content . ReadAsStringAsync ( ) ;
114117 return JsonConvert . DeserializeObject < T > ( responseData ) ! ;
115118 }
116-
117- /// <summary>
118- /// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
119- /// </summary>
120- public async Task < Stream > PostStreamRaw ( SyncStreamOptions options )
121- {
122- var requestMessage = await BuildRequest ( HttpMethod . Post , options . Path , options . Data , options . Headers ) ;
123- var response = await httpClient . SendAsync ( requestMessage , HttpCompletionOption . ResponseHeadersRead , options . CancellationToken ) ;
124119
125- if ( response . Content == null )
120+ public static StreamingSyncLine ? ParseStreamingSyncLine ( JObject json )
121+ {
122+ // Determine the type based on available keys
123+ if ( json . ContainsKey ( "checkpoint" ) )
126124 {
127- throw new HttpRequestException ( $ "HTTP { response . StatusCode } : No content" ) ;
125+ return json . ToObject < StreamingSyncCheckpoint > ( ) ;
128126 }
129-
130- if ( response . StatusCode == System . Net . HttpStatusCode . Unauthorized ) {
131- InvalidateCredentials ( ) ;
127+ else if ( json . ContainsKey ( "checkpoint_diff" ) )
128+ {
129+ return json . ToObject < StreamingSyncCheckpointDiff > ( ) ;
132130 }
133-
134- if ( ! response . IsSuccessStatusCode )
131+ else if ( json . ContainsKey ( "checkpoint_complete" ) )
135132 {
136- var errorText = await response . Content . ReadAsStringAsync ( ) ;
137- throw new HttpRequestException ( $ "HTTP { response . StatusCode } : { errorText } ") ;
133+ return json . ToObject < StreamingSyncCheckpointComplete > ( ) ;
134+ }
135+ else if ( json . ContainsKey ( "data" ) )
136+ {
137+ return json . ToObject < StreamingSyncDataJSON > ( ) ;
138+ }
139+ else if ( json . ContainsKey ( "token_expires_in" ) )
140+ {
141+ return json . ToObject < StreamingSyncKeepalive > ( ) ;
142+ }
143+ else
144+ {
145+ return null ;
138146 }
139-
140- return await response . Content . ReadAsStreamAsync ( ) ;
141147 }
142148
143-
149+
144150 private async Task < HttpRequestMessage > BuildRequest ( HttpMethod method , string path , object ? data = null , Dictionary < string , string > ? additionalHeaders = null )
145151 {
146152 var credentials = await GetCredentials ( ) ;
0 commit comments