|
| 1 | +# # Bitcoin Price Prediction |
| 2 | +# Here we provide a template to precit bitcoin price and deploy # |
| 3 | +# at scale on a user-defined schedule, taking advantage of # |
| 4 | +# Metis Machine curated data, and external 3rd party data. # |
| 5 | + |
| 6 | +## Import some needed dependencies |
| 7 | +import os |
| 8 | +from datetime import datetime |
| 9 | +import pandas as pd |
| 10 | +import numpy as np |
| 11 | +import quandl |
| 12 | +import torch |
| 13 | +import torch.nn as nn |
| 14 | +from torch.autograd import Variable |
| 15 | + |
| 16 | + |
| 17 | +# ## Data Prep |
| 18 | +# Specify what data to use from quandl |
| 19 | +# Here we grab bitcoin prices & market data |
| 20 | +COIN = "BCHARTS/BITSTAMPUSD" |
| 21 | +API = #<insert API key here> |
| 22 | +DATASETS = [COIN, "BCHAIN/CPTRA", "BCHAIN/NTRAT"] |
| 23 | + |
| 24 | +## Set the quandl api key so we can access historical coin data |
| 25 | +quandl.ApiConfig.api_key = API |
| 26 | +coin_df = quandl.get(DATASETS, returns="pandas") |
| 27 | + |
| 28 | +# We are interested in predicting the close price (*6pm of the next day) |
| 29 | +coin_df = coin_df[['BCHARTS/BITSTAMPUSD - Close', 'BCHAIN/NTRAT - Value', 'BCHAIN/CPTRA - Value']].dropna() |
| 30 | +coin_df.columns = ['close_price', 'num_trans', 'cost_per_trans'] |
| 31 | +coin_df_end = coin_df.index.max().date() |
| 32 | + |
| 33 | +# Use the Skafos Data Engine to pull in curated dataset |
| 34 | +from skafossdk import * |
| 35 | +print('Initializing the SDK connection', flush=True) |
| 36 | +skafos = Skafos() |
| 37 | + |
| 38 | +# Query data engine for google keyword trends |
| 39 | +res = skafos.engine.create_view( |
| 40 | + "gtrends", {"keyspace": "google_trends", "table": "crypto"}, |
| 41 | + DataSourceType.Cassandra).result() |
| 42 | +print("Created a view of historical google trends data", flush=True) |
| 43 | + |
| 44 | +print("Pulling in historical google trends data...") |
| 45 | +gtrends_json = skafos.engine.query("SELECT * from gtrends WHERE keyword IN ('bitcoin', 'blockchain', 'crypto currency', 'litecoin')").result() |
| 46 | + |
| 47 | +# Validate a single record |
| 48 | +print("Validating a single record:", flush=True) |
| 49 | +print(gtrends_json['data'][0], flush=True) |
| 50 | + |
| 51 | +# Convert to pandas df |
| 52 | +gtrends = pd.DataFrame.from_records(gtrends_json['data']) .pivot(index='date', values='interest', columns='keyword') |
| 53 | + |
| 54 | +# Set proper date format |
| 55 | +gtrends.index = pd.to_datetime(gtrends.index) |
| 56 | + |
| 57 | +# Catch the last date of gtrends data available |
| 58 | +gtrends_end = gtrends.index.max().date() |
| 59 | + |
| 60 | +# Figure out how much we might need to shift based on data availability |
| 61 | +if (coin_df_end - gtrends_end).days == 0: |
| 62 | + #Same day |
| 63 | + print("Data lined up perfectly, no shifting needed.") |
| 64 | + shifter = 0 |
| 65 | +elif (coin_df_end - gtrends_end).days == 1: |
| 66 | + # One day behind |
| 67 | + print("Gtrends Data is one day behind. Shifting once.") |
| 68 | + shifter = 1 |
| 69 | +else: |
| 70 | + # More days behind |
| 71 | + shifter = (coin_df_end - gtrends_end).days |
| 72 | + print("Gtrends Data is %s days behind. Shifting multiple." % shifter) |
| 73 | + |
| 74 | +# Join google trends with quandl coin data |
| 75 | +df = coin_df.join(gtrends, how='left') |
| 76 | + |
| 77 | + |
| 78 | +# ## Prep Inputs for Modeling |
| 79 | +# We want to use a recurrent time-series model, so our data |
| 80 | +# need to be in ascending order by date. |
| 81 | +day_zero = df.index.min() |
| 82 | +day_index_map = dict(zip((df.index - day_zero).days.values, df.index.values)) |
| 83 | + |
| 84 | +df.set_index((df.index - day_zero).days, inplace=True) |
| 85 | +df.sort_index(inplace=True) |
| 86 | + |
| 87 | +# Get rid of 0's in price and Calculate percent change in price |
| 88 | +df = df[df.close_price != df.close_price.min()] |
| 89 | +df['close_price_change'] = df.close_price.pct_change() |
| 90 | + |
| 91 | + |
| 92 | +# Shift google trends to fill gap in data availability |
| 93 | +# NOTE: This means the model is using the search volume some x days prior to |
| 94 | +# prediction. This should account for human lag in research/interest to action. |
| 95 | +# It also may not be as good as one day out or same day ofcourse. |
| 96 | +df[['bitcoin', 'blockchain', 'litecoin', 'crypto currency']] = df[['bitcoin', 'blockchain', 'litecoin', 'crypto currency']].shift(shifter) |
| 97 | +df.dropna(inplace=True) |
| 98 | + |
| 99 | +# Normalize inputs for deep learning |
| 100 | +# Most neural networks expect inputs from -1 to 1 |
| 101 | +# So we fit two standard deviations in between -1 and 1 |
| 102 | +df_scaled = df.apply(lambda c: 0.5 * (c - c.mean()) / c.std()) |
| 103 | + |
| 104 | +# Shift so that we're trying to predict tomorrow's price |
| 105 | +bitcoin_y = df_scaled['close_price_change'].copy().shift(-1) |
| 106 | + |
| 107 | +bitcoin_x = df_scaled.drop(['close_price'], axis=1) |
| 108 | + |
| 109 | +# Predict on the last day |
| 110 | +last_day = max(bitcoin_x.index) |
| 111 | + |
| 112 | + |
| 113 | +# ## Recurrent Neural Network Model |
| 114 | +# [PyTorch](http://pytorch.org) is a wonderful framnework for deep learning |
| 115 | +# since it handles backpropgation automatically. |
| 116 | + |
| 117 | +x_train = torch.autograd.Variable( |
| 118 | + torch.from_numpy(bitcoin_x.loc[:last_day - 1].as_matrix()).float(), requires_grad=False) |
| 119 | +x_pred = torch.autograd.Variable( |
| 120 | + torch.from_numpy(bitcoin_x.loc[last_day:].as_matrix()).float(), requires_grad=False) |
| 121 | +batch_size = x_train.size()[0] |
| 122 | +input_size = len(bitcoin_x.columns) |
| 123 | + |
| 124 | + |
| 125 | +y_train = torch.autograd.Variable( |
| 126 | + torch.from_numpy(bitcoin_y.loc[:last_day - 1].as_matrix()).float(), requires_grad=False) |
| 127 | +y_pred = torch.autograd.Variable( |
| 128 | + torch.from_numpy(bitcoin_y.loc[last_day:].as_matrix()).float(), requires_grad=False) |
| 129 | + |
| 130 | + |
| 131 | +class CryptoNet(torch.nn.Module): |
| 132 | + |
| 133 | + def __init__(self, hidden_layers, hidden_size, drop_out_rate): |
| 134 | + super(CryptoNet, self).__init__() |
| 135 | + # set hidden size, layers and dropout rate |
| 136 | + self.drop_out_rate = drop_out_rate |
| 137 | + self.hidden_layers = hidden_layers |
| 138 | + self.hidden_size = hidden_size |
| 139 | + # using a GRU (Gated Recurrent Unit), also try an LSTM |
| 140 | + self.rnn1 = nn.GRU(input_size=input_size, |
| 141 | + hidden_size=self.hidden_size, |
| 142 | + num_layers=self.hidden_layers) |
| 143 | + self.dropout = nn.Dropout(p=self.drop_out_rate) |
| 144 | + self.dense1 = nn.Linear(self.hidden_size, 4) |
| 145 | + self.dense2 = nn.Linear(4, 1) |
| 146 | + |
| 147 | + def forward(self, x, hidden): |
| 148 | + x_batch = x.view(len(x), 1, -1) |
| 149 | + x_r, hidden = self.rnn1(x_batch, hidden) |
| 150 | + x_d = self.dropout(x_r) |
| 151 | + x_l = self.dense1(x_d) |
| 152 | + x_l2 = self.dense2(x_l) |
| 153 | + return x_l2, hidden |
| 154 | + |
| 155 | + def init_hidden(self): |
| 156 | + return Variable(torch.randn(self.hidden_layers, 1, self.hidden_size)) |
| 157 | + |
| 158 | + |
| 159 | +# ## Train the RNN |
| 160 | + |
| 161 | +# Setup model for training and prediction |
| 162 | +torch.manual_seed(0) |
| 163 | +model = CryptoNet(hidden_layers=1, hidden_size=8, drop_out_rate=0.25) |
| 164 | +print(model) |
| 165 | + |
| 166 | +# Define loss function and optimizer, tune lr |
| 167 | +criterion = nn.MSELoss(size_average=True) |
| 168 | +optimizer = torch.optim.Adadelta(model.parameters(), lr=0.5) |
| 169 | + |
| 170 | +# Initialize the hidden layer during training, but keep it for later prediction. |
| 171 | +hidden = model.init_hidden() |
| 172 | + |
| 173 | +# Train the model on 500 epochs |
| 174 | +# Ideally this number is tuned precisely |
| 175 | +NUM_EPOCHS = 500 |
| 176 | +for i in range(NUM_EPOCHS): |
| 177 | + def closure(): |
| 178 | + model.zero_grad() |
| 179 | + hidden = model.init_hidden() |
| 180 | + out, hidden = model(x_train, hidden) |
| 181 | + loss = criterion(out, y_train) |
| 182 | + if i % 10 == 0: |
| 183 | + print('{:%H:%M:%S} epoch {} loss: {}'.format(datetime.now(), i, loss.data.numpy()[0]), flush=True) |
| 184 | + loss.backward() |
| 185 | + return loss |
| 186 | + optimizer.step(closure) |
| 187 | + |
| 188 | +###################################################### |
| 189 | + |
| 190 | +# Predict over the holdout test set and retain the hidden state |
| 191 | +pred, new_hidden = model(x_pred, hidden) |
| 192 | + |
| 193 | +def unnormalize(x): |
| 194 | + """Undo the normalization step performed prior to training the model.""" |
| 195 | + return (2. * x * df['close_price_change'].std())+df['close_price_change'].mean() |
| 196 | + |
| 197 | +# Unnormalize data and get close price |
| 198 | +predicted_value = unnormalize(pred.view(1).data.numpy()[0]) |
| 199 | +previous_close_price = df.loc[last_day:].close_price.values[0] |
| 200 | + |
| 201 | +# Get the prediction and date value |
| 202 | +predicted_price = (predicted_value + 1)*previous_close_price |
| 203 | +prediction_date = pd.to_datetime(day_index_map.get(last_day), "%Y-%m-%d") |
| 204 | + |
| 205 | +print("The RNN predicts the closing price for: \n%s to be %s $" % (prediction_date, predicted_price), flush=True) |
| 206 | + |
| 207 | +data_out = [{'price_prediction': predicted_price, |
| 208 | + 'date': prediction_date.date(), |
| 209 | + 'date_updated': datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"), |
| 210 | + 'coin': 'bitcoin'}] |
| 211 | + |
| 212 | + |
| 213 | +# ## Persist Predictions |
| 214 | +# define the schema for this dataset |
| 215 | +schema = { |
| 216 | + "table_name": "crypto_predictions", |
| 217 | + "options": { |
| 218 | + "primary_key": ["coin", "date", "date_updated"], |
| 219 | + "order_by": ["date asc"] |
| 220 | + }, |
| 221 | + "columns": { |
| 222 | + "coin": "text", |
| 223 | + "date": "date", |
| 224 | + "date_updated": "timestamp", |
| 225 | + "price_prediction": "float" |
| 226 | + } |
| 227 | +} |
| 228 | + |
| 229 | +# Save out using the data engine |
| 230 | +print("Saving to the data engine.", flush=True) |
| 231 | +skafos.engine.save(schema, data_out).result() |
| 232 | +print("Done.", flush=True) |
0 commit comments