Source code for nepse_client.token_manager

# nepse_client/token_manager.py
"""
Token management for NEPSE API authentication.

This module handles automatic token generation, refresh, and validation
for both synchronous and asynchronous clients.
"""

import asyncio
import logging
import pathlib
import time
from datetime import datetime
from typing import Any, Optional, cast

import pywasm

from .exceptions import NepseValidationError


logger = logging.getLogger(__name__)


[docs] class TokenParser: """ Parse authentication tokens using WebAssembly module. This class uses a WASM module to decode and parse the authentication tokens returned by the NEPSE API. """
[docs] def __init__(self): """Initialize token parser with WASM runtime.""" self.runtime = pywasm.core.Runtime() wasm_path = pathlib.Path(__file__).parent / "data" / "css.wasm" try: self.wasm_module = self.runtime.instance_from_file(str(wasm_path)) logger.debug("WASM module loaded successfully") except Exception as e: logger.error(f"Failed to load WASM module: {e}") raise
[docs] def parse_token_response(self, token_response: dict) -> tuple[str, str]: """ Parse access and refresh tokens from API response. Args: token_response: Raw token response from API Returns: Tuple of (access_token, refresh_token) """ salts = [ token_response["salt1"], token_response["salt2"], token_response["salt3"], token_response["salt4"], token_response["salt5"], ] # Calculate indices for access token n = self.runtime.invocate(self.wasm_module, "cdx", salts)[0] access_token_l_index = self.runtime.invocate( self.wasm_module, "rdx", [salts[0], salts[1], salts[3], salts[2], salts[4]] )[0] o = self.runtime.invocate( self.wasm_module, "bdx", [salts[0], salts[1], salts[3], salts[2], salts[4]] )[0] p = self.runtime.invocate( self.wasm_module, "ndx", [salts[0], salts[1], salts[3], salts[2], salts[4]] )[0] q = self.runtime.invocate( self.wasm_module, "mdx", [salts[0], salts[1], salts[3], salts[2], salts[4]] )[0] # Calculate indices for refresh token salts_reversed = [salts[1], salts[0], salts[2], salts[4], salts[3]] a = self.runtime.invocate( self.wasm_module, "cdx", [salts[1], salts[0], salts[2], salts[4], salts[3]] )[0] b = self.runtime.invocate( self.wasm_module, "rdx", [salts[1], salts[0], salts[2], salts[3], salts[4]] )[0] c = self.runtime.invocate( self.wasm_module, "bdx", [salts[1], salts[0], salts[3], salts[2], salts[4]] )[0] d = self.runtime.invocate( self.wasm_module, "ndx", [salts[1], salts[0], salts[3], salts[2], salts[4]] )[0] e = self.runtime.invocate( self.wasm_module, "mdx", [salts[1], salts[0], salts[3], salts[2], salts[4]] )[0] print(salts_reversed) # Extract tokens access_token = token_response["accessToken"] refresh_token = token_response["refreshToken"] # Parse access token parsed_access_token = ( access_token[0:n] + access_token[n + 1 : access_token_l_index] + access_token[access_token_l_index + 1 : o] + access_token[o + 1 : p] + access_token[p + 1 : q] + access_token[q + 1 :] ) # Parse refresh token parsed_refresh_token = ( refresh_token[0:a] + refresh_token[a + 1 : b] + refresh_token[b + 1 : c] + refresh_token[c + 1 : d] + refresh_token[d + 1 : e] + refresh_token[e + 1 :] ) return (parsed_access_token, parsed_refresh_token)
class _TokenManagerBase: """ Base class for token managers. Provides common functionality for managing authentication tokens, including validation and salt extraction. Args: nepse: Reference to parent NEPSE client """ # Token validity period in seconds (45 seconds as per original) MAX_UPDATE_PERIOD = 45 def __init__(self, nepse): """Initialize token manager.""" self.nepse = nepse self.token_parser = TokenParser() # Token endpoints self.token_url = "/api/authenticate/prove" self.refresh_url = "/api/authenticate/refresh-token" # Token state self.access_token: Optional[str] = None self.refresh_token: Optional[str] = None self.token_time_stamp: Optional[int] = None self.salts: Optional[list[int]] = None def isTokenValid(self) -> bool: """ Check if current token is still valid. Returns: True if token is valid, False otherwise """ if self.token_time_stamp is None: return False elapsed = int(time.time()) - self.token_time_stamp return elapsed < self.MAX_UPDATE_PERIOD def _getValidTokenFromJSON(self, token_response: dict) -> tuple[str, str, int, list[int]]: """ Extract and validate token data from API response. Args: token_response: Raw token response from API Returns: Tuple of (access_token, refresh_token, timestamp, salts) """ # Extract salts salts = [int(token_response[f"salt{i}"]) for i in range(1, 6)] # Parse tokens access_token, refresh_token = self.token_parser.parse_token_response(token_response) # Extract timestamp timestamp = int(token_response["serverTime"] / 1000) return (access_token, refresh_token, timestamp, salts) def __repr__(self) -> str: """Return the string representation of the token manager. Returns: str: Token Manager """ if self.access_token is None or self.token_time_stamp is None: return "Token Manager: Not Initialized" timestamp_str = datetime.fromtimestamp(self.token_time_stamp).strftime("%Y-%m-%d %H:%M:%S") return ( f"Token Manager:\n" f" Access Token: {self.access_token[:20]}...\n" f" Refresh Token: {self.refresh_token[:20] if self.refresh_token else ''}...\n" f" Salts: {self.salts}\n" f" Timestamp: {timestamp_str}\n" f" Valid: {self.isTokenValid()}" )
[docs] class TokenManager(_TokenManagerBase): """ Synchronous token manager. Manages authentication tokens for synchronous NEPSE client, automatically refreshing tokens when they expire. """
[docs] def __init__(self, nepse): """Initialize synchronous token manager.""" super().__init__(nepse)
[docs] def getAccessToken(self) -> str: """ Get valid access token, refreshing if necessary. Returns: Valid access token """ if not self.isTokenValid(): self.update() assert self.access_token is not None return self.access_token
[docs] def getRefreshToken(self) -> str: """ Get valid refresh token, refreshing if necessary. Returns: Valid refresh token """ if not self.isTokenValid(): self.update() assert self.refresh_token is not None return self.refresh_token
[docs] def update(self) -> None: """Fetch and update authentication tokens.""" self._setToken()
def _setToken(self) -> None: """Fetch tokens from API and update internal state.""" logger.debug("Fetching new authentication token") json_response = self._getTokenHttpRequest() ( self.access_token, self.refresh_token, self.token_time_stamp, self.salts, ) = self._getValidTokenFromJSON(json_response) logger.info("Authentication token refreshed successfully") def _getTokenHttpRequest(self) -> dict[str, Any]: """ Make HTTP request to get token. Returns: Token response dictionary """ response = self.nepse.requestGETAPI(url=self.token_url, include_authorization_headers=False) if not isinstance(response, dict): raise NepseValidationError(f"Expected dict from token API, got {type(response)}") return cast(dict[str, Any], response)
[docs] class AsyncTokenManager(_TokenManagerBase): """ Asynchronous token manager. Manages authentication tokens for asynchronous NEPSE client, with support for concurrent token refresh operations. """
[docs] def __init__(self, nepse): """Initialize asynchronous token manager.""" super().__init__(nepse) # Synchronization events for concurrent operations self.update_started = asyncio.Event() self.update_completed = asyncio.Event()
[docs] async def getAccessToken(self) -> str: """ Get valid access token, refreshing if necessary. Returns: Valid access token """ if not self.isTokenValid(): await self.update() assert self.access_token is not None return self.access_token
[docs] async def getRefreshToken(self) -> str: """ Get valid refresh token, refreshing if necessary. Returns: Valid refresh token """ if not self.isTokenValid(): await self.update() assert self.refresh_token is not None return self.refresh_token
[docs] async def update(self) -> None: """Fetch and update authentication tokens asynchronously.""" await self._setToken()
async def _setToken(self) -> None: """ Fetch tokens from API and update internal state. Ensures only one token refresh operation happens at a time, even with concurrent requests. """ # Check if another coroutine is already updating if not self.update_started.is_set(): # Mark update as started self.update_started.set() self.update_completed.clear() try: logger.debug("Fetching new authentication token") json_response = await self._getTokenHttpRequest() ( self.access_token, self.refresh_token, self.token_time_stamp, self.salts, ) = self._getValidTokenFromJSON(json_response) logger.info("Authentication token refreshed successfully") finally: # Mark update as completed self.update_completed.set() self.update_started.clear() else: # Wait for ongoing update to complete await self.update_completed.wait() async def _getTokenHttpRequest(self) -> dict: """ Make async HTTP request to get token. Returns: Token response dictionary """ return cast( dict[Any, Any], await self.nepse.requestGETAPI(url=self.token_url, include_authorization_headers=False), )
__all__ = [ "TokenManager", "AsyncTokenManager", "TokenParser", ]