Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 26 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Build Status: ![build status](https://circleci.com/gh/alpacahq/pymarketstore/tre

Pymarketstore can query and write financial timeseries data from [MarketStore](https://github.com/alpacahq/marketstore)

Tested with 2.7, 3.3+
Tested with Python 2.7, 3.5+
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pandas is 3.5+


## How to install

Expand Down Expand Up @@ -68,43 +68,48 @@ Construct a client object with endpoint.

## Query

`pymkts.Client#query(symbols, timeframe, attrgroup, start=None, end=None, limit=None, limit_from_start=False)`
`pymkts.Client.query(symbols, timeframe, attrgroup, start=None, end=None, limit=None, limit_from_start=False)`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all of these to dots, feels more "Pythonic" to me, but I'm not especially attached to it either.


You can build parameters using `pymkts.Params`.

- symbols: string for a single symbol or a list of symbol string for multi-symbol query
- timeframe: timeframe string
- attrgroup: attribute group string. symbols, timeframe and attrgroup compose a bucket key to query in the server
- start: unix epoch second (int), datetime object or timestamp string. The result will include only data timestamped equal to or after this time.
- end: unix epoch second (int), datetime object or timestamp string. The result will include only data timestamped equal to or before this time.
- limit: the number of records to be returned, counting from either start or end boundary.
- limit_from_start: boolean to indicate `limit` is from the start boundary. Defaults to False.
- `symbols`: string for a single symbol or a list of symbol string for multi-symbol query
- `timeframe`: timeframe string
- `attrgroup`: attribute group string. symbols, timeframe and attrgroup compose a bucket key to query in the server
- `start`: unix epoch second (int), datetime object or timestamp string. The result will include only data timestamped equal to or after this time.
- `end`: unix epoch second (int), datetime object or timestamp string. The result will include only data timestamped equal to or before this time.
- `limit`: the number of records to be returned, counting from either start or end boundary.
- `limit_from_start`: boolean to indicate `limit` is from the start boundary. Defaults to `False`.

Pass one or multiple instances of `Params` to `Client.query()`. It will return `QueryReply` object which holds internal numpy array data returned from the server.

## Write

`pymkts.Client#write(data, tbk)`
`pymkts.Client.write(data, tbk)`

You can write a numpy array to the server via `Client.write()` method. The data parameter must be numpy's [recarray type](https://docs.scipy.org/doc/numpy-dev/reference/generated/numpy.recarray.html) with
a column named `Epoch` in int64 type at the first column. `tbk` is the bucket key of the data records.
You can write data to the server via `Client.write()` method.

- `data`: Timeseries data to write. The supported data types you can write are:
- [np.recarray](https://docs.scipy.org/doc/numpy-dev/reference/generated/numpy.recarray.html) with the first column named `Epoch` in int64 type
- [pd.Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) with an int64 type index named `Epoch`
- [pd.DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/frame.html) with an int64 type index named `Epoch`
- `tbk`: the bucket key (as a string) of the data records.

## List Symbols

`pymkts.Client#list_symbols()`
`pymkts.Client.list_symbols()`

The list of all symbols stored in the server are returned.

## Server version

`pymkts.Client#server_version()`
`pymkts.Client.server_version()`

Returns a string of Marketstore-Version header from a server response.

## Streaming

If the server supports WebSocket streaming, you can connect to it using
`pymkts.StreamConn` class. For convenience, you can call `pymkts.Client#stream()` to obtain the instance with the same server
`pymkts.StreamConn` class. For convenience, you can call `pymkts.Client.stream()` to obtain the instance with the same server
information as REST client.

Once you have this instance, you will set up some event handles by
Expand All @@ -115,24 +120,24 @@ To actually connect and start receiving the messages from the server,
you will call `run()` with the stream names. By default, it subscribes
to all by `*/*/*`.

`pymkts.Client#stream()`
`pymkts.Client.stream()`

Return a `StreamConn` which is a websocket connection to the server.

`pymkts.StreamConn#(endpoint)`
`pymkts.StreamConn(endpoint)`

Create a connection instance to the `endpoint` server. The endpoint
string is a full URL with "ws" or "wss" scheme with the port and path.

`pymkts.StreamConn#register(stream_path, func)`
`@pymkts.StreamConn#on(stream_path)`
`pymkts.StreamConn.register(stream_path, func)`
`@pymkts.StreamConn.on(stream_path)`

Add a new message handler to the connection. The function will be called
with `handler(StreamConn, {"key": "...", "data": {...,}})` if the key
(time bucket key) matches with the `stream_path` regular expression.
The `on` method is a decorator version of `register`.

`pymkts.StreamConn#run([stream1, stream2, ...])`
`pymkts.StreamConn.run([stream1, stream2, ...])`

Start communication with the server and go into an indefinite loop. It
does not return until unhandled exception is raised, in which case the
Expand All @@ -154,4 +159,4 @@ def on_btc(conn, msg):
conn.run(['BTC/*/*']) # runs until exception

-> received btc {'Open': 4370.0, 'High': 4372.93, 'Low': 4370.0, 'Close': 4371.74, 'Volume': 3.3880948699999993, 'Epoch': 1507299600}
```
```
90 changes: 69 additions & 21 deletions pymarketstore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
logger = logging.getLogger(__name__)


if six.PY2:
memoryview = lambda array: buffer(np.ascontiguousarray(array))


data_type_conv = {
'<f4': 'f',
'<f8': 'd',
Expand Down Expand Up @@ -96,28 +100,15 @@ def query(self, params):
reply = self._request('DataService.Query', **query)
return QueryReply(reply)

def write(self, recarray, tbk, isvariablelength=False):
data = {}
data['types'] = [
recarray.dtype[name].str.replace('<', '')
for name in recarray.dtype.names
]
data['names'] = recarray.dtype.names
data['data'] = [
bytes(buffer(recarray[name])) if six.PY2
else bytes(memoryview(recarray[name]))
for name in recarray.dtype.names
]
data['length'] = len(recarray)
data['startindex'] = {tbk: 0}
data['lengths'] = {tbk: len(recarray)}
write_request = {}
write_request['dataset'] = data
write_request['is_variable_length'] = isvariablelength
writer = {}
writer['requests'] = [write_request]
def write(self, data, tbk, isvariablelength=False):
if not isinstance(data, (np.ndarray, np.recarray, pd.Series, pd.DataFrame)):
raise TypeError('The `data` parameter must be an instance of '
'np.ndarray, np.recarry, pd.Series, or pd.DataFrame')

try:
reply = self.rpc.call("DataService.Write", **writer)
reply = self.rpc.call("DataService.Write", requests=[
_make_write_request(data, tbk, isvariablelength),
])
except requests.exceptions.ConnectionError:
raise requests.exceptions.ConnectionError(
"Could not contact server")
Expand Down Expand Up @@ -177,3 +168,60 @@ def stream(self):

def __repr__(self):
return 'Client("{}")'.format(self.endpoint)


def _make_write_request(data, tbk, isvariablelength=False):
dataset = dict(length=len(data),
startindex={tbk: 0},
lengths={tbk: len(data)})

if isinstance(data, (np.ndarray, np.recarray)):
dataset.update(_np_array_to_dataset_params(data))
elif isinstance(data, pd.Series):
dataset.update(_pd_series_to_dataset_params(data, tbk))
elif isinstance(data, pd.DataFrame):
dataset.update(_pd_dataframe_to_dataset_params(data))

return dict(dataset=dataset, is_variable_length=isvariablelength)


def _np_array_to_dataset_params(array):
return dict(types=[array.dtype[name].str.replace('<', '')
for name in array.dtype.names],
names=list(array.dtype.names),
data=[bytes(memoryview(array[name]))
for name in array.dtype.names])


def _pd_series_to_dataset_params(series, tbk):
# one row (timestamp) with multiple columns of data (ie named indexes in the array)
if isinstance(series.index[0], str):
epoch = series.name.to_datetime64().astype(dtype='i8') / 10 ** 9
return dict(types=['i8'] + [series.dtype.str.replace('<', '')
for _ in range(0, len(series))],
names=['Epoch'] + series.index.to_list(),
data=[bytes(memoryview(epoch.astype('i8')))] + [
bytes(memoryview(val)) for val in series.array])

# many rows (timestamps) of data for the same column
else:
epoch = series.index.to_numpy(dtype='i8') / 10 ** 9
return dict(types=['i8', series.dtype.str.replace('<', '')],
names=['Epoch', series.name or tbk.split('/')[-1]],
data=[bytes(memoryview(epoch.astype('i8'))),
bytes(memoryview(series.to_numpy()))])


def _pd_dataframe_to_dataset_params(df):
# new_types = df.dtypes.map({
# np.dtype(np.float64): np.float32,
# np.dtype(np.int64): np.int32,
# }).to_dict()
# df = df.astype(new_types)
epoch = df.index.to_numpy(dtype='i8') / 10 ** 9
return dict(types=['i8'] + [dtype.str.replace('<', '')
for dtype in df.dtypes],
names=['Epoch'] + df.columns.to_list(),
data=[bytes(memoryview(epoch.astype('i8')))] + [
bytes(memoryview(df[name].to_numpy()))
for name in df.columns])
4 changes: 4 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-r requirements.txt

pytest
pytest-cov
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ pandas
requests
six
urllib3
pytest
setuptools>=28.8.0
websocket-client
websocket-client
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ exclude =
test=pytest

[tool:pytest]
addopts = --verbose --cov pymarketstore --cov-report=term-missing
addopts = -s --verbose --cov pymarketstore --cov-report=term-missing
Loading