|
7 | 7 |
|
8 | 8 |
|
9 | 9 | class TestTascomiParsingRefinement: |
10 | | - |
11 | 10 | def test_column_expansion(self, spark): |
12 | | - response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",' |
13 | | - ' "creation_user_id": null,' |
14 | | - ' "title_id": "4"}'}]) |
15 | | - expected = ['id', 'creation_user_id', 'title_id', |
16 | | - 'page_number', 'import_api_url_requested', 'import_api_status_code', |
17 | | - 'import_exception_thrown', 'import_datetime', 'import_timestamp', |
18 | | - 'import_year', 'import_month', 'import_day', 'import_date'] |
| 11 | + response = self.parse_json_into_dataframe( |
| 12 | + spark, |
| 13 | + "contacts", |
| 14 | + [ |
| 15 | + { |
| 16 | + "contacts": '{"id": "34607",' |
| 17 | + ' "creation_user_id": null,' |
| 18 | + ' "title_id": "4"}' |
| 19 | + } |
| 20 | + ], |
| 21 | + ) |
| 22 | + expected = [ |
| 23 | + "id", |
| 24 | + "creation_user_id", |
| 25 | + "title_id", |
| 26 | + "page_number", |
| 27 | + "import_api_url_requested", |
| 28 | + "import_api_status_code", |
| 29 | + "import_exception_thrown", |
| 30 | + "import_datetime", |
| 31 | + "import_timestamp", |
| 32 | + "import_year", |
| 33 | + "import_month", |
| 34 | + "import_day", |
| 35 | + "import_date", |
| 36 | + ] |
19 | 37 | TestCase().assertCountEqual(list(response[0]), expected) |
20 | 38 |
|
21 | 39 | def test_parsed_row_data(self, spark): |
22 | | - response = self.parse_json_into_dataframe(spark, 'contacts', [{'contacts': '{"id": "34607",' |
23 | | - ' "creation_user_id": null,' |
24 | | - ' "title_id": "4"}'}]) |
25 | | - expected = {'id': '34607', 'creation_user_id': None, 'title_id': '4', 'page_number': 691, |
26 | | - 'import_api_url_requested': 'https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691', |
27 | | - 'import_api_status_code': 200, 'import_exception_thrown': '', |
28 | | - 'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579', |
29 | | - 'import_year': '2021', 'import_month': '09', 'import_day': '16', |
30 | | - 'import_date': '20210916'} |
| 40 | + response = self.parse_json_into_dataframe( |
| 41 | + spark, |
| 42 | + "contacts", |
| 43 | + [ |
| 44 | + { |
| 45 | + "contacts": '{"id": "34607",' |
| 46 | + ' "creation_user_id": null,' |
| 47 | + ' "title_id": "4"}' |
| 48 | + } |
| 49 | + ], |
| 50 | + ) |
| 51 | + expected = { |
| 52 | + "id": "34607", |
| 53 | + "creation_user_id": None, |
| 54 | + "title_id": "4", |
| 55 | + "page_number": 691, |
| 56 | + "import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691", |
| 57 | + "import_api_status_code": 200, |
| 58 | + "import_exception_thrown": "", |
| 59 | + "import_datetime": datetime(2021, 9, 16, 13, 10), |
| 60 | + "import_timestamp": "1631797859.247579", |
| 61 | + "import_year": "2021", |
| 62 | + "import_month": "09", |
| 63 | + "import_day": "16", |
| 64 | + "import_date": "20210916", |
| 65 | + } |
31 | 66 | assertions.dictionaryContains(response[0], expected) |
32 | 67 |
|
33 | 68 | def parse_json_into_dataframe(self, spark, column, data): |
34 | | - data_with_imports = [{'page_number': 691, |
35 | | - 'import_api_url_requested': 'https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691', |
36 | | - 'import_api_status_code': 200, 'import_exception_thrown': '', |
37 | | - 'import_datetime': datetime(2021, 9, 16, 13, 10), 'import_timestamp': '1631797859.247579', |
38 | | - 'import_year': '2021', 'import_month': '09', 'import_day': '16', |
39 | | - 'import_date': '20210916', **i} for i in data] |
| 69 | + data_with_imports = [ |
| 70 | + { |
| 71 | + "page_number": 691, |
| 72 | + "import_api_url_requested": "https://hackney-planning.idoxcloud.com/rest/v1/contacts?page=691", |
| 73 | + "import_api_status_code": 200, |
| 74 | + "import_exception_thrown": "", |
| 75 | + "import_datetime": datetime(2021, 9, 16, 13, 10), |
| 76 | + "import_timestamp": "1631797859.247579", |
| 77 | + "import_year": "2021", |
| 78 | + "import_month": "09", |
| 79 | + "import_day": "16", |
| 80 | + "import_date": "20210916", |
| 81 | + **i, |
| 82 | + } |
| 83 | + for i in data |
| 84 | + ] |
40 | 85 | query_data = spark.createDataFrame( |
41 | | - spark.sparkContext.parallelize( |
42 | | - [Row(**i) for i in data_with_imports] |
43 | | - ) |
| 86 | + spark.sparkContext.parallelize([Row(**i) for i in data_with_imports]) |
44 | 87 | ) |
45 | | - return [row.asDict() for row in parse_json_into_dataframe(spark, column, query_data).rdd.collect()] |
| 88 | + return [ |
| 89 | + row.asDict() |
| 90 | + for row in parse_json_into_dataframe( |
| 91 | + spark, column, query_data |
| 92 | + ).rdd.collect() |
| 93 | + ] |
0 commit comments