Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/data-sources/source_reference.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
# generated by https://github.com/hashicorp/terraform-plugin-docs
page_title: "materialize_source_reference Data Source - terraform-provider-materialize"
subcategory: ""
description: |-
The materialize_source_reference data source retrieves a list of available upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.
---

# materialize_source_reference (Data Source)

The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.

## Example Usage

```terraform
data "materialize_source_reference" "source_references" {
source_id = materialize_source_mysql.test.id
}

output "source_references" {
value = data.materialize_source_reference.my_source_references.references
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- `source_id` (String) The ID of the source to get references for

### Optional

- `region` (String) The region in which the resource is located.

### Read-Only

- `id` (String) The ID of this resource.
- `references` (List of Object) The source references (see [below for nested schema](#nestedatt--references))

<a id="nestedatt--references"></a>
### Nested Schema for `references`

Read-Only:

- `columns` (List of String)
- `name` (String)
- `namespace` (String)
- `source_database_name` (String)
- `source_name` (String)
- `source_schema_name` (String)
- `source_type` (String)
- `updated_at` (String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
data "materialize_source_reference" "source_references" {
source_id = materialize_source_mysql.test.id
}

output "source_references" {
value = data.materialize_source_reference.my_source_references.references
}
118 changes: 118 additions & 0 deletions pkg/datasources/datasource_source_reference.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package datasources

import (
"context"

"github.com/MaterializeInc/terraform-provider-materialize/pkg/materialize"
"github.com/MaterializeInc/terraform-provider-materialize/pkg/utils"

"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func SourceReference() *schema.Resource {
return &schema.Resource{
ReadContext: sourceReferenceRead,
Description: "The `materialize_source_reference` data source retrieves a list of *available* upstream references for a given Materialize source. These references represent potential tables that can be created based on the source, but they do not necessarily indicate references the source is already ingesting. This allows users to see all upstream data that could be materialized into tables.",
Schema: map[string]*schema.Schema{
"source_id": {
Type: schema.TypeString,
Required: true,
Description: "The ID of the source to get references for",
},
"references": {
Type: schema.TypeList,
Computed: true,
Description: "The source references",
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"namespace": {
Type: schema.TypeString,
Computed: true,
Description: "The namespace of the reference",
},
"name": {
Type: schema.TypeString,
Computed: true,
Description: "The name of the reference",
},
"updated_at": {
Type: schema.TypeString,
Computed: true,
Description: "The last update timestamp of the reference",
},
"columns": {
Type: schema.TypeList,
Computed: true,
Description: "The columns of the reference",
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"source_name": {
Type: schema.TypeString,
Computed: true,
Description: "The name of the source",
},
"source_schema_name": {
Type: schema.TypeString,
Computed: true,
Description: "The schema name of the source",
},
"source_database_name": {
Type: schema.TypeString,
Computed: true,
Description: "The database name of the source",
},
"source_type": {
Type: schema.TypeString,
Computed: true,
Description: "The type of the source",
},
},
},
},
"region": RegionSchema(),
},
}
}

func sourceReferenceRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
sourceID := d.Get("source_id").(string)
sourceID = utils.ExtractId(sourceID)

var diags diag.Diagnostics

metaDb, region, err := utils.GetDBClientFromMeta(meta, d)
if err != nil {
return diag.FromErr(err)
}

sourceReference, err := materialize.ListSourceReferences(metaDb, sourceID)
if err != nil {
return diag.FromErr(err)
}

referenceFormats := []map[string]interface{}{}
for _, sr := range sourceReference {
referenceMap := map[string]interface{}{
"namespace": sr.Namespace.String,
"name": sr.Name.String,
"updated_at": sr.UpdatedAt.String,
"columns": sr.Columns,
"source_name": sr.SourceName.String,
"source_schema_name": sr.SourceSchemaName.String,
"source_database_name": sr.SourceDBName.String,
"source_type": sr.SourceType.String,
}
referenceFormats = append(referenceFormats, referenceMap)
}

if err := d.Set("references", referenceFormats); err != nil {
return diag.FromErr(err)
}

d.SetId(utils.TransformIdWithRegion(string(region), sourceID))

return diags
}
46 changes: 46 additions & 0 deletions pkg/datasources/datasource_source_reference_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datasources

import (
"context"
"testing"

"github.com/MaterializeInc/terraform-provider-materialize/pkg/testhelpers"
"github.com/MaterializeInc/terraform-provider-materialize/pkg/utils"

"github.com/DATA-DOG/go-sqlmock"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/stretchr/testify/require"
)

func TestSourceReferenceDatasource(t *testing.T) {
r := require.New(t)

in := map[string]interface{}{
"source_id": "source-id",
}
d := schema.TestResourceDataRaw(t, SourceReference().Schema, in)
r.NotNil(d)

testhelpers.WithMockProviderMeta(t, func(db *utils.ProviderMeta, mock sqlmock.Sqlmock) {
predicate := `WHERE sr.source_id = 'source-id'`
testhelpers.MockSourceReferenceScan(mock, predicate)

if err := sourceReferenceRead(context.TODO(), d, db); err != nil {
t.Fatal(err)
}

// Verify the results
references := d.Get("references").([]interface{})
r.Equal(1, len(references))

reference := references[0].(map[string]interface{})
r.Equal("namespace", reference["namespace"])
r.Equal("reference_name", reference["name"])
r.Equal("2023-10-01T12:34:56Z", reference["updated_at"])
r.Equal([]interface{}{"column1", "column2"}, reference["columns"])
r.Equal("source_name", reference["source_name"])
r.Equal("source_schema_name", reference["source_schema_name"])
r.Equal("source_database_name", reference["source_database_name"])
r.Equal("source_type", reference["source_type"])
})
}
90 changes: 90 additions & 0 deletions pkg/materialize/source_reference.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package materialize

import (
"database/sql"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

type SourceReferenceParams struct {
SourceId sql.NullString `db:"source_id"`
Namespace sql.NullString `db:"namespace"`
Name sql.NullString `db:"name"`
UpdatedAt sql.NullString `db:"updated_at"`
Columns pq.StringArray `db:"columns"`
SourceName sql.NullString `db:"source_name"`
SourceSchemaName sql.NullString `db:"source_schema_name"`
SourceDBName sql.NullString `db:"source_database_name"`
SourceType sql.NullString `db:"source_type"`
}

var sourceReferenceQuery = NewBaseQuery(`
SELECT
sr.source_id,
sr.namespace,
sr.name,
sr.updated_at,
sr.columns,
s.name AS source_name,
ss.name AS source_schema_name,
sd.name AS source_database_name,
s.type AS source_type
FROM mz_internal.mz_source_references sr
JOIN mz_sources s ON sr.source_id = s.id
JOIN mz_schemas ss ON s.schema_id = ss.id
JOIN mz_databases sd ON ss.database_id = sd.id
`)

func SourceReferenceId(conn *sqlx.DB, sourceId string) (string, error) {
p := map[string]string{
"sr.source_id": sourceId,
}
q := sourceReferenceQuery.QueryPredicate(p)

var s SourceReferenceParams
if err := conn.Get(&s, q); err != nil {
return "", err
}

return s.SourceId.String, nil
}

func ScanSourceReference(conn *sqlx.DB, id string) (SourceReferenceParams, error) {
q := sourceReferenceQuery.QueryPredicate(map[string]string{"sr.source_id": id})

var s SourceReferenceParams
if err := conn.Get(&s, q); err != nil {
return s, err
}

return s, nil
}

func refreshSourceReferences(conn *sqlx.DB, sourceName, schemaName, databaseName string) error {
query := fmt.Sprintf(`ALTER SOURCE %s REFRESH REFERENCES`, QualifiedName(databaseName, schemaName, sourceName))
_, err := conn.Exec(query)
return err
}

func ListSourceReferences(conn *sqlx.DB, id string) ([]SourceReferenceParams, error) {
source, err := ScanSource(conn, id)
if err == nil {
if err := refreshSourceReferences(conn, source.SourceName.String, source.SchemaName.String, source.DatabaseName.String); err != nil {
return nil, fmt.Errorf("error refreshing source references: %v", err)
}
}

p := map[string]string{
"sr.source_id": id,
}
q := sourceReferenceQuery.QueryPredicate(p)

var references []SourceReferenceParams
if err := conn.Select(&references, q); err != nil {
return references, err
}

return references, nil
}
Loading
Loading