123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- """ a clumsy attempt at a macro language to let the programmer execute code on the server (ex: determine 64bit)"""
- from . import is64bit as is64bit
- def macro_call(macro_name, args, kwargs):
- """ allow the programmer to perform limited processing on the server by passing macro names and args
- :new_key - the key name the macro will create
- :args[0] - macro name
- :args[1:] - any arguments
- :code - the value of the keyword item
- :kwargs - the connection keyword dictionary. ??key has been removed
- --> the value to put in for kwargs['name'] = value
- """
- if isinstance(args, (str, str)):
- args = [args] # the user forgot to pass a sequence, so make a string into args[0]
- new_key = args[0]
- try:
- if macro_name == "is64bit":
- if is64bit.Python(): # if on 64 bit Python
- return new_key, args[1] # return first argument
- else:
- try:
- return new_key, args[2] # else return second argument (if defined)
- except IndexError:
- return new_key, '' # else return blank
- elif macro_name == "getuser": # get the name of the user the server is logged in under
- if not new_key in kwargs:
- import getpass
- return new_key, getpass.getuser()
- elif macro_name == "getnode": # get the name of the computer running the server
- import platform
- try:
- return new_key, args[1] % platform.node()
- except IndexError:
- return new_key, platform.node()
- elif macro_name == "getenv": # expand the server's environment variable args[1]
- try:
- dflt = args[2] # if not found, default from args[2]
- except IndexError: # or blank
- dflt = ''
- return new_key, os.environ.get(args[1], dflt)
- elif macro_name == "auto_security":
- if not 'user' in kwargs or not kwargs['user']: # missing, blank, or Null username
- return new_key, 'Integrated Security=SSPI'
- return new_key, 'User ID=%(user)s; Password=%(password)s' % kwargs
- elif macro_name == "find_temp_test_path": # helper function for testing ado operation -- undocumented
- import tempfile, os
- return new_key, os.path.join(tempfile.gettempdir(), 'adodbapi_test', args[1])
- raise ValueError ('Unknown connect string macro=%s' % macro_name)
- except:
- raise ValueError ('Error in macro processing %s %s' % (macro_name, repr(args)))
- def process(args, kwargs, expand_macros=False): # --> connection string with keyword arguments processed.
- """ attempts to inject arguments into a connection string using Python "%" operator for strings
- co: adodbapi connection object
- args: positional parameters from the .connect() call
- kvargs: keyword arguments from the .connect() call
- """
- try:
- dsn = args[0]
- except IndexError:
- dsn = None
- if isinstance(dsn, dict): # as a convenience the first argument may be django settings
- kwargs.update(dsn)
- elif dsn: # the connection string is passed to the connection as part of the keyword dictionary
- kwargs['connection_string'] = dsn
- try:
- a1 = args[1]
- except IndexError:
- a1 = None
- # historically, the second positional argument might be a timeout value
- if isinstance(a1, int):
- kwargs['timeout'] = a1
- # if the second positional argument is a string, then it is user
- elif isinstance(a1, str):
- kwargs['user'] = a1
- # if the second positional argument is a dictionary, use it as keyword arguments, too
- elif isinstance(a1, dict):
- kwargs.update(a1)
- try:
- kwargs['password'] = args[2] # the third positional argument is password
- kwargs['host'] = args[3] # the fourth positional argument is host name
- kwargs['database'] = args[4] # the fifth positional argument is database name
- except IndexError:
- pass
- # make sure connection string is defined somehow
- if not 'connection_string' in kwargs:
- try: # perhaps 'dsn' was defined
- kwargs['connection_string'] = kwargs['dsn']
- except KeyError:
- try: # as a last effort, use the "host" keyword
- kwargs['connection_string'] = kwargs['host']
- except KeyError:
- raise TypeError ("Must define 'connection_string' for ado connections")
- if expand_macros:
- for kwarg in list(kwargs.keys()):
- if kwarg.startswith('macro_'): # If a key defines a macro
- macro_name = kwarg[6:] # name without the "macro_"
- macro_code = kwargs.pop(kwarg) # we remove the macro_key and get the code to execute
- new_key, rslt = macro_call(macro_name, macro_code, kwargs) # run the code in the local context
- kwargs[new_key] = rslt # put the result back in the keywords dict
- # special processing for PyRO IPv6 host address
- try:
- s = kwargs['proxy_host']
- if ':' in s: # it is an IPv6 address
- if s[0] != '[': # is not surrounded by brackets
- kwargs['proxy_host'] = s.join(('[',']')) # put it in brackets
- except KeyError:
- pass
- return kwargs
|