-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathproductionist.py
More file actions
executable file
·1790 lines (1692 loc) · 102 KB
/
productionist.py
File metadata and controls
executable file
·1790 lines (1692 loc) · 102 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import random
import copy
import re # Used to build a content unit's tree expression
import json # Used to parse JSON grammar file generated by Reductionist
import marisa_trie # Used to load a trie data structure efficiently storing all the paths through the grammar
from config import PRODUCTIONIST_REPETITION_PENALTY_MULTIPLIER, PRODUCTIONIST_REPETITION_PENALTY_RECOVERY_RATE
class Productionist(object):
"""A module for natural language generation that operates over a content bundle at runtime."""
def __init__(self, content_bundle_name, content_bundle_directory, probabilistic_mode=True,
repetition_penalty_mode=True, shuffle_candidate_sets=True, terse_mode=False, verbosity=1, seed=None):
"""Initialize a Productionist object."""
# Set the random seed, if one was specified
if seed is not None:
random.seed(seed)
self.content_bundle = content_bundle_name
# If verbosity is 0, no information will be printed out during processing; if 1, information
# about how far along Productionist is in its general processing will be printed out; if 2,
# information about the paths taken through the grammar to generate content will also be printed
self.verbosity = verbosity
# Grab the path to the directory with the content bundle
if content_bundle_directory[-1] == '/': # Strip off trailing slash, if applicable
content_bundle_directory = content_bundle_directory[:-1]
# Hold onto that path, for reference
self._grammar_file_location = content_bundle_directory
# Build the grammar in memory, as an object of the Grammar class, which is defined below
self.grammar = self._load_grammar(
grammar_file_location='{path}/{bundle_name}.grammar'.format(
path=content_bundle_directory, bundle_name=content_bundle_name
)
)
# If applicable, load the trie file at the specified location; this file contains a data structure
# (a 'trie') that efficiently stores all the semantically meaningful paths through the
# grammar; this file will have been generated by Reductionist
try:
# Check if there's a .marisa file in the content-bundle directory
trie_file_location = '{path}/{bundle_name}.marisa'.format(
path=content_bundle_directory, bundle_name=content_bundle_name
)
open(trie_file_location)
# If there is, load the trie stored in that file
self.trie = self._load_trie(trie_file_location=trie_file_location)
except IOError:
self.trie = None
# Also load a set of expressible meanings -- these pertain to each of the possible tagsets that
# generated content may come packaged with, and each expressible meaning bundles its associated
# tagset with recipes for producing that content (in the form of paths through the grammar)
self.expressible_meanings = self._load_expressible_meanings(
expressible_meanings_file_location='{path}/{bundle_name}.meanings'.format(
path=content_bundle_directory, bundle_name=content_bundle_name
)
)
# In probabilistic mode, Productionist will select which expressible meanings to target
# probabilistically, by fitting a probability distribution to the candidates using the scores
# given to them; otherwise, Productionist will simply pick the highest scoring one
self.probabilistic_mode = probabilistic_mode
# If probabilistic mode is not engaged, scoring ties between multiple candidates (expressible
# meanings, recipes, or wildcard production rules) will be broken either by simply selecting
# the first option (if shuffle_candidate_sets == False) or else by randomly selecting (if
# shuffle_candidate_sets == True)
self.shuffle_candidate_sets = shuffle_candidate_sets
# In repetition-penalty mode, semantically meaningless rules ("wildcard rules") that have been
# used to produce recently generated content are less likely to be selected again (with a decay
# rate on the penalty for selecting them); we do this by maintaining a current penalty for each
# rule that increases each time the rule is used and decays as the rule is not used
self.repetition_penalty_mode = repetition_penalty_mode
if repetition_penalty_mode:
if self.verbosity > 0:
print "Initializing new repetitions dictionary..."
self.repetition_penalties = {}
for production_rule in self.grammar.production_rules:
self.repetition_penalties[str(production_rule.head)] = 1.0
for symbol_or_runtime_expression in production_rule.body:
self.repetition_penalties[str(symbol_or_runtime_expression)] = 1.0
else:
self.repetition_penalties = {}
# In terse mode, the system will favor production rules that may produce terser dialogue
self.terse_mode = terse_mode
# The remaining path holds all the semantically meaningful production rules that the system
# is to execute as soon as they are encountered (between encountering these rules, the
# system is free to select between wildcard production rules since that only result in
# lexical/syntactic variation, i.e., not variation in the tags that are accumulated); this
# attribute gets set by self._follow_recipe()
self.remaining_path = []
# The explicit path holds all the production rules that the system ended up executing
# during generation (including ones that were selected as wildcard rules, which would thus not
# be included in the remaining path); this is saved a record of the generation process that
# produced a line, and is critically utilized to produce the bracketed expression for a line
self.explicit_path_taken = []
# The Productionist state, which includes state that was introduced by the author or by
# state updates executed upon expanding nonterminal symbols during previous generation instances;
# the state is continuously updated over the course of generating a content package
self.state = State(initial_state_dictionary=None)
# Whether Productionist is currently targeting a particular expressible meaning, which means
# that it cannot go down paths that would accumulate tags outside that meaning, or whether
# it is generating example terminal results of expanding nonterminal symbols or executing
# production rules, in which case every production rule becomes a wildcard rule
self.targeting_meaning = True
# In the course of a generation instance, we may need to score a given production rule
# thousands of times, which can become quite computationally expensive; we can cut down
# on that cost by saving any scores we compute for the duration of the generation instance,
# which we do using this dictionary (note: this is reset after each generation instance)
self.production_rule_scores = {}
# This attribute will always point to the most recent content request; it's meant to
# aid in debugging why particular generation instance has failed
self.content_request = None
def __str__(self):
"""Return string representation."""
return 'Productionist module built using the "{content_bundle}" content bundle'.format(
content_bundle=self.content_bundle
)
@property
def scoring_modes_engaged(self):
"""Return whether any mode is engaged such that candidate production rules need to be scored."""
return self.repetition_penalty_mode or self.terse_mode or self.grammar.unequal_rule_frequencies
def _load_trie(self, trie_file_location):
"""Load a trie from file (one containing the semantically meaningful paths through this grammar)."""
if self.verbosity >= 1:
print "\t-- Loading trie..."
trie = marisa_trie.Trie()
trie.load(trie_file_location)
return trie
def _load_grammar(self, grammar_file_location):
"""Load the grammar specification from file and build and return a Grammar object for it."""
if self.verbosity >= 1:
print "\t-- Loading grammar..."
grammar_object = Grammar(grammar_file_location=grammar_file_location)
if self.verbosity >= 1:
print "\t\tGrammar has {n} generable outputs".format(n=grammar_object.total_generable_outputs)
return grammar_object
def _load_expressible_meanings(self, expressible_meanings_file_location):
"""Load a set of constructed expressible meanings from file."""
if self.verbosity >= 1:
print "\t-- Loading expressible meanings..."
expressible_meanings = []
try:
f = open(expressible_meanings_file_location, 'r')
except IOError:
raise Exception(
"Cannot load expressible meanings -- there is no file located at '{filepath}'".format(
filepath=expressible_meanings_file_location
)
)
id_to_tag = self.grammar.id_to_tag
for line in f.readlines():
meaning_id, all_paths_str, all_tags_str = line.rstrip().split('\t')
if self.trie:
path_trie_keys = [int(path_trie_key) for path_trie_key in all_paths_str.split('|')]
recipes = [self.trie.restore_key(path_trie_key) for path_trie_key in path_trie_keys]
else:
recipes = [path_str for path_str in all_paths_str.split('|')]
tags = {id_to_tag[tag_id] for tag_id in all_tags_str.split(',')} if all_tags_str else set()
expressible_meanings.append(
ExpressibleMeaning(meaning_id=int(meaning_id), tags=tags, recipes=recipes)
)
expressible_meanings.sort(key=lambda em: em.id)
if self.verbosity >= 1:
print "\t\tGrammar has {n} expressible meanings".format(n=len(expressible_meanings))
return expressible_meanings
def fulfill_content_request(self, content_request):
"""Satisfy the given content request."""
# Reset temporary attributes
self._reset_temporary_attributes()
# Save the given content request, in case we'd like to inspect it for debugging purposes
self.content_request = content_request
# Prepare a new copy of the state either by merging or replacing (note: this does not alter the
# persistent state, because we make a copy here, but eventually it will if this generation instance
# is ultimately successful)
if content_request.merge_state:
initial_state_for_this_instance = self.state.merge(content_request.state)
else:
initial_state_for_this_instance = State(initial_state_dictionary=content_request.state)
# Reset the temporary variable that stores state updates made in a given generation
# instance (ultimately this information will be stored in a ContentPackage object, if
# this generation instance is successful)
initial_state_for_this_instance.updates_this_generation_instance = []
# Find all of the expressible meanings that are satisficing, given the content request
satisficing_expressible_meanings = self._compile_satisficing_expressible_meanings(
content_request=content_request
)
# If there's no satisficing expressible meanings, just return None right now
if not satisficing_expressible_meanings:
return None
# Otherwise, rank the satisficing expressible meanings
satisficing_expressible_meanings = self._rank_satisficing_expressible_meanings(
candidates=satisficing_expressible_meanings,
scoring_metric=content_request.scoring_metric
)
# Target these meanings one by one, attempting to produce content (note that the only reason that
# it may not be possible to produce content for a given candidate expressible meaning is if
# none of its recipes may be executed due to nonterminal symbols of their paths not being
# available, i.e., not having their preconditions satisfied)
for expressible_meaning in satisficing_expressible_meanings:
if self.verbosity >= 3:
print "-- Targeting EM{em_id}...".format(em_id=expressible_meaning.id)
# Target the grammar paths ("recipes") associated with this expressible meaning, one by one
candidate_recipes = list(expressible_meaning.recipes)
while candidate_recipes:
selected_recipe = self._select_recipe_for_expressible_meaning(candidate_recipes)
candidate_recipes.remove(selected_recipe)
# Execute that grammar path to produce the generated content satisfying the content request
generated_text, updated_state = self._follow_recipe(
recipe=selected_recipe,
state=initial_state_for_this_instance
)
if generated_text is not None: # Note that an empty string is valid here
# Package that up with all the associated metadata
content_package = self._build_content_package(
generated_text=generated_text,
updated_state=updated_state,
selected_recipe=selected_recipe,
content_request=content_request
)
# Update Productionist's persistent state, which allows for state to be maintained
# across generation instances
self.state = updated_state
# Lastly, if repetition-penalty mode is engaged, penalize all the rules that we executed to produce
# that content (so that they will be less likely to be used again) and decay the penalties for all
# the other production rules in the grammar that we didn't execute this time around
if self.repetition_penalty_mode:
self._update_repetition_penalties(
explicit_path_taken=content_package.explicit_grammar_path_taken)
# Return the package
return content_package
if self.verbosity >= 3:
print "-- Productionist could not satisfy the following content request:\n\n{content_request}".format(
content_request=content_request
)
return None
def render_expressible_meaning(self, expressible_meaning):
"""Render the given expressible meaning."""
if self.verbosity >= 3:
print "-- Targeting EM{em_id}...".format(em_id=expressible_meaning.id)
# Target the grammar paths ("recipes") associated with this expressible meaning, one by one
candidate_recipes = list(expressible_meaning.recipes)
while candidate_recipes:
selected_recipe = self._select_recipe_for_expressible_meaning(candidate_recipes)
candidate_recipes.remove(selected_recipe)
# Execute that grammar path to produce the generated content satisfying the content request
blank_state = State(initial_state_dictionary=None)
generated_text, updated_state = self._follow_recipe(recipe=selected_recipe, state=blank_state)
if generated_text is not None: # Note that an empty string is valid here
# Package that up with all the associated metadata
content_package = self._build_content_package(
generated_text=generated_text,
updated_state=updated_state,
selected_recipe=selected_recipe,
content_request=None
)
# Lastly, if repetition-penalty mode is engaged, penalize all the rules that we executed to produce
# that content (so that they will be less likely to be used again) and decay the penalties for all
# the other production rules in the grammar that we didn't execute this time around
if self.repetition_penalty_mode:
self._update_repetition_penalties(
explicit_path_taken=content_package.explicit_grammar_path_taken
)
# Return the package
return content_package
def _reset_temporary_attributes(self):
"""Reset any temporary attributes that were used in the course of the previous generation instance."""
self.production_rule_scores = {}
for nonterminal_symbol in self.grammar.nonterminal_symbols:
nonterminal_symbol.expansion = None
def _compile_satisficing_expressible_meanings(self, content_request):
"""Compile all satisficing expressible meanings that are satisficing.
In this case, 'satisficing' means that an expressible meaning has none of the 'must not have' tags and
all of the 'must have' tags that are specified in the content request.
"""
if self.verbosity >= 3:
print "-- Compiling satisficing expressible meanings..."
satisficing_expressible_meanings = [
em for em in self.expressible_meanings if
not (em.tags & content_request.prohibited_tags) and em.tags.issuperset(content_request.required_tags)
]
# Make sure none of these have condition tags that are currently violated
return satisficing_expressible_meanings
def _rank_satisficing_expressible_meanings(self, candidates, scoring_metric):
"""Rank all satisficing expressible meanings."""
if self.verbosity >= 3:
print "-- Ranking satisficing expressible meanings..."
if self.shuffle_candidate_sets:
random.shuffle(candidates)
if not scoring_metric:
return candidates
# If a scoring metric *was* provided in the content request, use it to rank the satisficing
# expressible meanings
scores = {}
for candidate in candidates:
scores[candidate] = self._score_expressible_meaning(
expressible_meaning=candidate, scoring_metric=scoring_metric
)
if self.verbosity >= 3:
print "-- Derived the following scores for expressible meanings:"
for candidate in scores:
print "\tEM{em_id}\t{score}".format(em_id=candidate.id, score=scores[candidate])
return sorted(candidates, key=lambda c: scores[c], reverse=True)
@staticmethod
def _score_expressible_meaning(expressible_meaning, scoring_metric):
"""Score a candidate expressible meaning using the scoring metric provided in a content request."""
score = 0
for tag, weight in scoring_metric:
if tag in expressible_meaning.tags:
score += weight
return score
def _select_recipe_for_expressible_meaning(self, candidate_recipes):
"""Select one of the grammar paths associated with the given expressible meaning."""
if self.verbosity >= 3:
if len(candidate_recipes[0].expressible_meaning.recipes) == 1:
print " Selecting EM{em_id}'s sole recipe...".format(em_id=candidate_recipes[0].expressible_meaning.id)
else:
print " Selecting one of EM{em_id}'s {n} recipes...".format(
em_id=candidate_recipes[0].expressible_meaning.id,
n=len(candidate_recipes[0].expressible_meaning.recipes)
)
if self.shuffle_candidate_sets:
random.shuffle(candidate_recipes)
# If there's only one option, we can just select that right off and move on
if len(candidate_recipes) == 1:
selected_recipe = candidate_recipes[0]
# If no scoring mode is engaged, we can just select a path randomly
elif not self.scoring_modes_engaged:
selected_recipe = random.choice(candidate_recipes)
else:
# If it is engaged, we'll want to select a path that won't generate a lot of repetition; to
# prevent repetition, we can score candidate paths according to the current repetition
# penalties attached to the symbols in the production rules on the paths; first let's
# compute a utility distribution over the candidate paths
scores = {}
for recipe in candidate_recipes:
scores[recipe] = self._score_candidate_recipe(recipe=recipe)
# Check if any candidate even earned any points; if not, we can just pick randomly
if not any(scores.values()):
selected_recipe = random.choice(candidate_recipes)
elif self.probabilistic_mode:
# Next, if probabilistic mode is engaged, fit a probability distribution to the utility distribution
probability_ranges = self._fit_probability_distribution_to_decision_candidates(scores=scores)
# Finally, select a path (using the probability distribution)
selected_recipe = self._select_candidate_given_probability_distribution(
probability_ranges=probability_ranges
)
else:
selected_recipe = max(candidate_recipes, key=lambda candidate: scores[candidate])
if self.verbosity >= 3:
print " Selected {recipe}...".format(recipe=selected_recipe)
return selected_recipe
def _score_candidate_recipe(self, recipe):
"""Return a score for the given recipe according to the scores for the production rules on its path."""
# Ground out the rule references in the recipe to form a list of actual ProductionRule
# objects; note: if there's no path string, that means that the selected path is one that
# doesn't pass through any symbols with tags; in this case, Productionist can just select
# between production rules that are not semantically meaningful until it's ground out into
# a terminal expansion
try:
path = [self.grammar.production_rules[int(rule_id)] for rule_id in recipe.path.split(',')]
except IndexError:
raise Exception(
"The path for {recipe} includes IDs for rules that do not exist. Did you forget to include "
"the .marisa file in the procedural_content directory?"
)
score = sum(self._score_candidate_production_rule(rule) for rule in path)
return score
def _score_candidate_production_rule(self, production_rule):
"""Return a score for the given production rule.
The score for this rule will be calculated according to its expansion-control tags (application
frequency and usage constraint) and, if applicable, the current repetition penalties of the symbols
symbols in its body (if repetition-penalty mode is engaged) and the number and length of symbols in
its body (if terse mode is engaged).
"""
try:
return self.production_rule_scores[production_rule]
except KeyError:
score = 1.0
# If applicable, adjust score according to repetition penalties and terseness
for symbol in production_rule.body:
if self.repetition_penalty_mode:
score *= self.repetition_penalties[str(symbol)]
if self.terse_mode:
if type(symbol) == unicode:
score /= len(symbol)
else:
# Need more testing here, and the divisor should be a constant value -- idea is to penalize
# longer sentence templates so as to avoid a local-optimum situation; it does this by dividing
# the score in half for every nonterminal symbol on the rule's right-hand side
score /= 2
# Finally, adjust score according to the application frequency associated with this rule; specifically,
# multiply the score by the rule's frequency score multiplier (which will be 1.0 or less)
score *= production_rule.frequency_score_multiplier
# Save this score so that we don't need to recompute it again during this generation instance
self.production_rule_scores[production_rule] = score
return score
def _follow_recipe(self, recipe, state):
"""Follow the given recipe to generate the desired text content."""
# First, create a copy of the given state, since we don't want to make any modifications
# to a State object that will be reused on another recipe in the case that we are not
# able to follow this one (due to a nonterminal having preconditions that don't hold)
state = state.copy()
# Ground out the rule references in the recipe to form a list of actual ProductionRule
# objects; note: if this is an empty list, that means that the selected path is one that
# doesn't pass through any symbols with tags; in this case, Productionist can just randomly
# select production rules that are not semantically meaningful until it's ground out into
# a terminal expansion
if recipe.path:
path = [self.grammar.production_rules[int(rule_id)] for rule_id in recipe.path.split(',')]
else:
path = []
# Keep this list handy as the list of remaining rules to execute -- we'll
# be consuming this as we proceed
self.remaining_path = list(path)
# Keep track of all the rules we ended up firing for this path, including our choices
# for wildcards -- we'll use this later to generate a bracketed expression specifying
# how exactly the content package was generated (for debugging/authoring purposes)
self.explicit_path_taken = []
# Execute the rules on the selected path in order to produce content expressing the
# desired semantics, which are specifically the tags associated with the targeted
# expressible meaning; this can be done by simply targeting the grammar's
# start symbol and then only using rules on the targeted path for expansion (with
# randomly chosen rules executed in each case of a wild card on the path)
generated_text, updated_state = self._terminally_expand_nonterminal_symbol(
nonterminal_symbol=self.grammar.start_symbol, state=state, n_tabs_for_debug=3
)
return generated_text, updated_state
def _terminally_expand_nonterminal_symbol(self, nonterminal_symbol, state, n_tabs_for_debug):
"""Terminally expand the given symbol."""
if self.verbosity >= 3:
print "{whitespace}Expanding nonterminal symbol [[{symbol_name}]]...".format(
whitespace=' ' * n_tabs_for_debug, symbol_name=nonterminal_symbol.name
)
# Select a production rule
rule_to_execute = None
if self.remaining_path and self.remaining_path[0] in nonterminal_symbol.production_rules:
next_rule_on_path = self.remaining_path.pop(0)
if next_rule_on_path.preconditions_hold(state=state, n_tabs_for_debug=n_tabs_for_debug+1):
rule_to_execute = next_rule_on_path
else:
if self.targeting_meaning:
candidate_wildcard_rules = (
[rule for rule in nonterminal_symbol.production_rules if not rule.semantically_meaningful]
)
else:
candidate_wildcard_rules = list(nonterminal_symbol.production_rules)
while candidate_wildcard_rules:
if self.verbosity >= 3:
print "{whitespace}Selecting wildcard rule...".format(whitespace=' ' * n_tabs_for_debug)
selected_wildcard_rule = self._select_wildcard_production_rule(
candidate_wildcard_rules=candidate_wildcard_rules
)
if selected_wildcard_rule.preconditions_hold(state=state, n_tabs_for_debug=n_tabs_for_debug+1):
rule_to_execute = selected_wildcard_rule
break
candidate_wildcard_rules.remove(selected_wildcard_rule)
if not rule_to_execute:
return None, None
# Terminally expand the symbol
terminal_expansion, updated_state = self._execute_production_rule(
rule=rule_to_execute,
state=state,
n_tabs_for_debug=n_tabs_for_debug + 1
)
# Save the terminal expansion, which may be referenced in runtime expressions using the
# operator 'result'
nonterminal_symbol.expansion = terminal_expansion
return terminal_expansion, updated_state
def _select_wildcard_production_rule(self, candidate_wildcard_rules):
"""Select a wildcard production rule that will be used to expand the given nonterminal symbol.
A "wildcard rule" is one that is not marked as being semantically meaningful, and is thus not
included on the targeted path (stored as the 'remaining_path' attribute).
"""
if self.shuffle_candidate_sets:
random.shuffle(candidate_wildcard_rules)
# If there's only choice, we can just select that and move on
if len(candidate_wildcard_rules) == 1:
selected_wildcard_rule = candidate_wildcard_rules[0]
elif not self.scoring_modes_engaged:
# If no scoring mode is engaged, we can simply randomly select a wildcard rule
selected_wildcard_rule = random.choice(candidate_wildcard_rules)
else:
# Otherwise, we need to compute a utility distribution over the candidate wildcard rules
scores = {}
for rule in candidate_wildcard_rules:
scores[rule] = self._score_candidate_production_rule(production_rule=rule)
# Check if any candidate even earned any points; if not, we can just pick randomly
if not any(scores.values()):
selected_wildcard_rule = random.choice(candidate_wildcard_rules)
elif self.probabilistic_mode:
# Next, fit a probability distribution to the utility distribution
probability_ranges = self._fit_probability_distribution_to_decision_candidates(scores=scores)
# Finally, select a wildcard rule (using the probability distribution, if probabilistic mode is engaged)
selected_wildcard_rule = self._select_candidate_given_probability_distribution(
probability_ranges=probability_ranges
)
else:
selected_wildcard_rule = max(candidate_wildcard_rules, key=lambda c: scores[c])
return selected_wildcard_rule
def _execute_production_rule(self, rule, state, n_tabs_for_debug):
"""Execute the given production rule."""
if self.verbosity >= 3:
print "{whitespace}Using production rule #{rule_id}: '{rule_spec}'".format(
whitespace=' ' * n_tabs_for_debug,
rule_id=rule.id,
rule_spec=str(rule)
)
# Create a copy of the current State object, since any modifications to the state that
# may be made by executing this production rule should only affect the state used by
# descendants of this rule; by copying here, we can revert back to an earlier state
# if we need to backtrack from this rule
state = state.copy()
# Add to our record of the explicit path we took the grammar to produce the
# content we'll be sending back
self.explicit_path_taken.append(rule)
# Terminally expand this symbol
result_of_rule_execution = []
for element in rule.body:
if type(element) is NonterminalSymbol:
# Nonterminal symbol (we must expand it, which may update the state)
terminal_expansion_of_that_symbol, state = self._terminally_expand_nonterminal_symbol(
nonterminal_symbol=element,
state=state,
n_tabs_for_debug=n_tabs_for_debug+1
)
if terminal_expansion_of_that_symbol is None:
# Preconditions were violated such that this symbol could not be expanded; short-circuit
return None, None
result_of_rule_execution.append(terminal_expansion_of_that_symbol)
elif type(element) is RuntimeExpression:
# Runtime expression (we must process it, which may update the state)
result_of_runtime_expression, state = self._process_runtime_expression(
runtime_expression=element,
state=state,
n_tabs_for_debug=n_tabs_for_debug+1
)
if result_of_runtime_expression is None:
# The runtime expression referenced something that could not be evaluated; short-circuit
# TODO SHOULD PROBABLY SURFACE ON ERROR ON THE AUTHORING INTERFACE
return None, None
# This may be a StateElement object, so we need to cast to string to make the join() call below work
result_of_rule_execution.append(str(result_of_runtime_expression))
else: # type(element) is unicode
# Terminal symbol (no need to expand)
result_of_rule_execution.append(element)
# Further update the state (in addition to any updates that may have been triggered by processing runtime
# expressions in the rule body) by executing any effects that are attached to this production rule
state = rule.execute_effects(state=state, n_tabs_for_debug=n_tabs_for_debug+1)
# Concatenate the results and return that string, along with the updated state
expansion_yielded_by_this_rule = ''.join(result_of_rule_execution)
return expansion_yielded_by_this_rule, state
def _process_runtime_expression(self, runtime_expression, state, n_tabs_for_debug):
"""Consult the state to resolve runtime expressions in the given raw generated text to fill in its gaps."""
# First, terminally expand any nonterminal symbols that are referenced in the runtime expression
realized_expression_definition = self._resolve_symbol_references_in_expression_definition(
definition=runtime_expression.definition,
state=state,
n_tabs_for_debug=n_tabs_for_debug
)
# Finally, execute the runtime expression and return the result (note: side effects may also
# trigger updates to the state)
result, updated_state = self._execute_runtime_expression(
runtime_expression=runtime_expression,
realized_expression_definition=realized_expression_definition,
state=state,
n_tabs_for_debug=n_tabs_for_debug+1
)
# Before returning the result, make sure it's a viable type for inclusion in a text
# output (unicode, str, int, float)
if type(result) not in (unicode, str, int, float):
return None, None
return result, updated_state
def _resolve_symbol_references_in_expression_definition(self, definition, state, n_tabs_for_debug):
"""Terminally expand any nonterminal symbols that are referenced in the given runtime expression."""
grounded_expression_definition = []
for expression_operator_or_symbol_reference in definition:
if type(expression_operator_or_symbol_reference) is NonterminalSymbol:
referenced_symbol = expression_operator_or_symbol_reference
# Note that this will cause the state to update (while still using the same variable name 'state')
terminal_expansion_of_that_symbol, state = self._terminally_expand_nonterminal_symbol(
nonterminal_symbol=referenced_symbol,
state=state,
n_tabs_for_debug=n_tabs_for_debug
)
# Turn this into a string literal, since otherwise it will be evaluated as a key in the state
string_literal_of_terminal_expansion = "'{expansion}'".format(
expansion=terminal_expansion_of_that_symbol
)
grounded_expression_definition.append(string_literal_of_terminal_expansion)
else:
expression_operator = expression_operator_or_symbol_reference
grounded_expression_definition.append(expression_operator)
return grounded_expression_definition
def _execute_runtime_expression(self, runtime_expression, realized_expression_definition, state, n_tabs_for_debug):
"""Execute the given runtime expression."""
# First, check if it's a ternary expression, in which case its condition must be evaluated
# to determine which of its subexpressions we will actually be executing, which we will execute
# recursively (since this allows for nested ternary expressions)
if runtime_expression.is_ternary_expression:
condition_holds = state.evaluate(predicate=runtime_expression.condition.definition)
if condition_holds:
nested_runtime_expression = runtime_expression.expression_if_condition_passes
else:
nested_runtime_expression = runtime_expression.expression_if_condition_fails
realized_nested_expression_definition = self._resolve_symbol_references_in_expression_definition(
definition=nested_runtime_expression.definition,
state=state,
n_tabs_for_debug=n_tabs_for_debug
)
return self._execute_runtime_expression(
runtime_expression=nested_runtime_expression,
realized_expression_definition=realized_nested_expression_definition,
state=state,
n_tabs_for_debug=n_tabs_for_debug
)
# Execute the runtime expression at hand
variable_to_set = None
value_to_set_variable_to = None
if runtime_expression.is_with_expression:
# Example: ['villain.name.last', 'with', 'speaker.worst_enemy', 'as', 'villain']
reference, with_operator, value_to_set_variable_to, as_operator, variable_to_set = (
realized_expression_definition
)
elif runtime_expression.is_as_expression:
# Example: ["'Jeff'", 'as', 'name']
value_to_set_variable_to, as_operator, variable_to_set = realized_expression_definition
reference = variable_to_set
elif runtime_expression.is_simple_expression:
# Example: [name]
reference = realized_expression_definition[0]
# Handle salted runtime expressions that don't produce a string; w're allowing these expressions to be
# salted into rule bodies, but since they don't return a string, we set 'reference' to the empty string,
# which will cause nothing to be realized in the generated text when one of these expressions is salted
# in the body of an executed rule
elif runtime_expression.is_declaration_expression:
# Examples: ['name', '=', "'Jeff'"], ['story.tension', '+=', "5"]
variable_to_set, equals_sign, value_to_set_variable_to = realized_expression_definition
reference = ''
else: # runtime_expression.is_increment_expression
# Examples: ['story.tension' '-=' '5'], ['story.tension' '+=' 'story.act.current.tension']
variable_to_set = realized_expression_definition[0]
operator = realized_expression_definition[1]
current_value = state.resolve(value=variable_to_set)
increment = state.resolve(value=realized_expression_definition[2])
if operator == '+=':
value_to_set_variable_to = current_value + increment
elif operator == '-=':
value_to_set_variable_to = current_value - increment
elif operator == '*=':
value_to_set_variable_to = current_value * increment
else: # /=
value_to_set_variable_to = float(current_value/increment)
reference = ''
if variable_to_set:
state.update(key=variable_to_set, value=value_to_set_variable_to)
result = state.resolve(value=reference)
return result, state
def _resolve_conditional_expression_in_runtime_expression(self, conditional_expression, state):
"""Parse the given conditional expression by referencing the state.
Here is an example conditional expression: "best_friend.name if best_friend else 'John'". Note
that a ternary operator is at work here and that it may be chained to produce conditional expressions
like "best_friend.name if best_friend else worst_enemy.name if worst_enemy else 'John". Finally,
the argument passed for 'conditional_expression' will more specifically be a list, e.g.,
['best_friend.name', 'if', "best_friend.name != 'Paul'", 'else', 'John'].
"""
# Test the first condition (note that we treat a variable not being included in the state as a failed test)
condition_str = ''.join(
conditional_expression[conditional_expression.index('if'):conditional_expression.index('else')]
)
if state.resolve(value=condition_str):
resolved_conditional_expression = conditional_expression[0]
else:
resolved_conditional_expression = conditional_expression[conditional_expression.index('else')+1:]
# If there's a nested conditional expression here, we need to also process this
if 'if' in resolved_conditional_expression:
resolved_conditional_expression = self._resolve_conditional_expression_in_runtime_expression(
conditional_expression=conditional_expression,
state=state
)
return resolved_conditional_expression
def _build_content_package(self, generated_text, updated_state, targeted_symbol=None, selected_recipe=None,
content_request=None):
"""Furnish an object that packaged the generated text with its accumulated tags and other metadata."""
# Collect all the tags attached to the symbols along the path we took -- these are the
# tags that will come bundled with the generated content
tags = set()
for production_rule in self.explicit_path_taken:
tags |= set(production_rule.tags)
# Produce a bracketed expression specifying the specific path through the grammar that
# produced this generated content (useful for debugging/authoring purposes); first, we'll
# need to save a copy of the explicit path that we took through the grammar, since this
# will be consumed as we build the bracketed expression
explicit_path_taken = list(self.explicit_path_taken)
# if targeted_symbol:
# bracketed_expression = self._produce_bracketed_expression(symbol_to_start_from=targeted_symbol)
# else:
# bracketed_expression = self._produce_bracketed_expression()
bracketed_expression = ''
# Instantiate an Output object
output = ContentPackage(
text=generated_text,
tags=tags,
recipe=selected_recipe,
explicit_grammar_path_taken=explicit_path_taken,
bracketed_expression=bracketed_expression,
updated_state=updated_state
)
# Lastly, if this content is meant to fulfill a content request, check to make sure that it does so
if content_request:
content_fulfills_the_request = (
not (output.tags & content_request.prohibited_tags) and
output.tags.issuperset(content_request.required_tags)
)
assert content_fulfills_the_request, "The generated content package does not satisfy the content request."
return output
def _produce_bracketed_expression(self, symbol_to_start_from=None):
"""Produce a bracketed expression for a given grammar path.
Bracketed expressions can be useful for debugging purposes, since they provide an explicit
account of how a content package was generated.
"""
# Unless this content was produced by explicitly targeting a nonterminal symbol or
# production rule (to support live authoring feedback), we'll want to start at the
# grammar's start symbol
if not symbol_to_start_from:
symbol_to_start_from = self.grammar.start_symbol
bracketed_expression = self._expand_nonterminal_symbol_to_produce_bracketed_expression_fragment(
nonterminal_symbol=symbol_to_start_from
)
return bracketed_expression
def _expand_nonterminal_symbol_to_produce_bracketed_expression_fragment(self, nonterminal_symbol):
"""Expand the given symbol to produce the next fragment of the bracketed expression being produced."""
try:
# Retrieve the next production rule
next_rule = self.explicit_path_taken.pop(0)
# Make sure that the next production rule on the path is one of this symbol's
# rules; if it's not, throw an error
assert next_rule in nonterminal_symbol.production_rules, (
"Error: Expected rule #{rule_id} to be a production rule of the symbol {symbol_name}".format(
rule_id=next_rule.id,
symbol_name=nonterminal_symbol.name
)
)
# Use the next production rule on the path to produce the next fragment of the bracketed expression
return self._execute_production_rule_to_produce_bracketed_expression_fragment(rule=next_rule)
except IndexError:
# This nonterminal symbol currently has no production rules, so we'll just return the bracketed
# expression
bracketed_expression_fragment = "{head}{head_tags}[{results}]".format(
head=nonterminal_symbol.name,
head_tags=' <{tags}>'.format(tags=', '.join(t for t in nonterminal_symbol.tags)),
results='[[{symbol}]]'.format(symbol=nonterminal_symbol.name)
)
return bracketed_expression_fragment
def _execute_production_rule_to_produce_bracketed_expression_fragment(self, rule):
"""Execute the given production rule to produce the next fragment of the bracketed expression being produced."""
results_of_executing_this_rule = []
for symbol in rule.body:
if type(symbol) == unicode: # Terminal symbol (no need to expand)
results_of_executing_this_rule.append(
'"{terminal_symbol}"'.format(terminal_symbol=symbol)
)
elif type(symbol) is NonterminalSymbol: # Nonterminal symbol (we must expand it)
terminal_expansion_of_that_symbol = (
self._expand_nonterminal_symbol_to_produce_bracketed_expression_fragment(nonterminal_symbol=symbol)
)
results_of_executing_this_rule.append(terminal_expansion_of_that_symbol)
else: # type(symbol) == RuntimeExpression
pass # TODO
# Concatenate the results and return that string
bracketed_expression_fragment = "{head}{head_tags}[{results}]".format(
head=rule.head.name,
head_tags=' <{tags}>'.format(tags=', '.join(t for t in rule.head.tags)) if rule.head.tags else '',
results='|'.join(results_of_executing_this_rule)
)
return bracketed_expression_fragment
def _update_repetition_penalties(self, explicit_path_taken):
"""Update repetition penalties to increase the penalties for symbols we just used and decay the penalty
for all the symbols we did not use this time around.
"""
symbols_used_this_time = set()
for rule in explicit_path_taken:
for symbol in rule.body:
symbols_used_this_time.add(symbol)
for symbol in self.grammar.nonterminal_symbols + self.grammar.terminal_symbols:
if symbol in symbols_used_this_time:
self.repetition_penalties[str(symbol)] *= PRODUCTIONIST_REPETITION_PENALTY_MULTIPLIER
else:
self.repetition_penalties[str(symbol)] *= PRODUCTIONIST_REPETITION_PENALTY_RECOVERY_RATE
self.repetition_penalties[str(symbol)] = min(1.0, self.repetition_penalties[str(symbol)])
@staticmethod
def _fit_probability_distribution_to_decision_candidates(scores):
"""Return a dictionary mapping each of the decision candidates to a probability range."""
candidates = sorted(scores.keys())
# Determine the individual probabilities of each candidate
individual_probabilities = {}
sum_of_all_scores = float(sum(scores.values()))
for candidate in candidates:
probability = scores[candidate] / sum_of_all_scores
individual_probabilities[candidate] = probability
# Use those individual probabilities to associate each candidate with a specific
# probability range, such that generating a random value between 0.0 and 1.0 will fall
# into one and only one candidate's probability range
probability_ranges = {}
current_bound = 0.0
for candidate in candidates:
probability = individual_probabilities[candidate]
probability_range_for_this_candidate = (current_bound, current_bound + probability)
probability_ranges[candidate] = probability_range_for_this_candidate
current_bound += probability
# Make sure the last bound indeed extends to 1.0 (necessary because of float rounding issues)
last_candidate_to_have_a_range_attributed = candidates[-1]
probability_ranges[last_candidate_to_have_a_range_attributed] = (
probability_ranges[last_candidate_to_have_a_range_attributed][0], 1.0
)
return probability_ranges
def _select_candidate_given_probability_distribution(self, probability_ranges):
"""Return a selected decision candidate.
If probabilistic mode is engaged, the system will probabilistically select; otherwise, it
will simply return the highest scoring candidate.
"""
if self.probabilistic_mode:
x = random.random()
selection = next(
candidate for candidate in probability_ranges if
probability_ranges[candidate][0] <= x <= probability_ranges[candidate][1]
)
else: # Pick the highest-scoring one, i.e., the most probable one
selection = max(
probability_ranges,
key=lambda candidate: probability_ranges[candidate][1] - probability_ranges[candidate][0]
)
return selection
class ExpressibleMeaning(object):
"""An 'expressible meaning' is a particular meaning (i.e., collection of tags), bundled with
recipes (i.e., collection of compressed grammar paths) for generating content that will come
with those tags.
The recipes for generating the desired content are specified as compressed paths through the grammar,
and they are reified as objects of the class Recipe, defined below.
"""
def __init__(self, meaning_id, tags, recipes):
"""Initialize an ExpressibleMeaning object."""
self.id = meaning_id
# A set including all the tags associated with this expressible meaning; these can be thought
# of as the semantics that are associated with all the paths through the grammar that this
# expressible meaning indexes
self.tags = tags
# A list of the recipes for generating content that expresses the associated meaning; each is
# represented as a compressed grammar path (i.e., one that, if its rules are executed in order,
# will produce the exact set of tags associated with this expressible meaning)
self.recipes = self._init_build_recipes(recipes=recipes)
def __str__(self):
"""Return string representation."""
return "An expressible meaning associated with the following tags:\n\t- {}".format(
'\n\t- '.join(self.tags)
)
def _init_build_recipes(self, recipes):
"""Return a list of Recipe objects, each corresponding to one of the grammar paths associated with
this expressible meaning.
"""
recipe_objects = []
for i in xrange(len(recipes)):
grammar_path = recipes[i]
recipe_objects.append(Recipe(recipe_id=i, expressible_meaning=self, grammar_path=grammar_path))
recipe_objects.sort(key=lambda r: r.id)
return recipe_objects
class Recipe(object):
"""A recipe, in the form of a compressed grammar path, for generating content associated with a specific
expressible meaning.
By 'compressed grammar path', I mean a chain of semantically meaningful production rules that,
when executed in the given order, will produce the desired content. Production rules that are not
semantically meaningful are not included in these compressed paths (this is the source of compression),
which means Productionist is free to choose randomly when its candidate production rules for the symbol
it is currently expanding does not include the next symbol on its target path. This works just fine,
because production rules that are not semantically meaningful cannot cause unwanted tags (or any
tags, for that matter) to be accumulated, which means the random choices only produce lexical variation
and not semantic variation. More precisely, the rules won't be chosen randomly, but according to all
the concerns that are used to score rules that are not semantically meaningful ('wildcard rules'):
repetition penalties, author assigned application frequencies and usage constraints, etc.
"""
def __init__(self, recipe_id, expressible_meaning, grammar_path):
"""Initialize a Recipe object."""
self.id = recipe_id
self.expressible_meaning = expressible_meaning
self.name = '{meaning_id}-{recipe_id}'.format(meaning_id=expressible_meaning.id, recipe_id=self.id)
self.path = grammar_path
def __str__(self):
"""Return string representation."""
return "Recipe {name}".format(name=self.name)
class ContentRequest(object):
"""A content request submitted to a Productionist module."""
def __init__(self, required_tags=None, prohibited_tags=None, scoring_metric=None, state=None, merge_state=True):
"""Initialize a ContentRequest object."""
# Tags that must come packaged with the generated content (a set of strings of the form 'tagset:tag')
self.required_tags = required_tags if required_tags else set()
# Tags that must *not* come packaged with the generated content (a set of strings of the form 'tagset:tag')
self.prohibited_tags = prohibited_tags if prohibited_tags else set()
# A set of (tag, weight) tuples specifying the desirability of optional tags (each tag is a string
# of the form 'tagset:tag')
self.scoring_metric = scoring_metric if scoring_metric else set()
# The state included in a content request will be set as the initial Productionist state
# prior to that module's attempt to fulfill the content request
self.state = state if state is not None else {}
# If 'merge_state' == True, the state given here will be merged with the existing Productionist
# state, otherwise the state given here will replace that state
self.merge_state = merge_state
def __str__(self):
"""Return string representation."""
return (
"\n\tRequired tags: {}".format(', '.join(self.required_tags) if self.required_tags else 'N/A') +
"\n\tProhibited tags: {}".format(
', '.join(self.prohibited_tags) if self.prohibited_tags else 'N/A') +
"\n\tScoring metric: {}".format(
', '.join(str(t) for t in self.scoring_metric) if self.scoring_metric else 'N/A'
)
)
class State(object):
"""A chunk of system state that is used to evaluate preconditions attached to nonterminal symbols and to
fill in template gaps that may appear in generated content.
"""
def __init__(self, initial_state_dictionary, updates_this_generation_instance=None):
"""Initialize a State object."""
self.initial = initial_state_dictionary if initial_state_dictionary is not None else {}
self.now = copy.deepcopy(self.initial)
if updates_this_generation_instance is None:
self.updates_this_generation_instance = []
else:
self.updates_this_generation_instance = updates_this_generation_instance
def resolve(self, value, suppress_warning=False):
"""Resolve the given value, which is either a primitive type or a state reference."""
# If it's a string literal, strip off the single quotes and return the result (the actual string
# being referenced in the string literal)
if value.startswith("'") or value.startswith('"'):
return value[1:-1]
# If it evaluates to a valid primitive type (int, float, bool), return the evaluated form
try:
if type(eval(value)) in (int, float, bool):
return eval(value)
except NameError:
pass
# If it includes an arithmetic operator, resolve the operand values, do the (recursive)
# arithmetic, and return the result
for arithmetic_operator in ('+', '-', '*', '/'):
if arithmetic_operator in value:
# Only split on first occurrence of the operator, since this allows nested operators
left_operand, right_operand = value.split(arithmetic_operator, 1)
# Strip off whitespace (e.g., as in 'scene.tension * 2')
left_operand, right_operand = left_operand.strip(), right_operand.strip()
resolved_left_operand = self.resolve(value=left_operand)
resolved_right_operand = self.resolve(value=right_operand)
operation_str = '{left_operand} {arithmetic_operator} {right_operand}'.format(