-
Notifications
You must be signed in to change notification settings - Fork 224
Expand file tree
/
Copy pathruntime.py
More file actions
1368 lines (1100 loc) · 47.5 KB
/
runtime.py
File metadata and controls
1368 lines (1100 loc) · 47.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Machinery to make the common case easy when building new runtimes
"""
from abc import ABCMeta, abstractmethod
from collections import namedtuple
import functools
import gettext
from io import BytesIO, StringIO
import importlib
import itertools
import json
import logging
import re
import threading
import warnings
from lxml import etree
import markupsafe
from web_fragments.fragment import Fragment
from xblock.core import XBlock, XBlockAside, XBlock2Mixin, XML_NAMESPACES
from xblock.fields import Field, BlockScope, Scope, ScopeIds, UserScope
from xblock.field_data import FieldData
from xblock.exceptions import (
NoSuchViewError,
NoSuchHandlerError,
NoSuchServiceError,
NoSuchUsage,
NoSuchDefinition,
FieldDataDeprecationWarning,
UserIdDeprecationWarning,
)
log = logging.getLogger(__name__)
class KeyValueStore(metaclass=ABCMeta):
"""The abstract interface for Key Value Stores."""
class Key(namedtuple("Key", "scope, user_id, block_scope_id, field_name, block_family")):
"""
Keys are structured to retain information about the scope of the data.
Stores can use this information however they like to store and retrieve
data.
"""
def __new__(cls, scope, user_id, block_scope_id, field_name, block_family='xblock.v1'):
return super(KeyValueStore.Key, cls).__new__(cls, scope, user_id, block_scope_id, field_name, block_family)
@abstractmethod
def get(self, key):
"""Reads the value of the given `key` from storage."""
@abstractmethod
def set(self, key, value):
"""Sets `key` equal to `value` in storage."""
@abstractmethod
def delete(self, key):
"""Deletes `key` from storage."""
@abstractmethod
def has(self, key):
"""Returns whether or not `key` is present in storage."""
def default(self, key):
"""
Returns the context relevant default of the given `key`
or raise KeyError which will result in the field's global default.
"""
raise KeyError(repr(key))
def set_many(self, update_dict):
"""
For each (`key, value`) in `update_dict`, set `key` to `value` in storage.
The default implementation brute force updates field by field through set which may be inefficient
for any runtimes doing persistence operations on each set. Such implementations will want to
override this method.
:update_dict: field_name, field_value pairs for all cached changes
"""
for key, value in update_dict.items():
self.set(key, value)
class DictKeyValueStore(KeyValueStore):
"""
A `KeyValueStore` that stores everything into a Python dictionary.
"""
def __init__(self, storage=None):
self.db_dict = storage if storage is not None else {}
def get(self, key):
return self.db_dict[key]
def set(self, key, value):
self.db_dict[key] = value
def set_many(self, update_dict):
self.db_dict.update(update_dict)
def delete(self, key):
del self.db_dict[key]
def has(self, key):
return key in self.db_dict
class KvsFieldData(FieldData):
"""
An interface mapping value access that uses field names to one
that uses the correct scoped keys for the underlying KeyValueStore
"""
def __init__(self, kvs, **kwargs):
super().__init__(**kwargs)
self._kvs = kvs
def __repr__(self):
return "{0.__class__.__name__}({0._kvs!r})".format(self)
def _getfield(self, block, name):
"""
Return the field with the given `name` from `block`.
If no field with `name` exists in any namespace, raises a KeyError.
:param block: xblock to retrieve the field from
:type block: :class:`~xblock.core.XBlock`
:param name: name of the field to retrieve
:type name: str
:raises KeyError: when no field with `name` exists in any namespace
"""
# First, get the field from the class, if defined
block_field = getattr(block.__class__, name, None)
if block_field is not None and isinstance(block_field, Field):
return block_field
# Not in the class, so name
# really doesn't name a field
raise KeyError(name)
def _key(self, block, name):
"""
Resolves `name` to a key, in the following form:
KeyValueStore.Key(
scope=field.scope,
user_id=student_id,
block_scope_id=block_id,
field_name=name,
block_family=block.entry_point,
)
"""
field = self._getfield(block, name)
if field.scope in (Scope.children, Scope.parent):
block_id = block.scope_ids.usage_id
user_id = None
else:
block_scope = field.scope.block
if block_scope == BlockScope.ALL:
block_id = None
elif block_scope == BlockScope.USAGE:
block_id = block.scope_ids.usage_id
elif block_scope == BlockScope.DEFINITION:
block_id = block.scope_ids.def_id
elif block_scope == BlockScope.TYPE:
block_id = block.scope_ids.block_type
if field.scope.user == UserScope.ONE:
user_id = block.scope_ids.user_id
else:
user_id = None
key = KeyValueStore.Key(
scope=field.scope,
user_id=user_id,
block_scope_id=block_id, # pylint: disable=possibly-used-before-assignment
field_name=name,
block_family=block.entry_point,
)
return key
def get(self, block, name):
"""
Retrieve the value for the field named `name`.
If a value is provided for `default`, then it will be
returned if no value is set
"""
return self._kvs.get(self._key(block, name))
def set(self, block, name, value):
"""
Set the value of the field named `name`
"""
self._kvs.set(self._key(block, name), value)
def delete(self, block, name):
"""
Reset the value of the field named `name` to the default
"""
self._kvs.delete(self._key(block, name))
def has(self, block, name):
"""
Return whether or not the field named `name` has a non-default value
"""
try:
return self._kvs.has(self._key(block, name))
except KeyError:
return False
def set_many(self, block, update_dict):
"""Update the underlying model with the correct values."""
updated_dict = {}
# Generate a new dict with the correct mappings.
for (key, value) in update_dict.items():
updated_dict[self._key(block, key)] = value
self._kvs.set_many(updated_dict)
def default(self, block, name):
"""
Ask the kvs for the default (default implementation which other classes may override).
:param block: block containing field to default
:type block: :class:`~xblock.core.XBlock`
:param name: name of the field to default
"""
return self._kvs.default(self._key(block, name))
# The old name for KvsFieldData, to ease transition.
DbModel = KvsFieldData
class IdReader(metaclass=ABCMeta):
"""An abstract object that stores usages and definitions."""
@abstractmethod
def get_usage_id_from_aside(self, aside_id):
"""
Retrieve the XBlock `usage_id` associated with this aside usage id.
Args:
aside_id: The usage id of the XBlockAside.
Returns:
The `usage_id` of the usage the aside is commenting on.
"""
raise NotImplementedError()
@abstractmethod
def get_definition_id_from_aside(self, aside_id):
"""
Retrieve the XBlock `definition_id` associated with this aside definition id.
Args:
aside_id: The definition id of the XBlockAside.
Returns:
The `definition_id` of the xblock the aside is commenting on.
"""
raise NotImplementedError()
@abstractmethod
def get_definition_id(self, usage_id):
"""Retrieve the definition that a usage is derived from.
Args:
usage_id: The id of the usage to query
Returns:
The `definition_id` the usage is derived from
"""
raise NotImplementedError()
@abstractmethod
def get_block_type(self, def_id):
"""Retrieve the block_type of a particular definition
Args:
def_id: The id of the definition to query
Returns:
The `block_type` of the definition
"""
raise NotImplementedError()
@abstractmethod
def get_aside_type_from_usage(self, aside_id):
"""
Retrieve the XBlockAside `aside_type` associated with this aside
usage id.
Args:
aside_id: The usage id of the XBlockAside.
Returns:
The `aside_type` of the aside.
"""
raise NotImplementedError()
@abstractmethod
def get_aside_type_from_definition(self, aside_id):
"""
Retrieve the XBlockAside `aside_type` associated with this aside
definition id.
Args:
aside_id: The definition id of the XBlockAside.
Returns:
The `aside_type` of the aside.
"""
raise NotImplementedError()
class IdGenerator(metaclass=ABCMeta):
"""An abstract object that creates usage and definition ids"""
@abstractmethod
def create_aside(self, definition_id, usage_id, aside_type):
"""
Make a new aside definition and usage ids, indicating an :class:`.XBlockAside` of type `aside_type`
commenting on an :class:`.XBlock` usage `usage_id`
Returns:
(aside_definition_id, aside_usage_id)
"""
raise NotImplementedError()
@abstractmethod
def create_usage(self, def_id):
"""Make a usage, storing its definition id.
Returns the newly-created usage id.
"""
raise NotImplementedError()
@abstractmethod
def create_definition(self, block_type, slug=None):
"""Make a definition, storing its block type.
If `slug` is provided, it is a suggestion that the definition id
incorporate the slug somehow.
Returns the newly-created definition id.
"""
raise NotImplementedError()
class MemoryIdManager(IdReader, IdGenerator):
"""A simple dict-based implementation of IdReader and IdGenerator."""
ASIDE_USAGE_ID = namedtuple('MemoryAsideUsageId', 'usage_id aside_type')
ASIDE_DEFINITION_ID = namedtuple('MemoryAsideDefinitionId', 'definition_id aside_type')
def __init__(self):
self._ids = itertools.count()
self._usages = {}
self._definitions = {}
def _next_id(self, prefix):
"""Generate a new id."""
return f"{prefix}_{next(self._ids)}"
def clear(self):
"""Remove all entries."""
self._usages.clear()
self._definitions.clear()
def create_aside(self, definition_id, usage_id, aside_type):
"""Create the aside."""
return (
self.ASIDE_DEFINITION_ID(definition_id, aside_type),
self.ASIDE_USAGE_ID(usage_id, aside_type),
)
def get_usage_id_from_aside(self, aside_id):
"""Extract the usage_id from the aside's usage_id."""
return aside_id.usage_id
def get_definition_id_from_aside(self, aside_id):
"""Extract the original xblock's definition_id from an aside's definition_id."""
return aside_id.definition_id
def create_usage(self, def_id):
"""Make a usage, storing its definition id."""
usage_id = self._next_id("u")
self._usages[usage_id] = def_id
return usage_id
def get_definition_id(self, usage_id):
"""Get a definition_id by its usage id."""
try:
return self._usages[usage_id]
except KeyError:
raise NoSuchUsage(repr(usage_id)) # pylint: disable= raise-missing-from
def create_definition(self, block_type, slug=None):
"""Make a definition, storing its block type."""
prefix = "d"
if slug:
prefix += "_" + slug
def_id = self._next_id(prefix)
self._definitions[def_id] = block_type
return def_id
def get_block_type(self, def_id):
"""Get a block_type by its definition id."""
try:
return self._definitions[def_id]
except KeyError:
try:
return def_id.aside_type
except AttributeError:
raise NoSuchDefinition(repr(def_id)) # pylint: disable= raise-missing-from
def get_aside_type_from_definition(self, aside_id):
"""Get an aside's type from its definition id."""
return aside_id.aside_type
def get_aside_type_from_usage(self, aside_id):
"""Get an aside's type from its usage id."""
return aside_id.aside_type
class Runtime(metaclass=ABCMeta):
"""
Access to the runtime environment for XBlocks.
"""
# Abstract methods
@abstractmethod
def handler_url(self, block, handler_name, suffix='', query='', thirdparty=False):
"""Get the actual URL to invoke a handler.
`handler_name` is the name of your handler function. Any additional
portion of the url will be passed as the `suffix` argument to the handler.
The return value is a complete absolute URL that will route through the
runtime to your handler.
:param block: The block to generate the url for
:param handler_name: The handler on that block that the url should resolve to
:param suffix: Any path suffix that should be added to the handler url
:param query: Any query string that should be added to the handler url
(which should not include an initial ? or &)
:param thirdparty: If true, create a URL that can be used without the
user being logged in. This is useful for URLs to be used by third-party
services.
"""
raise NotImplementedError("Runtime needs to provide handler_url()")
@abstractmethod
def resource_url(self, resource):
"""Get the URL for a static resource file.
`resource` is the application local path to the resource.
The return value is a complete absolute URL that will locate the
resource on your runtime.
"""
raise NotImplementedError("Runtime needs to provide resource_url()")
@abstractmethod
def local_resource_url(self, block, uri):
"""Get the URL to load a static resource from an XBlock.
`block` is the XBlock that owns the resource.
`uri` is a relative URI to the resource. The XBlock class's
get_local_resource(uri) method should be able to open the resource
identified by this uri.
Typically, this function uses `open_local_resource` defined on the
XBlock class, which by default will only allow resources from the
"public/" directory of the kit. Resources must be placed in "public/"
to be successfully served with this URL.
The return value is a complete absolute URL which will locate the
resource on your runtime.
"""
raise NotImplementedError("Runtime needs to provide local_resource_url()")
@abstractmethod
def publish(self, block, event_type, event_data):
"""Publish an event.
For example, to participate in the course grade, an XBlock should set
has_score to True, and should publish a grade event whenever the grade
changes.
In this case the `event_type` would be `grade`, and the `event_data`
would be a dictionary of the following form::
{
'value': <number>,
'max_value': <number>,
}
The grade event represents a grade of value/max_value for the current
user.
`block` is the XBlock from which the event originates.
"""
raise NotImplementedError("Runtime needs to provide publish()")
# Construction
def __init__(
self, id_reader, id_generator, field_data=None, mixins=(),
services=None, default_class=None, select=None
):
"""
Arguments:
id_reader (IdReader): An object that allows the `Runtime` to
map between *usage_ids*, *definition_ids*, and *block_types*.
id_generator (IdGenerator): The :class:`.IdGenerator` to use
for creating ids when importing XML or loading XBlockAsides.
field_data (FieldData): The :class:`.FieldData` to use by default when
constructing an :class:`.XBlock` from this `Runtime`.
mixins (tuple): Classes that should be mixed in with every :class:`.XBlock`
created by this `Runtime`.
services (dict): Services to make available through the
:meth:`service` method. There's no point passing anything here
if you are overriding :meth:`service` in your sub-class.
default_class (class): The default class to use if a class can't be found for a
particular `block_type` when loading an :class:`.XBlock`.
select: A function to select from one or more XBlock-like subtypes found
when calling :meth:`.XBlock.load_class` or :meth:`.XBlockAside.load_class`
to resolve a `block_type`.
"""
self.id_reader = id_reader
self._services = services or {}
# Provide some default implementations
self._services.setdefault("i18n", NullI18nService())
self._deprecated_per_instance_field_data = field_data # pylint: disable=invalid-name
if field_data:
warnings.warn(
"Passing field_data as a constructor argument to Runtimes is deprecated",
FieldDataDeprecationWarning,
stacklevel=2
)
self._services.setdefault("field-data", field_data)
self.default_class = default_class
self.select = select
self._deprecated_per_instance_user_id = None # pylint: disable=invalid-name
self.mixologist = Mixologist(mixins)
self._view_name = None
self.id_generator = id_generator
# Block operations
@property
def field_data(self):
"""
Access the FieldData passed in the constructor.
Deprecated in favor of a 'field-data' service.
"""
warnings.warn("Runtime.field_data is deprecated", FieldDataDeprecationWarning, stacklevel=2)
return self._deprecated_per_instance_field_data
@field_data.setter
def field_data(self, field_data):
"""
Set field_data.
Deprecated in favor of a 'field-data' service.
"""
warnings.warn("Runtime.field_data is deprecated", FieldDataDeprecationWarning, stacklevel=2)
self._deprecated_per_instance_field_data = field_data
@property
def user_id(self):
"""
Access the current user ID.
Deprecated in favor of a 'user' service.
"""
warnings.warn("Runtime.user_id is deprecated", UserIdDeprecationWarning, stacklevel=2)
return self._deprecated_per_instance_user_id
@user_id.setter
def user_id(self, user_id):
"""
Set the current user ID.
Deprecated in favor of a 'user' service.
"""
warnings.warn("Runtime.user_id is deprecated", UserIdDeprecationWarning, stacklevel=2)
self._deprecated_per_instance_user_id = user_id
def load_block_type(self, block_type):
"""
Returns a subclass of :class:`.XBlock` that corresponds to the specified `block_type`.
"""
return XBlock.load_class(block_type, self.default_class, self.select)
def load_aside_type(self, aside_type):
"""
Returns a subclass of :class:`.XBlockAside` that corresponds to the specified `aside_type`.
"""
return XBlockAside.load_class(aside_type, select=self.select)
# pylint: disable=keyword-arg-before-vararg
def construct_xblock(self, block_type, scope_ids, field_data=None, *args, **kwargs):
r"""
Construct a new xblock of the type identified by block_type,
passing \*args and \*\*kwargs into `__init__`.
"""
return self.construct_xblock_from_class(
cls=self.load_block_type(block_type),
scope_ids=scope_ids,
field_data=field_data,
*args, **kwargs
)
# pylint: disable=keyword-arg-before-vararg
def construct_xblock_from_class(self, cls, scope_ids, field_data=None, *args, **kwargs):
"""
Construct a new xblock of type cls, mixing in the mixins
defined for this application.
"""
return self.mixologist.mix(cls)(
runtime=self,
field_data=field_data,
scope_ids=scope_ids,
*args, **kwargs
)
def get_block(self, usage_id, for_parent=None):
"""
Create an XBlock instance in this runtime.
The `usage_id` is used to find the XBlock class and data.
"""
def_id = self.id_reader.get_definition_id(usage_id)
try:
block_type = self.id_reader.get_block_type(def_id)
except NoSuchDefinition:
raise NoSuchUsage(repr(usage_id)) # pylint: disable= raise-missing-from
keys = ScopeIds(self.user_id, block_type, def_id, usage_id)
block = self.construct_xblock(block_type, keys, for_parent=for_parent)
return block
def get_aside(self, aside_usage_id):
"""
Create an XBlockAside in this runtime.
The `aside_usage_id` is used to find the Aside class and data.
"""
aside_type = self.id_reader.get_aside_type_from_usage(aside_usage_id)
xblock_usage = self.id_reader.get_usage_id_from_aside(aside_usage_id)
xblock_def = self.id_reader.get_definition_id(xblock_usage)
aside_def_id, aside_usage_id = self.id_generator.create_aside(xblock_def, xblock_usage, aside_type)
keys = ScopeIds(self.user_id, aside_type, aside_def_id, aside_usage_id)
block = self.create_aside(aside_type, keys)
return block
# Saving field data changes
def save_block(self, block):
"""
Finalize/commit changes for the field data from the specified block.
Called at the end of an XBlock's save() method. Runtimes may ignore this
as generally the field data implementation is responsible for persisting
changes.
(The main use case here is a runtime and field data implementation that
want to store field data in XML format - the only way to correctly
serialize a block to XML is to ask the block to serialize itself all at
once, so such implementations cannot persist changes on a field-by-field
basis.)
:param block: the block being saved
:type block: :class:`~xblock.core.XBlock`
"""
# Implementing this is optional.
# Parsing XML
def parse_xml_string(self, xml):
"""Parse a string of XML, returning a usage id."""
if isinstance(xml, bytes):
io_type = BytesIO
else:
io_type = StringIO
return self.parse_xml_file(io_type(xml))
def parse_xml_file(self, fileobj):
"""Parse an open XML file, returning a usage id."""
root = etree.parse(fileobj).getroot()
usage_id = self._usage_id_from_node(root, None)
return usage_id
def _usage_id_from_node(self, node, parent_id):
"""Create a new usage id from an XML dom node.
Args:
node (lxml.etree.Element): The DOM node to interpret.
parent_id: The usage ID of the parent block
"""
block_type = node.tag
# remove xblock-family from elements
node.attrib.pop('xblock-family', None)
# TODO: a way for this node to be a usage to an existing definition?
def_id = self.id_generator.create_definition(block_type)
usage_id = self.id_generator.create_usage(def_id)
keys = ScopeIds(None, block_type, def_id, usage_id)
block_class = self.mixologist.mix(self.load_block_type(block_type))
# pull the asides out of the xml payload
aside_children = []
for child in node.iterchildren():
# get xblock-family from node
xblock_family = child.attrib.pop('xblock-family', None)
if xblock_family:
xblock_family = self._family_id_to_superclass(xblock_family)
if issubclass(xblock_family, XBlockAside):
aside_children.append(child)
# now process them & remove them from the xml payload
for child in aside_children:
self._aside_from_xml(child, def_id, usage_id)
node.remove(child)
block = block_class.parse_xml(node, self, keys)
block.parent = parent_id
block.save()
return usage_id
def _aside_from_xml(self, node, block_def_id, block_usage_id):
"""
Create an aside from the xml and attach it to the given block
"""
aside_type = node.tag
aside_class = self.load_aside_type(aside_type)
aside_def_id, aside_usage_id = self.id_generator.create_aside(block_def_id, block_usage_id, aside_type)
keys = ScopeIds(None, aside_type, aside_def_id, aside_usage_id)
aside = aside_class.parse_xml(node, self, keys)
aside.save()
def add_node_as_child(self, block, node):
"""
Called by XBlock.parse_xml to treat a child node as a child block.
"""
usage_id = self._usage_id_from_node(node, block.scope_ids.usage_id)
block.children.append(usage_id)
# Exporting XML
def export_to_xml(self, block, xmlfile):
"""
Export the block to XML, writing the XML to `xmlfile`.
"""
root = etree.Element("unknown_root", nsmap=XML_NAMESPACES)
tree = etree.ElementTree(root)
block.add_xml_to_node(root)
# write asides as children
for aside in self.get_asides(block):
if aside.needs_serialization():
aside_node = etree.Element("unknown_root", nsmap=XML_NAMESPACES)
aside.add_xml_to_node(aside_node)
block.append(aside_node)
tree.write(xmlfile, xml_declaration=True, pretty_print=True, encoding='utf-8')
def add_block_as_child_node(self, block, node):
"""
Export `block` as a child node of `node`.
"""
child = etree.SubElement(node, "unknown")
block.add_xml_to_node(child)
# Rendering
def render(self, block, view_name, context=None):
"""
Render a block by invoking its view.
Finds the view named `view_name` on `block`. The default view will be
used if a specific view hasn't be registered. If there is no default
view, an exception will be raised.
The view is invoked, passing it `context`. The value returned by the
view is returned, with possible modifications by the runtime to
integrate it into a larger whole.
"""
# Set the active view so that :function:`render_child` can use it
# as a default
old_view_name = self._view_name
self._view_name = view_name
try:
view_fn = getattr(block, view_name, None)
if view_fn is None:
view_fn = getattr(block, "fallback_view", None)
if view_fn is None:
raise NoSuchViewError(block, view_name)
view_fn = functools.partial(view_fn, view_name)
frag = view_fn(context)
# Explicitly save because render action may have changed state
block.save()
updated_frag = self.wrap_xblock(block, view_name, frag, context)
return self.render_asides(block, view_name, updated_frag, context)
finally:
# Reset the active view to what it was before entering this method
self._view_name = old_view_name
def render_child(self, child, view_name=None, context=None):
"""A shortcut to render a child block.
Use this method to render your children from your own view function.
If `view_name` is not provided, it will default to the view name you're
being rendered with.
Returns the same value as :func:`render`.
"""
return child.render(view_name or self._view_name, context)
def render_children(self, block, view_name=None, context=None):
"""Render a block's children, returning a list of results.
Each child of `block` will be rendered, just as :func:`render_child` does.
Returns a list of values, each as provided by :func:`render`.
"""
results = []
for child_id in block.children:
child = self.get_block(child_id)
result = self.render_child(child, view_name, context)
results.append(result)
return results
def wrap_xblock(self, block, view, frag, context):
"""
Creates a div which identifies the xblock and writes out the json_init_args into a script tag.
If there's a `wrap_child` method, it calls that with a deprecation warning.
The default implementation creates a frag to wraps frag w/ a div identifying the xblock. If you have
javascript, you'll need to override this impl
"""
if hasattr(self, 'wrap_child'):
log.warning("wrap_child is deprecated in favor of wrap_xblock and wrap_aside %s", self.__class__)
return self.wrap_child(block, view, frag, context)
extra_data = {'name': block.name} if block.name else {}
return self._wrap_ele(block, view, frag, extra_data)
def wrap_aside(self, block, aside, view, frag, context): # pylint: disable=unused-argument
"""
Creates a div which identifies the aside, points to the original block,
and writes out the json_init_args into a script tag.
The default implementation creates a frag to wraps frag w/ a div identifying the xblock. If you have
javascript, you'll need to override this impl
"""
return self._wrap_ele(
aside, view, frag, {
'block_id': block.scope_ids.usage_id,
'url_selector': 'asideBaseUrl',
})
def _wrap_ele(self, block, view, frag, extra_data=None):
"""
Does the guts of the wrapping the same way for both xblocks and asides. Their
wrappers provide other info in extra_data which gets put into the dom data- attrs.
"""
wrapped = Fragment()
data = {
'usage': block.scope_ids.usage_id,
'block-type': block.scope_ids.block_type,
}
data.update(extra_data)
if frag.js_init_fn:
data['init'] = frag.js_init_fn
data['runtime-version'] = frag.js_init_version
json_init = ""
# TODO/Note: We eventually want to remove: hasattr(frag, 'json_init_args')
# However, I'd like to maintain backwards-compatibility with older XBlock
# for at least a little while so as not to adversely effect developers.
# pmitros/Jun 28, 2014.
if hasattr(frag, 'json_init_args') and frag.json_init_args is not None:
json_init = (
'<script type="json/xblock-args" class="xblock_json_init_args">'
'{data}</script>'
).format(data=json.dumps(frag.json_init_args))
css_classes = self._css_classes_for(block, view)
html = "<div class='{}'{properties}>{body}{js}</div>".format(
markupsafe.escape(' '.join(css_classes)),
properties="".join(" data-%s='%s'" % item for item in list(data.items())),
body=frag.body_html(),
js=json_init)
wrapped.add_content(html)
wrapped.add_fragment_resources(frag)
return wrapped
def _css_classes_for(self, block, view):
"""
Get the list of CSS classes that the wrapping <div> should have for the
specified xblock or aside's view.
"""
block_css_entrypoint = block.entry_point.replace('.', '-')
css_classes = [
block_css_entrypoint,
f'{block_css_entrypoint}-{view}',
]
return css_classes
# Asides
def create_aside(self, block_type, keys):
"""
The aside version of construct_xblock: take a type and key. Return an instance
"""
aside_cls = XBlockAside.load_class(block_type)
return aside_cls(runtime=self, scope_ids=keys)
def get_asides(self, block):
"""
Return instances for all of the asides that will decorate this `block`.
Arguments:
block (:class:`.XBlock`): The block to render retrieve asides for.
Returns:
List of XBlockAside instances
"""
aside_instances = [
self.get_aside_of_type(block, aside_type)
for aside_type in self.applicable_aside_types(block)
]
return [
aside_instance for aside_instance in aside_instances
if aside_instance.should_apply_to_block(block)
]
# pylint: disable=unused-argument
def applicable_aside_types(self, block):
"""
Return the set of applicable aside types for this runtime and block. This method allows the runtime
to filter the set of asides it wants to support or to provide even block-level or block_type level
filtering. We may extend this in the future to also take the user or user roles.
"""
return [aside_type for aside_type, __ in XBlockAside.load_classes()]
def get_aside_of_type(self, block, aside_type):
"""
Return the aside of the given aside_type which might be decorating this `block`.
Arguments:
block (:class:`.XBlock`): The block to retrieve asides for.
aside_type (`str`): the type of the aside
"""
# TODO: This function will need to be extended if we want to allow:
# a) XBlockAsides to statically indicated which types of blocks they can comment on
# b) XBlockRuntimes to limit the selection of asides to a subset of the installed asides
# c) Optimize by only loading asides that actually decorate a particular view
if self.id_generator is None:
raise Exception("Runtimes must be supplied with an IdGenerator to load XBlockAsides.")
usage_id = block.scope_ids.usage_id
aside_cls = self.load_aside_type(aside_type)
definition_id = self.id_reader.get_definition_id(usage_id)
aside_def_id, aside_usage_id = self.id_generator.create_aside(definition_id, usage_id, aside_type)
scope_ids = ScopeIds(self.user_id, aside_type, aside_def_id, aside_usage_id)
return aside_cls(runtime=self, scope_ids=scope_ids)