1515package internal
1616
1717import (
18- "fmt "
18+ "context "
1919 "log/slog"
2020 "os"
2121
2222 "github.com/DataBridgeTech/dbqcore"
23+ "github.com/DataBridgeTech/dbqcore/dbq"
2324
2425 "github.com/spf13/cobra"
2526 "github.com/spf13/viper"
@@ -30,7 +31,7 @@ type DbqCliApp interface {
3031 PingDataSource (srcId string ) (string , error )
3132 ImportDatasets (srcId string , filter string ) ([]string , error )
3233 ProfileDataset (srcId string , dataset string , sample bool , maxConcurrent int ) (* dbqcore.TableMetrics , error )
33- RunCheck (check * dbqcore.Check , dataSource * dbqcore.DataSource , dataset string , defaultWhere string ) (bool , string , error )
34+ RunCheck (check * dbqcore.DataQualityCheck , dataSource * dbqcore.DataSource , dataset string , defaultWhere string ) (bool , string , error )
3435 GetDbqConfig () * dbqcore.DbqConfig
3536 SaveDbqConfig () error
3637 SetLogLevel (level slog.Level )
@@ -41,26 +42,29 @@ type DbqAppImpl struct {
4142 dbqConfigPath string
4243 dbqConfig * dbqcore.DbqConfig
4344 logLevel slog.Level
45+ logger * slog.Logger
4446}
4547
4648func NewDbqCliApp (dbqConfigPath string ) DbqCliApp {
4749 dbqConfig , dbqConfigUsedPath := initConfig (dbqConfigPath )
50+ logger := slog .New (slog .NewTextHandler (os .Stdout , & slog.HandlerOptions {Level : slog .LevelError }))
4851 return & DbqAppImpl {
4952 dbqConfigPath : dbqConfigUsedPath ,
5053 dbqConfig : dbqConfig ,
5154 logLevel : slog .LevelError ,
55+ logger : logger , // todo: fix logger init
5256 }
5357}
5458
5559func (app * DbqAppImpl ) PingDataSource (srcId string ) (string , error ) {
5660 var dataSource = app .FindDataSourceById (srcId )
5761
58- cnn , err := getDbqConnector ( * dataSource , app .logLevel )
62+ cnn , err := dbq . NewDbqConnector ( dataSource , app .logger )
5963 if err != nil {
6064 return "" , err
6165 }
6266
63- info , err := cnn .Ping ()
67+ info , err := cnn .Ping (context . Background ()) // todo: ctx propagation
6468 if err != nil {
6569 return "" , err
6670 }
@@ -71,23 +75,23 @@ func (app *DbqAppImpl) PingDataSource(srcId string) (string, error) {
7175func (app * DbqAppImpl ) ImportDatasets (srcId string , filter string ) ([]string , error ) {
7276 var dataSource = app .FindDataSourceById (srcId )
7377
74- cnn , err := getDbqConnector ( * dataSource , app .logLevel )
78+ cnn , err := dbq . NewDbqConnector ( dataSource , app .logger )
7579 if err != nil {
7680 return []string {}, err
7781 }
7882
79- return cnn .ImportDatasets (filter )
83+ return cnn .ImportDatasets (context . Background (), filter ) // todo: ctx propagation
8084}
8185
8286func (app * DbqAppImpl ) ProfileDataset (srcId string , dataset string , sample bool , maxConcurrent int ) (* dbqcore.TableMetrics , error ) {
8387 var dataSource = app .FindDataSourceById (srcId )
8488
85- cnn , err := getDbqConnector ( * dataSource , app .logLevel )
89+ dbqProfiler , err := dbq . NewDbqProfiler ( dataSource , app .logger )
8690 if err != nil {
8791 return nil , err
8892 }
8993
90- return cnn .ProfileDataset (dataset , sample , maxConcurrent )
94+ return dbqProfiler .ProfileDataset (context . Background (), dataset , sample , maxConcurrent ) // todo: ctx propagation
9195}
9296
9397func (app * DbqAppImpl ) GetDbqConfig () * dbqcore.DbqConfig {
@@ -117,12 +121,13 @@ func (app *DbqAppImpl) FindDataSourceById(srcId string) *dbqcore.DataSource {
117121 return nil
118122}
119123
120- func (app * DbqAppImpl ) RunCheck (check * dbqcore.Check , dataSource * dbqcore.DataSource , dataset string , defaultWhere string ) (bool , string , error ) {
121- cnn , err := getDbqConnector ( * dataSource , app .logLevel )
124+ func (app * DbqAppImpl ) RunCheck (check * dbqcore.DataQualityCheck , dataSource * dbqcore.DataSource , dataset string , defaultWhere string ) (bool , string , error ) {
125+ dbqValidator , err := dbq . NewDbqValidator ( dataSource , app .logger )
122126 if err != nil {
123127 return false , "" , err
124128 }
125- return cnn .RunCheck (check , dataset , defaultWhere )
129+
130+ return dbqValidator .RunCheck (context .Background (), check , dataset , defaultWhere ) // todo: ctx propagation
126131}
127132
128133func (app * DbqAppImpl ) SetLogLevel (logLevel slog.Level ) {
@@ -154,13 +159,3 @@ func initConfig(dbqConfigPath string) (*dbqcore.DbqConfig, string) {
154159
155160 return & dbqConfig , v .ConfigFileUsed ()
156161}
157-
158- func getDbqConnector (ds dbqcore.DataSource , logLevel slog.Level ) (dbqcore.DbqConnector , error ) {
159- logHandler := slog .NewTextHandler (os .Stdout , & slog.HandlerOptions {Level : logLevel })
160- switch ds .Type {
161- case dbqcore .DataSourceTypeClickhouse :
162- return dbqcore .NewClickhouseDbqConnector (ds , slog .New (logHandler ))
163- default :
164- return nil , fmt .Errorf ("data source type '%s' is not supported" , ds .Type )
165- }
166- }
0 commit comments