11package com .linkedin .hoptimator .jdbc ;
22
3+ import com .linkedin .hoptimator .Database ;
4+ import com .linkedin .hoptimator .Sink ;
5+ import com .linkedin .hoptimator .Source ;
6+ import com .linkedin .hoptimator .util .ConnectionService ;
37import java .sql .PreparedStatement ;
48import java .sql .SQLException ;
59import java .sql .Statement ;
610import java .util .ArrayList ;
7- import java .util .Collections ;
811import java .util .List ;
912import java .util .Locale ;
1013import java .util .Map ;
14+ import java .util .Objects ;
1115import java .util .Properties ;
1216import java .util .stream .Collectors ;
1317
18+ import org .apache .avro .Schema ;
1419import org .apache .calcite .jdbc .CalciteConnection ;
1520import org .apache .calcite .jdbc .CalcitePrepare ;
21+ import org .apache .calcite .jdbc .CalciteSchema ;
1622import org .apache .calcite .plan .RelOptMaterialization ;
1723import org .apache .calcite .rel .RelNode ;
1824
1925import com .linkedin .hoptimator .avro .AvroConverter ;
2026import com .linkedin .hoptimator .util .DelegatingConnection ;
27+ import org .apache .calcite .util .Util ;
2128
2229
2330public class HoptimatorConnection extends DelegatingConnection {
@@ -44,9 +51,12 @@ public ResolvedTable resolve(List<String> tablePath, Map<String, String> hints)
4451 .limit (tablePath .size () - 1 )
4552 .collect (Collectors .joining ("." ));
4653 String schemaName = tablePath .get (tablePath .size () - 1 ).toLowerCase (Locale .ROOT );
47- org .apache .avro .Schema avroSchema = AvroConverter .avro (namespace , schemaName , tableRel .getRowType ());
48- // TODO: generate source and sink configs via ConnectionService
49- return new ResolvedTable (tablePath , avroSchema , Collections .emptyMap (), Collections .emptyMap ());
54+ Schema avroSchema = AvroConverter .avro (namespace , schemaName , tableRel .getRowType ());
55+ String database = databaseName (this .createPrepareContext (), tablePath );
56+ Source source = new Source (database , tablePath , hints );
57+ Sink sink = new Sink (database , tablePath , hints );
58+ return new ResolvedTable (tablePath , avroSchema , ConnectionService .configure (source , this ),
59+ ConnectionService .configure (sink , this ));
5060 } catch (Exception e ) {
5161 throw new SQLException ("Failed to resolve " + String .join ("." , tablePath ) + ": " + e .getMessage (), e );
5262 }
@@ -92,4 +102,16 @@ private void registerMaterialization(List<String> viewPath, RelNode tableRel, Re
92102 public List <RelOptMaterialization > materializations () {
93103 return materializations ;
94104 }
105+
106+ private static String databaseName (CalcitePrepare .Context context , List <String > tablePath ) throws SQLException {
107+ final List <String > path = Util .skipLast (tablePath );
108+ CalciteSchema schema = context .getRootSchema ();
109+ for (String p : path ) {
110+ schema = Objects .requireNonNull (schema ).getSubSchema (p , true );
111+ }
112+ if (schema == null || !(schema .schema instanceof Database )) {
113+ throw new SQLException (tablePath + " is not a physical database." );
114+ }
115+ return ((Database ) schema .schema ).databaseName ();
116+ }
95117}
0 commit comments