From 9f19b7462fc0c4aa7206615c251fdc41e52447f2 Mon Sep 17 00:00:00 2001 From: Daniel-I-Am Date: Mon, 6 Jun 2022 13:38:22 +0200 Subject: [PATCH] Finish first version of bitwarden script --- templates/rofi/rofi-bitwarden.py | 233 +++++++++++++++++++++++++------ 1 file changed, 191 insertions(+), 42 deletions(-) diff --git a/templates/rofi/rofi-bitwarden.py b/templates/rofi/rofi-bitwarden.py index b42e28e..e646770 100755 --- a/templates/rofi/rofi-bitwarden.py +++ b/templates/rofi/rofi-bitwarden.py @@ -4,65 +4,214 @@ import os import sys import json +import shlex +import subprocess -BW_SESSION_FILE = os.path.join('/var/run/user', str(os.getuid()), 'bitwarden-session') +from io import StringIO +from dataclasses import dataclass +from urllib.parse import urlparse +from functools import cache, partial -def does_bw_session_exist(): - return os.path.exists(BW_SESSION_FILE) +from typing import Callable, Tuple -def show_rofi_pwd_prompt(): - return os.popen('rofi -dmenu -password -i -no-fixed-num-lines -p "Master Password" -theme ~/.config/rofi/themes/askpass.rasi').read().strip() +# Time until the vault locks itself +# Resets upon every retrieval +# Set to 0 to disble autolocking +BW_SESSION_AUTOLOCK_TIMEOUT = 0 -def get_bw_session(): - if not does_bw_session_exist(): - return None +''' +Utils +''' +stderr = partial(print, file=sys.stderr) - with open(BW_SESSION_FILE, 'r') as file: - return file.read() +def copy_to_clipboard(value): + run_shell_command('xclip -in -selection clipboard', input_data=value) -def is_logged_in(): - return os.system('bw login --check > /dev/null') == 0 +''' +Shell Commands +''' +@dataclass +class ShellOutput: + status_code: int + stdout: str + stderr: str -def login(): - os.spawnl(os.P_WAIT, '/usr/bin/alacritty', 'alacritty', '-e', 'sh', '-c', 'bw login') - -def unlock(): - password = show_rofi_pwd_prompt() +def run_shell_command(cmd, input_data=None): + process = subprocess.run( + shlex.split(cmd), + text=True, + input=input_data, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) - if len(password) > 0: - # TODO: pipe in the password - os.system(f'bw unlock "{password}" --raw > {BW_SESSION_FILE}') + return ShellOutput( + process.returncode, + process.stdout.strip(), + process.stderr.strip(), + ) -def lock(): - if not does_bw_session_exist(): - return False +''' +Keyctl +''' +KEYCTL_BW_SESSION_KEY='bw-session' - return os.system(f'bw --session "{get_bw_session()}" --nointeraction lock > /dev/null') == 0 +def get_bw_session_token(): + request_output = run_shell_command(f'keyctl request user {KEYCTL_BW_SESSION_KEY}') + if request_output.status_code > 0: + # Ask for password + password = show_rofi_pwd_prompt() + bw_unlock_output = unlock(password) -def is_vault_unlocked(): - if not does_bw_session_exist(): - return False + if bw_unlock_output.status_code > 0: + stderr(f'Bitwarden error: {bw_unlock_output.stderr}') + exit(1) - return os.system(f'bw --session "{get_bw_session()}" --nointeraction list items > /dev/null 2>&1') == 0 + add_output = run_shell_command(f'keyctl add user {KEYCTL_BW_SESSION_KEY} "{bw_unlock_output.stdout}" @u') + if add_output.status_code > 0: + stderr('Could not store session token') + exit(1) + key_id = add_output.stdout + else: + key_id = request_output.stdout -def debug(msg): - os.spawnl(os.P_WAIT, '/usr/bin/alacritty', 'alacritty', '-e', 'sh', '-c', f'echo "{msg}" && cat -') + # Maybe invalidate on timeout + if BW_SESSION_AUTOLOCK_TIMEOUT > 0: + print(run_shell_command(f'keyctl timeout {key_id} "{BW_SESSION_AUTOLOCK_TIMEOUT}"')) + pipe_output = run_shell_command(f'keyctl pipe {key_id}') + + if pipe_output.status_code > 0: + stderr('Could not fetch session token') + exit(1) + + return pipe_output.stdout + +''' +Bitwarden +''' +def unlock(password): + return run_shell_command(f'bw unlock "{password}" --raw') + +@cache +def get_all_entries(): + session_token = get_bw_session_token() + + output = run_shell_command(f'bw --session "{session_token}" list items') + + if output.status_code > 0: + stderr(output.stderr) + exit(1) + + return json.loads(output.stdout) + +def get_entries_by_filter(entry_to_keys): + """This function returns: + { key: str, entries: int[] }[] + """ + + output_entries = {} + entries = get_all_entries() + + for entry_index, entry in enumerate(entries): + if 'login' not in entry.keys(): + continue + + for key in entry_to_keys(entry): + if key is None: + continue + + if key in output_entries: + output_entries[key] += [entry_index] + else: + output_entries[key] = [entry_index] + + true_output = [] + for key in output_entries: + true_output += [{ + 'key': key, + 'entries': output_entries[key], + }] + return sorted(true_output, key=lambda e: e['key']) + +''' +UI +''' +def show_rofi_pwd_prompt(): + output = run_shell_command('rofi -dmenu -password -i -no-fixed-num-lines -p "Master Password" -theme ~/.config/rofi/themes/askpass.rasi') + if output.status_code > 0: + return None + return output.stdout + +def show_rofi_entries(entries, title="Bitwarden", exit_on_close=False): + output = run_shell_command(f'rofi -dmenu -i -p "{title}" -format "i:f"', input_data='\n'.join(entries)) + + if output.status_code > 0: + if exit_on_close: + exit(0) + return (None, None) + + [selection_index, selection_text] = output.stdout.split(':') + + return (int(selection_index), selection_text) + +''' +Main +''' def main(): - if not is_logged_in(): - login() - if not is_logged_in(): - print('You did nog log in', file=sys.stderr) - exit(1) + get_bw_session_token() - if not is_vault_unlocked(): - unlock() - if not is_vault_unlocked(): - print('You did not unlock the vault', file=sys.stderr) - exit(1) + main_menu_entries = [ + # (key, display, function) + ( + 'list-by-name', + 'List by Name', + lambda e: [e['name']] + ), ( + 'list-by-uri', + 'List by URI', + lambda e: [urlparse(uri['uri']).hostname for uri in e['login']['uris']] + ) + ] - print("Signed in and unlocked, cool") - debug("Done") + '''Main Menu''' + (index, _) = show_rofi_entries([entry[1] for entry in main_menu_entries], exit_on_close=True) + + '''List of items''' + selection = main_menu_entries[index] + + entry_index_map = get_entries_by_filter(selection[2]) + (index, _) = show_rofi_entries([entry['key'] for entry in entry_index_map], title=f'Bitwarden - {selection[1]}', exit_on_close=True) + + '''List of items matching key''' + entries = get_all_entries() + selected_entries = entry_index_map[index]['entries'] + + if len(selected_entries) == 1: + index = 0 + else: + (index, _) = show_rofi_entries([f'{entries[index]["name"]} ({entries[index]["login"]["username"]})' for index in selected_entries], exit_on_close=True) + + '''List copy options''' + selection_options = { + 'Password': lambda e: e['login']['password'], + 'TOTP': lambda e: e['login']['totp'], + 'Username': lambda e: e['login']['username'], + 'URI': lambda e: e['login']['uris'][0]['uri'], + } + + selected_entry = entries[selected_entries[index]] + display_options = [] + for key in selection_options: + try: + value = selection_options[key](selected_entry) + if value is not None: + display_options += [key] + except: + pass + + (index, _) = show_rofi_entries(display_options, title=f'Bitwarden - Copy for {selected_entry["name"]}', exit_on_close=True) + copy_to_clipboard(selection_options[display_options[index]](selected_entry)) if __name__ == '__main__': main()