-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathdriver.py
More file actions
229 lines (189 loc) · 7.25 KB
/
driver.py
File metadata and controls
229 lines (189 loc) · 7.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File: driver.py
# Author: Leilani H. Gilpin
# Date: 30 December 2019
# Section: 1
# Email: lhg@mit.edu
# Description: High-level reasoner code.
import argparse
import requests
import sys
import logging
import nltk
from sympy import * # python symbolic package
from sympy.logic import SOPform
import itertools
from time import process_time
# TODO we may want to do some data science stuff
import pandas as pd
import numpy as np
import csv
#import commonsense.conceptnet as kb
import synthesizer.synthesize as synthesize
import monitor.reasonableness_monitor as monitor
# We may not want to use a priority queue
from queue import PriorityQueue
needs = PriorityQueue() #we initialise the PQ class instead of using a
#function to operate upon a list.
domain = None
anchors = ['animal', 'object', 'place', 'plant']
relations = ['AtLocation', 'LocatedNear'] # technically these are vehicle relations for now
class Reason:
def __init__(self, rdf, reason):
self.rdf = rdf
self.ind = 1 # for referencing
self.reason = reason
def add_reason(self, reason):
self.reason = reason
class Synthesizer:
"""Started setting this out
But really, the synthesizer just has (1) a set of candidate
explanations and (2) a priority hierarchy
"""
def __init__(self, relations, anchors, data, rules):
self.relations = relations
self.anchors = anchors
self.data = data
self.rules = rules
"""
Section on data tables
"""
def initialize(relationFile, anchorFile):
"""" Lists the relations for the semantic database
Should also have domains attached to the some of the relations
TODO: Check format of the file, etc
"""
relations = make_data_frame(relationFile)
anchors = make_data_frame(anchorFile)
def make_data_frame(fileName):
""" Makes a dataframe from the specified file
CSV files are assumed to have a heading with a:
header
info....
"""
try:
raw_relation_data = pd.read_csv(fileName)
logging.debug("%s is parsed as a csv file" %fileName)
logging.debug(raw_relation_data.head())
except pd.errors.ParserError:
raw_relation_data = pd.DataFrame()
with open(fileName, 'r') as f:
for line in f:
raw_relation_data = pd.concat( [df,
pd.DataFrame([tuple(line.strip().split('\s+'))])],
ignore_index=True )
return raw_relation_data
def clean_relations(raw_data):
"""
Cleans relation raw data from a file
"""
for element in raw_data:
print(element)
return raw_data
# Takes symbolic reasons and turns it into a natural language explanation
def make_explanation(reasons):
return reasons
# This will "imagine the future and generate a few candidate plans
# How does one do this? * Or do we assume the planner?
def generate_candiates(state):
return ["continue-forward", "stop", "veer-slow-down"]
# TODO - How will this be defined?
# Define an API for this.
def make_needs_hierachy():
needs.put(1, symbols('passenger-safety'))
needs.put(2, symbols('passenger-perceived-safety'))
needs.put(3, symbols('passenger-comfort'))
needs.put(4, symbols('route-efficiency'))
# safety of the person in the vehicle (1)
# safety of the people around
# safety of moving things (animals)
return
def check_hierarchy():
return
# Takes in an arbitrary number of explanations (tested for up to...)
# Returns an explanation and the correct intended action
# If actions are not provided by the underlying algorithm, ...TODO
# Group Explanations / explanation alignement
def synthesize_explanations(*explanations, actions):
"""
Step 1 - alignment
Step 2 - provenance
"""
for (is_reasonable, explanation) in explanations:
logging.debug(is_reasonable)
logging.debug(explanation)
action = synthesize.diagnose(explanations)
return explanations
# Return a list of symbolic triples supporting the explanation
def get_explanation_uber(symbol_list, case):
if case=='vision':
for exp in symbol_list:
add_common_sense(exp)
if similar(symbol_list):
logging.debug("They're similar")
else:
logging.debug("They're not similar")
return (False, symbol_list)
elif case=='lidar':
for exp in symbol_list:
print(exp)
symbol_list = []
return (True, symbol_list)
else: # Then it's the case for actions
for exp in symbol_list:
print(exp)
symbol_list = []
return (True, symbol_list)
def run_uber_example():
"""
Iniatives the uber example
1. Vision reports an unknown object, a vehicle, and then a bicycle
2. Lidar reports interpretted information
3. State is moving forward quickly has been going straight, etc
"""
start_time = process_time() # starting time
# From the Uber report: The vision system reports, "unknown
# object, as a vehicle, and then as a bicycle with varying
# expectations of future travel path."
vision = symbols('unknown_object vehicle bicycle')
visionExplain = monitor.snapshot_monitor(vision, anchors,relations, True)
lidar = symbols('object_moving 5_ft_tall top_left_quadrant')
lidarExplain = (symbols('reasonable'), lidar)
state = symbols('moving_quickly proceed_straight has_been_straight')
stateExplain = (symbols('reasonable'), state)
actions = symbols('forward stop veer-slow')
explanation = synthesize_explanations(visionExplain, lidarExplain,
stateExplain, actions=actions)
#expr = SOPform(sm, minterms) # find the expression
end_time = process_time() # end time
print('explanation :', explanation)
print('\n monitor processing time (in seconds):', end_time-start_time)
syn_start_time = process_time()
synthesize.uber_synthesize(explanation)
def main():
"""
Right now, this tests the Uber example
REQUIREMENT: The anchors and commonsense files need to be .csv
"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action='store_true',
help='This is the same as debug right now')
parser.add_argument("-d", "--domain", action='store_true',
help='Enter the particular domain', default='vehicle')
parser.add_argument("-r", "--relations", action='store_true',
help='The datafile with descriptions of the relations',
default='commonsense/relations.csv')
parser.add_argument("-a", "--anchors", action='store_true',
help='The datafile with descriptions of the anchors/ontology',
default='commonsense/anchors.csv')
args = parser.parse_args()
domain = args.domain
if args.verbose:
logging.basicConfig(level=logging.DEBUG,
format='%(levelname)s: %(message)s')
# Logging info for verbose and other debugging
logging.info("Verbose output.")
logging.info("Explaining behavior in the %s domain\n" %args.domain)
initialize(args.relations, args.anchors)
run_uber_example()
if __name__ == "__main__":
main()