Skip to content

Commit cb74296

Browse files
fix: Implement batch insert for processed records in data processing scripts
1 parent fa5ea9b commit cb74296

File tree

2 files changed

+92
-22
lines changed

2 files changed

+92
-22
lines changed

infra/scripts/index_scripts/03_cu_process_data_text.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ def create_tables():
314314
async def process_files():
315315
"""Process all files with async embeddings client."""
316316
conversationIds, docs, counter = [], [], 0
317+
processed_records = [] # Collect all records for batch insert
317318

318319
# Create embeddings client for entire processing session
319320
async with (
@@ -352,11 +353,21 @@ async def process_files():
352353
key_phrases = fields['keyPhrases']['valueString']
353354
complaint = fields['complaint']['valueString']
354355
content = fields['content']['valueString']
355-
cursor.execute(
356-
"INSERT INTO processed_data (ConversationId, EndTime, StartTime, Content, summary, satisfied, sentiment, topic, key_phrases, complaint) VALUES (?,?,?,?,?,?,?,?,?,?)",
357-
(conversation_id, end_timestamp, start_timestamp, content, summary, satisfied, sentiment, topic, key_phrases, complaint)
358-
)
359-
conn.commit()
356+
357+
# Collect record for batch insert
358+
processed_records.append({
359+
'ConversationId': conversation_id,
360+
'EndTime': end_timestamp,
361+
'StartTime': start_timestamp,
362+
'Content': content,
363+
'summary': summary,
364+
'satisfied': satisfied,
365+
'sentiment': sentiment,
366+
'topic': topic,
367+
'key_phrases': key_phrases,
368+
'complaint': complaint
369+
})
370+
360371
docs.extend(await prepare_search_doc(content, conversation_id, path.name, embeddings_client))
361372
counter += 1
362373
except Exception:
@@ -367,6 +378,12 @@ async def process_files():
367378
if docs:
368379
search_client.upload_documents(documents=docs)
369380

381+
# Batch insert all processed records using optimized SQL script
382+
if processed_records:
383+
df_processed = pd.DataFrame(processed_records)
384+
columns = ['ConversationId', 'EndTime', 'StartTime', 'Content', 'summary', 'satisfied', 'sentiment', 'topic', 'key_phrases', 'complaint']
385+
generate_sql_insert_script(df_processed, 'processed_data', columns, 'processed_data_batch_insert.sql')
386+
370387
return conversationIds, counter
371388

372389

@@ -383,7 +400,7 @@ def bulk_import_json_to_table(json_file, table_name):
383400
return
384401

385402
df = pd.DataFrame(data)
386-
generate_sql_insert_script(df, table_name, list(df.columns), f'{table_name}_import.sql')
403+
generate_sql_insert_script(df, table_name, list(df.columns), f'sample_import_{table_name}.sql')
387404

388405

389406
with open(file=SAMPLE_IMPORT_FILE, mode='r', encoding='utf-8') as file:
@@ -576,21 +593,34 @@ async def map_all_topics():
576593
"keyphrases", "complaint", "topic"]
577594

578595
df_km = pd.DataFrame([list(row) for row in rows], columns=columns)
579-
generate_sql_insert_script(df_km, 'km_processed_data', columns, 'km_processed_data_insert.sql')
596+
generate_sql_insert_script(df_km, 'km_processed_data', columns, 'processed_km_data_with_mined_topics.sql')
580597

581598
# Update processed_data_key_phrases table
582599
cursor.execute('''select ConversationId, key_phrases, sentiment, mined_topic as topic, StartTime from processed_data''')
583600
rows = [tuple(row) for row in cursor.fetchall()]
584601
column_names = [i[0] for i in cursor.description]
585602
df = pd.DataFrame(rows, columns=column_names)
586603
df = df[df['ConversationId'].isin(conversationIds)]
604+
605+
# Collect all key phrase records for batch insert
606+
key_phrase_records = []
587607
for _, row in df.iterrows():
588608
key_phrases = row['key_phrases'].split(',')
589609
for key_phrase in key_phrases:
590610
key_phrase = key_phrase.strip()
591-
cursor.execute("INSERT INTO processed_data_key_phrases (ConversationId, key_phrase, sentiment, topic, StartTime) VALUES (?,?,?,?,?)",
592-
(row['ConversationId'], key_phrase, row['sentiment'], row['topic'], row['StartTime']))
593-
conn.commit()
611+
key_phrase_records.append({
612+
'ConversationId': row['ConversationId'],
613+
'key_phrase': key_phrase,
614+
'sentiment': row['sentiment'],
615+
'topic': row['topic'],
616+
'StartTime': row['StartTime']
617+
})
618+
619+
# Batch insert using optimized SQL script
620+
if key_phrase_records:
621+
df_key_phrases = pd.DataFrame(key_phrase_records)
622+
columns = ['ConversationId', 'key_phrase', 'sentiment', 'topic', 'StartTime']
623+
generate_sql_insert_script(df_key_phrases, 'processed_data_key_phrases', columns, 'processed_new_key_phrases.sql')
594624

595625
# Adjust dates to current date
596626
today = datetime.today()

infra/scripts/index_scripts/04_cu_process_custom_data.py

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ def create_tables():
359359
async def process_files():
360360
"""Process all files with async embeddings client."""
361361
conversationIds, docs, counter = [], [], 0
362+
processed_records = [] # Collect all records for batch insert
362363

