From 29ad46ced9eee2d7e7974b9c7edcfbc7acc20d3b Mon Sep 17 00:00:00 2001 From: Lumpiasty Date: Tue, 22 Apr 2025 02:36:40 +0200 Subject: [PATCH] add basedpyright and make it happy --- .vscode/extensions.json | 6 +++++- pyrightconfig.json | 3 +++ utils/synchronize-vault.py | 32 +++++++++++++++----------------- 3 files changed, 23 insertions(+), 18 deletions(-) create mode 100644 pyrightconfig.json diff --git a/.vscode/extensions.json b/.vscode/extensions.json index 7c6bf15..931f30e 100644 --- a/.vscode/extensions.json +++ b/.vscode/extensions.json @@ -1,3 +1,7 @@ { - "recommendations": ["arrterian.nix-env-selector", "jnoortheen.nix-ide"] + "recommendations": [ + "arrterian.nix-env-selector", + "jnoortheen.nix-ide", + "detachhead.basedpyright" + ] } diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..71c2e34 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,3 @@ +{ + "allowedUntypedLibraries": ["hvac"] +} diff --git a/utils/synchronize-vault.py b/utils/synchronize-vault.py index 517de38..6ca39bf 100755 --- a/utils/synchronize-vault.py +++ b/utils/synchronize-vault.py @@ -1,18 +1,16 @@ #!/usr/bin/env python -from hvac.api.auth_methods.kubernetes import Kubernetes - - import argparse import os -from hvac.api.system_backend import mount -import yaml +from typing import Any, cast + import hvac -from hvac.api.auth_methods import Kubernetes, kubernetes +from hvac.api.auth_methods import Kubernetes +import yaml # Read vault/policies dir then write what is there and delete missing def synchronize_policies(client: hvac.Client): - policies = {} + policies: dict[str, str] = {} # Read all policies files policy_dir = os.path.join(os.path.dirname(__file__), '../vault/policy') for filename in os.listdir(policy_dir): @@ -20,7 +18,7 @@ def synchronize_policies(client: hvac.Client): policy_name = os.path.splitext(filename)[0] policies[policy_name] = f.read() - policies_on_vault = client.sys.list_policies()['data']['policies'] + policies_on_vault: list[str] = cast(list[str], client.sys.list_policies()['data']['policies']) # Delete policies that should not be there for policy in policies_on_vault: @@ -37,8 +35,8 @@ def synchronize_policies(client: hvac.Client): def synchronize_auth_kubernetes_config(client: hvac.Client): config_file = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-config.yaml') with open(config_file, 'r') as f: - config = yaml.safe_load(f.read()) - client.write_data('/auth/kubernetes/config', data=config) + config = cast(dict[str, str], yaml.safe_load(f.read())) + _ = client.write_data('/auth/kubernetes/config', data=config) # Read vault/kubernetes-roles dir then write what is there and delete missing def synchronize_kubernetes_roles(client: hvac.Client): @@ -46,28 +44,28 @@ def synchronize_kubernetes_roles(client: hvac.Client): policy_dir = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-roles/') - roles = {} + roles: dict[str, Any] = {} # pyright:ignore[reportExplicitAny] for filename in os.listdir(policy_dir): with open(os.path.join(policy_dir, filename), 'r') as f: role_name = os.path.splitext(filename)[0] roles[role_name] = yaml.safe_load(f.read()) - roles_on_vault = [] + roles_on_vault: list[str] = [] try: - roles_on_vault = kubernetes.list_roles()['keys'] - except hvac.exceptions.InvalidPath: + roles_on_vault = cast(list[str], kubernetes.list_roles()['keys']) + except hvac.exceptions.InvalidPath: # pyright:ignore[reportAttributeAccessIssue, reportUnknownMemberType] print("No roles found on server!") for role in roles_on_vault: - if role not in roles_on_vault: + if role not in roles: print(f'Deleting role: {role}') kubernetes.delete_role(role) - for role_name, role_content in roles.items(): + for role_name, role_content in roles.items(): # pyright:ignore[reportAny] print(f'Updating role: {role_name}') # Using write data instead of kubernetes.create_role, we can pass raw yaml - client.write_data(f'/auth/kubernetes/role/{role_name}', data=role_content) + _ = client.write_data(f'/auth/kubernetes/role/{role_name}', data=role_content) # pyright:ignore[reportAny] if __name__ == '__main__': parser = argparse.ArgumentParser(