Skip to content

Commit 77d50e7

Browse files
authored
Merge pull request #3 from acompany-develop/feature/hiraoka/CC-856/reformat-handler.py
[CC-856] handler.pyの可読性を上げる
2 parents a4263db + d377053 commit 77d50e7

File tree

6 files changed

+332
-185
lines changed

6 files changed

+332
-185
lines changed

functions/cross_table/README.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# Cross Table Function
2+
3+
## 概要
4+
5+
Cross Table Functionは、2つのCSVファイルを結合し、クロス集計表を作成する関数です。両方のデータフレームの最左列をキーとして使用し、内部結合(INNER JOIN)を実行した後、属性列の組み合わせごとに集計を行います。Polarsライブラリを使用して高速なデータ処理を実現します。
6+
7+
## 機能
8+
9+
- **データ結合**: 2つのCSVファイルを最左列をキーとして結合
10+
- **列名リネーム**: 結合前に列名にプレフィックス(0:, 1:)を付与
11+
- **クロス集計**: 属性列の組み合わせごとに件数を集計
12+
- **閾値フィルタリング**: 集計数が閾値未満の行を除外
13+
- **ログ出力**: 処理経過とエラー情報をログファイルに記録
14+
- **処理結果**: 結果をcsvファイルに格納して返す
15+
16+
## 入力データ
17+
18+
### input_a.csv (user1専用入力データ)
19+
- **パス**: `/work/inputs/input_1/input_a.csv`
20+
- **形式**: CSVファイル
21+
- **要件**: 最左列が結合キーとして使用される
22+
23+
### input_b.csv (user2専用入力データ)
24+
- **パス**: `/work/inputs/input_2/input_b.csv`
25+
- **形式**: CSVファイル
26+
- **要件**: 最左列が結合キーとして使用される
27+
28+
## 出力データ
29+
30+
### output.csv (user1専用出力データ)
31+
- **パス**: `/work/outputs/output_1/output.csv`
32+
- **形式**: CSVファイル
33+
- **内容**: クロス集計結果(number_of_rows列が先頭、属性列がソート済み)
34+
35+
### output.csv (user2専用出力データ)
36+
- **パス**: `/work/outputs/output_2/output.csv`
37+
- **形式**: CSVファイル
38+
- **内容**: クロス集計結果(number_of_rows列が先頭、属性列がソート済み)
39+
40+
### ログファイル
41+
- **app.log**: 処理ログ(`/work/outputs/output_1/app.log``/work/outputs/output_2/app.log`
42+
43+
## アルゴリズム
44+
45+
1. **データ読み込み**
46+
- `input_a.csv`をLazyFrameとして読み込み
47+
- `input_b.csv`をLazyFrameとして読み込み
48+
49+
2. **キー抽出**
50+
- 両データフレームの最左列を結合キーとして抽出
51+
52+
3. **列名リネーム**
53+
- input_aの列名に`0:`プレフィックスを付与(キー列以外)
54+
- input_bの列名に`1:`プレフィックスを付与(キー列以外)
55+
56+
4. **データ結合**
57+
- `polars.join()`を使用して内部結合を実行
58+
- `left_on`: input_aの最左列
59+
- `right_on`: input_bの最左列
60+
61+
5. **クロス集計**
62+
- 属性列(キー列以外)でグループ化
63+
- 各組み合わせの件数を集計
64+
65+
6. **閾値フィルタリング**
66+
- 集計数が閾値(THRESHOLD=2)未満の行を除外
67+
68+
7. **列順序整理**
69+
- `number_of_rows`列を先頭に配置
70+
- 属性列をアルファベット順にソート
71+
72+
8. **結果保存**
73+
- クロス集計結果を両方の出力ディレクトリに保存
74+
75+
## 使用例
76+
77+
### 入力ファイル例
78+
79+
**input_a.csv**:
80+
```csv
81+
id,age,gender
82+
1,25,F
83+
2,30,M
84+
3,35,M
85+
4,28,F
86+
5,25,F
87+
6,30,M
88+
```
89+
90+
**input_b.csv**:
91+
```csv
92+
id,city,department
93+
1,Tokyo,Sales
94+
2,Osaka,Marketing
95+
3,Tokyo,Engineering
96+
4,Kyoto,HR
97+
5,Tokyo,Sales
98+
6,Osaka,Marketing
99+
```
100+
101+
### 出力ファイル例
102+
103+
**output.csv** (user1用・user2用):
104+
```csv
105+
number_of_rows,0:age,0:gender,1:city,1:department
106+
2,25,F,Tokyo,Sales
107+
2,30,M,Osaka,Marketing
108+
```
109+
110+
## 設定パラメータ
111+
112+
- **THRESHOLD**: 集計数がこの値未満の行は出力されない(デフォルト: 2)
113+

functions/cross_table/function/handler.py

Lines changed: 80 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -3,144 +3,100 @@
33
import traceback
44
from datetime import datetime
55

6-
sys.path.insert(0, "/work/function/packages") # functionが依存するパッケージのパス
6+
# 依存ライブラリがある場合は、packagesディレクトリをパスに追加
7+
sys.path.append("/work/function/packages")
78

9+
import polars as pl
10+
11+
# 作業ディレクトリとI/Oディレクトリのパス設定
812
WORK_DIR = "/work"
9-
INPUT_A_PATH = f"{WORK_DIR}/inputs/input_1"
10-
INPUT_B_PATH = f"{WORK_DIR}/inputs/input_2"
11-
OUTPUT_A_PATH = f"{WORK_DIR}/outputs/output_1"
12-
OUTPUT_B_PATH = f"{WORK_DIR}/outputs/output_2"
13-
DOWNLOAD_DIR = "downloads/"
13+
INPUT_DIR = f"{WORK_DIR}/inputs"
14+
OUTPUT_DIR = f"{WORK_DIR}/outputs"
15+
INPUT_1_DIR = f"{INPUT_DIR}/input_1"
16+
INPUT_2_DIR = f"{INPUT_DIR}/input_2"
17+
OUTPUT_1_DIR = f"{OUTPUT_DIR}/output_1"
18+
OUTPUT_2_DIR = f"{OUTPUT_DIR}/output_2"
1419

1520
THRESHOLD = 2 # 集計数がこの値未満の行は出力されない
1621

1722

18-
def print_log(msg):
23+
def print_log(msg: str):
1924
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
20-
try:
21-
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
22-
with open(os.path.join(DOWNLOAD_DIR, "app.log"), "a") as log_file:
23-
log_file.write(f"[{current_time}]:[handler.py]: {msg}\n")
24-
os.makedirs(OUTPUT_A_PATH, exist_ok=True)
25-
with open(os.path.join(OUTPUT_A_PATH, "app.log"), "a") as log_file:
26-
log_file.write(f"[{current_time}]:[handler.py]: {msg}\n")
27-
os.makedirs(OUTPUT_B_PATH, exist_ok=True)
28-
with open(os.path.join(OUTPUT_B_PATH, "app.log"), "a") as log_file:
29-
log_file.write(f"[{current_time}]:[handler.py]: {msg}\n")
30-
except Exception:
31-
pass # 出力ディレクトリへの書き込みに失敗しても続行
32-
33-
34-
# メモリ使用量と実行時間を計測するための関数
35-
def get_memory_usage():
36-
"""現在のメモリ使用量を取得"""
37-
import psutil
38-
39-
process = psutil.Process(os.getpid())
40-
memory_info = process.memory_info()
41-
return memory_info.rss / 1024 / 1024 # MB単位
42-
43-
44-
def print_memory_usage(stage_name):
45-
"""メモリ使用量を表示"""
46-
try:
47-
memory_mb = get_memory_usage()
48-
print_log(f"[{stage_name}] メモリ使用量: {memory_mb:.2f} MB")
49-
except Exception as e:
50-
print_log(f"Failed to get memory usage: {e.__class__.__name__}")
51-
return 0
25+
for path in [OUTPUT_1_DIR, OUTPUT_2_DIR]:
26+
os.makedirs(path, exist_ok=True)
27+
with open(os.path.join(path, "app.log"), "a") as log_file:
28+
log_file.write(f"{current_time}:{msg}\n")
29+
30+
31+
def cross_table_data(input_1_path: str, input_2_path: str):
32+
print_log("cross_table_data: Started.")
33+
34+
# 入力データを読み込む(LazyFrameとして読み込み)
35+
lf_a = pl.scan_csv(input_1_path)
36+
lf_b = pl.scan_csv(input_2_path)
37+
38+
# キー列を特定
39+
key_a = lf_a.columns[0]
40+
key_b = lf_b.columns[0]
41+
42+
# 列名をリネーム
43+
cols_to_rename_a = [col for col in lf_a.columns if col != key_a]
44+
rename_map_a = {col: f"0:{col}" for col in cols_to_rename_a}
45+
lf_a_renamed = lf_a.rename(rename_map_a)
46+
47+
cols_to_rename_b = [col for col in lf_b.columns if col != key_b]
48+
rename_map_b = {col: f"1:{col}" for col in cols_to_rename_b}
49+
lf_b_renamed = lf_b.rename(rename_map_b)
50+
51+
# データを結合
52+
lf_joined = lf_a_renamed.join(
53+
lf_b_renamed, left_on=key_a, right_on=key_b, how="inner"
54+
)
55+
56+
# 属性列を特定(キー列以外)
57+
attribute_cols = [col for col in lf_joined.columns if col != key_a and col != key_b]
58+
59+
# グループ化して集計
60+
lf_summary = lf_joined.group_by(attribute_cols).agg(
61+
pl.count().alias("number_of_rows")
62+
)
63+
64+
# 閾値でフィルタリング
65+
lf_filtered = lf_summary.filter(pl.col("number_of_rows") >= THRESHOLD)
66+
67+
# 列の順序を整理(number_of_rowsを先頭に)
68+
sorted_cols = ["number_of_rows"] + sorted(attribute_cols)
69+
lf_final = lf_filtered.select(sorted_cols)
70+
71+
# 計算を実行して結果を取得
72+
final_result = lf_final.collect(streaming=True)
73+
74+
print_log("cross_table_data: Completed.")
75+
return final_result
5276

5377

5478
def run():
5579
try:
56-
print_memory_usage("開始時")
57-
print_log("handler.run: Started.")
58-
59-
import polars as pl
60-
61-
print_log("handler.run: Imported successfully.")
62-
63-
# 入力データを読み込む
64-
lf_a = pl.scan_csv(os.path.join(INPUT_A_PATH, "input_a.csv"))
65-
print_log("handler.run: Read input_a.csv successfully.")
66-
print_memory_usage("input_a.csv読み込み後")
67-
lf_b = pl.scan_csv(os.path.join(INPUT_B_PATH, "input_b.csv"))
68-
print_log("handler.run: Read input_b.csv successfully.")
69-
print_memory_usage("input_b.csv読み込み後")
70-
71-
# キー列を特定
72-
key_a = lf_a.columns[0]
73-
key_b = lf_b.columns[0]
74-
75-
# 2. Join前のリネーム処理
76-
# dataset_a の列名をリネーム (id以外)
77-
cols_to_rename_a = [col for col in lf_a.columns if col != key_a]
78-
rename_map_a = {col: f"0:{col}" for col in cols_to_rename_a}
79-
lf_a_renamed = lf_a.rename(rename_map_a)
80-
print_memory_usage("0_列名リネーム後")
81-
82-
# dataset_b の列名をリネーム (id以外)
83-
cols_to_rename_b = [col for col in lf_b.columns if col != key_b]
84-
rename_map_b = {col: f"1:{col}" for col in cols_to_rename_b}
85-
lf_b_renamed = lf_b.rename(rename_map_b)
86-
print_memory_usage("1_列名リネーム後")
87-
print_log("handler.run: Renamed columns successfully.")
88-
89-
# 3. リネーム済みのLazyFrameをJoin
90-
lf_joined = lf_a_renamed.join(
91-
lf_b_renamed, left_on=key_a, right_on=key_b, how="inner"
80+
print_log("run: Started.")
81+
82+
df_cross_table = cross_table_data(
83+
f"{INPUT_1_DIR}/input_a.csv", f"{INPUT_2_DIR}/input_b.csv"
9284
)
93-
print_log("handler.run: Merged successfully with leftmost columns.")
94-
print_memory_usage("Join後")
85+
print_log("run: Cross table data created.")
9586

96-
# 4. 全Attribute列でGroup By & Count
97-
# id以外の全ての列(a_...とb_...)をグループ化のキーに指定
98-
attribute_cols = [
99-
col for col in lf_joined.columns if col != key_a and col != key_b
100-
]
87+
# polarsのDataFrameを直接保存
88+
df_cross_table.write_csv(f"{OUTPUT_1_DIR}/output.csv")
89+
print_log("run: Saved output.csv to output_1.")
10190

102-
lf_summary = lf_joined.group_by(attribute_cols).agg(
103-
pl.count().alias("number_of_rows")
104-
)
105-
print_memory_usage("Group By & Count後")
106-
107-
# 5. 列の整形
108-
# number_of_rows を先頭に持ってくる
109-
# 列名をソートしてから指定
110-
sorted_cols = ["number_of_rows"] + sorted(attribute_cols)
111-
lf_final = lf_summary.select(sorted_cols)
112-
113-
filtered = lf_final.filter(pl.col("number_of_rows") >= THRESHOLD)
114-
print_log(f"handler.run: Filtered successfully with threshold {THRESHOLD}.")
115-
print_memory_usage("Filter後")
116-
117-
# 計算を実行して結果を表示
118-
final_result = filtered.collect(streaming=True)
119-
print_memory_usage("Collect後")
120-
121-
# CSV形式で出力
122-
try:
123-
os.makedirs(OUTPUT_A_PATH, exist_ok=True)
124-
os.makedirs(OUTPUT_B_PATH, exist_ok=True)
125-
126-
final_result.write_csv(os.path.join(OUTPUT_A_PATH, "output.csv"))
127-
print_log("handler.run: Saved a's output.csv successfully.")
128-
print_memory_usage("a's output.csv保存後")
129-
final_result.write_csv(os.path.join(OUTPUT_B_PATH, "output.csv"))
130-
print_log("handler.run: Saved b's output.csv successfully.")
131-
print_memory_usage("b's output.csv保存後")
132-
except Exception as e:
133-
print_log(f"handler.run: Error saving results: {str(e)}")
134-
135-
print_log("handler.run: DONE.")
136-
print_memory_usage("終了時")
91+
df_cross_table.write_csv(f"{OUTPUT_2_DIR}/output.csv")
92+
print_log("run: Saved output.csv to output_2.")
13793

94+
print_log("run: Completed.")
13895
except BaseException as e:
139-
print_log(f"handler.run: ERROR: {str(e)}")
140-
try:
141-
os.makedirs(DOWNLOAD_DIR, exist_ok=True)
142-
with open(os.path.join(DOWNLOAD_DIR, "error.log"), "w") as error_file:
143-
traceback.print_exc(file=error_file)
144-
except Exception:
145-
pass # エラーログの書き込みに失敗しても続行
96+
print_log(f"error type: {type(e).__name__}")
97+
98+
tb = traceback.extract_tb(e.__traceback__)
99+
if tb:
100+
for i, frame in enumerate(tb):
101+
print_log(f"error location {i + 1}: {frame.filename}:{frame.lineno}")
146102
raise e
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
numpy==1.26.4
2-
python-dateutil==2.8.2
32
pytz==2023.3
4-
six==1.16.0
53
tzdata==2023.3
64
polars==0.19.19
7-
psutil==5.9.5
85
pyarrow==14.0.1

0 commit comments

Comments
 (0)