55namespace SimPod \ClickHouseClient \Client ;
66
77use DateTimeZone ;
8+ use InvalidArgumentException ;
89use Psr \Http \Client \ClientExceptionInterface ;
910use Psr \Http \Client \ClientInterface ;
1011use Psr \Http \Message \ResponseInterface ;
12+ use Psr \Http \Message \StreamInterface ;
1113use SimPod \ClickHouseClient \Client \Http \RequestFactory ;
1214use SimPod \ClickHouseClient \Client \Http \RequestOptions ;
15+ use SimPod \ClickHouseClient \Client \Http \RequestSettings ;
1316use SimPod \ClickHouseClient \Exception \CannotInsert ;
1417use SimPod \ClickHouseClient \Exception \ServerError ;
1518use SimPod \ClickHouseClient \Exception \UnsupportedParamType ;
@@ -198,6 +201,50 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat
198201 }
199202 }
200203
204+ public function insertPayload (
205+ string $ table ,
206+ Format $ inputFormat ,
207+ StreamInterface $ payload ,
208+ array $ columns = [],
209+ array $ settings = [],
210+ ): void {
211+ if ($ payload ->getSize () === 0 ) {
212+ throw CannotInsert::noValues ();
213+ }
214+
215+ $ formatSql = $ inputFormat ::toSql ();
216+
217+ $ table = Escaper::quoteIdentifier ($ table );
218+
219+ $ columnsSql = $ columns === [] ? '' : sprintf ('(%s) ' , implode (', ' , $ columns ));
220+
221+ $ sql = <<<CLICKHOUSE
222+ INSERT INTO $ table $ columnsSql $ formatSql
223+ CLICKHOUSE ;
224+
225+ $ request = $ this ->requestFactory ->initRequest (
226+ new RequestSettings (
227+ $ this ->defaultSettings ,
228+ $ settings ,
229+ ),
230+ ['query ' => $ sql ],
231+ );
232+
233+ try {
234+ $ request = $ request ->withBody ($ payload );
235+ } catch (InvalidArgumentException ) {
236+ absurd ();
237+ }
238+
239+ $ response = $ this ->client ->sendRequest ($ request );
240+
241+ if ($ response ->getStatusCode () !== 200 ) {
242+ throw ServerError::fromResponse ($ response );
243+ }
244+
245+ return ;
246+ }
247+
201248 /**
202249 * @param array<string, mixed> $params
203250 * @param array<string, float|int|string> $settings
@@ -208,13 +255,15 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat
208255 */
209256 private function executeRequest (string $ sql , array $ params , array $ settings ): ResponseInterface
210257 {
211- $ request = $ this ->requestFactory ->prepareRequest (
212- new RequestOptions (
213- $ sql ,
214- $ params ,
258+ $ request = $ this ->requestFactory ->prepareSqlRequest (
259+ $ sql ,
260+ new RequestSettings (
215261 $ this ->defaultSettings ,
216262 $ settings ,
217263 ),
264+ new RequestOptions (
265+ $ params ,
266+ ),
218267 );
219268
220269 $ response = $ this ->client ->sendRequest ($ request );
0 commit comments