From be9d8cc0a8258b77ef28c4c399943df5e55d9e6c Mon Sep 17 00:00:00 2001 From: tridao Date: Tue, 26 Apr 2022 10:07:15 -0300 Subject: [PATCH] support parallel encoding --- main.go | 11 ++++++++++- scripts/test.py | 32 ++++++++++++++++---------------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/main.go b/main.go index aabc868..1b4a422 100644 --- a/main.go +++ b/main.go @@ -48,9 +48,18 @@ func evalFileInto(file, mimetype string, out *os.File) error { return fmt.Errorf("Unknown mimetype for file: %s.\n", file) } + parallelEncoding := false + stat, err := os.Stat(file) + if err != nil { + return err + } + if stat.Size() > runner.ParallelEncodingMin { + parallelEncoding = true + } + return runner.TransformFile(file, runner.ContentTypeInfo{ Type: mimetype, - }, out) + }, out, parallelEncoding) } func getShape(resultFile, panelId string) (*runner.Shape, error) { diff --git a/scripts/test.py b/scripts/test.py index 9d09da9..5874b65 100755 --- a/scripts/test.py +++ b/scripts/test.py @@ -263,27 +263,27 @@ def test(name, to_run, want, fail=False, sort=False, winSkip=False, within_secon os.remove(f) to_run = """ -./dsq --cache taxi.csv "SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC" +./dsq --cache taxi.csv "SELECT passenger_count, COUNT(*), ROUND(AVG(total_amount),10) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC" """ want = """ -[{"AVG(total_amount)":17.641883306799908,"passenger_count":"1","COUNT(*)":1533197}, -{"AVG(total_amount)":18.097587071145647,"passenger_count":"2","COUNT(*)":286461}, -{"AVG(total_amount)":32.23715114825533,"passenger_count":"","COUNT(*)":128020}, -{"AVG(total_amount)":17.915395871092315,"passenger_count":"3","COUNT(*)":72852}, -{"AVG(total_amount)":17.270924817567234,"passenger_count":"5","COUNT(*)":50291}, -{"passenger_count":"0","COUNT(*)":42228,"AVG(total_amount)":17.021401676615067}, -{"passenger_count":"6","COUNT(*)":32623,"AVG(total_amount)":17.600296416636713}, -{"passenger_count":"4","COUNT(*)":25510,"AVG(total_amount)":18.452774990196012}, -{"COUNT(*)":2,"AVG(total_amount)":95.705,"passenger_count":"8"}, -{"passenger_count":"7","COUNT(*)":2,"AVG(total_amount)":87.17}, -{"passenger_count":"9","COUNT(*)":1,"AVG(total_amount)":113.6}]""" +[{"COUNT(*)": 1533197, "ROUND(AVG(total_amount),10)": 17.6418833068, "passenger_count": "1"}, +{"COUNT(*)": 286461, "ROUND(AVG(total_amount),10)": 18.0975870711, "passenger_count": "2"}, +{"COUNT(*)": 128020, "ROUND(AVG(total_amount),10)": 32.2371511483, "passenger_count": ""}, +{"COUNT(*)": 72852, "ROUND(AVG(total_amount),10)": 17.9153958711, "passenger_count": "3"}, +{"COUNT(*)": 50291, "ROUND(AVG(total_amount),10)": 17.2709248176, "passenger_count": "5"}, +{"COUNT(*)": 42228, "ROUND(AVG(total_amount),10)": 17.0214016766, "passenger_count": "0"}, +{"COUNT(*)": 32623, "ROUND(AVG(total_amount),10)": 17.6002964166, "passenger_count": "6"}, +{"COUNT(*)": 25510, "ROUND(AVG(total_amount),10)": 18.4527749902, "passenger_count": "4"}, +{"COUNT(*)": 2, "ROUND(AVG(total_amount),10)": 95.705, "passenger_count": "8"}, +{"COUNT(*)": 2, "ROUND(AVG(total_amount),10)": 87.17, "passenger_count": "7"}, +{"COUNT(*)": 1, "ROUND(AVG(total_amount),10)": 113.6, "passenger_count": "9"}]""" want_stderr = "Cache invalid, re-import required.\n" cmd("curl https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-04.csv -o taxi.csv", doNotReplaceWin=True) test("Caching from file (first time so import is required)", to_run, want, want_stderr=want_stderr, sort=True) to_run = """ -cat taxi.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC' +cat taxi.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), ROUND(AVG(total_amount),10) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC' """ test("Caching from pipe (second time so import not required)", to_run, want, sort=True, winSkip=True, within_seconds=5) @@ -300,9 +300,9 @@ def test(name, to_run, want, fail=False, sort=False, winSkip=False, within_secon f.write(''.join(lines)) to_run = """ -cat taxi.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), AVG(total_amount) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC'""" -want = """[{"COUNT(*)":9,"AVG(total_amount)":20.571111111111115,"passenger_count":"1"}, -{"passenger_count":"0","COUNT(*)":1,"AVG(total_amount)":43.67}]""" +cat taxi.csv | ./dsq --cache -s csv 'SELECT passenger_count, COUNT(*), ROUND(AVG(total_amount),10) FROM {} GROUP BY passenger_count ORDER BY COUNT(*) DESC'""" +want = """[{"COUNT(*)": 9, "ROUND(AVG(total_amount),10)": 20.5711111111, "passenger_count": "1"}, +{"COUNT(*)": 1, "ROUND(AVG(total_amount),10)": 43.67, "passenger_count": "0"}]""" want_stderr = "Cache invalid, re-import required.\n" test("Re-imports when file changes with cache on", to_run, want, want_stderr=want_stderr, sort=True, winSkip=True)