@@ -70,7 +70,6 @@ def _path_importer_cache(cls, path):
7070import itertools
7171import logging
7272import os
73- import pickle as py_pickle
7473import pstats
7574import pty
7675import signal
@@ -113,12 +112,57 @@ def set_blocking(fd, blocking):
113112 now = time .time
114113
115114if sys .version_info >= (3 , 0 ):
115+ from pickle import PicklingError , Unpickler as _Unpickler , UnpicklingError
116+ def find_deny (module , name ):
117+ raise UnpicklingError ('Denied: %s.%s' % (module , name ))
118+ class Unpickler (_Unpickler ):
119+ def __init__ (self , file , find_class = find_deny ):
120+ self .find_class = find_class
121+ super ().__init__ (file , encoding = 'bytes' )
122+ else :
123+ from cPickle import PicklingError , Unpickler as _Unpickler , UnpicklingError
124+ def find_deny (module , name ):
125+ raise UnpicklingError ('Denied: %s.%s' % (module , name ))
126+ def Unpickler (file , find_class = find_deny ):
127+ unpickler = _Unpickler (file )
128+ unpickler .find_global = find_class
129+ return unpickler
130+
131+ if sys .version_info >= (3 , 0 ):
132+ from pickle import Pickler as _Pickler
133+ class Pickler (_Pickler ):
134+ def __init__ (self , file , protocol ):
135+ self ._file = file
136+ self ._protocol = protocol
137+ super ().__init__ (file , protocol )
138+ def dump (self , obj ):
139+ if self ._protocol == 2 and type (obj ) == bytes :
140+ self ._file .write (struct .pack ('<BBBL' , 128 , 2 , 84 , len (obj )))
141+ self ._file .write (obj )
142+ self ._file .write (struct .pack ('<B' , 46 ))
143+ else :
144+ super ().dump (obj )
116145 str_partition , str_rpartition = str .partition , str .rpartition
117146 bytes_partition = bytes .partition
118147elif sys .version_info >= (2 , 5 ):
148+ from cPickle import Pickler
119149 str_partition , str_rpartition = unicode .partition , unicode .rpartition
120150 bytes_partition = str .partition
121151else :
152+ import pickle
153+ class Pickler (pickle .Pickler ):
154+ def save_exc_inst (self , obj ):
155+ if isinstance (obj , CallError ):
156+ func , args = obj .__reduce__ ()
157+ self .save (func )
158+ self .save (args )
159+ self .write (pickle .REDUCE )
160+ else :
161+ pickle .Pickler .save_inst (self , obj )
162+
163+ dispatch = pickle .Pickler .dispatch .copy ()
164+ dispatch [pickle .InstanceType ] = save_exc_inst
165+
122166 def _part (s , sep , find ):
123167 "(str|unicode).(partition|rpartition) polyfill for Python 2.4"
124168 idx = find (sep )
@@ -217,7 +261,6 @@ def any(it):
217261PY24 = sys .version_info < (2 , 5 )
218262PY3 = sys .version_info > (3 ,)
219263if sys .version_info >= (3 , 0 ):
220- import pickle
221264 import _thread as thread
222265 from io import BytesIO
223266 b = str .encode
@@ -229,7 +272,6 @@ def any(it):
229272 iteritems , iterkeys , itervalues = dict .items , dict .keys , dict .values
230273 range = range
231274else :
232- import cPickle as pickle
233275 import thread
234276 from cStringIO import StringIO as BytesIO
235277 b = str
@@ -746,54 +788,6 @@ def iter_split(buf, delim, func):
746788 return buf [start :], cont
747789
748790
749- class Py24Pickler (py_pickle .Pickler ):
750- """
751- Exceptions were classic classes until Python 2.5. Sadly for 2.4, cPickle
752- offers little control over how a classic instance is pickled. Therefore 2.4
753- uses a pure-Python pickler, so CallError can be made to look as it does on
754- newer Pythons.
755-
756- This mess will go away once proper serialization exists.
757- """
758- @classmethod
759- def dumps (cls , obj , protocol ):
760- bio = BytesIO ()
761- self = cls (bio , protocol = protocol )
762- self .dump (obj )
763- return bio .getvalue ()
764-
765- def save_exc_inst (self , obj ):
766- if isinstance (obj , CallError ):
767- func , args = obj .__reduce__ ()
768- self .save (func )
769- self .save (args )
770- self .write (py_pickle .REDUCE )
771- else :
772- py_pickle .Pickler .save_inst (self , obj )
773-
774- if sys .version_info < (2 , 5 ):
775- dispatch = py_pickle .Pickler .dispatch .copy ()
776- dispatch [py_pickle .InstanceType ] = save_exc_inst
777-
778-
779- if sys .version_info >= (3 , 0 ):
780- # In 3.x Unpickler is a class exposing find_class as an overridable, but it
781- # cannot be overridden without subclassing.
782- class _Unpickler (pickle .Unpickler ):
783- def find_class (self , module , func ):
784- return self .find_global (module , func )
785- pickle__dumps = pickle .dumps
786- elif sys .version_info < (2 , 5 ):
787- # On Python 2.4, we must use a pure-Python pickler.
788- pickle__dumps = Py24Pickler .dumps
789- _Unpickler = pickle .Unpickler
790- else :
791- pickle__dumps = pickle .dumps
792- # In 2.x Unpickler is a function exposing a writeable find_global
793- # attribute.
794- _Unpickler = pickle .Unpickler
795-
796-
797791class Message (object ):
798792 """
799793 Messages are the fundamental unit of communication, comprising fields from
@@ -835,8 +829,6 @@ class Message(object):
835829 #: :ref:`standard-handles` should explicitly declare an encoding.
836830 enc = ENC_MGC
837831
838- _unpickled = object ()
839-
840832 #: The :class:`Router` responsible for routing the message. This is
841833 #: :data:`None` for locally originated messages.
842834 router = None
@@ -928,21 +920,23 @@ def encoded(cls, obj, enc, **kwargs):
928920 raise ValueError ('Invalid explicit enc: %r' % (enc ,))
929921
930922 @classmethod
931- def pickled (cls , obj , ** kwargs ):
923+ def pickled (cls , * args , ** kwargs ):
932924 """
933925 Construct a pickled message, setting :attr:`data` to the serialization
934- of `obj `, and setting remaining fields using `kwargs`.
926+ of each object in `args `, and setting remaining fields using `kwargs`.
935927
936928 :returns:
937929 The new message.
938930 """
939- self = cls (enc = cls .ENC_PKL , ** kwargs )
940- try :
941- self .data = pickle__dumps (obj , protocol = 2 )
942- except pickle .PicklingError :
943- e = sys .exc_info ()[1 ]
944- self .data = pickle__dumps (CallError (e ), protocol = 2 )
945- return self
931+ f = BytesIO ()
932+ p = Pickler (f , protocol = 2 )
933+ for obj in args :
934+ try :
935+ p .dump (obj )
936+ except PicklingError :
937+ exc = sys .exc_info ()[1 ]
938+ p .dump (CallError (exc ))
939+ return cls (enc = cls .ENC_PKL , data = f .getvalue (), ** kwargs )
946940
947941 def reply (self , msg , router = None , ** kwargs ):
948942 """
@@ -968,11 +962,6 @@ def reply(self, msg, router=None, **kwargs):
968962 LOG .debug ('dropping reply to message with no return address: %r' ,
969963 msg )
970964
971- if sys .version_info >= (3 , 0 ):
972- UNPICKLER_KWARGS = {'encoding' : 'bytes' }
973- else :
974- UNPICKLER_KWARGS = {}
975-
976965 def _throw_dead (self ):
977966 if len (self .data ):
978967 raise ChannelError (self .data .decode ('utf-8' , 'replace' ))
@@ -986,49 +975,62 @@ def decode(self, throw=True, throw_dead=True):
986975 if self .enc == self .ENC_BIN : return self .data
987976 raise ValueError ('Invalid explicit enc: %r' % (self .enc ,))
988977
989- def unpickle (self , throw = True , throw_dead = True ):
978+ def unpickle (self , throw = True , throw_dead = True , find_class = None ):
979+ """
980+ Return the first unpickled stream in :attr:`data`, optionally raise
981+ :exc:`CallError` if the unpickled object is such.
982+
983+ `throw` and `throw_dead` behave the same as with :meth:`unpickle_iter`.
984+
985+ :param find_class:
986+ Callable that takes ``(module, func)`` and returns a constructor.
987+ Defaults to :meth:`_find_global`.
990988 """
991- Unpickle :attr:`data`, optionally raising any exceptions present.
989+ if find_class is None : find_class = self ._find_global
990+ return next (self .unpickle_iter (throw , throw_dead , find_class ))
991+
992+ def unpickle_iter (self , throw = True , throw_dead = True , find_class = find_deny ):
993+ """
994+ Return an iterator of objects unpickled from :attr:`data`, optionally
995+ raising any :exc:`CallError` exceptions present.
992996
993997 :param bool throw_dead:
994998 If :data:`True`, raise exceptions, otherwise it is the caller's
995999 responsibility.
1000+ :param find_class:
1001+ Callable that takes ``(module, func)`` and returns a constructor.
1002+ Default: :func:`find_deny`.
9961003
9971004 :raises CallError:
9981005 The serialized data contained CallError exception.
9991006 :raises ChannelError:
10001007 The `is_dead` field was set.
10011008 """
1002- _vv and IOLOG .debug ('%r.unpickle()' , self )
10031009 if self .enc not in (self .ENC_MGC , self .ENC_PKL ):
10041010 raise ValueError (
10051011 'Message %r is not pickled, invalid enc=%r' , self , self .enc ,
10061012 )
10071013 if throw_dead and self .is_dead :
10081014 self ._throw_dead ()
10091015
1010- obj = self ._unpickled
1011- if obj is Message ._unpickled :
1012- fp = BytesIO (self .data )
1013- unpickler = _Unpickler (fp , ** self .UNPICKLER_KWARGS )
1014- unpickler .find_global = self ._find_global
1016+ file = BytesIO (self .data )
1017+ unpickler = Unpickler (file , find_class )
1018+ while file .tell () < len (self .data ):
10151019 try :
10161020 # Must occur off the broker thread.
10171021 try :
10181022 obj = unpickler .load ()
10191023 except :
10201024 LOG .error ('raw pickle was: %r' , self .data )
10211025 raise
1022- self ._unpickled = obj
10231026 except (TypeError , ValueError ):
10241027 e = sys .exc_info ()[1 ]
10251028 raise StreamError ('invalid message: %s' , e )
10261029
1027- if throw :
1028- if isinstance (obj , CallError ):
1030+ if throw and isinstance (obj , CallError ):
10291031 raise obj
10301032
1031- return obj
1033+ yield obj
10321034
10331035 def __repr__ (self ):
10341036 if len (self .data ) > 60 :
@@ -1809,8 +1811,7 @@ def _request_resource(self, fullname, resource, callback):
18091811 def _on_load_resource (self , msg ):
18101812 if msg .is_dead :
18111813 return
1812- tup = msg .unpickle ()
1813- fullname , resource , content = tup
1814+ (fullname , resource ), content = msg .unpickle_iter ()
18141815
18151816 self ._lock .acquire ()
18161817 try :
0 commit comments