Skip to content

Commit 0e140a5

Browse files
authored
[XMLParserExtractor] Add support for Schema (#1866)
1 parent d8a278d commit 0e140a5

File tree

2 files changed

+50
-19
lines changed

2 files changed

+50
-19
lines changed

src/adapter/etl-adapter-xml/src/Flow/ETL/Adapter/XML/XMLParserExtractor.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
use function Flow\ETL\DSL\array_to_rows;
88
use Flow\ETL\Exception\RuntimeException;
99
use Flow\ETL\Extractor\{FileExtractor, Limitable, LimitableExtractor, PathFiltering, Signal};
10-
use Flow\ETL\{Extractor, FlowContext};
10+
use Flow\ETL\{Extractor, FlowContext, Schema};
1111
use Flow\Filesystem\Path;
1212

1313
final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExtractor
@@ -34,6 +34,8 @@ final class XMLParserExtractor implements Extractor, FileExtractor, LimitableExt
3434

3535
private ?\XMLParser $parser = null;
3636

37+
private ?Schema $schema = null;
38+
3739
private ?\XMLWriter $writer = null;
3840

3941
private string $xmlNodePath = '';
@@ -105,7 +107,7 @@ public function extract(FlowContext $context) : \Generator
105107
$rowData = ['node' => $this->createDOMNode($element)];
106108
}
107109

108-
$signal = yield array_to_rows($rowData, $context->entryFactory(), $stream->path()->partitions());
110+
$signal = yield array_to_rows($rowData, $context->entryFactory(), $stream->path()->partitions(), $this->schema);
109111

110112
$this->incrementReturnedRows();
111113

@@ -190,6 +192,13 @@ public function withBufferSize(int $bufferSize) : self
190192
return $this;
191193
}
192194

195+
public function withSchema(Schema $schema) : self
196+
{
197+
$this->schema = $schema;
198+
199+
return $this;
200+
}
201+
193202
public function withXMLNodePath(string $xmlNodePath) : self
194203
{
195204
$this->xmlNodePath = $xmlNodePath;

src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Integration/XMLParserExtractorTest.php

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
namespace Flow\ETL\Adapter\XML\Tests\Integration;
66

77
use function Flow\ETL\Adapter\XML\from_xml;
8-
use function Flow\ETL\DSL\{config, data_frame};
9-
use function Flow\ETL\DSL\{flow_context};
8+
use function Flow\ETL\DSL\{config};
9+
use function Flow\ETL\DSL\{df, flow_context, schema, xml_schema};
1010
use function Flow\Types\DSL\type_string;
1111
use Flow\ETL\{Adapter\XML\XMLParserExtractor, Tests\FlowIntegrationTestCase};
1212
use Flow\ETL\Extractor\Signal;
@@ -16,20 +16,23 @@ final class XMLParserExtractorTest extends FlowIntegrationTestCase
1616
{
1717
public function test_limit() : void
1818
{
19-
$extractor = (new XMLParserExtractor(Path::realpath(__DIR__ . '/../Fixtures/flow_orders.xml')))->withXMLNodePath('root/row');
19+
$extractor = from_xml(Path::realpath(__DIR__ . '/../Fixtures/flow_orders.xml'))
20+
->withXMLNodePath('root/row');
2021
$extractor->changeLimit(2);
2122

22-
self::assertCount(
23-
2,
24-
\iterator_to_array($extractor->extract(flow_context(config())))
25-
);
23+
$rows = df()
24+
->extract($extractor)
25+
->fetch()
26+
->toArray();
27+
28+
self::assertCount(2, $rows);
2629
}
2730

2831
public function test_reading_deep_xml() : void
2932
{
30-
self::assertEquals(
33+
self::assertSame(
3134
5,
32-
(data_frame())
35+
df()
3336
->read(from_xml(__DIR__ . '/../Fixtures/deepest_items_flat.xml', 'root/items/item/deep'))
3437
->fetch()
3538
->count()
@@ -38,12 +41,9 @@ public function test_reading_deep_xml() : void
3841

3942
public function test_reading_xml() : void
4043
{
41-
$xml = new \DOMDocument();
42-
$xml->load(__DIR__ . '/../Fixtures/simple_items.xml');
43-
44-
self::assertEquals(
44+
self::assertSame(
4545
1,
46-
(data_frame())
46+
df()
4747
->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml'))
4848
->fetch()
4949
->count()
@@ -59,7 +59,7 @@ public function test_reading_xml_each_collection_item() : void
5959
</item>
6060
XML,
6161
type_string()->cast(
62-
(data_frame())
62+
df()
6363
->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item'))
6464
->fetch()[0]
6565
->valueOf('node')
@@ -73,7 +73,7 @@ public function test_reading_xml_each_collection_item() : void
7373
</item>
7474
XML,
7575
type_string()->cast(
76-
(data_frame())
76+
df()
7777
->read(from_xml(__DIR__ . '/../Fixtures/simple_items_flat.xml', 'root/items/item'))
7878
->fetch()[4]
7979
->valueOf('node')
@@ -104,13 +104,35 @@ public function test_reading_xml_from_path() : void
104104
</items>
105105
XML,
106106
type_string()->cast(
107-
(data_frame())
107+
df()
108108
->read(from_xml(__DIR__ . '/../Fixtures/simple_items.xml', 'root/items'))
109109
->fetch()[0]->valueOf('node')
110110
)
111111
);
112112
}
113113

114+
public function test_reading_xml_with_schema() : void
115+
{
116+
$rows = df()
117+
->extract(
118+
from_xml(__DIR__ . '/../Fixtures/simple_items.xml')
119+
->withSchema(
120+
schema(
121+
xml_schema('node'),
122+
xml_schema('missing'),
123+
)
124+
)
125+
)
126+
->fetch()
127+
->toArray();
128+
129+
foreach ($rows as $row) {
130+
self::assertNotSame([], $row);
131+
self::assertNotNull($row['node']);
132+
self::assertNull($row['missing']);
133+
}
134+
}
135+
114136
public function test_signal_stop() : void
115137
{
116138
$extractor = (new XMLParserExtractor(Path::realpath(__DIR__ . '/../Fixtures/flow_orders.xml')))->withXMLNodePath('root/row');

0 commit comments

Comments
 (0)