diff --git a/templates/rofi/rofi-bitwarden.py b/templates/rofi/rofi-bitwarden.py index e646770..f5aa31d 100755 --- a/templates/rofi/rofi-bitwarden.py +++ b/templates/rofi/rofi-bitwarden.py @@ -1,6 +1,8 @@ {# symlink{~/bin/rofi-bitwarden.py} #} #!/usr/bin/env python3 +# TODO: totp + import os import sys import json @@ -17,15 +19,24 @@ from typing import Callable, Tuple # Time until the vault locks itself # Resets upon every retrieval # Set to 0 to disble autolocking -BW_SESSION_AUTOLOCK_TIMEOUT = 0 +BW_SESSION_AUTOLOCK_TIMEOUT = 3600 + +# Change the key under which the secret is store using keyctl +BW_SESSION_SECRET_NAME = 'bw-session' ''' Utils ''' stderr = partial(print, file=sys.stderr) -def copy_to_clipboard(value): - run_shell_command('xclip -in -selection clipboard', input_data=value) +class Clipboard(): + @staticmethod + def copy(value): + run_shell_command('xclip -in -selection clipboard', input_data=value, disable_stdout=True, disable_stderr=True) + + @staticmethod + def paste() -> str: + return run_shell_command('xclip -out -selection clipboard').stdout ''' Shell Commands @@ -36,114 +47,286 @@ class ShellOutput: stdout: str stderr: str -def run_shell_command(cmd, input_data=None): +def run_shell_command(cmd, input_data=None, raise_on_error=False, disable_stdout=False, disable_stderr=False): process = subprocess.run( - shlex.split(cmd), + cmd if type(cmd) == list else shlex.split(cmd), text=True, input=input_data, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, + stdout=None if disable_stdout else subprocess.PIPE, + stderr=None if disable_stderr else subprocess.PIPE, + check=raise_on_error, ) - + return ShellOutput( process.returncode, - process.stdout.strip(), - process.stderr.strip(), + None if disable_stdout else process.stdout.strip(), + None if disable_stderr else process.stderr.strip(), ) ''' Keyctl ''' -KEYCTL_BW_SESSION_KEY='bw-session' +class KeyctlSecret(): + def __init__(self, key, timeout = 0, keyring = '@u'): + self.key = key + self.timeout = timeout + self.keyring = keyring -def get_bw_session_token(): - request_output = run_shell_command(f'keyctl request user {KEYCTL_BW_SESSION_KEY}') - if request_output.status_code > 0: - # Ask for password - password = show_rofi_pwd_prompt() - bw_unlock_output = unlock(password) + def get_key_id(self): + output = run_shell_command(['keyctl', 'request', 'user', self.key]) + + if output.status_code > 0: + return None + + self.reset_timeout() + return output.stdout + + def store(self, value): + run_shell_command(['keyctl', 'add', 'user', self.key, value, self.keyring], raise_on_error=True) + self.reset_timeout() + + def reset_timeout(self): + if self.timeout > 0: + run_shell_command(['keyctl', 'timeout', self.key, str(self.timeout)]) + + def get_value(self): + key_id = self.get_key_id() + if key_id is None: + return None + + return run_shell_command(['keyctl', 'pipe', key_id], raise_on_error=True).stdout + +''' +Bitwarden +''' +class Bitwarden(): + def __init__(self): + self.session_secret = KeyctlSecret(BW_SESSION_SECRET_NAME, BW_SESSION_AUTOLOCK_TIMEOUT) + + def ensure_vault_unlocked(self): + self._get_session_token() + + def get_all_entries(self): + return self._get_bw_entries() + + def get_all_entries_by_uri(self, uri): + return self._get_bw_entries('--url', uri) + + def _get_session_token(self): + # Check if the secret is already set + if self.session_secret.get_key_id() is None: + # If not, ask for password through rofi + password = show_rofi_pwd_prompt() + # Use the password to unlock BW vault + session_token = self._unlock(password) + + # Store the new session token + self.session_secret.store(session_token) + + # Return the value stored by the secret manager + return self.session_secret.get_value() + + def _unlock(self, password): + bw_unlock_output = run_shell_command(['bw', 'unlock', password, '--raw']) if bw_unlock_output.status_code > 0: stderr(f'Bitwarden error: {bw_unlock_output.stderr}') exit(1) - add_output = run_shell_command(f'keyctl add user {KEYCTL_BW_SESSION_KEY} "{bw_unlock_output.stdout}" @u') - if add_output.status_code > 0: - stderr('Could not store session token') + return bw_unlock_output.stdout + + @cache + def _get_bw_entries(self, *filter_args): + session_token = self._get_session_token() + + output = run_shell_command(['bw', '--session', session_token, 'list', 'items'] + list(filter_args)) + + if output.status_code > 0: + stderr(output.stderr) exit(1) - key_id = add_output.stdout - else: - key_id = request_output.stdout - # Maybe invalidate on timeout - if BW_SESSION_AUTOLOCK_TIMEOUT > 0: - print(run_shell_command(f'keyctl timeout {key_id} "{BW_SESSION_AUTOLOCK_TIMEOUT}"')) - - pipe_output = run_shell_command(f'keyctl pipe {key_id}') - - if pipe_output.status_code > 0: - stderr('Could not fetch session token') - exit(1) - - return pipe_output.stdout + return json.loads(output.stdout) ''' -Bitwarden +Menu States ''' -def unlock(password): - return run_shell_command(f'bw unlock "{password}" --raw') +@dataclass +class MenuEntry(): + key: str + label: str -@cache -def get_all_entries(): - session_token = get_bw_session_token() +class MainMenu(): + def __init__(self, app, bw): + self.app = app + self.bw = bw + self.desired_domain = None + + def get_title(self): + return 'Bitwarden' - output = run_shell_command(f'bw --session "{session_token}" list items') + def get_entries(self): + if run_shell_command('xdotool getwindowfocus getwindowclassname').stdout == 'qutebrowser': + old_contents = Clipboard.paste() + run_shell_command('qutebrowser :yank') + self.desired_domain = Clipboard.paste() + Clipboard.copy(old_contents) - if output.status_code > 0: - stderr(output.stderr) - exit(1) + entries = [] - return json.loads(output.stdout) + if self.desired_domain is not None: + entries += [MenuEntry( + 'search-domain', + f'Search on URL: {self.desired_domain}', + )] -def get_entries_by_filter(entry_to_keys): - """This function returns: - { key: str, entries: int[] }[] - """ + entries += [ + MenuEntry('list-name', 'List by Name'), + MenuEntry('list-url', 'List by URL'), + ] - output_entries = {} - entries = get_all_entries() + return entries - for entry_index, entry in enumerate(entries): - if 'login' not in entry.keys(): - continue + def handle_input(self, key, typed_input): + if key == 'list-name': + self.app.change_menu_state(ListByNameMenu(self.app, self.bw)) + elif key == 'list-url': + self.app.change_menu_state(ListByURLMenu(self.app, self.bw)) + elif key == 'search-domain': + all_entries = self.bw.get_all_entries() + filtered_entries = self.bw.get_all_entries_by_uri(self.desired_domain) + filtered_ids = [entry['id'] for entry in filtered_entries] + + indices = [i for i, entry in enumerate(all_entries) if entry['id'] in filtered_ids] + + self.app.change_menu_state(ViewListMatchMenu(self.app, self.bw, indices)) + else: + stderr("Unreachable input") + +class ViewEntryMenu: + def __init__(self, app, bw, entry_index): + self.app = app + self.bw = bw + self.entry = self.bw.get_all_entries()[int(entry_index)] + + self.selection_options = { + 'Password': lambda e: e['login']['password'], + 'TOTP': lambda e: e['login']['totp'], + 'Username': lambda e: e['login']['username'], + 'URI': lambda e: e['login']['uris'][0]['uri'], + } + + def get_title(self): + return f'Bitwarden - Copy {self.entry["name"]}' + + def get_entries(self): + + entries = [] + + for key in self.selection_options: + try: + if self.selection_options[key](self.entry) is not None: + entries += [MenuEntry( + key, + key, + )] + except: + pass - for key in entry_to_keys(entry): - if key is None: - continue + return entries - if key in output_entries: - output_entries[key] += [entry_index] - else: - output_entries[key] = [entry_index] + def handle_input(self, key, typed_input): + value = self.selection_options[key](self.entry) + Clipboard.copy(value) - true_output = [] - for key in output_entries: - true_output += [{ - 'key': key, - 'entries': output_entries[key], - }] - return sorted(true_output, key=lambda e: e['key']) + self.app.set_finished() + +class ViewListMatchMenu: + def __init__(self, app, bw, possible_indices): + self.app = app + self.bw = bw + self.possible_indices = possible_indices + + def get_title(self): + return 'Bitwarden - List Matches' + + def get_entries(self): + raw_entries = self.bw.get_all_entries() + + menu_entries = [] + for index in self.possible_indices: + entry = raw_entries[index] + key = f'{entry["name"]} ({entry["login"]["username"]})' + + menu_entries += [MenuEntry( + str(index), + key, + )] + + return menu_entries + + def handle_input(self, key, typed_input): + self.app.change_menu_state(ViewEntryMenu(self.app, self.bw, int(key))) + +class ListByAnyFieldMenu(): + def __init__(self, app, bw, entry_to_keys_func): + self.app = app + self.bw = bw + self.entry_to_keys_func = entry_to_keys_func + + self.key_to_entry_indices_map = {} + + def get_title(self): + return 'Bitwarden - List' + + def get_entries(self): + raw_entries = self.bw.get_all_entries() + + if len(self.key_to_entry_indices_map.keys()) == 0: + for index, entry in enumerate(raw_entries): + if 'login' not in entry.keys(): + continue + + for key in self.entry_to_keys_func(entry): + if key is None: + continue + + if key in self.key_to_entry_indices_map: + self.key_to_entry_indices_map[key] += [index] + else: + self.key_to_entry_indices_map[key] = [index] + + menu_entries = [] + for index, key in enumerate(sorted(self.key_to_entry_indices_map.keys())): + menu_entries += [MenuEntry( + str(index), + key, + )] + + return menu_entries + + def handle_input(self, key, typed_input): + possible_indices = self.key_to_entry_indices_map[sorted(self.key_to_entry_indices_map.keys())[int(key)]] + + if len(possible_indices) > 1: + self.app.change_menu_state(ViewListMatchMenu(self.app, self.bw, possible_indices)) + else: + self.app.change_menu_state(ViewEntryMenu(self.app, self.bw, possible_indices[0])) + +class ListByNameMenu(ListByAnyFieldMenu): + def __init__(self, app, bw): + super().__init__(app, bw, lambda e: [e['name']]) + +class ListByURLMenu(ListByAnyFieldMenu): + def __init__(self, app, bw): + super().__init__(app, bw, lambda e: [urlparse(uri['uri']).hostname for uri in e['login']['uris']]) -''' -UI -''' def show_rofi_pwd_prompt(): output = run_shell_command('rofi -dmenu -password -i -no-fixed-num-lines -p "Master Password" -theme ~/.config/rofi/themes/askpass.rasi') if output.status_code > 0: return None return output.stdout -def show_rofi_entries(entries, title="Bitwarden", exit_on_close=False): +def show_rofi_entries(entries, title="Bitwarden", exit_on_close=True): output = run_shell_command(f'rofi -dmenu -i -p "{title}" -format "i:f"', input_data='\n'.join(entries)) if output.status_code > 0: @@ -155,64 +338,29 @@ def show_rofi_entries(entries, title="Bitwarden", exit_on_close=False): return (int(selection_index), selection_text) -''' -Main -''' -def main(): - get_bw_session_token() +class App(): + def __init__(self): + self.bw = Bitwarden() + self.menu_state = MainMenu(self, self.bw) + self._is_finished = False - main_menu_entries = [ - # (key, display, function) - ( - 'list-by-name', - 'List by Name', - lambda e: [e['name']] - ), ( - 'list-by-uri', - 'List by URI', - lambda e: [urlparse(uri['uri']).hostname for uri in e['login']['uris']] - ) - ] + def is_finished(self): + return self._is_finished - '''Main Menu''' - (index, _) = show_rofi_entries([entry[1] for entry in main_menu_entries], exit_on_close=True) + def set_finished(self): + self._is_finished = True - '''List of items''' - selection = main_menu_entries[index] + def render(self): + entries = self.menu_state.get_entries() + (index, extra_text) = show_rofi_entries([entry.label for entry in entries], title=self.menu_state.get_title()) + self.menu_state.handle_input(entries[index].key, extra_text) - entry_index_map = get_entries_by_filter(selection[2]) - (index, _) = show_rofi_entries([entry['key'] for entry in entry_index_map], title=f'Bitwarden - {selection[1]}', exit_on_close=True) - - '''List of items matching key''' - entries = get_all_entries() - selected_entries = entry_index_map[index]['entries'] - - if len(selected_entries) == 1: - index = 0 - else: - (index, _) = show_rofi_entries([f'{entries[index]["name"]} ({entries[index]["login"]["username"]})' for index in selected_entries], exit_on_close=True) - - '''List copy options''' - selection_options = { - 'Password': lambda e: e['login']['password'], - 'TOTP': lambda e: e['login']['totp'], - 'Username': lambda e: e['login']['username'], - 'URI': lambda e: e['login']['uris'][0]['uri'], - } - - selected_entry = entries[selected_entries[index]] - display_options = [] - for key in selection_options: - try: - value = selection_options[key](selected_entry) - if value is not None: - display_options += [key] - except: - pass - - (index, _) = show_rofi_entries(display_options, title=f'Bitwarden - Copy for {selected_entry["name"]}', exit_on_close=True) - copy_to_clipboard(selection_options[display_options[index]](selected_entry)) + def change_menu_state(self, new_state): + self.menu_state = new_state if __name__ == '__main__': - main() + app = App() + + while not app.is_finished(): + app.render()