|
1 | | -OCI Data Flow Reading files from Object Storage in Streaming mode |
| 1 | +# OCI Data Flow Reading files from Object Storage in Streaming mode |
2 | 2 |
|
3 | | -When to use this asset? |
| 3 | +Sometimes you would like to continously monitor a Object Storage (S3 compatible) location and incrementally process new incoming data.</br> |
| 4 | +With Spark we can create a StreamingQuery using ObjectStorage source and process data from files in streaming mode .... without streaming platform. |
| 5 | +All we need is to use spark.readStream with a location - object storage or S3 compatible. |
| 6 | +It looks like this: |
4 | 7 |
|
5 | | -When you need continously monitor a Object Storage (S3 location) and incrementally process new incoming data |
| 8 | +## START |
6 | 9 |
|
7 | | -How to use this asset? |
| 10 | +### Define source and target locations |
| 11 | +namespace = 'objstr_namespace' </br> |
| 12 | +source_bucket = 'src_bucket'</br> |
| 13 | +inc_folder = 'inc_folder'</br> |
| 14 | +target_bucket = 'trg_bucket'</br> |
| 15 | +target_folder = 'trg_folder'</br> |
| 16 | +checkpoint_bucket = 'check_bucket'</br> |
| 17 | +checkpoint_folder = 'check_folder'</br> |
| 18 | +input_path = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder</br> |
| 19 | +archivelocation = 'oci://archivebucket+'@'+namespace+'/arch_folder'</br> |
| 20 | + |
| 21 | +### Infer schema from sample file |
| 22 | +example_file = 'oci://'+source_bucket+'@'+namespace+'/'+inc_folder+'/example_file.parquet'</br> |
| 23 | +example = spark.read.option("basePath", input_path).option("inferSchema", "true").parquet(example_file)</br> |
| 24 | +schema = example.schema</br> |
| 25 | + |
| 26 | +### Read files in streaming mode - streaming query |
| 27 | +kafka = spark.readStream.schema(schema).parquet(input_path)</br> |
| 28 | +stream_path = 'oci://'+target_bucket+'@'+namespace+'/'+target_folder</br> |
| 29 | + |
| 30 | +wr = kafka.writeStream.queryName('StreamObjects').format("parquet").option("path", stream_path).option("checkpointLocation", 'oci://'+checkpoint_bucket+'@'+namespace+'/'+checkpoint_folder).option("cleanSource", "archive").option("sourceArchiveDir", archivelocation).start() |
| 31 | + |
| 32 | +### Stop streaming query |
| 33 | +wr.awaitTermination(60)</br> |
| 34 | +wr.stop()</br> |
8 | 35 |
|
| 36 | +### Check streamed data |
| 37 | +nd = spark.read.option("inferSchema", "true").parquet(stream_path+'/*.parquet')</br> |
| 38 | +nd.count() |
| 39 | + |
| 40 | +## END of code |
| 41 | + |
| 42 | +## Additional comments: |
| 43 | +You may to provide : |
| 44 | +option("checkpointLocation") - to persist medatada about processed files |
| 45 | +Option("cleanSource") — It can archive or delete the source file after processing. Values can be archive, delete and default is off. |
| 46 | +Option("sourceArchiveDir") — Archive directory if the cleanSource option is set to archive. |
| 47 | + |
| 48 | + |
| 49 | +How to use this asset? |
9 | 50 | Review the code in the notebook and add the code to your personal OCI Data Flow application. |
10 | 51 |
|
11 | 52 | License |
|
0 commit comments