1515package pgcatalog
1616
1717import (
18+ "fmt"
1819 "io"
1920
2021 "github.com/dolthub/go-mysql-server/sql"
@@ -37,6 +38,7 @@ func InitPgNamespace() {
3738type PgNamespaceHandler struct {}
3839
3940var _ tables.Handler = PgNamespaceHandler {}
41+ var _ tables.IndexedTableHandler = PgNamespaceHandler {}
4042
4143// Name implements the interface tables.Handler.
4244func (p PgNamespaceHandler ) Name () string {
@@ -45,45 +47,172 @@ func (p PgNamespaceHandler) Name() string {
4547
4648// RowIter implements the interface tables.Handler.
4749func (p PgNamespaceHandler ) RowIter (ctx * sql.Context , partition sql.Partition ) (sql.RowIter , error ) {
48- // Use cached data from this process if it exists
50+ // Use cached data from this session if it exists
4951 pgCatalogCache , err := getPgCatalogCache (ctx )
5052 if err != nil {
5153 return nil , err
5254 }
5355
54- if pgCatalogCache .schemaOids == nil {
55- var schemaNames []string
56- var schemaOids []id.Id
57- err := functions .IterateCurrentDatabase (ctx , functions.Callbacks {
58- Schema : func (ctx * sql.Context , schema functions.ItemSchema ) (cont bool , err error ) {
59- schemaNames = append (schemaNames , schema .Item .SchemaName ())
60- schemaOids = append (schemaOids , schema .OID .AsId ())
61- return true , nil
62- },
63- })
56+ if pgCatalogCache .pgNamespaces == nil {
57+ err = cachePgNamespaces (ctx , pgCatalogCache )
6458 if err != nil {
6559 return nil , err
6660 }
67- //schemaOids = append(schemaOids, schemaOids[len(schemaOids)-1]+1) // TODO: what is this for?
68- pgCatalogCache .schemaNames = schemaNames
69- pgCatalogCache .schemaOids = schemaOids
7061 }
7162
72- return & pgNamespaceRowIter {
73- schemas : pgCatalogCache .schemaNames ,
74- oids : pgCatalogCache .schemaOids ,
75- idx : 0 ,
63+ if namespaceIdxPart , ok := partition .(inMemIndexPartition ); ok {
64+ return & inMemIndexScanIter [* pgNamespace ]{
65+ lookup : namespaceIdxPart .lookup ,
66+ rangeConverter : p ,
67+ btreeAccess : pgCatalogCache .pgNamespaces ,
68+ rowConverter : pgNamespaceToRow ,
69+ }, nil
70+ }
71+
72+ return & pgNamespaceTableScanIter {
73+ namespaceCache : pgCatalogCache .pgNamespaces ,
74+ idx : 0 ,
7675 }, nil
7776}
7877
79- // Schema implements the interface tables.Handler.
78+ // cachePgNamespaces caches the pg_namespace data for the current database in the session.
79+ func cachePgNamespaces (ctx * sql.Context , pgCatalogCache * pgCatalogCache ) error {
80+ var namespaces []* pgNamespace
81+ oidIdx := NewUniqueInMemIndexStorage [* pgNamespace ](lessNamespaceOid )
82+ nameIdx := NewUniqueInMemIndexStorage [* pgNamespace ](lessNamespaceName )
83+
84+ err := functions .IterateCurrentDatabase (ctx , functions.Callbacks {
85+ Schema : func (ctx * sql.Context , schema functions.ItemSchema ) (cont bool , err error ) {
86+ namespace := & pgNamespace {
87+ oid : schema .OID .AsId (),
88+ oidNative : id .Cache ().ToOID (schema .OID .AsId ()),
89+ name : schema .Item .SchemaName (),
90+ }
91+ oidIdx .Add (namespace )
92+ nameIdx .Add (namespace )
93+ namespaces = append (namespaces , namespace )
94+ return true , nil
95+ },
96+ })
97+ if err != nil {
98+ return err
99+ }
100+
101+ pgCatalogCache .pgNamespaces = & pgNamespaceCache {
102+ namespaces : namespaces ,
103+ oidIdx : oidIdx ,
104+ nameIdx : nameIdx ,
105+ }
106+
107+ return nil
108+ }
109+
110+ // getIndexScanRange implements the interface RangeConverter.
111+ func (p PgNamespaceHandler ) getIndexScanRange (rng sql.Range , index sql.Index ) (* pgNamespace , bool , * pgNamespace , bool ) {
112+ var gte , lt * pgNamespace
113+ var hasLowerBound , hasUpperBound bool
114+
115+ switch index .(pgCatalogInMemIndex ).name {
116+ case "pg_namespace_oid_index" :
117+ msrng := rng .(sql.MySQLRange )
118+ oidRng := msrng [0 ]
119+ if oidRng .HasLowerBound () {
120+ lb := sql .GetMySQLRangeCutKey (oidRng .LowerBound )
121+ if lb != nil {
122+ lowerRangeCutKey := lb .(id.Id )
123+ gte = & pgNamespace {
124+ oidNative : idToOid (lowerRangeCutKey ),
125+ }
126+ hasLowerBound = true
127+ }
128+ }
129+ if oidRng .HasUpperBound () {
130+ ub := sql .GetMySQLRangeCutKey (oidRng .UpperBound )
131+ if ub != nil {
132+ upperRangeCutKey := ub .(id.Id )
133+ lt = & pgNamespace {
134+ oidNative : idToOid (upperRangeCutKey ) + 1 ,
135+ }
136+ hasUpperBound = true
137+ }
138+ }
139+
140+ case "pg_namespace_nspname_index" :
141+ msrng := rng .(sql.MySQLRange )
142+ nameRng := msrng [0 ]
143+ var nameLower , nameUpper string
144+
145+ if nameRng .HasLowerBound () {
146+ lb := sql .GetMySQLRangeCutKey (nameRng .LowerBound )
147+ if lb != nil {
148+ nameLower = lb .(string )
149+ hasLowerBound = true
150+ }
151+ }
152+ if nameRng .HasUpperBound () {
153+ ub := sql .GetMySQLRangeCutKey (nameRng .UpperBound )
154+ if ub != nil {
155+ nameUpper = ub .(string )
156+ nameUpper = fmt .Sprintf ("%s%o" , nameUpper , rune (0 ))
157+ hasUpperBound = true
158+ }
159+ }
160+
161+ if hasLowerBound {
162+ gte = & pgNamespace {
163+ name : nameLower ,
164+ }
165+ }
166+ if hasUpperBound {
167+ lt = & pgNamespace {
168+ name : nameUpper ,
169+ }
170+ }
171+ default :
172+ panic ("unknown index name: " + index .(pgCatalogInMemIndex ).name )
173+ }
174+
175+ return gte , hasLowerBound , lt , hasUpperBound
176+ }
177+
178+ // PkSchema implements the interface tables.Handler.
80179func (p PgNamespaceHandler ) PkSchema () sql.PrimaryKeySchema {
81180 return sql.PrimaryKeySchema {
82181 Schema : pgNamespaceSchema ,
83182 PkOrdinals : nil ,
84183 }
85184}
86185
186+ // Indexes implements tables.IndexedTableHandler.
187+ func (p PgNamespaceHandler ) Indexes () ([]sql.Index , error ) {
188+ return []sql.Index {
189+ pgCatalogInMemIndex {
190+ name : "pg_namespace_oid_index" ,
191+ tblName : "pg_namespace" ,
192+ dbName : "pg_catalog" ,
193+ uniq : true ,
194+ columnExprs : []sql.ColumnExpressionType {{Expression : "pg_namespace.oid" , Type : pgtypes .Oid }},
195+ },
196+ pgCatalogInMemIndex {
197+ name : "pg_namespace_nspname_index" ,
198+ tblName : "pg_namespace" ,
199+ dbName : "pg_catalog" ,
200+ uniq : true ,
201+ columnExprs : []sql.ColumnExpressionType {{Expression : "pg_namespace.nspname" , Type : pgtypes .Name }},
202+ },
203+ }, nil
204+ }
205+
206+ // LookupPartitions implements tables.IndexedTableHandler.
207+ func (p PgNamespaceHandler ) LookupPartitions (context * sql.Context , lookup sql.IndexLookup ) (sql.PartitionIter , error ) {
208+ return & inMemIndexPartIter {
209+ part : inMemIndexPartition {
210+ idxName : lookup .Index .(pgCatalogInMemIndex ).name ,
211+ lookup : lookup ,
212+ },
213+ }, nil
214+ }
215+
87216// pgNamespaceSchema is the schema for pg_namespace.
88217var pgNamespaceSchema = sql.Schema {
89218 {Name : "oid" , Type : pgtypes .Oid , Default : nil , Nullable : false , Source : PgNamespaceName },
@@ -92,34 +221,46 @@ var pgNamespaceSchema = sql.Schema{
92221 {Name : "nspacl" , Type : pgtypes .TextArray , Default : nil , Nullable : true , Source : PgNamespaceName }, // TODO: type aclitem[]
93222}
94223
95- // pgNamespaceRowIter is the sql.RowIter for the pg_namespace table.
96- type pgNamespaceRowIter struct {
97- schemas []string
98- oids []id.Id
99- idx int
224+ // lessNamespaceOid is a sort function for pgNamespace based on oid.
225+ func lessNamespaceOid (a , b * pgNamespace ) bool {
226+ return a .oidNative < b .oidNative
100227}
101228
102- var _ sql.RowIter = (* pgNamespaceRowIter )(nil )
229+ // lessNamespaceName is a sort function for pgNamespace based on name.
230+ func lessNamespaceName (a , b * pgNamespace ) bool {
231+ return a .name < b .name
232+ }
233+
234+ // pgNamespaceTableScanIter is the sql.RowIter for the pg_namespace table.
235+ type pgNamespaceTableScanIter struct {
236+ namespaceCache * pgNamespaceCache
237+ idx int
238+ }
239+
240+ var _ sql.RowIter = (* pgNamespaceTableScanIter )(nil )
103241
104242// Next implements the interface sql.RowIter.
105- func (iter * pgNamespaceRowIter ) Next (ctx * sql.Context ) (sql.Row , error ) {
106- if iter .idx >= len (iter .schemas ) {
243+ func (iter * pgNamespaceTableScanIter ) Next (ctx * sql.Context ) (sql.Row , error ) {
244+ if iter .idx >= len (iter .namespaceCache . namespaces ) {
107245 return nil , io .EOF
108246 }
109247 iter .idx ++
110- sch := iter .schemas [iter .idx - 1 ]
111- nspOID := iter .oids [iter .idx - 1 ]
248+ namespace := iter .namespaceCache .namespaces [iter .idx - 1 ]
112249
113- // TODO: columns are incomplete
114- return sql.Row {
115- nspOID , //oid
116- sch , //nspname
117- id .Null , //nspowner
118- nil , //nspacl
119- }, nil
250+ return pgNamespaceToRow (namespace ), nil
120251}
121252
122253// Close implements the interface sql.RowIter.
123- func (iter * pgNamespaceRowIter ) Close (ctx * sql.Context ) error {
254+ func (iter * pgNamespaceTableScanIter ) Close (ctx * sql.Context ) error {
124255 return nil
125256}
257+
258+ func pgNamespaceToRow (namespace * pgNamespace ) sql.Row {
259+ // TODO: columns are incomplete
260+ return sql.Row {
261+ namespace .oid , // oid
262+ namespace .name , // nspname
263+ id .Null , // nspowner
264+ nil , // nspacl
265+ }
266+ }
0 commit comments