1- import cPickle
1+ from future import standard_library
2+ standard_library .install_aliases ()
3+ from builtins import filter
4+ from builtins import object
25from types import FunctionType
6+ import pickle
37
48from graphql import parse , validate , specified_rules , value_from_ast , execute
59from graphql .language .ast import OperationDefinition
@@ -21,13 +25,13 @@ def __init__(self, host='localhost', port=6379, *args, **kwargs):
2125 self .greenlet = None
2226
2327 def publish (self , trigger_name , message ):
24- self .redis .publish (trigger_name , cPickle .dumps (message ))
28+ self .redis .publish (trigger_name , pickle .dumps (message ))
2529 return True
2630
2731 def subscribe (self , trigger_name , on_message_handler , options ):
2832 self .sub_id_counter += 1
2933 try :
30- if trigger_name not in self .subscriptions .values ()[0 ]:
34+ if trigger_name not in list ( self .subscriptions .values () )[0 ]:
3135 self .pubsub .subscribe (trigger_name )
3236 except IndexError :
3337 self .pubsub .subscribe (trigger_name )
@@ -42,7 +46,7 @@ def unsubscribe(self, sub_id):
4246 trigger_name , on_message_handler = self .subscriptions [sub_id ]
4347 del self .subscriptions [sub_id ]
4448 try :
45- if trigger_name not in self .subscriptions .values ()[0 ]:
49+ if trigger_name not in list ( self .subscriptions .values () )[0 ]:
4650 self .pubsub .unsubscribe (trigger_name )
4751 except IndexError :
4852 self .pubsub .unsubscribe (trigger_name )
@@ -57,9 +61,9 @@ def wait_and_get_message(self):
5761 gevent .sleep (.001 )
5862
5963 def handle_message (self , message ):
60- for sub_id , trigger_map in self .subscriptions .iteritems ():
64+ for sub_id , trigger_map in self .subscriptions .items ():
6165 if trigger_map [0 ] == message ['channel' ]:
62- trigger_map [1 ](cPickle .loads (message ['data' ]))
66+ trigger_map [1 ](pickle .loads (message ['data' ]))
6367
6468
6569class ValidationError (Exception ):
@@ -105,7 +109,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
105109 arg_definition = [
106110 arg_def
107111 for _ , arg_def in fields .get (subscription_name )
108- .args .iteritems () if arg_def .out_name == arg .name .value
112+ .args .items () if arg_def .out_name == arg .name .value
109113 ][0 ]
110114
111115 args [arg_definition .out_name ] = value_from_ast (
@@ -131,7 +135,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
131135 self .subscriptions [external_subscription_id ] = []
132136 subscription_promises = []
133137
134- for trigger_name in trigger_map .viewkeys ():
138+ for trigger_name in trigger_map .keys ():
135139 try :
136140 channel_options = trigger_map [trigger_name ].get (
137141 'channel_options' , {})
@@ -156,7 +160,7 @@ def context_promise_handler(result):
156160 return context
157161
158162 def filter_func_promise_handler (context ):
159- return Promise .all ([context , filter (root_value , context )])
163+ return Promise .all ([context , list ( filter (root_value , context ) )])
160164
161165 def context_do_execute_handler (result ):
162166 context , do_execute = result
0 commit comments