Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 44 additions & 19 deletions donkey_gym/core/tcp_server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
'''
migrate something general here
'''
import os
import json
import time
import re
import asyncore
import json
import socket


def replace_float_notation(string):
"""
Replace unity float notation for languages like
French or German that use comma instead of dot.
This convert the json sent by Unity to a valid one.
Ex: "test": 1,2, "key": 2 -> "test": 1.2, "key": 2

:param string: (str) The incorrect json string
:return: (str) Valid JSON string
"""
regex_french_notation = r'"[a-zA-Z_]+":(?P<num>[0-9,E-]+),'
regex_end = r'"[a-zA-Z_]+":(?P<num>[0-9,E-]+)}'

for regex in [regex_french_notation, regex_end]:
matches = re.finditer(regex, string, re.MULTILINE)

for match in matches:
num = match.group('num').replace(',', '.')
string = string.replace(match.group('num'), num)
return string


class IMesgHandler(object):

def on_connect(self, socketHandler):
Expand All @@ -28,7 +50,7 @@ class SimServer(asyncore.dispatcher):
Receives network connections and establishes handlers for each client.
Each client connection is handled by a new instance of the SteeringHandler class.
"""

def __init__(self, address, msg_handler):
asyncore.dispatcher.__init__(self)

Expand All @@ -40,33 +62,33 @@ def __init__(self, address, msg_handler):

#let TCP stack know that we'd like to sit on this address and listen for connections
self.bind(address)

#confirm for users what address we are listening on
self.address = self.socket.getsockname()
print('binding to', self.address)

#let tcp stack know we plan to process one outstanding request to connect request each loop
self.listen(5)

#keep a pointer to our IMesgHandler handler
self.msg_handler = msg_handler


def handle_accept(self):
# Called when a client connects to our socket
client_info = self.accept()

print('got a new client', client_info[1])

#make a new steering handler to communicate with the client
SimHandler(sock=client_info[0], msg_handler=self.msg_handler)


def handle_close(self):
print("server shutdown")
# Called then server is shutdown
self.close()

if self.msg_handler:
self.msg_handler.on_close()

Expand All @@ -75,11 +97,11 @@ class SimHandler(asyncore.dispatcher):
"""
Handles messages from a single TCP client.
"""

def __init__(self, sock, chunk_size=(16*1024), msg_handler=None):
#we call our base class init
asyncore.dispatcher.__init__(self, sock=sock)

#msg_handler handles incoming messages
self.msg_handler = msg_handler

Expand All @@ -95,7 +117,7 @@ def __init__(self, sock, chunk_size=(16*1024), msg_handler=None):
#and image bytes is an empty list of partial bytes of the image as it comes in
self.data_to_read = []


def writable(self):
"""
We want to write if we have received data.
Expand All @@ -107,7 +129,7 @@ def writable(self):
def queue_message(self, msg):
json_msg = json.dumps(msg)
self.data_to_write.append(json_msg)


def handle_write(self):
"""
Expand Down Expand Up @@ -141,7 +163,7 @@ def handle_read(self):

#receive a chunK of data with the max size chunk_size from our client.
data = self.recv(self.chunk_size)

if len(data) == 0:
#this only happens when the connection is dropped
self.handle_close()
Expand All @@ -150,7 +172,7 @@ def handle_read(self):
self.data_to_read.append(data.decode("utf-8"))

messages = ''.join(self.data_to_read).split('\n')

self.data_to_read = []

for mesg in messages:
Expand All @@ -167,6 +189,9 @@ def handle_json_message(self, chunk):
We are expecing a json object
'''
try:
# Replace comma with dots for floats
# useful when using unity in a language different from English
chunk = replace_float_notation(chunk)
#convert data into a string with decode, and then load it as a json object
jsonObj = json.loads(chunk)
except Exception as e:
Expand All @@ -180,13 +205,13 @@ def handle_json_message(self, chunk):
except Exception as e:
print(e, '>>> failure during on_recv_message:', chunk)



def handle_close(self):
#when client drops or closes connection
if self.msg_handler:
self.msg_handler.on_disconnect()
self.msg_handler = None
print('connection dropped')

self.close()
self.close()
6 changes: 3 additions & 3 deletions donkey_gym/envs/donkey_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ def send_control(self, steer, throttle):
if not self.loaded:
return
msg = { 'msg_type' : 'control', 'steering': steer.__str__(), 'throttle':throttle.__str__(), 'brake': '0.0' }
self.queue_message(msg)
self.queue_message(msg)

def send_reset_car(self):
msg = { 'msg_type' : 'reset_car' }
self.queue_message(msg)
Expand All @@ -252,7 +252,7 @@ def queue_message(self, msg):
if self.verbose:
print('skiping:', msg)
return

if self.verbose:
print('sending', msg)
self.sock.queue_message(msg)