Skip to content

Commit 0b878f3

Browse files
authored
[ExcelExtractor] Add support for Schema (#1864)
* [ExcelExtractor] Add support for `Schema` * [ExcelExtractor] Add support for `Schema`
1 parent 6e957ce commit 0b878f3

File tree

2 files changed

+112
-85
lines changed

2 files changed

+112
-85
lines changed

src/adapter/etl-adapter-excel/src/Flow/ETL/Adapter/Excel/ExcelExtractor.php

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
Adapter\Excel\Sheet\SheetsManager,
1010
Exception\InvalidArgumentException,
1111
Extractor,
12-
FlowContext
13-
};
12+
FlowContext,
13+
Schema};
1414
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
1515
use Flow\Filesystem\{Path, SourceStream};
1616
use OpenSpout\Common\Entity\{Cell, Row};
@@ -28,6 +28,8 @@ final class ExcelExtractor implements Extractor, FileExtractor, LimitableExtract
2828

2929
private XlsxReader|OdsReader|null $reader = null;
3030

31+
private ?Schema $schema = null;
32+
3133
private ?string $sheetName = null;
3234

3335
private bool $withHeader = true;
@@ -57,8 +59,7 @@ public function extract(FlowContext $context) : \Generator
5759
foreach ($context->streams()->list($this->path, $this->filter()) as $stream) {
5860
foreach ($this->extractRows($stream, $headers, $offset) as $row) {
5961
// Ensure $row is an array before passing to array_to_rows
60-
$rowArray = \is_array($row) ? $row : [];
61-
$signal = yield array_to_rows($rowArray, $context->entryFactory(), $stream->path()->partitions());
62+
$signal = yield array_to_rows(\is_array($row) ? $row : [], $context->entryFactory(), $stream->path()->partitions(), schema: $this->schema);
6263
$this->incrementReturnedRows();
6364

6465
if ($signal === Signal::STOP || $this->reachedLimit()) {
@@ -112,6 +113,13 @@ public function withReader(ExcelReader $reader) : self
112113
return $this;
113114
}
114115

116+
public function withSchema(Schema $schema) : self
117+
{
118+
$this->schema = $schema;
119+
120+
return $this;
121+
}
122+
115123
public function withSheetName(string $sheetName) : self
116124
{
117125
SheetNameAssertion::assert($sheetName);

src/adapter/etl-adapter-excel/tests/Flow/ETL/Adapter/Excel/Tests/Integration/ExcelExtractorTest.php

Lines changed: 100 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
namespace Flow\ETL\Adapter\Excel\Tests\Integration;
66

77
use function Flow\ETL\Adapter\Excel\DSL\from_excel;
8-
use function Flow\ETL\DSL\{config, df, flow_context};
8+
use function Flow\ETL\DSL\{config, df, flow_context, int_schema, schema, string_schema};
99
use Flow\ETL\Adapter\Excel\ExcelReader;
1010
use Flow\ETL\Exception\InvalidArgumentException;
11-
use Flow\ETL\{Extractor\Signal, Row, Rows};
11+
use Flow\ETL\{Extractor\Signal, Rows};
1212
use Flow\ETL\Tests\FlowTestCase;
1313
use Flow\Filesystem\{Partition, Path};
1414
use PHPUnit\Framework\Attributes\DataProvider;
@@ -68,19 +68,17 @@ public function test_extract_excel_file_with_limit(string $fixtureName) : void
6868
$extractor = from_excel($fixtureName);
6969
$extractor->changeLimit(5);
7070

71-
$total = 0;
71+
$rows = df()
72+
->extract($extractor)
73+
->fetch()
74+
->toArray();
7275

73-
foreach ($extractor->extract(flow_context(config())) as $rows) {
74-
$rows->each(function (Row $row) : void {
75-
$this->assertSame(
76-
['id', 'name', 'email'],
77-
\array_keys($row->toArray())
78-
);
79-
});
80-
$total += $rows->count();
81-
}
76+
self::assertCount(5, $rows);
8277

83-
self::assertSame(5, $total);
78+
foreach ($rows as $row) {
79+
self::assertSame(['id', 'name', 'email'], \array_keys($row));
80+
self::assertCount(3, $row);
81+
}
8482
}
8583

8684
#[DataProvider('provide_fixtures')]
@@ -89,19 +87,17 @@ public function test_extract_excel_file_with_offset(string $fixtureName) : void
8987
$extractor = from_excel($fixtureName);
9088
$extractor->withOffset(5);
9189

92-
$total = 0;
90+
$rows = df()
91+
->extract($extractor)
92+
->fetch()
93+
->toArray();
9394

94-
foreach ($extractor->extract(flow_context(config())) as $rows) {
95-
$rows->each(function (Row $row) : void {
96-
$this->assertSame(
97-
['id', 'name', 'email'],
98-
\array_keys($row->toArray())
99-
);
100-
});
101-
$total += $rows->count();
102-
}
95+
self::assertCount(7, $rows);
10396

104-
self::assertSame(7, $total);
97+
foreach ($rows as $row) {
98+
self::assertSame(['id', 'name', 'email'], \array_keys($row));
99+
self::assertCount(3, $row);
100+
}
105101
}
106102

107103
#[DataProvider('provide_fixtures')]
@@ -111,112 +107,135 @@ public function test_extract_excel_file_with_offset_without_header(string $fixtu
111107
$extractor->withHeader(false);
112108
$extractor->withOffset(5);
113109

114-
$total = 0;
110+
$rows = df()
111+
->extract($extractor)
112+
->fetch()
113+
->toArray();
115114

116-
foreach ($extractor->extract(flow_context(config())) as $rows) {
117-
$rows->each(function (Row $row) : void {
118-
$this->assertSame(
119-
['e00', 'e01', 'e02'],
120-
\array_keys($row->toArray())
121-
);
122-
});
123-
$total += $rows->count();
124-
}
115+
self::assertCount(6, $rows);
125116

126-
self::assertSame(6, $total);
117+
foreach ($rows as $row) {
118+
self::assertSame(['e00', 'e01', 'e02'], \array_keys($row));
119+
self::assertCount(3, $row);
120+
}
127121
}
128122

129123
#[DataProvider('provide_fixtures')]
130124
public function test_extract_excel_file_with_selected_sheet_name(string $fixtureName) : void
131125
{
132-
$extractor = from_excel($fixtureName);
133-
$extractor->withSheetName('Sheet2');
126+
$rows = df()
127+
->extract(
128+
from_excel($fixtureName)
129+
->withSheetName('Sheet2')
130+
)
131+
->fetch()
132+
->toArray();
134133

135-
$total = 0;
134+
self::assertCount(5, $rows);
136135

137-
foreach ($extractor->extract(flow_context(config())) as $rows) {
138-
$rows->each(function (Row $row) : void {
139-
$this->assertSame(
140-
['id', 'name', 'email'],
141-
\array_keys($row->toArray())
142-
);
143-
});
144-
$total += $rows->count();
136+
foreach ($rows as $row) {
137+
self::assertSame(['id', 'name', 'email'], \array_keys($row));
138+
self::assertCount(3, $row);
145139
}
146-
147-
self::assertSame(5, $total);
148140
}
149141

150142
#[DataProvider('provide_fixtures')]
151143
public function test_extract_excel_file_with_unknown_sheet_name(string $fixtureName) : void
152144
{
153-
$extractor = from_excel($fixtureName);
154-
$extractor->withSheetName('unknown');
155-
156145
$this->expectException(InvalidArgumentException::class);
157146
$this->expectExceptionMessage("Sheet with name: 'unknown' not found.");
158147

159-
iterator_to_array($extractor->extract(flow_context(config())));
148+
df()
149+
->extract(
150+
from_excel($fixtureName)
151+
->withSheetName('unknown')
152+
)
153+
->fetch()
154+
->toArray();
160155
}
161156

162157
#[DataProvider('provide_fixtures')]
163158
public function test_extract_excel_file_without_header(string $fixtureName) : void
164159
{
165-
$extractor = from_excel($fixtureName);
166-
$extractor->withHeader(false);
160+
$rows = df()
161+
->extract(
162+
from_excel($fixtureName)
163+
->withHeader(false)
164+
)
165+
->fetch()
166+
->toArray();
167167

168-
$total = 0;
168+
self::assertCount(10, $rows);
169169

170-
foreach ($extractor->extract(flow_context(config())) as $rows) {
171-
$rows->each(function (Row $row) : void {
172-
$this->assertSame(
173-
['e00', 'e01', 'e02'],
174-
\array_keys($row->toArray())
175-
);
176-
});
177-
$total += $rows->count();
170+
foreach ($rows as $row) {
171+
self::assertSame(['e00', 'e01', 'e02'], \array_keys($row));
172+
self::assertCount(3, $row);
178173
}
179-
180-
self::assertSame(10, $total);
181174
}
182175

183176
#[DataProvider('provide_nullable_fixtures')]
184177
public function test_extract_excel_nullable_file(string $fixtureName) : void
185178
{
186-
$extractor = from_excel($fixtureName);
179+
$rows = df()
180+
->extract(from_excel($fixtureName))
181+
->fetch()
182+
->toArray();
187183

188-
$total = 0;
184+
self::assertCount(5, $rows);
189185

190-
foreach ($extractor->extract(flow_context(config())) as $rows) {
191-
$rows->each(function (Row $row) : void {
192-
$this->assertSame(['id', 'name', 'email'], \array_keys($row->toArray()));
193-
$this->assertCount(3, $row->toArray());
194-
});
195-
$total += $rows->count();
186+
foreach ($rows as $row) {
187+
self::assertSame(['id', 'name', 'email'], \array_keys($row));
188+
self::assertCount(3, $row);
196189
}
190+
}
197191

198-
self::assertSame(5, $total);
192+
#[DataProvider('provide_fixtures')]
193+
public function test_extract_excel_puts_null_in_not_matching_schema_rows(string $fixtureName) : void
194+
{
195+
$rows = df()
196+
->extract(
197+
from_excel($fixtureName)
198+
->withSchema(
199+
schema(
200+
int_schema('id'),
201+
string_schema('name'),
202+
string_schema('email'),
203+
string_schema('missing'),
204+
)
205+
)
206+
)
207+
->fetch()
208+
->toArray();
209+
210+
foreach ($rows as $row) {
211+
self::assertNotSame([], $row);
212+
self::assertNull($row['missing']);
213+
}
199214
}
200215

201216
public function test_extract_with_unknown_file() : void
202217
{
203-
$extractor = from_excel(__DIR__ . '/../Fixtures/empty_file');
204-
205218
$this->expectException(InvalidArgumentException::class);
206219
$this->expectExceptionMessage('Unsupported file format: n/a');
207220

208-
iterator_to_array($extractor->extract(flow_context(config())));
221+
df()
222+
->extract(from_excel(__DIR__ . '/../Fixtures/empty_file'))
223+
->fetch()
224+
->toArray();
209225
}
210226

211227
public function test_extract_with_wrongly_selected_reader() : void
212228
{
213-
$extractor = from_excel(__DIR__ . '/../Fixtures/fixture.xlsx');
214-
$extractor->withReader(ExcelReader::ODS);
215-
216229
$this->expectException(InvalidArgumentException::class);
217230
$this->expectExceptionMessage('Failed to open file: Could not open');
218231

219-
iterator_to_array($extractor->extract(flow_context(config())));
232+
df()
233+
->extract(
234+
from_excel(__DIR__ . '/../Fixtures/fixture.xlsx')
235+
->withReader(ExcelReader::ODS)
236+
)
237+
->fetch()
238+
->toArray();
220239
}
221240

222241
public function test_loading_data_from_all_partitions() : void

0 commit comments

Comments
 (0)