|
| 1 | +# Note: This docstring is also used by this script's command line help. |
| 2 | +"""A one-stop helper for desktop app to acquire an authorization code. |
| 3 | +
|
| 4 | +It starts a web server to listen redirect_uri, waiting for auth code. |
| 5 | +It optionally opens a browser window to guide a human user to manually login. |
| 6 | +After obtaining an auth code, the web server will automatically shut down. |
| 7 | +""" |
| 8 | + |
| 9 | +import argparse |
| 10 | +import webbrowser |
| 11 | +import logging |
| 12 | + |
| 13 | +try: # Python 3 |
| 14 | + from http.server import HTTPServer, BaseHTTPRequestHandler |
| 15 | + from urllib.parse import urlparse, parse_qs, urlencode |
| 16 | +except ImportError: # Fall back to Python 2 |
| 17 | + from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler |
| 18 | + from urlparse import urlparse, parse_qs |
| 19 | + from urllib import urlencode |
| 20 | + |
| 21 | +from .oauth2 import Client |
| 22 | + |
| 23 | + |
| 24 | +logger = logging.getLogger(__file__) |
| 25 | + |
| 26 | +def obtain_auth_code(listen_port, auth_uri=None): |
| 27 | + """This function will start a web server listening on http://localhost:port |
| 28 | + and then you need to open a browser on this device and visit your auth_uri. |
| 29 | + When interaction finishes, this function will return the auth code, |
| 30 | + and then shut down the local web server. |
| 31 | +
|
| 32 | + :param listen_port: |
| 33 | + The local web server will listen at http://localhost:<listen_port> |
| 34 | + Unless the authorization server supports dynamic port, |
| 35 | + you need to use the same port when you register with your app. |
| 36 | + :param auth_uri: If provided, this function will try to open a local browser. |
| 37 | + :return: Hang indefinitely, until it receives and then return the auth code. |
| 38 | + """ |
| 39 | + exit_hint = "Visit http://localhost:{p}?code=exit to abort".format(p=listen_port) |
| 40 | + logger.warn(exit_hint) |
| 41 | + if auth_uri: |
| 42 | + page = "http://localhost:{p}?{q}".format(p=listen_port, q=urlencode({ |
| 43 | + "text": "Open this link to sign in. You may use incognito window", |
| 44 | + "link": auth_uri, |
| 45 | + "exit_hint": exit_hint, |
| 46 | + })) |
| 47 | + browse(page) |
| 48 | + server = HTTPServer(("", int(listen_port)), AuthCodeReceiver) |
| 49 | + try: |
| 50 | + server.authcode = None |
| 51 | + while not server.authcode: |
| 52 | + # Derived from |
| 53 | + # https://docs.python.org/2/library/basehttpserver.html#more-examples |
| 54 | + server.handle_request() |
| 55 | + return server.authcode |
| 56 | + finally: |
| 57 | + server.server_close() |
| 58 | + |
| 59 | +def browse(auth_uri): |
| 60 | + controller = webbrowser.get() # Get a default controller |
| 61 | + # Some Linux Distro does not setup default browser properly, |
| 62 | + # so we try to explicitly use some popular browser, if we found any. |
| 63 | + for browser in ["chrome", "firefox", "safari", "windows-default"]: |
| 64 | + try: |
| 65 | + controller = webbrowser.get(browser) |
| 66 | + break |
| 67 | + except webbrowser.Error: |
| 68 | + pass # This browser is not installed. Try next one. |
| 69 | + logger.info("Please open a browser on THIS device to visit: %s" % auth_uri) |
| 70 | + controller.open(auth_uri) |
| 71 | + |
| 72 | +class AuthCodeReceiver(BaseHTTPRequestHandler): |
| 73 | + def do_GET(self): |
| 74 | + # For flexibility, we choose to not check self.path matching redirect_uri |
| 75 | + #assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP') |
| 76 | + qs = parse_qs(urlparse(self.path).query) |
| 77 | + if qs.get('code'): # Then store it into the server instance |
| 78 | + ac = self.server.authcode = qs['code'][0] |
| 79 | + self._send_full_response('Authcode:\n{}'.format(ac)) |
| 80 | + # NOTE: Don't do self.server.shutdown() here. It'll halt the server. |
| 81 | + elif qs.get('text') and qs.get('link'): # Then display a landing page |
| 82 | + self._send_full_response( |
| 83 | + '<a href={link}>{text}</a><hr/>{exit_hint}'.format( |
| 84 | + link=qs['link'][0], text=qs['text'][0], |
| 85 | + exit_hint=qs.get("exit_hint", [''])[0], |
| 86 | + )) |
| 87 | + else: |
| 88 | + self._send_full_response("This web service serves your redirect_uri") |
| 89 | + |
| 90 | + def _send_full_response(self, body, is_ok=True): |
| 91 | + self.send_response(200 if is_ok else 400) |
| 92 | + content_type = 'text/html' if body.startswith('<') else 'text/plain' |
| 93 | + self.send_header('Content-type', content_type) |
| 94 | + self.end_headers() |
| 95 | + self.wfile.write(body.encode("utf-8")) |
| 96 | + |
| 97 | + |
| 98 | +if __name__ == '__main__': |
| 99 | + logging.basicConfig(level=logging.INFO) |
| 100 | + p = parser = argparse.ArgumentParser( |
| 101 | + description=__doc__ + "The auth code received will be shown at stdout.") |
| 102 | + p.add_argument('endpoint', |
| 103 | + help="The auth endpoint for your app. For example: " |
| 104 | + "https://login.microsoftonline.com/your_tenant/oauth2/authorize") |
| 105 | + p.add_argument('client_id', help="The client_id of your application") |
| 106 | + p.add_argument('redirect_port', type=int, help="The port in redirect_uri") |
| 107 | + args = parser.parse_args() |
| 108 | + client = Client(args.client_id, authorization_endpoint=args.endpoint) |
| 109 | + auth_uri = client.build_auth_request_uri("code") |
| 110 | + print(obtain_auth_code(args.redirect_port, auth_uri)) |
| 111 | + |
0 commit comments