1414import  org .elasticsearch .action .bulk .BulkResponse ;
1515import  org .elasticsearch .action .index .IndexRequest ;
1616import  org .elasticsearch .action .support .SubscribableListener ;
17+ import  org .elasticsearch .action .update .UpdateRequest ;
1718import  org .elasticsearch .client .internal .Client ;
19+ import  org .elasticsearch .common .hash .MurmurHash3 ;
1820import  org .elasticsearch .compute .data .Block ;
1921import  org .elasticsearch .compute .data .Page ;
2022import  org .elasticsearch .core .Releasables ;
2325
2426import  java .io .IOException ;
2527import  java .io .UncheckedIOException ;
28+ import  java .util .Base64 ;
2629import  java .util .List ;
2730import  java .util .function .Function ;
2831
2932public  class  CollectOperator  implements  Operator  {
30-     public  interface  Writer  {
31-         XContentBuilder  write (XContentBuilder  builder , int  valueIndex ) throws  IOException ;
33+     public  static  final  long  ID_SEED  = 0 ;
34+ 
35+     public  interface  FieldWriter  {
36+         XContentBuilder  write (XContentBuilder  builder , int  position ) throws  IOException ;
37+     }
38+ 
39+     public  interface  IdWriter  {
40+         void  write (BreakingBytesRefBuilder  builder , int  position );
3241    }
3342
34-     public  record  Factory (Client  client , String  index , List <Function <Block , Writer >> writers ) implements  OperatorFactory  {
43+     public  record  Factory (
44+         Client  client ,
45+         String  index ,
46+         List <Function <Block , FieldWriter >> fieldWriters ,
47+         List <Function <Page , IdWriter >> idWriters 
48+     ) implements  OperatorFactory  {
3549        @ Override 
3650        public  CollectOperator  get (DriverContext  driverContext ) {
37-             return  new  CollectOperator (client , driverContext , index , writers );
51+             return  new  CollectOperator (client , driverContext , index , fieldWriters ,  idWriters );
3852        }
3953
4054        @ Override 
@@ -48,7 +62,9 @@ public String describe() {
4862    private  final  Client  client ;
4963    private  final  DriverContext  driverContext ;
5064    private  final  String  index ;
51-     private  final  List <Function <Block , Writer >> writers ;
65+     private  final  List <Function <Block , FieldWriter >> fieldWriters ;
66+     private  final  List <Function <Page , IdWriter >> idWriters ;
67+     private  final  BreakingBytesRefBuilder  idBuilder ;
5268
5369    private  volatile  Phase  phase  = Phase .COLLECTING ;
5470    private  volatile  IsBlockedResult  blocked  = NOT_BLOCKED ;
@@ -59,11 +75,19 @@ public String describe() {
5975    private  long  rowsEmitted ;
6076    private  long  bulkBytesSent ;
6177
62-     public  CollectOperator (Client  client , DriverContext  driverContext , String  index , List <Function <Block , Writer >> writers ) {
78+     public  CollectOperator (
79+         Client  client ,
80+         DriverContext  driverContext ,
81+         String  index ,
82+         List <Function <Block , FieldWriter >> fieldWriters ,
83+         List <Function <Page , IdWriter >> idWriters 
84+     ) {
6385        this .client  = client ;
6486        this .driverContext  = driverContext ;
6587        this .index  = index ;
66-         this .writers  = writers ;
88+         this .fieldWriters  = fieldWriters ;
89+         this .idWriters  = idWriters ;
90+         this .idBuilder  = new  BreakingBytesRefBuilder (driverContext .breaker (), "id" );
6791    }
6892
6993    @ Override 
@@ -77,7 +101,6 @@ public void addInput(Page page) {
77101        checkFailure ();
78102        pagesReceived ++;
79103        rowsReceived  += page .getPositionCount ();
80- 
81104        try  {
82105            BulkRequest  request  = request (page );
83106            bulkBytesSent  += request .estimatedSizeInBytes ();
@@ -96,14 +119,29 @@ private BulkRequest request(Page page) throws IOException {
96119        }
97120        for  (int  b  = 0 ; b  < page .getBlockCount (); b ++) {
98121            Block  block  = page .getBlock (b );
99-             Writer  writer  = writers .get (b ).apply (block );
122+             FieldWriter  writer  = fieldWriters .get (b ).apply (block );
100123            for  (int  p  = 0 ; p  < block .getPositionCount (); p ++) {
101124                writer .write (source [p ], p );
102125            }
103126        }
127+         List <IdWriter > idWriters  = this .idWriters .stream ().map (w  -> w .apply (page )).toList ();
104128        BulkRequest  request  = new  BulkRequest ();
129+         MurmurHash3 .Hash128  hash  = new  MurmurHash3 .Hash128 ();
130+         byte [] hashBytes  = hash .getBytes ();
105131        for  (int  p  = 0 ; p  < page .getPositionCount (); p ++) {
106-             request .add (new  IndexRequest (index ).source (source [p ].endObject ()));
132+             source [p ].endObject ();
133+             if  (idWriters .isEmpty ()) {
134+                 request .add (new  IndexRequest (index ).source (source [p ]));
135+             } else  {
136+                 for  (IdWriter  idWriter  : idWriters ) {
137+                     idWriter .write (idBuilder , p );
138+                 }
139+                 MurmurHash3 .hash128 (idBuilder .bytes (), 0 , idBuilder .length (), ID_SEED , hash );
140+                 hash .getBytes (hashBytes , 0 );
141+                 String  id  = Base64 .getUrlEncoder ().encodeToString (hashBytes );
142+                 request .add (new  UpdateRequest ().index (index ).id (id ).doc (source [p ]).docAsUpsert (true ));
143+                 idBuilder .clear ();
144+             }
107145        }
108146        return  request ;
109147    }
0 commit comments