363364
# Create embeddings client for entire processing session
364365
async with (
@@ -395,11 +396,21 @@ async def process_files():
395396
key_phrases = fields['keyPhrases']['valueString']
396397
complaint = fields['complaint']['valueString']
397398
content = fields['content']['valueString']
398-
cursor.execute(
399-
"INSERT INTO processed_data (ConversationId, EndTime, StartTime, Content, summary, satisfied, sentiment, topic, key_phrases, complaint) VALUES (?,?,?,?,?,?,?,?,?,?)",
400-
(conversation_id, end_timestamp, start_timestamp, content, summary, satisfied, sentiment, topic, key_phrases, complaint)
401-
)
402-
conn.commit()
399+
400+
# Collect record for batch insert
401+
processed_records.append({
402+
'ConversationId': conversation_id,
403+
'EndTime': end_timestamp,
404+
'StartTime': start_timestamp,
405+
'Content': content,
406+
'summary': summary,
407+
'satisfied': satisfied,
408+
'sentiment': sentiment,
409+
'topic': topic,
410+
'key_phrases': key_phrases,
411+
'complaint': complaint
412+
})
413+
403414
docs.extend(await prepare_search_doc(content, conversation_id, path.name, embeddings_client))
404415
counter += 1
405416
except Exception:
@@ -449,9 +460,19 @@ async def process_files():
449460
complaint = result['result']['contents'][0]['fields']['complaint']['valueString']
450461
content = result['result']['contents'][0]['fields']['content']['valueString']
451462

452-
cursor.execute("INSERT INTO processed_data (ConversationId, EndTime, StartTime, Content, summary, satisfied, sentiment, topic, key_phrases, complaint) VALUES (?,?,?,?,?,?,?,?,?,?)",
453-
(conversation_id, end_timestamp, start_timestamp, content, summary, satisfied, sentiment, topic, key_phrases, complaint))
454-
conn.commit()
463+
# Collect record for batch insert
464+
processed_records.append({
465+
'ConversationId': conversation_id,
466+
'EndTime': end_timestamp,
467+
'StartTime': start_timestamp,
468+
'Content': content,
469+
'summary': summary,
470+
'satisfied': satisfied,
471+
'sentiment': sentiment,
472+
'topic': topic,
473+
'key_phrases': key_phrases,
474+
'complaint': complaint
475+
})
455476

456477
document_id = conversation_id
457478
docs.extend(await prepare_search_doc(content, document_id, path.name, embeddings_client))
@@ -468,6 +489,12 @@ async def process_files():
468489

469490
print(f"✓ Processed {counter} audio files")
470491

492+
# Batch insert all processed records using optimized SQL script
493+
if processed_records:
494+
df_processed = pd.DataFrame(processed_records)
495+
columns = ['ConversationId', 'EndTime', 'StartTime', 'Content', 'summary', 'satisfied', 'sentiment', 'topic', 'key_phrases', 'complaint']
496+
generate_sql_insert_script(df_processed, 'processed_data', columns, 'custom_processed_data_batch_insert.sql')
497+
471498
return conversationIds
472499

473500
# Run the async file processing
@@ -647,7 +674,7 @@ async def map_all_topics():
647674
"keyphrases", "complaint", "topic"]
648675

649676
df_km = pd.DataFrame([list(row) for row in rows], columns=columns)
650-
record_count = generate_sql_insert_script(df_km, 'km_processed_data', columns, 'km_processed_data_insert.sql')
677+
record_count = generate_sql_insert_script(df_km, 'km_processed_data', columns, 'custom_km_data_with_mined_topics.sql')
651678
print(f"✓ Loaded {record_count} sample records")
652679

653680
# Update processed_data_key_phrases table
@@ -656,13 +683,26 @@ async def map_all_topics():
656683
column_names = [i[0] for i in cursor.description]
657684
df = pd.DataFrame(rows, columns=column_names)
658685
df = df[df['ConversationId'].isin(conversationIds)]
686+
687+
# Collect all key phrase records for batch insert
688+
key_phrase_records = []
659689
for _, row in df.iterrows():
660690
key_phrases = row['key_phrases'].split(',')
661691
for key_phrase in key_phrases:
662692
key_phrase = key_phrase.strip()
663-
cursor.execute("INSERT INTO processed_data_key_phrases (ConversationId, key_phrase, sentiment, topic, StartTime) VALUES (?,?,?,?,?)",
664-
(row['ConversationId'], key_phrase, row['sentiment'], row['topic'], row['StartTime']))
665-
conn.commit()
693+
key_phrase_records.append({
694+
'ConversationId': row['ConversationId'],
695+
'key_phrase': key_phrase,
696+
'sentiment': row['sentiment'],
697+
'topic': row['topic'],
698+
'StartTime': row['StartTime']
699+
})
700+
701+
# Batch insert using optimized SQL script
702+
if key_phrase_records:
703+
df_key_phrases = pd.DataFrame(key_phrase_records)
704+
columns = ['ConversationId', 'key_phrase', 'sentiment', 'topic', 'StartTime']
705+
generate_sql_insert_script(df_key_phrases, 'processed_data_key_phrases', columns, 'custom_new_key_phrases.sql')
666706

667707
# Adjust dates to current date
668708
today = datetime.today()

0 commit comments

Comments
 (0)