Skip to content

Commit 5dac3dd

Browse files
authored
Merge pull request #1622 from oracle-devrel/sylwesterdec-patch-7
Create readme.md
2 parents 4a405e7 + 631dbda commit 5dac3dd

File tree

1 file changed

+58
-0
lines changed
  • data-platform/open-source-data-platforms/oci-data-flow/code-examples/Streaming_from_ObjectStorage

1 file changed

+58
-0
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# OCI Data Flow Reading files from Object Storage in Streaming mode
2+
3+
Sometimes you would like to continously monitor a Object Storage (S3 compatible) location and incrementally process new incoming data.</br>
4+
With Spark we can create a StreamingQuery using ObjectStorage source and process data from files in streaming mode .... without streaming platform.
5+
All we need is to use spark.readStream with a location - object storage or S3 compatible.
6+
It looks like this:
7+
8+
## START
9+
10+
### Define source and target locations
11+
namespace = 'objstr_namespace' </br>
12+
source_bucket = 'src_bucket'</br>
13+
inc_folder = 'inc_folder'</br>
14+
target_bucket = 'trg_bucket'</br>
15+
target_folder = 'trg_folder'</br>
16+
checkpoint_bucket = 'check_bucket'</br>
17+
checkpoint_folder = 'check_folder'</br>
18+
input_path = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder</br>
19+
archivelocation = 'oci://archivebucket+'@'+namespace+'/arch_folder'</br>
20+
21+
### Infer schema from sample file
22+
example_file = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder+'/example_file.parquet'</br>
23+
example = spark.read.option("basePath", input_path).option("inferSchema", "true").parquet(example_file)</br>
24+
schema = example.schema</br>
25+
26+
### Read files in streaming mode - streaming query
27+
kafka = spark.readStream.schema(schema).parquet(input_path)</br>
28+
stream_path = 'oci://'+target_bucket+'@'+namespace+'/'+target_folder</br>
29+
30+
wr = kafka.writeStream.queryName('StreamObjects').format("parquet").option("path", stream_path).option("checkpointLocation", 'oci://'+checkpoint_bucket+'@'+namespace+'/'+checkpoint_folder).option("cleanSource", "archive").option("sourceArchiveDir", archivelocation).start()
31+
32+
### Stop streaming query
33+
wr.awaitTermination(60)</br>
34+
wr.stop()</br>
35+
36+
### Check streamed data
37+
nd = spark.read.option("inferSchema", "true").parquet(stream_path+'/*.parquet')</br>
38+
nd.count()</br>
39+
40+
## END of code
41+
42+
## Additional comments:
43+
You may to provide :</br>
44+
option("checkpointLocation") - to persist medatada about processed files</br>
45+
Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off.</br>
46+
Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive.</br>
47+
48+
49+
How to use this asset?
50+
Review the code in the notebook and add the code to your personal OCI Data Flow application.
51+
52+
License
53+
54+
Copyright (c) 2024 Oracle and/or its affiliates.
55+
56+
Licensed under the Universal Permissive License (UPL), Version 1.0.
57+
58+
See LICENSE for more details.

0 commit comments

Comments
 (0)