add basedpyright and make it happy

This commit is contained in:
2025-04-22 02:36:40 +02:00
parent 947f154a81
commit b84c792992
3 changed files with 23 additions and 18 deletions

View File

@@ -1,3 +1,7 @@
{ {
"recommendations": ["arrterian.nix-env-selector", "jnoortheen.nix-ide"] "recommendations": [
"arrterian.nix-env-selector",
"jnoortheen.nix-ide",
"detachhead.basedpyright"
]
} }

3
pyrightconfig.json Normal file
View File

@@ -0,0 +1,3 @@
{
"allowedUntypedLibraries": ["hvac"]
}

View File

@@ -1,18 +1,16 @@
#!/usr/bin/env python #!/usr/bin/env python
from hvac.api.auth_methods.kubernetes import Kubernetes
import argparse import argparse
import os import os
from hvac.api.system_backend import mount from typing import Any, cast
import yaml
import hvac import hvac
from hvac.api.auth_methods import Kubernetes, kubernetes from hvac.api.auth_methods import Kubernetes
import yaml
# Read vault/policies dir then write what is there and delete missing # Read vault/policies dir then write what is there and delete missing
def synchronize_policies(client: hvac.Client): def synchronize_policies(client: hvac.Client):
policies = {} policies: dict[str, str] = {}
# Read all policies files # Read all policies files
policy_dir = os.path.join(os.path.dirname(__file__), '../vault/policy') policy_dir = os.path.join(os.path.dirname(__file__), '../vault/policy')
for filename in os.listdir(policy_dir): for filename in os.listdir(policy_dir):
@@ -20,7 +18,7 @@ def synchronize_policies(client: hvac.Client):
policy_name = os.path.splitext(filename)[0] policy_name = os.path.splitext(filename)[0]
policies[policy_name] = f.read() policies[policy_name] = f.read()
policies_on_vault = client.sys.list_policies()['data']['policies'] policies_on_vault: list[str] = cast(list[str], client.sys.list_policies()['data']['policies'])
# Delete policies that should not be there # Delete policies that should not be there
for policy in policies_on_vault: for policy in policies_on_vault:
@@ -37,8 +35,8 @@ def synchronize_policies(client: hvac.Client):
def synchronize_auth_kubernetes_config(client: hvac.Client): def synchronize_auth_kubernetes_config(client: hvac.Client):
config_file = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-config.yaml') config_file = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-config.yaml')
with open(config_file, 'r') as f: with open(config_file, 'r') as f:
config = yaml.safe_load(f.read()) config = cast(dict[str, str], yaml.safe_load(f.read()))
client.write_data('/auth/kubernetes/config', data=config) _ = client.write_data('/auth/kubernetes/config', data=config)
# Read vault/kubernetes-roles dir then write what is there and delete missing # Read vault/kubernetes-roles dir then write what is there and delete missing
def synchronize_kubernetes_roles(client: hvac.Client): def synchronize_kubernetes_roles(client: hvac.Client):
@@ -46,28 +44,28 @@ def synchronize_kubernetes_roles(client: hvac.Client):
policy_dir = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-roles/') policy_dir = os.path.join(os.path.dirname(__file__), '../vault/kubernetes-roles/')
roles = {} roles: dict[str, Any] = {} # pyright:ignore[reportExplicitAny]
for filename in os.listdir(policy_dir): for filename in os.listdir(policy_dir):
with open(os.path.join(policy_dir, filename), 'r') as f: with open(os.path.join(policy_dir, filename), 'r') as f:
role_name = os.path.splitext(filename)[0] role_name = os.path.splitext(filename)[0]
roles[role_name] = yaml.safe_load(f.read()) roles[role_name] = yaml.safe_load(f.read())
roles_on_vault = [] roles_on_vault: list[str] = []
try: try:
roles_on_vault = kubernetes.list_roles()['keys'] roles_on_vault = cast(list[str], kubernetes.list_roles()['keys'])
except hvac.exceptions.InvalidPath: except hvac.exceptions.InvalidPath: # pyright:ignore[reportAttributeAccessIssue, reportUnknownMemberType]
print("No roles found on server!") print("No roles found on server!")
for role in roles_on_vault: for role in roles_on_vault:
if role not in roles_on_vault: if role not in roles:
print(f'Deleting role: {role}') print(f'Deleting role: {role}')
kubernetes.delete_role(role) kubernetes.delete_role(role)
for role_name, role_content in roles.items(): for role_name, role_content in roles.items(): # pyright:ignore[reportAny]
print(f'Updating role: {role_name}') print(f'Updating role: {role_name}')
# Using write data instead of kubernetes.create_role, we can pass raw yaml # Using write data instead of kubernetes.create_role, we can pass raw yaml
client.write_data(f'/auth/kubernetes/role/{role_name}', data=role_content) _ = client.write_data(f'/auth/kubernetes/role/{role_name}', data=role_content) # pyright:ignore[reportAny]
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(