55namespace SimPod \ClickHouseClient \Client ;
66
77use DateTimeZone ;
8+ use InvalidArgumentException ;
89use Psr \Http \Client \ClientExceptionInterface ;
910use Psr \Http \Client \ClientInterface ;
1011use Psr \Http \Message \ResponseInterface ;
12+ use Psr \Http \Message \StreamInterface ;
1113use SimPod \ClickHouseClient \Client \Http \RequestFactory ;
1214use SimPod \ClickHouseClient \Client \Http \RequestOptions ;
15+ use SimPod \ClickHouseClient \Client \Http \RequestSettings ;
1316use SimPod \ClickHouseClient \Exception \CannotInsert ;
1417use SimPod \ClickHouseClient \Exception \ServerError ;
1518use SimPod \ClickHouseClient \Exception \UnsupportedParamType ;
@@ -198,6 +201,46 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat
198201 }
199202 }
200203
204+ public function insertPayload (
205+ string $ table ,
206+ Format $ inputFormat ,
207+ StreamInterface $ payload ,
208+ array $ columns = [],
209+ array $ settings = [],
210+ ): void {
211+ $ formatSql = $ inputFormat ::toSql ();
212+
213+ $ table = Escaper::quoteIdentifier ($ table );
214+
215+ $ columnsSql = $ columns === [] ? '' : sprintf ('(%s) ' , implode (', ' , $ columns ));
216+
217+ $ sql = <<<CLICKHOUSE
218+ INSERT INTO $ table $ columnsSql $ formatSql
219+ CLICKHOUSE ;
220+
221+ $ request = $ this ->requestFactory ->initRequest (
222+ new RequestSettings (
223+ $ this ->defaultSettings ,
224+ $ settings ,
225+ ),
226+ ['query ' => $ sql ],
227+ );
228+
229+ try {
230+ $ request = $ request ->withBody ($ payload );
231+ } catch (InvalidArgumentException ) {
232+ absurd ();
233+ }
234+
235+ $ response = $ this ->client ->sendRequest ($ request );
236+
237+ if ($ response ->getStatusCode () !== 200 ) {
238+ throw ServerError::fromResponse ($ response );
239+ }
240+
241+ return ;
242+ }
243+
201244 /**
202245 * @param array<string, mixed> $params
203246 * @param array<string, float|int|string> $settings
@@ -208,13 +251,15 @@ public function insertWithFormat(string $table, Format $inputFormat, string $dat
208251 */
209252 private function executeRequest (string $ sql , array $ params , array $ settings ): ResponseInterface
210253 {
211- $ request = $ this ->requestFactory ->prepareRequest (
212- new RequestOptions (
213- $ sql ,
214- $ params ,
254+ $ request = $ this ->requestFactory ->prepareSqlRequest (
255+ $ sql ,
256+ new RequestSettings (
215257 $ this ->defaultSettings ,
216258 $ settings ,
217259 ),
260+ new RequestOptions (
261+ $ params ,
262+ ),
218263 );
219264
220265 $ response = $ this ->client ->sendRequest ($ request );
0 commit comments