@@ -212,6 +212,7 @@ typedef struct {
212212 Relation rel ;
213213 FmgrInfo * key_info ;
214214 FmgrInfo * value_info ;
215+ FmgrInfo * flags_info ;
215216 AttrNumber key_junk_no ;
216217} KtFdwModifyState ;
217218
@@ -282,7 +283,6 @@ static bool isValidOption(const char *option, Oid context)
282283#ifdef DEBUG
283284 elog (NOTICE , "isValidOption %s" , option );
284285#endif
285-
286286 for (opt = valid_options ; opt -> optname ; opt ++ ){
287287 if (context == opt -> optcontext && strcmp (opt -> optname , option ) == 0 ) {
288288 return true;
@@ -950,8 +950,8 @@ ktIterateForeignScan(ForeignScanState *node)
950950
951951 if (found )
952952 {
953- dvalues = (Datum * ) palloc (2 * sizeof (Datum ));
954- nulls = (bool * ) palloc (2 * sizeof (bool ));
953+ dvalues = (Datum * ) palloc (3 * sizeof (Datum ));
954+ nulls = (bool * ) palloc (3 * sizeof (bool ));
955955 for (i = 0 ; i < 2 ; i ++ )
956956 {
957957 nulls [i ] = false;
@@ -981,6 +981,8 @@ ktIterateForeignScan(ForeignScanState *node)
981981 nulls [i ] = true;
982982 }
983983 }
984+ /* set flags to null */
985+ nulls [2 ] = true;
984986
985987 tuple = heap_form_tuple (estate -> attinmeta -> tupdesc , dvalues , nulls );
986988 ExecStoreTuple (tuple , slot , InvalidBuffer , false);
@@ -1179,6 +1181,7 @@ ktBeginForeignModify(ModifyTableState *mtstate,
11791181 fmstate -> rel = rel ;
11801182 fmstate -> key_info = (FmgrInfo * ) palloc0 (sizeof (FmgrInfo ));
11811183 fmstate -> value_info = (FmgrInfo * ) palloc0 (sizeof (FmgrInfo ));
1184+ fmstate -> flags_info = (FmgrInfo * ) palloc0 (sizeof (FmgrInfo ));
11821185
11831186
11841187 if (operation == CMD_UPDATE || operation == CMD_DELETE ) {
@@ -1210,6 +1213,16 @@ ktBeginForeignModify(ModifyTableState *mtstate,
12101213 getTypeBinaryOutputInfo (attr -> atttypid , & typefnoid , & isvarlena );
12111214 fmgr_info (typefnoid , fmstate -> value_info );
12121215
1216+ #if PG_VERSION_NUM >= 110000
1217+ attr = & RelationGetDescr (rel )-> attrs [2 ];
1218+ #else
1219+ attr = RelationGetDescr (rel )-> attrs [2 ];
1220+ #endif
1221+ Assert (!attr -> attisdropped );
1222+
1223+ getTypeBinaryOutputInfo (attr -> atttypid , & typefnoid , & isvarlena );
1224+ fmgr_info (typefnoid , fmstate -> flags_info );
1225+
12131226 initTableOptions (& (fmstate -> opt ));
12141227 getTableOptions (RelationGetRelid (rel ), & (fmstate -> opt ));
12151228
@@ -1254,7 +1267,8 @@ ktExecForeignInsert(EState *estate,
12541267
12551268 Datum value ;
12561269 bool isnull ;
1257- bytea * bkey , * bval ;
1270+ bool isSet = false;
1271+ bytea * bkey , * bval , * sval ;
12581272 KtFdwModifyState * fmstate = (KtFdwModifyState * ) rinfo -> ri_FdwState ;
12591273
12601274 elog (DEBUG1 ,"entering function %s" ,__func__ );
@@ -1271,9 +1285,24 @@ ktExecForeignInsert(EState *estate,
12711285
12721286 bval = SendFunctionCall (fmstate -> value_info , value );
12731287
1274- if (!ktaddl (fmstate -> db , VARDATA (bkey ), VARSIZE (bkey ) - VARHDRSZ ,
1275- VARDATA (bval ), VARSIZE (bval ) - VARHDRSZ ))
1276- elog (ERROR , "Error from kt: %s" , ktgeterror (fmstate -> db ));
1288+ value = slot_getattr (planSlot , 3 , & isnull );
1289+ if (!isnull ) {
1290+ sval = SendFunctionCall (fmstate -> flags_info , value );
1291+ if (memcmp (VARDATA (sval ), "kt_set" , VARSIZE (sval ) - VARHDRSZ ) == 0 ) {
1292+ isSet = true;
1293+ }
1294+ }
1295+ if (isSet ) {
1296+ if (!ktsetl (fmstate -> db , VARDATA (bkey ), VARSIZE (bkey ) - VARHDRSZ ,
1297+ VARDATA (bval ), VARSIZE (bval ) - VARHDRSZ )) {
1298+ elog (ERROR , "Error from kt: %s" , ktgeterror (fmstate -> db ));
1299+ }
1300+ } else {
1301+ if (!ktaddl (fmstate -> db , VARDATA (bkey ), VARSIZE (bkey ) - VARHDRSZ ,
1302+ VARDATA (bval ), VARSIZE (bval ) - VARHDRSZ )) {
1303+ elog (ERROR , "%d, Error from kt: %s" , ktgeterror (fmstate -> db ));
1304+ }
1305+ }
12771306
12781307 return slot ;
12791308}
0 commit comments