Refactor rofi-bitwarden script
This commit is contained in:
parent
9f19b7462f
commit
e518752bc3
@ -1,6 +1,8 @@
|
|||||||
{# symlink{~/bin/rofi-bitwarden.py} #}
|
{# symlink{~/bin/rofi-bitwarden.py} #}
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# TODO: totp
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import json
|
import json
|
||||||
@ -17,15 +19,24 @@ from typing import Callable, Tuple
|
|||||||
# Time until the vault locks itself
|
# Time until the vault locks itself
|
||||||
# Resets upon every retrieval
|
# Resets upon every retrieval
|
||||||
# Set to 0 to disble autolocking
|
# Set to 0 to disble autolocking
|
||||||
BW_SESSION_AUTOLOCK_TIMEOUT = 0
|
BW_SESSION_AUTOLOCK_TIMEOUT = 3600
|
||||||
|
|
||||||
|
# Change the key under which the secret is store using keyctl
|
||||||
|
BW_SESSION_SECRET_NAME = 'bw-session'
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Utils
|
Utils
|
||||||
'''
|
'''
|
||||||
stderr = partial(print, file=sys.stderr)
|
stderr = partial(print, file=sys.stderr)
|
||||||
|
|
||||||
def copy_to_clipboard(value):
|
class Clipboard():
|
||||||
run_shell_command('xclip -in -selection clipboard', input_data=value)
|
@staticmethod
|
||||||
|
def copy(value):
|
||||||
|
run_shell_command('xclip -in -selection clipboard', input_data=value, disable_stdout=True, disable_stderr=True)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def paste() -> str:
|
||||||
|
return run_shell_command('xclip -out -selection clipboard').stdout
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Shell Commands
|
Shell Commands
|
||||||
@ -36,68 +47,99 @@ class ShellOutput:
|
|||||||
stdout: str
|
stdout: str
|
||||||
stderr: str
|
stderr: str
|
||||||
|
|
||||||
def run_shell_command(cmd, input_data=None):
|
def run_shell_command(cmd, input_data=None, raise_on_error=False, disable_stdout=False, disable_stderr=False):
|
||||||
process = subprocess.run(
|
process = subprocess.run(
|
||||||
shlex.split(cmd),
|
cmd if type(cmd) == list else shlex.split(cmd),
|
||||||
text=True,
|
text=True,
|
||||||
input=input_data,
|
input=input_data,
|
||||||
stdout=subprocess.PIPE,
|
stdout=None if disable_stdout else subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE,
|
stderr=None if disable_stderr else subprocess.PIPE,
|
||||||
|
check=raise_on_error,
|
||||||
)
|
)
|
||||||
|
|
||||||
return ShellOutput(
|
return ShellOutput(
|
||||||
process.returncode,
|
process.returncode,
|
||||||
process.stdout.strip(),
|
None if disable_stdout else process.stdout.strip(),
|
||||||
process.stderr.strip(),
|
None if disable_stderr else process.stderr.strip(),
|
||||||
)
|
)
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Keyctl
|
Keyctl
|
||||||
'''
|
'''
|
||||||
KEYCTL_BW_SESSION_KEY='bw-session'
|
class KeyctlSecret():
|
||||||
|
def __init__(self, key, timeout = 0, keyring = '@u'):
|
||||||
|
self.key = key
|
||||||
|
self.timeout = timeout
|
||||||
|
self.keyring = keyring
|
||||||
|
|
||||||
def get_bw_session_token():
|
def get_key_id(self):
|
||||||
request_output = run_shell_command(f'keyctl request user {KEYCTL_BW_SESSION_KEY}')
|
output = run_shell_command(['keyctl', 'request', 'user', self.key])
|
||||||
if request_output.status_code > 0:
|
|
||||||
# Ask for password
|
if output.status_code > 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
self.reset_timeout()
|
||||||
|
return output.stdout
|
||||||
|
|
||||||
|
def store(self, value):
|
||||||
|
run_shell_command(['keyctl', 'add', 'user', self.key, value, self.keyring], raise_on_error=True)
|
||||||
|
self.reset_timeout()
|
||||||
|
|
||||||
|
def reset_timeout(self):
|
||||||
|
if self.timeout > 0:
|
||||||
|
run_shell_command(['keyctl', 'timeout', self.key, str(self.timeout)])
|
||||||
|
|
||||||
|
def get_value(self):
|
||||||
|
key_id = self.get_key_id()
|
||||||
|
if key_id is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return run_shell_command(['keyctl', 'pipe', key_id], raise_on_error=True).stdout
|
||||||
|
|
||||||
|
'''
|
||||||
|
Bitwarden
|
||||||
|
'''
|
||||||
|
class Bitwarden():
|
||||||
|
def __init__(self):
|
||||||
|
self.session_secret = KeyctlSecret(BW_SESSION_SECRET_NAME, BW_SESSION_AUTOLOCK_TIMEOUT)
|
||||||
|
|
||||||
|
def ensure_vault_unlocked(self):
|
||||||
|
self._get_session_token()
|
||||||
|
|
||||||
|
def get_all_entries(self):
|
||||||
|
return self._get_bw_entries()
|
||||||
|
|
||||||
|
def get_all_entries_by_uri(self, uri):
|
||||||
|
return self._get_bw_entries('--url', uri)
|
||||||
|
|
||||||
|
def _get_session_token(self):
|
||||||
|
# Check if the secret is already set
|
||||||
|
if self.session_secret.get_key_id() is None:
|
||||||
|
# If not, ask for password through rofi
|
||||||
password = show_rofi_pwd_prompt()
|
password = show_rofi_pwd_prompt()
|
||||||
bw_unlock_output = unlock(password)
|
# Use the password to unlock BW vault
|
||||||
|
session_token = self._unlock(password)
|
||||||
|
|
||||||
|
# Store the new session token
|
||||||
|
self.session_secret.store(session_token)
|
||||||
|
|
||||||
|
# Return the value stored by the secret manager
|
||||||
|
return self.session_secret.get_value()
|
||||||
|
|
||||||
|
def _unlock(self, password):
|
||||||
|
bw_unlock_output = run_shell_command(['bw', 'unlock', password, '--raw'])
|
||||||
|
|
||||||
if bw_unlock_output.status_code > 0:
|
if bw_unlock_output.status_code > 0:
|
||||||
stderr(f'Bitwarden error: {bw_unlock_output.stderr}')
|
stderr(f'Bitwarden error: {bw_unlock_output.stderr}')
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
add_output = run_shell_command(f'keyctl add user {KEYCTL_BW_SESSION_KEY} "{bw_unlock_output.stdout}" @u')
|
return bw_unlock_output.stdout
|
||||||
if add_output.status_code > 0:
|
|
||||||
stderr('Could not store session token')
|
|
||||||
exit(1)
|
|
||||||
key_id = add_output.stdout
|
|
||||||
else:
|
|
||||||
key_id = request_output.stdout
|
|
||||||
|
|
||||||
# Maybe invalidate on timeout
|
|
||||||
if BW_SESSION_AUTOLOCK_TIMEOUT > 0:
|
|
||||||
print(run_shell_command(f'keyctl timeout {key_id} "{BW_SESSION_AUTOLOCK_TIMEOUT}"'))
|
|
||||||
|
|
||||||
pipe_output = run_shell_command(f'keyctl pipe {key_id}')
|
|
||||||
|
|
||||||
if pipe_output.status_code > 0:
|
|
||||||
stderr('Could not fetch session token')
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
return pipe_output.stdout
|
|
||||||
|
|
||||||
'''
|
|
||||||
Bitwarden
|
|
||||||
'''
|
|
||||||
def unlock(password):
|
|
||||||
return run_shell_command(f'bw unlock "{password}" --raw')
|
|
||||||
|
|
||||||
@cache
|
@cache
|
||||||
def get_all_entries():
|
def _get_bw_entries(self, *filter_args):
|
||||||
session_token = get_bw_session_token()
|
session_token = self._get_session_token()
|
||||||
|
|
||||||
output = run_shell_command(f'bw --session "{session_token}" list items')
|
output = run_shell_command(['bw', '--session', session_token, 'list', 'items'] + list(filter_args))
|
||||||
|
|
||||||
if output.status_code > 0:
|
if output.status_code > 0:
|
||||||
stderr(output.stderr)
|
stderr(output.stderr)
|
||||||
@ -105,45 +147,186 @@ def get_all_entries():
|
|||||||
|
|
||||||
return json.loads(output.stdout)
|
return json.loads(output.stdout)
|
||||||
|
|
||||||
def get_entries_by_filter(entry_to_keys):
|
'''
|
||||||
"""This function returns:
|
Menu States
|
||||||
{ key: str, entries: int[] }[]
|
'''
|
||||||
"""
|
@dataclass
|
||||||
|
class MenuEntry():
|
||||||
|
key: str
|
||||||
|
label: str
|
||||||
|
|
||||||
output_entries = {}
|
class MainMenu():
|
||||||
entries = get_all_entries()
|
def __init__(self, app, bw):
|
||||||
|
self.app = app
|
||||||
|
self.bw = bw
|
||||||
|
self.desired_domain = None
|
||||||
|
|
||||||
for entry_index, entry in enumerate(entries):
|
def get_title(self):
|
||||||
|
return 'Bitwarden'
|
||||||
|
|
||||||
|
def get_entries(self):
|
||||||
|
if run_shell_command('xdotool getwindowfocus getwindowclassname').stdout == 'qutebrowser':
|
||||||
|
old_contents = Clipboard.paste()
|
||||||
|
run_shell_command('qutebrowser :yank')
|
||||||
|
self.desired_domain = Clipboard.paste()
|
||||||
|
Clipboard.copy(old_contents)
|
||||||
|
|
||||||
|
entries = []
|
||||||
|
|
||||||
|
if self.desired_domain is not None:
|
||||||
|
entries += [MenuEntry(
|
||||||
|
'search-domain',
|
||||||
|
f'Search on URL: {self.desired_domain}',
|
||||||
|
)]
|
||||||
|
|
||||||
|
entries += [
|
||||||
|
MenuEntry('list-name', 'List by Name'),
|
||||||
|
MenuEntry('list-url', 'List by URL'),
|
||||||
|
]
|
||||||
|
|
||||||
|
return entries
|
||||||
|
|
||||||
|
def handle_input(self, key, typed_input):
|
||||||
|
if key == 'list-name':
|
||||||
|
self.app.change_menu_state(ListByNameMenu(self.app, self.bw))
|
||||||
|
elif key == 'list-url':
|
||||||
|
self.app.change_menu_state(ListByURLMenu(self.app, self.bw))
|
||||||
|
elif key == 'search-domain':
|
||||||
|
all_entries = self.bw.get_all_entries()
|
||||||
|
filtered_entries = self.bw.get_all_entries_by_uri(self.desired_domain)
|
||||||
|
filtered_ids = [entry['id'] for entry in filtered_entries]
|
||||||
|
|
||||||
|
indices = [i for i, entry in enumerate(all_entries) if entry['id'] in filtered_ids]
|
||||||
|
|
||||||
|
self.app.change_menu_state(ViewListMatchMenu(self.app, self.bw, indices))
|
||||||
|
else:
|
||||||
|
stderr("Unreachable input")
|
||||||
|
|
||||||
|
class ViewEntryMenu:
|
||||||
|
def __init__(self, app, bw, entry_index):
|
||||||
|
self.app = app
|
||||||
|
self.bw = bw
|
||||||
|
self.entry = self.bw.get_all_entries()[int(entry_index)]
|
||||||
|
|
||||||
|
self.selection_options = {
|
||||||
|
'Password': lambda e: e['login']['password'],
|
||||||
|
'TOTP': lambda e: e['login']['totp'],
|
||||||
|
'Username': lambda e: e['login']['username'],
|
||||||
|
'URI': lambda e: e['login']['uris'][0]['uri'],
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_title(self):
|
||||||
|
return f'Bitwarden - Copy {self.entry["name"]}'
|
||||||
|
|
||||||
|
def get_entries(self):
|
||||||
|
|
||||||
|
entries = []
|
||||||
|
|
||||||
|
for key in self.selection_options:
|
||||||
|
try:
|
||||||
|
if self.selection_options[key](self.entry) is not None:
|
||||||
|
entries += [MenuEntry(
|
||||||
|
key,
|
||||||
|
key,
|
||||||
|
)]
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return entries
|
||||||
|
|
||||||
|
def handle_input(self, key, typed_input):
|
||||||
|
value = self.selection_options[key](self.entry)
|
||||||
|
Clipboard.copy(value)
|
||||||
|
|
||||||
|
self.app.set_finished()
|
||||||
|
|
||||||
|
class ViewListMatchMenu:
|
||||||
|
def __init__(self, app, bw, possible_indices):
|
||||||
|
self.app = app
|
||||||
|
self.bw = bw
|
||||||
|
self.possible_indices = possible_indices
|
||||||
|
|
||||||
|
def get_title(self):
|
||||||
|
return 'Bitwarden - List Matches'
|
||||||
|
|
||||||
|
def get_entries(self):
|
||||||
|
raw_entries = self.bw.get_all_entries()
|
||||||
|
|
||||||
|
menu_entries = []
|
||||||
|
for index in self.possible_indices:
|
||||||
|
entry = raw_entries[index]
|
||||||
|
key = f'{entry["name"]} ({entry["login"]["username"]})'
|
||||||
|
|
||||||
|
menu_entries += [MenuEntry(
|
||||||
|
str(index),
|
||||||
|
key,
|
||||||
|
)]
|
||||||
|
|
||||||
|
return menu_entries
|
||||||
|
|
||||||
|
def handle_input(self, key, typed_input):
|
||||||
|
self.app.change_menu_state(ViewEntryMenu(self.app, self.bw, int(key)))
|
||||||
|
|
||||||
|
class ListByAnyFieldMenu():
|
||||||
|
def __init__(self, app, bw, entry_to_keys_func):
|
||||||
|
self.app = app
|
||||||
|
self.bw = bw
|
||||||
|
self.entry_to_keys_func = entry_to_keys_func
|
||||||
|
|
||||||
|
self.key_to_entry_indices_map = {}
|
||||||
|
|
||||||
|
def get_title(self):
|
||||||
|
return 'Bitwarden - List'
|
||||||
|
|
||||||
|
def get_entries(self):
|
||||||
|
raw_entries = self.bw.get_all_entries()
|
||||||
|
|
||||||
|
if len(self.key_to_entry_indices_map.keys()) == 0:
|
||||||
|
for index, entry in enumerate(raw_entries):
|
||||||
if 'login' not in entry.keys():
|
if 'login' not in entry.keys():
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for key in entry_to_keys(entry):
|
for key in self.entry_to_keys_func(entry):
|
||||||
if key is None:
|
if key is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if key in output_entries:
|
if key in self.key_to_entry_indices_map:
|
||||||
output_entries[key] += [entry_index]
|
self.key_to_entry_indices_map[key] += [index]
|
||||||
else:
|
else:
|
||||||
output_entries[key] = [entry_index]
|
self.key_to_entry_indices_map[key] = [index]
|
||||||
|
|
||||||
true_output = []
|
menu_entries = []
|
||||||
for key in output_entries:
|
for index, key in enumerate(sorted(self.key_to_entry_indices_map.keys())):
|
||||||
true_output += [{
|
menu_entries += [MenuEntry(
|
||||||
'key': key,
|
str(index),
|
||||||
'entries': output_entries[key],
|
key,
|
||||||
}]
|
)]
|
||||||
return sorted(true_output, key=lambda e: e['key'])
|
|
||||||
|
return menu_entries
|
||||||
|
|
||||||
|
def handle_input(self, key, typed_input):
|
||||||
|
possible_indices = self.key_to_entry_indices_map[sorted(self.key_to_entry_indices_map.keys())[int(key)]]
|
||||||
|
|
||||||
|
if len(possible_indices) > 1:
|
||||||
|
self.app.change_menu_state(ViewListMatchMenu(self.app, self.bw, possible_indices))
|
||||||
|
else:
|
||||||
|
self.app.change_menu_state(ViewEntryMenu(self.app, self.bw, possible_indices[0]))
|
||||||
|
|
||||||
|
class ListByNameMenu(ListByAnyFieldMenu):
|
||||||
|
def __init__(self, app, bw):
|
||||||
|
super().__init__(app, bw, lambda e: [e['name']])
|
||||||
|
|
||||||
|
class ListByURLMenu(ListByAnyFieldMenu):
|
||||||
|
def __init__(self, app, bw):
|
||||||
|
super().__init__(app, bw, lambda e: [urlparse(uri['uri']).hostname for uri in e['login']['uris']])
|
||||||
|
|
||||||
'''
|
|
||||||
UI
|
|
||||||
'''
|
|
||||||
def show_rofi_pwd_prompt():
|
def show_rofi_pwd_prompt():
|
||||||
output = run_shell_command('rofi -dmenu -password -i -no-fixed-num-lines -p "Master Password" -theme ~/.config/rofi/themes/askpass.rasi')
|
output = run_shell_command('rofi -dmenu -password -i -no-fixed-num-lines -p "Master Password" -theme ~/.config/rofi/themes/askpass.rasi')
|
||||||
if output.status_code > 0:
|
if output.status_code > 0:
|
||||||
return None
|
return None
|
||||||
return output.stdout
|
return output.stdout
|
||||||
|
|
||||||
def show_rofi_entries(entries, title="Bitwarden", exit_on_close=False):
|
def show_rofi_entries(entries, title="Bitwarden", exit_on_close=True):
|
||||||
output = run_shell_command(f'rofi -dmenu -i -p "{title}" -format "i:f"', input_data='\n'.join(entries))
|
output = run_shell_command(f'rofi -dmenu -i -p "{title}" -format "i:f"', input_data='\n'.join(entries))
|
||||||
|
|
||||||
if output.status_code > 0:
|
if output.status_code > 0:
|
||||||
@ -155,64 +338,29 @@ def show_rofi_entries(entries, title="Bitwarden", exit_on_close=False):
|
|||||||
|
|
||||||
return (int(selection_index), selection_text)
|
return (int(selection_index), selection_text)
|
||||||
|
|
||||||
'''
|
class App():
|
||||||
Main
|
def __init__(self):
|
||||||
'''
|
self.bw = Bitwarden()
|
||||||
def main():
|
self.menu_state = MainMenu(self, self.bw)
|
||||||
get_bw_session_token()
|
self._is_finished = False
|
||||||
|
|
||||||
main_menu_entries = [
|
def is_finished(self):
|
||||||
# (key, display, function)
|
return self._is_finished
|
||||||
(
|
|
||||||
'list-by-name',
|
|
||||||
'List by Name',
|
|
||||||
lambda e: [e['name']]
|
|
||||||
), (
|
|
||||||
'list-by-uri',
|
|
||||||
'List by URI',
|
|
||||||
lambda e: [urlparse(uri['uri']).hostname for uri in e['login']['uris']]
|
|
||||||
)
|
|
||||||
]
|
|
||||||
|
|
||||||
'''Main Menu'''
|
def set_finished(self):
|
||||||
(index, _) = show_rofi_entries([entry[1] for entry in main_menu_entries], exit_on_close=True)
|
self._is_finished = True
|
||||||
|
|
||||||
'''List of items'''
|
def render(self):
|
||||||
selection = main_menu_entries[index]
|
entries = self.menu_state.get_entries()
|
||||||
|
(index, extra_text) = show_rofi_entries([entry.label for entry in entries], title=self.menu_state.get_title())
|
||||||
|
self.menu_state.handle_input(entries[index].key, extra_text)
|
||||||
|
|
||||||
entry_index_map = get_entries_by_filter(selection[2])
|
def change_menu_state(self, new_state):
|
||||||
(index, _) = show_rofi_entries([entry['key'] for entry in entry_index_map], title=f'Bitwarden - {selection[1]}', exit_on_close=True)
|
self.menu_state = new_state
|
||||||
|
|
||||||
'''List of items matching key'''
|
|
||||||
entries = get_all_entries()
|
|
||||||
selected_entries = entry_index_map[index]['entries']
|
|
||||||
|
|
||||||
if len(selected_entries) == 1:
|
|
||||||
index = 0
|
|
||||||
else:
|
|
||||||
(index, _) = show_rofi_entries([f'{entries[index]["name"]} ({entries[index]["login"]["username"]})' for index in selected_entries], exit_on_close=True)
|
|
||||||
|
|
||||||
'''List copy options'''
|
|
||||||
selection_options = {
|
|
||||||
'Password': lambda e: e['login']['password'],
|
|
||||||
'TOTP': lambda e: e['login']['totp'],
|
|
||||||
'Username': lambda e: e['login']['username'],
|
|
||||||
'URI': lambda e: e['login']['uris'][0]['uri'],
|
|
||||||
}
|
|
||||||
|
|
||||||
selected_entry = entries[selected_entries[index]]
|
|
||||||
display_options = []
|
|
||||||
for key in selection_options:
|
|
||||||
try:
|
|
||||||
value = selection_options[key](selected_entry)
|
|
||||||
if value is not None:
|
|
||||||
display_options += [key]
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
(index, _) = show_rofi_entries(display_options, title=f'Bitwarden - Copy for {selected_entry["name"]}', exit_on_close=True)
|
|
||||||
copy_to_clipboard(selection_options[display_options[index]](selected_entry))
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
app = App()
|
||||||
|
|
||||||
|
while not app.is_finished():
|
||||||
|
app.render()
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user