Skip to content

Commit e431a01

Browse files
authored
Added experimental databricks_sql_table resource to manage tables in the Unity Catalog (#2213)
1 parent 13681a0 commit e431a01

File tree

5 files changed

+1173
-0
lines changed

5 files changed

+1173
-0
lines changed

catalog/resource_sql_table.go

Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
package catalog
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"reflect"
8+
"strings"
9+
10+
"github.com/databricks/databricks-sdk-go/apierr"
11+
clustersApi "github.com/databricks/databricks-sdk-go/service/clusters"
12+
"github.com/databricks/terraform-provider-databricks/clusters"
13+
"github.com/databricks/terraform-provider-databricks/common"
14+
15+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
16+
)
17+
18+
type SqlColumnInfo struct {
19+
Name string `json:"name"`
20+
Type string `json:"type_text" tf:"alias:type"`
21+
Comment string `json:"comment,omitempty"`
22+
Nullable bool `json:"nullable,omitempty" tf:"default:true"`
23+
}
24+
25+
type SqlTableInfo struct {
26+
Name string `json:"name"`
27+
CatalogName string `json:"catalog_name" tf:"force_new"`
28+
SchemaName string `json:"schema_name" tf:"force_new"`
29+
TableType string `json:"table_type" tf:"force_new"`
30+
DataSourceFormat string `json:"data_source_format,omitempty" tf:"force_new"`
31+
ColumnInfos []SqlColumnInfo `json:"columns,omitempty" tf:"alias:column,computed,force_new"`
32+
StorageLocation string `json:"storage_location,omitempty" tf:"suppress_diff"`
33+
StorageCredentialName string `json:"storage_credential_name,omitempty" tf:"force_new"`
34+
ViewDefinition string `json:"view_definition,omitempty"`
35+
Comment string `json:"comment,omitempty"`
36+
Properties map[string]string `json:"properties,omitempty" tf:"computed"`
37+
ClusterID string `json:"cluster_id,omitempty" tf:"computed"`
38+
39+
exec common.CommandExecutor
40+
}
41+
42+
type SqlTablesAPI struct {
43+
client *common.DatabricksClient
44+
context context.Context
45+
}
46+
47+
func NewSqlTablesAPI(ctx context.Context, m any) SqlTablesAPI {
48+
return SqlTablesAPI{m.(*common.DatabricksClient), context.WithValue(ctx, common.Api, common.API_2_1)}
49+
}
50+
51+
func (a SqlTablesAPI) getTable(name string) (ti SqlTableInfo, err error) {
52+
err = a.client.Get(a.context, "/unity-catalog/tables/"+name, nil, &ti)
53+
return
54+
}
55+
56+
func (ti *SqlTableInfo) FullName() string {
57+
return fmt.Sprintf("%s.%s.%s", ti.CatalogName, ti.SchemaName, ti.Name)
58+
}
59+
60+
// These properties are added automatically
61+
// If we do not customize the diff using these then terraform will constantly try to remove them
62+
// `properties` is essentially a "partially" computed field
63+
// This needs to be replaced with something a bit more robust in the future
64+
func sqlTableIsManagedProperty(key string) bool {
65+
managedProps := map[string]bool{
66+
"delta.lastCommitTimestamp": true,
67+
"delta.lastUpdateVersion": true,
68+
"delta.minReaderVersion": true,
69+
"delta.minWriterVersion": true,
70+
"view.catalogAndNamespace.numParts": true,
71+
"view.catalogAndNamespace.part.0": true,
72+
"view.catalogAndNamespace.part.1": true,
73+
"view.query.out.col.0": true,
74+
"view.query.out.numCols": true,
75+
"view.referredTempFunctionsNames": true,
76+
"view.referredTempViewNames": true,
77+
"view.sqlConfig.spark.sql.hive.convertCTAS": true,
78+
"view.sqlConfig.spark.sql.legacy.createHiveTableByDefault": true,
79+
"view.sqlConfig.spark.sql.parquet.compression.codec": true,
80+
"view.sqlConfig.spark.sql.session.timeZone": true,
81+
"view.sqlConfig.spark.sql.sources.commitProtocolClass": true,
82+
"view.sqlConfig.spark.sql.sources.default": true,
83+
"view.sqlConfig.spark.sql.streaming.stopTimeout": true,
84+
}
85+
return managedProps[key]
86+
}
87+
88+
func (ti *SqlTableInfo) initCluster(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) (err error) {
89+
defaultClusterName := "terraform-sql-table"
90+
clustersAPI := clusters.NewClustersAPI(ctx, c)
91+
if ci, ok := d.GetOk("cluster_id"); ok {
92+
ti.ClusterID = ci.(string)
93+
} else {
94+
ti.ClusterID, err = ti.getOrCreateCluster(defaultClusterName, clustersAPI)
95+
if err != nil {
96+
return
97+
}
98+
}
99+
_, err = clustersAPI.StartAndGetInfo(ti.ClusterID)
100+
if apierr.IsMissing(err) {
101+
// cluster that was previously in a tfstate was deleted
102+
ti.ClusterID, err = ti.getOrCreateCluster(defaultClusterName, clustersAPI)
103+
if err != nil {
104+
return
105+
}
106+
_, err = clustersAPI.StartAndGetInfo(ti.ClusterID)
107+
}
108+
if err != nil {
109+
return
110+
}
111+
ti.exec = c.CommandExecutor(ctx)
112+
return nil
113+
}
114+
115+
func (ti *SqlTableInfo) getOrCreateCluster(clusterName string, clustersAPI clusters.ClustersAPI) (string, error) {
116+
sparkVersion := clustersAPI.LatestSparkVersionOrDefault(clusters.SparkVersionRequest{
117+
Latest: true,
118+
})
119+
nodeType := clustersAPI.GetSmallestNodeType(clustersApi.NodeTypeRequest{LocalDisk: true})
120+
aclCluster, err := clustersAPI.GetOrCreateRunningCluster(
121+
clusterName, clusters.Cluster{
122+
ClusterName: clusterName,
123+
SparkVersion: sparkVersion,
124+
NodeTypeID: nodeType,
125+
AutoterminationMinutes: 10,
126+
DataSecurityMode: "SINGLE_USER",
127+
SparkConf: map[string]string{
128+
"spark.databricks.cluster.profile": "singleNode",
129+
"spark.master": "local[*]",
130+
},
131+
CustomTags: map[string]string{
132+
"ResourceClass": "SingleNode",
133+
},
134+
})
135+
if err != nil {
136+
return "", err
137+
}
138+
return aclCluster.ClusterID, nil
139+
}
140+
141+
func (ti *SqlTableInfo) serializeColumnInfo(col SqlColumnInfo) string {
142+
notNull := ""
143+
if !col.Nullable {
144+
notNull = " NOT NULL"
145+
}
146+
147+
comment := ""
148+
if col.Comment != "" {
149+
comment = fmt.Sprintf(" COMMENT %s", col.Comment)
150+
}
151+
return fmt.Sprintf("%s %s%s%s", col.Name, col.Type, notNull, comment) // id INT NOT NULL COMMENT something
152+
}
153+
154+
func (ti *SqlTableInfo) serializeColumnInfos() string {
155+
columnFragments := make([]string, len(ti.ColumnInfos))
156+
for i, col := range ti.ColumnInfos {
157+
columnFragments[i] = ti.serializeColumnInfo(col)
158+
}
159+
return strings.Join(columnFragments[:], ", ") // id INT NOT NULL, name STRING, age INT
160+
}
161+
162+
func (ti *SqlTableInfo) serializeProperties() string {
163+
propsMap := make([]string, 0, len(ti.Properties))
164+
for key, value := range ti.Properties {
165+
if !sqlTableIsManagedProperty(key) {
166+
propsMap = append(propsMap, fmt.Sprintf("'%s'='%s'", key, value))
167+
}
168+
}
169+
return strings.Join(propsMap[:], ", ") // 'foo'='bar', 'this'='that'
170+
}
171+
172+
func (ti *SqlTableInfo) buildLocationStatement() string {
173+
statements := make([]string, 0, 10)
174+
statements = append(statements, fmt.Sprintf("LOCATION '%s'", ti.StorageLocation)) // LOCATION '/mnt/csv_files'
175+
176+
if ti.StorageCredentialName != "" {
177+
statements = append(statements, fmt.Sprintf(" WITH (CREDENTIAL `%s`)", ti.StorageCredentialName))
178+
}
179+
return strings.Join(statements, "")
180+
}
181+
182+
func (ti *SqlTableInfo) getTableTypeString() string {
183+
if ti.TableType == "VIEW" {
184+
return "VIEW"
185+
}
186+
return "TABLE"
187+
}
188+
189+
func (ti *SqlTableInfo) buildTableCreateStatement() string {
190+
statements := make([]string, 0, 10)
191+
192+
isView := ti.TableType == "VIEW"
193+
194+
externalFragment := ""
195+
if ti.TableType == "EXTERNAL" {
196+
externalFragment = "EXTERNAL "
197+
}
198+
199+
createType := ti.getTableTypeString()
200+
201+
statements = append(statements, fmt.Sprintf("CREATE %s%s %s", externalFragment, createType, ti.FullName()))
202+
203+
if len(ti.ColumnInfos) > 0 {
204+
statements = append(statements, fmt.Sprintf(" (%s)", ti.serializeColumnInfos()))
205+
}
206+
207+
if !isView {
208+
if ti.DataSourceFormat != "" {
209+
statements = append(statements, fmt.Sprintf("\nUSING %s", ti.DataSourceFormat)) // USING CSV
210+
}
211+
}
212+
213+
if ti.Comment != "" {
214+
statements = append(statements, fmt.Sprintf("\nCOMMENT '%s'", ti.Comment)) // COMMENT 'this is a comment'
215+
}
216+
217+
if len(ti.Properties) > 0 {
218+
statements = append(statements, fmt.Sprintf("\nTBLPROPERTIES (%s)", ti.serializeProperties())) // TBLPROPERTIES ('foo'='bar')
219+
}
220+
221+
if !isView {
222+
if ti.StorageLocation != "" {
223+
statements = append(statements, "\n"+ti.buildLocationStatement())
224+
}
225+
} else {
226+
statements = append(statements, fmt.Sprintf("\nAS %s", ti.ViewDefinition))
227+
}
228+
229+
statements = append(statements, ";")
230+
231+
return strings.Join(statements, "")
232+
}
233+
234+
func (ti *SqlTableInfo) diff(oldti *SqlTableInfo) ([]string, error) {
235+
statements := make([]string, 0)
236+
typestring := ti.getTableTypeString()
237+
238+
if ti.TableType == "VIEW" {
239+
// View only attributes
240+
if ti.ViewDefinition != oldti.ViewDefinition {
241+
statements = append(statements, fmt.Sprintf("ALTER VIEW %s AS %s", ti.FullName(), ti.ViewDefinition))
242+
}
243+
} else {
244+
// Table only attributes
245+
if ti.StorageLocation != oldti.StorageLocation {
246+
statements = append(statements, fmt.Sprintf("ALTER TABLE %s SET %s", ti.FullName(), ti.buildLocationStatement()))
247+
}
248+
}
249+
250+
// Attributes common to both views and tables
251+
if ti.Comment != oldti.Comment {
252+
statements = append(statements, fmt.Sprintf("COMMENT ON %s %s IS '%s'", typestring, ti.FullName(), ti.Comment))
253+
}
254+
255+
if !reflect.DeepEqual(ti.Properties, oldti.Properties) {
256+
// First handle removal of properties
257+
removeProps := make([]string, 0)
258+
for key := range oldti.Properties {
259+
if _, ok := ti.Properties[key]; !ok {
260+
removeProps = append(removeProps, key)
261+
}
262+
}
263+
if len(removeProps) > 0 {
264+
statements = append(statements, fmt.Sprintf("ALTER %s %s UNSET TBLPROPERTIES IF EXISTS (%s)", typestring, ti.FullName(), strings.Join(removeProps, ",")))
265+
}
266+
// Next handle property changes and additions
267+
statements = append(statements, fmt.Sprintf("ALTER %s %s SET TBLPROPERTIES (%s)", typestring, ti.FullName(), ti.serializeProperties()))
268+
}
269+
270+
return statements, nil
271+
}
272+
273+
func (ti *SqlTableInfo) updateTable(oldti *SqlTableInfo) error {
274+
statements, err := ti.diff(oldti)
275+
if err != nil {
276+
return err
277+
}
278+
for _, statement := range statements {
279+
err = ti.applySql(statement)
280+
if err != nil {
281+
return err
282+
}
283+
}
284+
return nil
285+
}
286+
287+
func (ti *SqlTableInfo) createTable() error {
288+
return ti.applySql(ti.buildTableCreateStatement())
289+
}
290+
291+
func (ti *SqlTableInfo) deleteTable() error {
292+
return ti.applySql(fmt.Sprintf("DROP %s %s", ti.getTableTypeString(), ti.FullName()))
293+
}
294+
295+
func (ti *SqlTableInfo) applySql(sqlQuery string) error {
296+
log.Printf("[INFO] Executing Sql: %s", sqlQuery)
297+
r := ti.exec.Execute(ti.ClusterID, "sql", sqlQuery)
298+
299+
if !r.Failed() {
300+
return nil
301+
}
302+
return fmt.Errorf("cannot execute %s: %s", sqlQuery, r.Error())
303+
}
304+
305+
func ResourceSqlTable() *schema.Resource {
306+
tableSchema := common.StructToSchema(SqlTableInfo{},
307+
func(s map[string]*schema.Schema) map[string]*schema.Schema {
308+
s["data_source_format"].DiffSuppressFunc = func(k, old, new string, d *schema.ResourceData) bool {
309+
if new == "" {
310+
return true
311+
}
312+
return strings.EqualFold(strings.ToLower(old), strings.ToLower(new))
313+
}
314+
return s
315+
})
316+
return common.Resource{
317+
Schema: tableSchema,
318+
CustomizeDiff: func(ctx context.Context, d *schema.ResourceDiff) error {
319+
if d.HasChange("properties") {
320+
old, new := d.GetChange("properties")
321+
oldProps := old.(map[string]any)
322+
newProps := new.(map[string]any)
323+
for key := range oldProps {
324+
if _, ok := newProps[key]; !ok {
325+
if sqlTableIsManagedProperty(key) {
326+
newProps[key] = oldProps[key]
327+
}
328+
}
329+
}
330+
d.SetNew("properties", newProps)
331+
}
332+
// No support yet for changing the COMMENT on a VIEW
333+
// Once added this can be removed
334+
if d.HasChange("comment") && d.Get("table_type") == "VIEW" {
335+
d.ForceNew("comment")
336+
}
337+
return nil
338+
},
339+
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
340+
var ti = new(SqlTableInfo)
341+
common.DataToStructPointer(d, tableSchema, ti)
342+
if err := ti.initCluster(ctx, d, c); err != nil {
343+
return err
344+
}
345+
if err := ti.createTable(); err != nil {
346+
return err
347+
}
348+
d.SetId(ti.FullName())
349+
return nil
350+
},
351+
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
352+
ti, err := NewSqlTablesAPI(ctx, c).getTable(d.Id())
353+
if err != nil {
354+
return err
355+
}
356+
return common.StructToData(ti, tableSchema, d)
357+
},
358+
Update: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
359+
var newti = new(SqlTableInfo)
360+
common.DataToStructPointer(d, tableSchema, newti)
361+
if err := newti.initCluster(ctx, d, c); err != nil {
362+
return err
363+
}
364+
oldti, err := NewSqlTablesAPI(ctx, c).getTable(d.Id())
365+
if err != nil {
366+
return err
367+
}
368+
return newti.updateTable(&oldti)
369+
},
370+
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
371+
var ti = new(SqlTableInfo)
372+
common.DataToStructPointer(d, tableSchema, ti)
373+
if err := ti.initCluster(ctx, d, c); err != nil {
374+
return err
375+
}
376+
return ti.deleteTable()
377+
},
378+
}.ToResource()
379+
}

0 commit comments

Comments
 (0)