Source code for volttroncentral.sessions

import errno
import os
import uuid
from copy import deepcopy

[docs]class SessionHandler: """A handler for dealing with authentication of sessions The SessionHandler requires an authenticator to be handed in to this object in order to authenticate user. The authenticator must implement an interface that expects a method called authenticate with parameters username and password. The return value must be either a list of groups the user belongs two or None. If successful then the a session token is generated and added to a cache of validated users to be able to be checked against. The user's ip address is stored with the token for further checking of authentication. """ def __init__(self, authenticator): self._sessions = {} self._session_tokens = {} self._authenticator = authenticator self._stored_session_path = None
[docs] def clear(self): self._sessions.clear() self._session_tokens.clear()
[docs] def authenticate(self, username, password, ip): """Authenticates a user with the authenticator. This is the main login function for the system. """ groups = self._authenticator.authenticate(username, password) if groups: token = str(uuid.uuid4()) self._add_session(username, token, ip, ",".join(groups)) self._store_session() return token return None
def _add_session(self, user, token, ip, groups): """Add a user session to the session cache""" self._sessions[user] = {'user': user, 'token': token, 'ip': ip, 'groups': groups} self._session_tokens[token] = self._sessions[user]
[docs] def check_session(self, token, ip): """Check if a user token has been authenticated. @return: A users session information or False. """ if not self._session_tokens: self._load_auths() session = self._session_tokens.get(str(token)) if session: if session['ip'] != ip: return False return deepcopy(session) return False
def _store_session(self): # Disable the storing of sessions to disk. return # if not self._stored_session_path: # self._get_auth_storage() # # with open(self._stored_session_path, 'wb') as file: # file.write(jsonapi.dumps(self._sessions)) def _load_auths(self): self._sessions = {} return # # if not self._stored_session_path: # self._get_auth_storage() # try: # with open(self._stored_session_path) as file: # self._sessions = jsonapi.loads(file.read()) # # self._session_tokens.clear() # for k, v in self._sessions.items(): # self._session_tokens[v['token']] = v # except IOError: # pass def _get_auth_storage(self): if not os.environ.get('VOLTTRON_HOME', None): raise ValueError('VOLTTRON_HOME environment must be set!') db_path = os.path.join(os.environ.get('VOLTTRON_HOME'), 'data/volttron.central.sessions') db_dir = os.path.dirname(db_path) try: os.makedirs(db_dir) except OSError as exc: if exc.errno != errno.EEXIST or not os.path.isdir(db_dir): raise self._stored_session_path = db_path