2525import org .apache .calcite .rel .type .RelDataTypeField ;
2626import org .apache .calcite .rex .*;
2727import org .apache .calcite .util .BuiltInMethod ;
28+ import org .apache .commons .collections .CollectionUtils ;
2829import org .apache .lineage .flink .sql .metadata .LineageRelColumnOrigin ;
2930import org .slf4j .Logger ;
3031import org .slf4j .LoggerFactory ;
@@ -116,6 +117,16 @@ public Set<RelColumnOrigin> getColumnOrigins(Join rel, RelMetadataQuery mq, int
116117 return set ;
117118 }
118119
120+ private static final ThreadLocal <Deque <Set <RelColumnOrigin >>> STACK_INPUT_SET = ThreadLocal .withInitial (ArrayDeque ::new );;
121+
122+ public Set <RelColumnOrigin > getColumnOrigins (Uncollect rel , RelMetadataQuery mq , int iOutputColumn ) {
123+ final RelNode input = rel .getInput ();
124+ List <String > fieldNameList = input .getRowType ().getFieldNames ();
125+ String fieldName = rel .getRowType ().getFieldNames ().get (iOutputColumn );
126+ // Set<RelColumnOrigin> columnOriginSet = mq.getColumnOrigins(input, iOutputColumn);
127+ return createDerivedColumnOrigins (STACK_INPUT_SET .get ().pop (), "UNNEST(" +String .join ("|" , fieldNameList )+DELIMITER +fieldName +")" );
128+ }
129+
119130 /**
120131 * Support the field blood relationship of table function
121132 */
@@ -125,7 +136,6 @@ public Set<RelColumnOrigin> getColumnOrigins(Correlate rel, RelMetadataQuery mq,
125136 if (iOutputColumn < nLeftColumns ) {
126137 return mq .getColumnOrigins (rel .getLeft (), iOutputColumn );
127138 } else {
128- Uncollect uncollect = null ;
129139 if (rel .getRight () instanceof TableFunctionScan ) {
130140 final Set <RelColumnOrigin > set = new LinkedHashSet <>();
131141 for (Integer iInput : rel .getRequiredColumns ().asList ()) {
@@ -135,27 +145,25 @@ public Set<RelColumnOrigin> getColumnOrigins(Correlate rel, RelMetadataQuery mq,
135145 TableFunctionScan tableFunctionScan = (TableFunctionScan ) rel .getRight ();
136146 String transform = computeTransform (set , tableFunctionScan .getCall ()) + DELIMITER + tableFunctionScan .getRowType ().getFieldNames ().get (iOutputColumn - nLeftColumns );
137147 return createDerivedColumnOrigins (set , transform );
138- } else if (rel .getRight () instanceof Uncollect ) {
139- uncollect = (Uncollect )rel .getRight ();
140- } else if (rel .getRight () instanceof Project ) {
141- Project project = ((Project ) rel .getRight ());
142- final RelNode input = project .getInput ();
143- if (input instanceof Uncollect ) {
144- uncollect = (Uncollect )input ;
145- }
146148 }
147- if (uncollect != null ) {
149+
150+ final Deque <Set <RelColumnOrigin >> stack = STACK_INPUT_SET .get ();
151+ final int initialSize = stack .size ();
152+ try {
148153 final Set <RelColumnOrigin > set = new LinkedHashSet <>();
149154 for (Integer iInput : rel .getRequiredColumns ().asList ()) {
150155 set .addAll (mq .getColumnOrigins (rel .getLeft (), iInput ));
156+ stack .push (set );
157+ }
158+ return mq .getColumnOrigins (rel .getRight (), iOutputColumn - nLeftColumns );
159+ } catch (Exception e ) {
160+ e .printStackTrace ();
161+ throw new RuntimeException (e );
162+ } finally {
163+ while (stack .size () > initialSize ) {
164+ stack .pop ();
151165 }
152- // List<String> nameList = uncollect.getInput().getRowType().getFieldNames(); // String.join(",", nameList) + DELIMITER +
153- String fieldName = uncollect .getRowType ().getFieldNames ().get (iOutputColumn - nLeftColumns );
154- String transform = computeTransform (set , "UNNEST(" +fieldName +")" );
155- return createDerivedColumnOrigins (set , transform );
156166 }
157-
158- return mq .getColumnOrigins (rel .getRight (), iOutputColumn - nLeftColumns );
159167 }
160168 }
161169
@@ -185,27 +193,6 @@ public Set<RelColumnOrigin> getColumnOrigins(SingleRel rel, RelMetadataQuery mq,
185193 return mq .getColumnOrigins (rel .getInput (), iOutputColumn );
186194 }
187195
188- /*public Set<RelColumnOrigin> getColumnOrigins(Uncollect rel, RelMetadataQuery mq, int iOutputColumn) {
189- final RelNode input = rel.getInput();
190- List<String> fieldNameList = input.getRowType().getFieldNames();
191- String fieldName = rel.getRowType().getFieldNames().get(iOutputColumn);
192-
193- final Set<RelColumnOrigin> set;
194- if (input instanceof Project) {
195- Project project = (Project) input;
196- final RexNode rexNode = project.getProjects().get(iOutputColumn);
197- if (rexNode instanceof RexInputRef) {
198- RexInputRef inputRef = (RexInputRef) rexNode;
199- return mq.getColumnOrigins(input, inputRef.getIndex());
200- } // RexFieldAccess
201- set = getMultipleColumns(rexNode, input, mq);
202- } else {
203- set = mq.getColumnOrigins(input, iOutputColumn);
204- }
205-
206- return createDerivedColumnOrigins(set);
207- }*/
208-
209196 /**
210197 * Support for new fields in the source table similar to those created with the LOCALTIMESTAMP function
211198 */
0 commit comments