11"""Reductionist S3 Active Storage server storage interface module."""
22
33import collections .abc
4+ import http .client
45import json
56import sys
67import typing
78
9+ import numcodecs
810import numpy as np
911import requests
1012
1315DEBUG = 0
1416
1517
16- def get_session (
17- username : str ,
18- password : str ,
19- cacert : typing .Optional [str ],
20- ) -> requests .Session :
18+ def get_session (username : str , password : str ,
19+ cacert : typing .Optional [str ]) -> requests .Session :
2120 """Create and return a client session object.
2221
2322 :param username: S3 username / access key
@@ -30,25 +29,23 @@ def get_session(
3029 return session
3130
3231
33- def reduce_chunk (
34- session ,
35- server ,
36- source ,
37- bucket ,
38- object ,
39- offset ,
40- size ,
41- compression ,
42- filters ,
43- missing ,
44- dtype ,
45- shape ,
46- order ,
47- chunk_selection ,
48- axis ,
49- operation ,
50- storage_type = None ,
51- ):
32+ def reduce_chunk (session ,
33+ server ,
34+ source ,
35+ bucket ,
36+ object ,
37+ offset ,
38+ size ,
39+ compression ,
40+ filters ,
41+ missing ,
42+ dtype ,
43+ shape ,
44+ order ,
45+ chunk_selection ,
46+ axis ,
47+ operation ,
48+ storage_type = None ):
5249 """Perform a reduction on a chunk using Reductionist.
5350
5451 :param server: Reductionist server URL
@@ -76,41 +73,41 @@ def reduce_chunk(
7673 :returns: the reduced data as a numpy array or scalar
7774 :raises ReductionistError: if the request to Reductionist fails
7875 """
79- request_data = build_request_data (
80- source ,
81- bucket ,
82- object ,
83- offset ,
84- size ,
85- compression ,
86- filters ,
87- missing ,
88- dtype ,
89- shape ,
90- order ,
91- chunk_selection ,
92- axis ,
93- storage_type = storage_type ,
94- )
76+
77+ request_data = build_request_data (source ,
78+ bucket ,
79+ object ,
80+ offset ,
81+ size ,
82+ compression ,
83+ filters ,
84+ missing ,
85+ dtype ,
86+ shape ,
87+ order ,
88+ chunk_selection ,
89+ axis ,
90+ storage_type = storage_type )
9591 if DEBUG :
9692 print (f"Reductionist request data dictionary: { request_data } " )
9793 api_operation = "sum" if operation == "mean" else operation or "select"
98- url = f" { server } /v1/{ api_operation } /"
94+ url = f' { server } /v1/{ api_operation } /'
9995 response = request (session , url , request_data )
10096
10197 if response .ok :
10298 return decode_result (response )
103- decode_and_raise_error (response )
99+ else :
100+ decode_and_raise_error (response )
104101
105102
106103def encode_byte_order (dtype ):
107104 """Encode the byte order (endianness) of a dtype in a JSON-compatible format."""
108- if dtype .byteorder == "=" :
105+ if dtype .byteorder == '=' :
109106 return sys .byteorder
110- if dtype .byteorder == "<" :
111- return " little"
112- if dtype .byteorder == ">" :
113- return " big"
107+ elif dtype .byteorder == '<' :
108+ return ' little'
109+ elif dtype .byteorder == '>' :
110+ return ' big'
114111 assert False , "Unexpected byte order {dtype.byteorder}"
115112
116113
@@ -120,8 +117,9 @@ def encode_selection(selection):
120117 def encode_slice (s ):
121118 if isinstance (s , slice ):
122119 return [s .start , s .stop , s .step ]
123- # Integer - select single value
124- return [s , s + 1 , 1 ]
120+ else :
121+ # Integer - select single value
122+ return [s , s + 1 , 1 ]
125123
126124 return [encode_slice (s ) for s in selection ]
127125
@@ -130,7 +128,8 @@ def encode_filter(filter):
130128 """Encode a filter algorithm in a JSON-compatible format."""
131129 if filter .codec_id == "shuffle" :
132130 return {"id" : filter .codec_id , "element_size" : filter .elementsize }
133- raise ValueError (f"Unsupported filter { filter } )" )
131+ else :
132+ raise ValueError (f"Unsupported filter { filter } )" )
134133
135134
136135def encode_filters (filters ):
@@ -152,20 +151,21 @@ def encode_missing(missing):
152151 # fill_value and missing_value are effectively the same when reading data.
153152 missing_value = fill_value or missing_value
154153 if missing_value :
155- if isinstance (missing_value , collections .abc .Sequence ) or isinstance (
156- missing_value ,
157- np .ndarray ,
158- ):
154+ if isinstance (missing_value , collections .abc .Sequence ):
155+ return {
156+ "missing_values" : [encode_dvalue (v ) for v in missing_value ]
157+ }
158+ elif isinstance (missing_value , np .ndarray ):
159159 return {
160- "missing_values" : [encode_dvalue (v ) for v in missing_value ],
160+ "missing_values" : [encode_dvalue (v ) for v in missing_value ]
161161 }
162- return {"missing_value" : encode_dvalue (missing_value )}
162+ else :
163+ return {"missing_value" : encode_dvalue (missing_value )}
163164 if valid_min and valid_max :
164165 return {
165- "valid_range" : [
166- encode_dvalue (valid_min ),
167- encode_dvalue (valid_max ),
168- ],
166+ "valid_range" :
167+ [encode_dvalue (valid_min ),
168+ encode_dvalue (valid_max )]
169169 }
170170 if valid_min :
171171 return {"valid_min" : encode_dvalue (valid_min )}
@@ -174,33 +174,31 @@ def encode_missing(missing):
174174 assert False , "Expected missing values not found"
175175
176176
177- def build_request_data (
178- source : str ,
179- bucket : str ,
180- object : str ,
181- offset : int ,
182- size : int ,
183- compression ,
184- filters ,
185- missing ,
186- dtype ,
187- shape ,
188- order ,
189- selection ,
190- axis ,
191- storage_type = None ,
192- ) -> dict :
177+ def build_request_data (source : str ,
178+ bucket : str ,
179+ object : str ,
180+ offset : int ,
181+ size : int ,
182+ compression ,
183+ filters ,
184+ missing ,
185+ dtype ,
186+ shape ,
187+ order ,
188+ selection ,
189+ axis ,
190+ storage_type = None ) -> dict :
193191 """Build request data for Reductionist API."""
194192 request_data = {
195- " source" : source ,
196- " bucket" : bucket ,
197- " object" : object ,
198- " dtype" : dtype .name ,
199- " byte_order" : encode_byte_order (dtype ),
200- " offset" : int (offset ),
201- " size" : int (size ),
202- " order" : order ,
203- " storage_type" : storage_type ,
193+ ' source' : source ,
194+ ' bucket' : bucket ,
195+ ' object' : object ,
196+ ' dtype' : dtype .name ,
197+ ' byte_order' : encode_byte_order (dtype ),
198+ ' offset' : int (offset ),
199+ ' size' : int (size ),
200+ ' order' : order ,
201+ ' storage_type' : storage_type ,
204202 }
205203 if shape :
206204 request_data ["shape" ] = shape
@@ -217,11 +215,10 @@ def build_request_data(
217215 request_data ["missing" ] = encode_missing (missing )
218216
219217 if REDUCTIONIST_AXIS_READY :
220- request_data [" axis" ] = axis
218+ request_data [' axis' ] = axis
221219 elif axis is not None and len (axis ) != len (shape ):
222220 raise ValueError (
223- "Can't reduce over axis subset unitl reductionist is ready" ,
224- )
221+ "Can't reduce over axis subset unitl reductionist is ready" )
225222
226223 return {k : v for k , v in request_data .items () if v is not None }
227224
@@ -237,15 +234,15 @@ def request(session: requests.Session, url: str, request_data: dict):
237234
238235def decode_result (response ):
239236 """Decode a successful response, return as a 2-tuple of (numpy array or scalar, count)."""
240- dtype = response .headers [" x-activestorage-dtype" ]
241- shape = json .loads (response .headers [" x-activestorage-shape" ])
237+ dtype = response .headers [' x-activestorage-dtype' ]
238+ shape = json .loads (response .headers [' x-activestorage-shape' ])
242239
243240 # Result
244241 result = np .frombuffer (response .content , dtype = dtype )
245242 result = result .reshape (shape )
246243
247244 # Counts
248- count = json .loads (response .headers [" x-activestorage-count" ])
245+ count = json .loads (response .headers [' x-activestorage-count' ])
249246 # TODO: When reductionist is ready, we need to fix 'count'
250247
251248 # Mask the result
@@ -258,9 +255,9 @@ class ReductionistError(Exception):
258255 """Exception for Reductionist failures."""
259256
260257 def __init__ (self , status_code , error ):
261- super (ReductionistError , self ). __init__ (
262- f"Reductionist error: HTTP { status_code } : { error } " ,
263- )
258+ super (
259+ ReductionistError ,
260+ self ). __init__ ( f"Reductionist error: HTTP { status_code } : { error } " )
264261
265262
266263def decode_and_raise_error (response ):
0 commit comments