diff --git a/app/src/libraries/streamlit.py b/app/src/libraries/streamlit.py index ac718d4..82f8d90 100644 --- a/app/src/libraries/streamlit.py +++ b/app/src/libraries/streamlit.py @@ -1,90 +1,100 @@ +# Import libraries +from snowflake.snowpark.context import get_active_session +from snowflake.snowpark.functions import sum, col, when, max, lag +from snowflake.snowpark import Window +from datetime import timedelta +import altair as alt import streamlit as st import pandas as pd -import altair as alt -from snowflake.snowpark.context import get_active_session -from snowflake.snowpark.functions import count_distinct,col,sum -import snowflake.permissions as permission -from sys import exit +import snowflake.permissions as permissions +##import plotly.express as px +import datetime as dt + +# Set page config st.set_page_config(layout="wide") +# Get current session session = get_active_session() -def load_app(orders_table,site_recovery_table): - with st.spinner("Loading lead time, order status, and supplier performance. Please wait..."): - df = session.sql(f"SELECT t1.order_id,t2.ship_order_id,t1.material_name,t1.supplier_name, t1.quantity, t1.cost, t2.status, t2.lat, t2.lon FROM {orders_table} as t1 INNER JOIN MFG_SHIPPING as t2 ON t2.ORDER_ID = t1.ORDER_ID ORDER BY t1.order_id") - df_order_status = df.group_by('status').agg(count_distinct('order_id').as_('TOTAL RECORDS')).order_by('status').to_pandas() +#TITRE +st.header("ACTIN TABLE COMPARISON") - df_cal_lead_time = session.sql(f"SELECT t1.order_id,t2.ship_order_id,t1.material_name,t1.supplier_name,t1.quantity,t1.cost,t2.status,t2.lat,t2.lon,cal_lead_time(t1.process_supply_day,t2.duration,t2.recovery_days) as lead_time FROM {orders_table} as t1 INNER JOIN (SELECT order_id, ship_order_id, status, duration, MFG_SHIPPING.lat, MFG_SHIPPING.lon, IFF(srt.recovery_weeks * 7::int = srt.recovery_weeks * 7,srt.recovery_weeks * 7,0) as recovery_days from MFG_SHIPPING LEFT OUTER JOIN {site_recovery_table} as srt ON MFG_SHIPPING.lon = srt.lon AND MFG_SHIPPING.lat = srt.lat) as t2 ON t2.ORDER_ID = t1.ORDER_ID ORDER BY t1.order_id") - df_supplier_perf = df_cal_lead_time.group_by('supplier_name').agg(sum(col('lead_time')).as_('TOTAL LEAD TIME')).sort('TOTAL LEAD TIME', ascending=True).limit(20).to_pandas() - df_lead_time = df_cal_lead_time.select('order_id','lead_time').sort('order_id', ascending=True).to_pandas() - - with st.container(): - col1,col2 = st.columns(2,gap='small') - with col1: - # Display Lead Time Status chart - st.subheader("Lead Time Status") - lead_time_base = alt.Chart(df_lead_time).encode(alt.X("ORDER_ID", title="ORDER ID", sort=None)) - lead_time_base_bars = lead_time_base.mark_bar().encode( - color=alt.value("#249DC9"), - y=alt.Y("LEAD_TIME", title="LEAD TIME DAYS") - ) - line = alt.Chart(pd.DataFrame({'y': [60]})).mark_rule(color='rgb(249,158,54)').encode(y='y') - lead_time_chart = alt.layer(lead_time_base_bars) - st.altair_chart(lead_time_chart + line, use_container_width=True) +st.subheader(":dart: COMPARE TABLES BETWEEN A SOURCE ENVIRONMENT AND A TARGET ENVIRONMENT") + +st.write("----------------:one: Update settings file to set tables to compare") +st.write("----------------:two: Launch the tables comparison and save results in a new file") +st.write("----------------:three: Visualize and interpret results") + + +#PARTIE 1 MODIFIER LES TABLES A COMPARER EN INPUT +st.subheader(":wrench: 1 : SET THE LIST OF TABLES TO BE COMPARED") +st.write(" Informations required are the source (DATABASE_SOURCE, SCHEMA_SOURCE) and the target (DATABASE_CIBLE, SCHEMA_CIBLE) environments, as well as the table name (TABLE_NAME)") +st.write("DATE_SCOPE delimits the temporal scope with MIN_DATE and MAX_DATE bounds for both tables. If DATE_SCOPE is NULL, the entire table is used.") + +#On affiche la table de paramétrage +table_list="TABLE_LIST" +table_list_df=session.table(table_list).to_pandas() + +#On permet à l'utilisateur de modifier la table de paramétrage +with st.form("table write"): + df_edited=st.experimental_data_editor(table_list_df, use_container_width= True , num_rows="dynamic") + write_to_snowflake=st.form_submit_button("UPDATE") + +#Mise à jour dans Snowflake +if write_to_snowflake: + with st.spinner("UPDATING..."): + try: + session.write_pandas(df_edited, table_list, overwrite = True) + session.sql('GRANT SELECT ON TABLE TABLE_LIST TO APPLICATION ROLE app_instance_role').collect() + except: + st.write('Error saving to table.') + st.success("The input table has been successfully updated in Snowflake!") + +st.divider() + + +#PARTIE 2 LANCER LA COMPARAISON DES TABLES +st.subheader(":telescope: 2 : LAUNCH TABLE COMPARISON") + + +#Création d'un suffixe datetime pour identifier la table résultats de manière unique +current_time = dt.datetime.now() +tb_result_default=str(current_time.year)+('0'+str(current_time.month))[-2:]+('0'+str(current_time.day))[-2:]+'_'+('0'+str(current_time.hour))[-2:]+('0'+str(current_time.minute))[-2:]+('0'+str(current_time.second))[-2:] + +#On permet à l'utilisateur de changer le nom de la table résultats + +col1, col2, col3 = st.columns(3) +with col1: + tb_results_name = st.text_input( 'CHANGE THE TABLE NAME SUFFIX TO RECORD THE VARIOUS RESULTS :',value=tb_result_default) + composed_query="""CREATE or REPLACE TABLE """+tb_results_name +""" AS SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY 1;""" + with st.form("Proc stockées"): + launch_proc_stock=st.form_submit_button("LAUNCH TABLE COMPARISON") + + + +#Lancer les procedures stockées +#with st.form("Proc stockées"): +# launch_proc_stock=st.form_submit_button("LANCER LA COMPARAISON DES TABLES") + +if launch_proc_stock: + with st.spinner("CALCULATION IN PROGRESS..."): + + df = session.sql("""call GET_DICTIONNARY_SOURCE('TABLE_LIST')""").collect() + df = session.sql("CREATE or REPLACE TABLE SOURCE_DICTIONNARY AS SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY 1;").collect() - def color_lead_time(val): - return f'background-color: rgb(249,158,54)' + #df = session.sql("""call GET_STATISTICS('SOURCE_DICTIONNARY')""").collect() + #df = session.sql("CREATE or REPLACE TABLE SOURCE_STATISTICS AS SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY 1;").collect() + - with col2: - # Underlying Data - st.subheader("Orders with Lead Time Status >= 60days") - df_lead_time_60_days = df_cal_lead_time.select('lead_time','order_id','ship_order_id','material_name','supplier_name','quantity','cost').filter(col('lead_time') > 60).sort('lead_time').to_pandas() - df_lead_time_60_days['LEAD_TIME'] = df_lead_time_60_days['LEAD_TIME'].astype('int') - st.dataframe(df_lead_time_60_days.style.applymap(color_lead_time, subset=['LEAD_TIME'])) + #df = session.sql("""call GET_DICTIONNARY_CIBLE('TABLE_LIST','PC_ALTERYX_DB.INFORMATION_SCHEMA.COLUMNS')""").collect() + #df = session.sql("CREATE or REPLACE TABLE CIBLE_DICTIONNARY AS SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY 1;").collect() - with st.container(): - col1,col2 = st.columns(2,gap='small') - with col1: - # Display Supplier Performance - st.subheader("Supplier Performance") - supplier_perf_base = alt.Chart(df_supplier_perf).encode(alt.X("SUPPLIER_NAME:N", title="SUPPLIER NAME", sort=None)) - supplier_perf_base_bars = supplier_perf_base.mark_bar().encode( - color=alt.value("#249DC9"), - y=alt.Y("TOTAL LEAD TIME", title="TOTAL LEAD TIME") - ) - supplier_perf_chart = alt.layer(supplier_perf_base_bars) - st.altair_chart(supplier_perf_chart, use_container_width=True) - - # Underlying Data - # st.subheader("Underlying Data") - # st.dataframe(df_lead_time) + #df = session.sql("""call GET_STATISTICS('CIBLE_DICTIONNARY')""").collect() + #df = session.sql("CREATE or REPLACE TABLE CIBLE_STATISTICS AS SELECT * FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) ORDER BY 1;").collect() - with col2: - # Display Purchase Order Status - st.subheader("Purchase Order Status") - order_status_base = alt.Chart(df_order_status).encode(alt.X("STATUS", sort=['Order_confirmed','Shipped','In_Transit','Out_for_delivery','Delivered'])) - order_status_base_bars = order_status_base.mark_bar().encode( - color=alt.value("#249DC9"), - y=alt.Y("TOTAL RECORDS", title="TOTAL RECORDS") - ) - order_status_chart = alt.layer(order_status_base_bars) - st.altair_chart(order_status_chart, use_container_width=True) - - # Underlying Data - # st.subheader("Underlying Data") - # st.dataframe(df_order_status) - -orders_reference_associations = permission.get_reference_associations("order_table") -if len(orders_reference_associations) == 0: - permission.request_reference("order_table") - exit(0) - -site_recovery_reference_associations = permission.get_reference_associations("site_recovery_table") -if len(site_recovery_reference_associations) == 0: - permission.request_reference("site_recovery_table") - exit(0) - -st.title("Where Are My Ski Goggles?") -orders_table = "reference('order_table')" -site_recovery_table = "reference('site_recovery_table')" -load_app(orders_table,site_recovery_table) + #df = session.sql("""call COMPARE_STATISTICS('CIBLE_STATISTICS', 'SOURCE_STATISTICS')""").collect() + #df = session.sql(composed_query).collect() + + st.success("Successfully run in Snowflake! Statistics ready for viewing! :point_down:") + +st.divider() diff --git a/app/src/manifest.yml b/app/src/manifest.yml index 7ef92a4..1565550 100644 --- a/app/src/manifest.yml +++ b/app/src/manifest.yml @@ -15,20 +15,21 @@ artifacts: #runtime configuration for this version configuration: log_level: debug - trace_level: off + trace_level: always + references: - order_table: - label: "Orders Table" - description: "Select table" + label: "information_schema" + description: "Give the access to the information_schema.column table" privileges: - SELECT object_type: Table multi_valued: false register_callback: app_instance_schema.update_reference - site_recovery_table: - label: "Site Recovery Table" - description: "Select table" + label: "Prod_Table" + description: "Provide access to the prod table" privileges: - SELECT object_type: Table diff --git a/app/src/setup.sql b/app/src/setup.sql index d18a7da..05c15a7 100644 --- a/app/src/setup.sql +++ b/app/src/setup.sql @@ -2,42 +2,16 @@ -- This script runs when the app is installed -- ========================================== --- Create Application Role and Schema +-- Create Application Role create application role if not exists app_instance_role; -create or alter versioned schema app_instance_schema; --- Share data -create or replace view app_instance_schema.MFG_SHIPPING as select * from shared_content_schema.MFG_SHIPPING; --- Create Streamlit app -create or replace streamlit app_instance_schema.streamlit from '/libraries' main_file='streamlit.py'; +--Creation du shcema app_instance_schema +create or alter versioned schema app_instance_schema; +grant usage on schema app_instance_schema to application role app_instance_role; --- Create UDFs -create or replace function app_instance_schema.cal_lead_time(i int, j int, k int) -returns float -language python -runtime_version = '3.8' -packages = ('snowflake-snowpark-python') -imports = ('/libraries/udf.py') -handler = 'udf.cal_lead_time'; - -create or replace function app_instance_schema.cal_distance(slat float,slon float,elat float,elon float) -returns float -language python -runtime_version = '3.8' -packages = ('snowflake-snowpark-python','pandas','scikit-learn==1.1.1') -imports = ('/libraries/udf.py') -handler = 'udf.cal_distance'; - --- Create Stored Procedure -create or replace procedure app_instance_schema.billing_event(number_of_rows int) -returns string -language python -runtime_version = '3.8' -packages = ('snowflake-snowpark-python') -imports = ('/libraries/procs.py') -handler = 'procs.billing_event'; +--La procedure sotckée UPDATE_REFERENCE : création et droits create or replace procedure app_instance_schema.update_reference(ref_name string, operation string, ref_or_alias string) returns string language sql @@ -57,11 +31,171 @@ begin end; $$; --- Grant usage and permissions on objects -grant usage on schema app_instance_schema to application role app_instance_role; -grant usage on function app_instance_schema.cal_lead_time(int,int,int) to application role app_instance_role; -grant usage on procedure app_instance_schema.billing_event(int) to application role app_instance_role; -grant usage on function app_instance_schema.cal_distance(float,float,float,float) to application role app_instance_role; -grant SELECT on view app_instance_schema.MFG_SHIPPING to application role app_instance_role; -grant usage on streamlit app_instance_schema.streamlit to application role app_instance_role; +--grant usage grant usage on procedure app_instance_schema.update_reference(string, string, string) to application role app_instance_role; + + + +--Les procédures stockées GET_DICTIONNARY_CIBLE, GET_DICTIONNARY_SOURCE, COMPARE_STATISTICS, GET_STATISTICS : création et droits +CREATE OR REPLACE PROCEDURE app_instance_schema.GET_DICTIONNARY_SOURCE(table_list string, information_schema_columns string) +RETURNS TABLE () +LANGUAGE SQL +EXECUTE AS OWNER +AS 'DECLARE + res RESULTSET DEFAULT ( + with + t1 as ( + select * from IDENTIFIER(:table_list) + ) + ,t2 as ( + select TABLE_CATALOG,TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME ,IS_NULLABLE ,DATA_TYPE from IDENTIFIER(:information_schema_columns) + + ) + select t2.*,t1.min_date,t1.max_date, t1.date_scope from t1 + inner join t2 + on t1.TABLE_NAME=t2.TABLE_NAME and + t1.schema_source=t2.TABLE_SCHEMA); + + BEGIN + return table(res); +END'; + +CREATE OR REPLACE PROCEDURE app_instance_schema.GET_DICTIONNARY_CIBLE(table_list string) +RETURNS TABLE () +LANGUAGE SQL +EXECUTE AS OWNER +AS 'DECLARE + res RESULTSET DEFAULT ( + with + t1 as ( + select * from IDENTIFIER(:table_list) + ) + ,t2 as ( + select TABLE_CATALOG,TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME ,IS_NULLABLE ,DATA_TYPE from reference('information_schema') + + ) + select t2.*,t1.min_date,t1.max_date, t1.date_scope from t1 + inner join t2 + on t1.TABLE_NAME=t2.TABLE_NAME and + t1.schema_cible=t2.TABLE_SCHEMA); + + BEGIN + return table(res); +END'; + + +CREATE OR REPLACE PROCEDURE app_instance_schema.COMPARE_STATISTICS(table_prod_statistics string, table_dev_statistics string) +RETURNS TABLE () +LANGUAGE SQL +EXECUTE AS OWNER +AS 'DECLARE + res RESULTSET DEFAULT + ( + + select ''PROD'' as source, + iff(not(a.DATA_TYPE=b.DATA_TYPE),1, 0) as data_type_issue, + iff(not(a.UNIQUE_VALUES=b.UNIQUE_VALUES),1, 0) as unique_values_issue, + iff(not(a.MAX_LENGTH=b.MAX_LENGTH and a.MIN_LENGTH=b.MIN_LENGTH ),1, 0) as length_issue, + iff(not(a.SUM_VALUE=b.SUM_VALUE),1, 0) as sum_issue, + iff(not(a.MIN_VALUE=b.MIN_VALUE and a.MIN_DATE=b.MIN_DATE),1, 0) as min_issue, + iff(not(a.MAX_VALUE=b.MAX_VALUE and a.MAX_DATE=b.MAX_DATE),1, 0) as max_issue, + a.* + from IDENTIFIER(:table_prod_statistics) a inner join IDENTIFIER(:table_dev_statistics) b + on a.TABLE_NAME=b.TABLE_NAME and + a.COLUMN_NAME=b.COLUMN_NAME + union all + + select ''DEV'' as source, + iff(not(a.DATA_TYPE=b.DATA_TYPE),1, 0) as data_type_issue, + iff(not(a.UNIQUE_VALUES=b.UNIQUE_VALUES),1, 0) as unique_values_issue, + iff(not(a.MAX_LENGTH=b.MAX_LENGTH and a.MIN_LENGTH=b.MIN_LENGTH ),1, 0) as length_issue, + iff(not(a.SUM_VALUE=b.SUM_VALUE),1, 0) as sum_issue, + iff(not(a.MIN_VALUE=b.MIN_VALUE and a.MIN_DATE=b.MIN_DATE),1, 0) as min_issue, + iff(not(a.MAX_VALUE=b.MAX_VALUE and a.MAX_DATE=b.MAX_DATE),1, 0) as max_issue, + a.* + from IDENTIFIER(:table_dev_statistics) a inner join IDENTIFIER(:table_prod_statistics) b + on a.TABLE_NAME=b.TABLE_NAME and + a.COLUMN_NAME=b.COLUMN_NAME + + order by table_name, column_name + ); + + BEGIN + return table(res); +END'; + + + +CREATE OR REPLACE PROCEDURE app_instance_schema.GET_STATISTICS(dictionnary_table string) +RETURNS TABLE () +LANGUAGE SQL +EXECUTE AS OWNER +AS 'DECLARE + sql string; + final_sql := ''''; + res RESULTSET DEFAULT (SELECT * FROM IDENTIFIER(:dictionnary_table)); + c1 cursor for res; + BEGIN + FOR record in c1 do + + if (record.DATA_TYPE in (''NUMBER'',''DECIMAL'', ''NUMERIC'', ''INT'', ''INTEGER'', ''BIGINT'', ''SMALLINT'', ''TINYINT'', ''BYTEINT'', ''FLOAT'', ''FLOAT4'',''FLOAT8'',''DOUBLE'', ''DOUBLE PRECISION'',''REAL'')) + then + sql := ''SELECT ''''''||record.TABLE_NAME||'''''' as TABLE_NAME, ''''''||REPLACE(record.COLUMN_NAME,'''''''', '''''''''''')||'''''' as COLUMN_NAME, ''''''||record.DATA_TYPE||'''''' as DATA_TYPE, count( distinct "''||record.COLUMN_NAME||''") as UNIQUE_VALUES, null as MAX_LENGTH, null as MIN_LENGTH, SUM("''||record.COLUMN_NAME||''") as SUM_VALUE, MAX("''||record.COLUMN_NAME||''") as MAX_VALUE, MIN("''||record.COLUMN_NAME||''") as MIN_VALUE,null as MAX_DATE, null as MIN_DATE FROM ''||record.TABLE_CATALOG||''.''||record.TABLE_SCHEMA||''."''||record.TABLE_NAME||''" ''; + + elseif (record.DATA_TYPE in (''DATE'',''DATETIME'',''TIME'',''TIMESTAMP_LTZ'',''TIMESTAMP_NTZ'',''TIMESTAMP_TZ'')) then + sql := ''SELECT ''''''||record.TABLE_NAME||'''''' as TABLE_NAME, ''''''||REPLACE(record.COLUMN_NAME,'''''''', '''''''''''')||'''''' as COLUMN_NAME, ''''''||record.DATA_TYPE||'''''' as DATA_TYPE, count(distinct "''||record.COLUMN_NAME||''") as UNIQUE_VALUES, null as MAX_LENGTH, null as MIN_LENGTH, null as SUM_VALUE, null as MAX_VALUE, null as MIN_VALUE, MAX("''||record.COLUMN_NAME||''") as MAX_DATE, MIN("''||record.COLUMN_NAME||''") as MIN_DATE FROM ''||record.TABLE_CATALOG||''.''||record.TABLE_SCHEMA||''."''||record.TABLE_NAME||''" ''; + + else + sql := ''SELECT ''''''||record.TABLE_NAME||'''''' as TABLE_NAME, ''''''||REPLACE(record.COLUMN_NAME,'''''''', '''''''''''')||'''''' as COLUMN_NAME, ''''''||record.DATA_TYPE||'''''' as DATA_TYPE, count( distinct "''||record.COLUMN_NAME||''") as UNIQUE_VALUES, MAX(LEN("''||record.COLUMN_NAME||''")) as MAX_LENGTH, MIN(LEN("''||record.COLUMN_NAME||''")) as MIN_LENGTH, null as SUM_VALUE, null as MAX_VALUE, null as MIN_VALUE, null as MAX_DATE, null as MIN_DATE FROM ''||record.TABLE_CATALOG||''.''||record.TABLE_SCHEMA||''."''||record.TABLE_NAME||''" ''; + + end if; + + if(record.DATE_SCOPE is not null) then + sql := sql || ''where "''||record.DATE_SCOPE||''" between ''''''||record.MIN_DATE||'''''' and ''''''||record.MAX_DATE||'''''' '' ;end if; + + + if(final_sql<>'''')then + final_sql := final_sql || '' UNION ALL ''; + end if; + + final_sql := final_sql || sql; + + END FOR; + res := (EXECUTE IMMEDIATE :final_sql); + RETURN TABLE (res); +end'; + + +-- grant usage +grant usage on procedure app_instance_schema.GET_DICTIONNARY_SOURCE(string) to application role app_instance_role; +grant usage on procedure app_instance_schema.GET_DICTIONNARY_CIBLE(string,string) to application role app_instance_role; +grant usage on procedure app_instance_schema.COMPARE_STATISTICS(string, string) to application role app_instance_role; +grant usage on procedure app_instance_schema.GET_STATISTICS(string) to application role app_instance_role; + + +-- TABLE DE PARAMETRAGE : Creer le schema INPUT puis la table table_list + +CREATE or replace TABLE app_instance_schema.TABLE_LIST ( + table_name VARCHAR(100) DEFAULT NULL, + database_source VARCHAR(100) DEFAULT NULL, + schema_source VARCHAR(100) DEFAULT NULL, + database_cible VARCHAR(100) DEFAULT NULL, + schema_cible VARCHAR(100) DEFAULT NULL, + date_scope VARCHAR(100) DEFAULT NULL, + min_date DATE DEFAULT NULL, + max_date DATE DEFAULT NULL +); + +INSERT INTO app_instance_schema.TABLE_LIST + VALUES + ('Hypermarche_Achats','PC_ALTERYX_DB','HYPERMARCHE_DEV','PC_ALTERYX_DB','HYPERMARCHE','Date de commande','2019-01-01','2022-12-31'), + ('Hypermarche_Personnes','PC_ALTERYX_DB','HYPERMARCHE_DEV','PC_ALTERYX_DB','HYPERMARCHE',null,null,null), + ('Hypermarche_Retours','PC_ALTERYX_DB','HYPERMARCHE_DEV','PC_ALTERYX_DB','HYPERMARCHE',null,null,null); + +GRANT SELECT ON TABLE app_instance_schema.TABLE_LIST TO APPLICATION ROLE app_instance_role; + + +-- CREATION DE L'APPLICATION STEAMLIT +create or replace streamlit app_instance_schema.streamlit from '/libraries' main_file='streamlit.py'; + +grant usage on streamlit app_instance_schema.streamlit to application role app_instance_role;