Skip to content

Commit 706cd28

Browse files
Merge pull request #144 from microsoft/bugfix11904
fix: For double batch processing
2 parents a661399 + 92be78c commit 706cd28

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"cells":[{"cell_type":"code","execution_count":null,"id":"3b73b213-58af-4209-9efd-ac34c9e1e1d7","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# IMPORTANT: This notebook manipulates sample data to guarantee that the Power BI report includes data for the current date, the last two days, and the last seven days. \n","# It is OPTIONAL and is only used to ensure the Power BI report can display data during each deployment."]},{"cell_type":"code","execution_count":null,"id":"e8e036de-0d34-4ea5-ab75-b624ddc2e220","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%%sql\n","--# RUN TO MOVE THE DATES FORWARD TO TODAY\n","UPDATE ckm_conv_processed\n","SET StartTime = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), StartTime),\n","EndTime = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), EndTime),\n","ConversationDate = DATEADD(day, (SELECT DATEDIFF(NOW(), MAX(ConversationDate)) FROM ckm_conv_processed), ConversationDate)"]},{"cell_type":"code","execution_count":null,"id":"82c35c12-b919-4e55-959a-2300f0412ee0","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# This code manipulates sample data that allocates a percentage of the data\n","# across a two weeks period to support storytelling and demo\n","\n","import pandas as pd\n","from datetime import date, datetime, timedelta\n","from pyspark.sql.functions import col\n","\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed\")\n","\n","# Convert string columns to timestamp types\n","df = df.withColumn(\"StartTime\", col(\"StartTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"EndTime\", col(\"EndTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"ConversationDate\", col(\"ConversationDate\").cast(\"timestamp\"))\n","\n","dfp = df.toPandas()\n","dfp = dfp.sample(frac=1) # This line randomly shuffles the df for a new distribution and demo percentages\n","\n","# Following list are date weights from Today-0 to Today-13 (two weeks)\n","weights = [30, 26, 5, 5, 5, 5, 15, 2, 2, 1, 1, 1, 1, 1]\n","dfindex = 0 # index loop through all conversations\n","daysback = 0 # start at today and work backwards\n","for row in weights:\n"," numconvos = int((row/100.00) * df.count())\n"," for i in range(numconvos):\n"," dfp.at[dfindex, 'StartTime'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'StartTime'].time())\n"," dfp.at[dfindex, 'EndTime'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'EndTime'].time())\n"," dfp.at[dfindex, 'ConversationDate'] = datetime.combine(date.today() - timedelta(days = daysback) , dfp.at[dfindex, 'ConversationDate'].time())\n"," dfindex += 1\n"," daysback += 1\n","df = spark.createDataFrame(dfp)\n","\n","# Write to temp table, then update final results table\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"true\").saveAsTable('ckm_conv_processed_temp')\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed_temp \")\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"false\").saveAsTable('ckm_conv_processed')"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"e6ad9dad-e3da-4da5-bca6-6572c466b69a","default_lakehouse_name":"ckm_lakehouse","default_lakehouse_workspace_id":"0d98d480-171b-4b4d-a8e7-80fbd031d1a6","known_lakehouses":[{"id":"e6ad9dad-e3da-4da5-bca6-6572c466b69a"}]}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"[email protected]"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5}
1+
{"cells":[{"cell_type":"code","execution_count":null,"id":"3b73b213-58af-4209-9efd-ac34c9e1e1d7","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# IMPORTANT: This notebook manipulates sample data to guarantee that the Power BI report includes data for the current date, the last two days, and the last seven days. \n","# It is OPTIONAL and is only used to ensure the Power BI report can display data during each deployment."]},{"cell_type":"code","execution_count":null,"id":"e8e036de-0d34-4ea5-ab75-b624ddc2e220","metadata":{"collapsed":false,"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["%%sql\n","--# RUN TO MOVE THE DATES FORWARD TO TODAY\n","UPDATE ckm_conv_processed\n","SET StartTime = DATEADD(day, (SELECT DATEDIFF(CURRENT_DATE, MAX(ConversationDate)) FROM ckm_conv_processed), StartTime),\n"," EndTime = DATEADD(day, (SELECT DATEDIFF(CURRENT_DATE, MAX(ConversationDate)) FROM ckm_conv_processed), EndTime),\n"," ConversationDate = DATEADD(day, (SELECT DATEDIFF(CURRENT_DATE, MAX(ConversationDate)) FROM ckm_conv_processed), ConversationDate)"]},{"cell_type":"code","execution_count":null,"id":"82c35c12-b919-4e55-959a-2300f0412ee0","metadata":{"jupyter":{"outputs_hidden":false,"source_hidden":false},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"nteract":{"transient":{"deleting":false}}},"outputs":[],"source":["# This code manipulates sample data that allocates a percentage of the data\n","# across a two weeks period to support storytelling and demo\n","\n","import pandas as pd\n","from datetime import date, datetime, timedelta\n","from pyspark.sql.functions import col\n","\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed\")\n","\n","# Convert string columns to timestamp types\n","df = df.withColumn(\"StartTime\", col(\"StartTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"EndTime\", col(\"EndTime\").cast(\"timestamp\"))\n","df = df.withColumn(\"ConversationDate\", col(\"ConversationDate\").cast(\"timestamp\"))\n","\n","dfp = df.toPandas()\n","dfp = dfp.sample(frac=1) # Randomly shuffle the df\n","\n","# Following list are date weights from Today-0 to Today-13 (two weeks)\n","weights = [30, 26, 5, 5, 5, 5, 15, 2, 2, 1, 1, 1, 1, 1]\n","dfindex = 0 # index loop through all conversations\n","daysback = 0 # start at today and work backwards\n","\n","# Create a default time (e.g., noon) to use when NaT is encountered\n","default_time = datetime.strptime('12:00:00', '%H:%M:%S').time()\n","\n","for row in weights:\n"," numconvos = int((row/100.00) * df.count())\n"," for i in range(numconvos):\n"," # Handle NaT values by using default time when necessary\n"," start_time = dfp.at[dfindex, 'StartTime'].time() if pd.notna(dfp.at[dfindex, 'StartTime']) else default_time\n"," end_time = dfp.at[dfindex, 'EndTime'].time() if pd.notna(dfp.at[dfindex, 'EndTime']) else default_time\n"," conv_time = dfp.at[dfindex, 'ConversationDate'].time() if pd.notna(dfp.at[dfindex, 'ConversationDate']) else default_time\n"," \n"," # Combine dates with times\n"," dfp.at[dfindex, 'StartTime'] = datetime.combine(date.today() - timedelta(days=daysback), start_time)\n"," dfp.at[dfindex, 'EndTime'] = datetime.combine(date.today() - timedelta(days=daysback), end_time)\n"," dfp.at[dfindex, 'ConversationDate'] = datetime.combine(date.today() - timedelta(days=daysback), conv_time)\n"," \n"," dfindex += 1\n"," daysback += 1\n","\n","# Convert back to Spark DataFrame and save\n","df = spark.createDataFrame(dfp)\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"true\").saveAsTable('ckm_conv_processed_temp')\n","df = spark.sql(\"SELECT * FROM ckm_conv_processed_temp\")\n","df.write.format('delta').mode('overwrite').option(\"overwriteSchema\", \"false\").saveAsTable('ckm_conv_processed')"]}],"metadata":{"dependencies":{"lakehouse":{"default_lakehouse":"e6ad9dad-e3da-4da5-bca6-6572c466b69a","default_lakehouse_name":"ckm_lakehouse","default_lakehouse_workspace_id":"0d98d480-171b-4b4d-a8e7-80fbd031d1a6","known_lakehouses":[{"id":"e6ad9dad-e3da-4da5-bca6-6572c466b69a"}]}},"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"display_name":"Synapse PySpark","language":"Python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"nteract":{"version":"[email protected]"},"spark_compute":{"compute_id":"/trident/default"},"synapse_widget":{"state":{},"version":"0.1"},"widgets":{}},"nbformat":4,"nbformat_minor":5}

0 commit comments

Comments
 (0)