Source code for nepse_client.dummy_id_manager

"""
Dummy ID management for NEPSE API requests.

This module manages the generation and caching of dummy IDs used in
POST request payloads, ensuring they stay synchronized with market status.
"""

import asyncio
import logging
import re
from datetime import datetime
from typing import Callable, Optional


logger = logging.getLogger(__name__)


class _DummyIDManagerBase:
    """
    Base class for dummy ID managers.

    Manages dummy IDs that are required for certain NEPSE API requests.
    The dummy ID changes based on market status and date.

    Args:
       market_status_function: Function to get market status
       date_function: Function to get current datetime (default: datetime.now)
    """

    def __init__(
        self,
        market_status_function: Optional[Callable] = None,
        date_function: Callable = datetime.now,
    ):
        """Initialize dummy ID manager."""
        self.data: Optional[dict] = None
        self.dummy_id: Optional[int] = None
        self.date_stamp: Optional[datetime] = None

        self.setDateFunction(date_function)
        self.setMarketStatusFunction(
            market_status_function or (lambda: {"id": 0, "asOf": datetime.now().isoformat()})
        )

    def setDateFunction(self, func: Callable) -> None:
        """
        Set function to get current date/time.

        Args:
           func: Callable that returns datetime
        """
        self.date_function = func

    def setMarketStatusFunction(self, func: Callable) -> None:
        """
        Set function to get market status.

        Args:
           func: Callable that returns market status dict
        """
        self.market_status_function = func
        self.data = None  # Reset data when function changes

    def convertToDateTime(self, date_time_str: str) -> datetime:
        """
        Convert date string to datetime object.

        Handles various datetime formats and microsecond precision issues.

        Args:
           date_time_str: Date/time string from API

        Returns:
           Parsed datetime object
        """
        # Handle microseconds properly (limit to 6 decimal places)
        if "." in date_time_str:
            pattern = r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})(\.\d+)?"
            match = re.match(pattern, date_time_str)

            if match:
                main_part = match.group(1)
                decimal_part = match.group(2)

                if decimal_part:
                    # Limit to 6 decimal places (microseconds)
                    decimal_part = decimal_part[:7]  # . + 6 digits
                    formatted_str = main_part + decimal_part
                    try:
                        return datetime.strptime(formatted_str, "%Y-%m-%dT%H:%M:%S.%f")
                    except ValueError:
                        logger.warning("ValueError occured..")

                # Fall back to second-level parsing
                return datetime.strptime(main_part, "%Y-%m-%dT%H:%M:%S")

        # Default parsing without microseconds
        return datetime.strptime(date_time_str, "%Y-%m-%dT%H:%M:%S")

    def __repr__(self) -> str:
        """Return the string representation of the dummy ID manager."""
        if self.dummy_id is None:
            return "Dummy ID Manager: Not Initialized"

        date_str = self.date_stamp.strftime("%Y-%m-%d %H:%M:%S") if self.date_stamp else "N/A"
        return f"Dummy ID Manager (ID: {self.dummy_id}, Date: {date_str})"


[docs] class DummyIDManager(_DummyIDManagerBase): """ Synchronous dummy ID manager. Manages dummy IDs for synchronous NEPSE client, automatically updating when the date changes or market status is updated. """
[docs] def __init__( self, market_status_function: Optional[Callable] = None, date_function: Callable = datetime.now, ): """Initialize synchronous dummy ID manager.""" super().__init__(market_status_function, date_function)
[docs] def populateData(self, force: bool = False) -> None: """ Fetch and populate dummy ID data. Args: force: Force refresh even if data exists """ today = self.date_function() # Initialize data if not present or forced if self.data is None or force: logger.debug("Initializing dummy ID data") self.data = self.market_status_function() self.dummy_id = self.data["id"] self.date_stamp = today logger.info(f"Dummy ID initialized: {self.dummy_id}") return # Check if date has changed if self.date_stamp is None or self.date_stamp.date() < today.date(): # if self.date_stamp.date() < today.date(): logger.debug("Date changed, updating dummy ID") new_data = self.market_status_function() new_converted_date = self.convertToDateTime(new_data["asOf"]) # Check if NEPSE date matches current date if new_converted_date.date() == today.date(): self.data = new_data self.dummy_id = self.data["id"] self.date_stamp = new_converted_date logger.info(f"Dummy ID updated: {self.dummy_id}") else: # NEPSE is closed (holiday/weekend) # Set date stamp to today to avoid repeated checks self.data = new_data self.dummy_id = self.data["id"] self.date_stamp = today logger.debug("Market closed, using previous dummy ID")
[docs] def getDummyID(self) -> int: """ Get current dummy ID, updating if necessary. Returns: Current dummy ID """ self.populateData() assert self.dummy_id is not None return self.dummy_id
[docs] class AsyncDummyIDManager(_DummyIDManagerBase): """ Asynchronous dummy ID manager. Manages dummy IDs for asynchronous NEPSE client, with support for concurrent operations and proper async synchronization. """
[docs] def __init__( self, market_status_function: Optional[Callable] = None, date_function: Callable = datetime.now, ): """Initialize asynchronous dummy ID manager.""" super().__init__(market_status_function, date_function) # Synchronization events for concurrent operations self.update_started = asyncio.Event() self.update_completed = asyncio.Event()
[docs] async def populateData(self, force: bool = False) -> None: """ Fetch and populate dummy ID data asynchronously. Ensures only one update operation happens at a time, even with concurrent requests. Args: force: Force refresh even if data exists """ today = self.date_function() # Initialize data if not present or forced if self.data is None or force: # Check if another coroutine is already updating if not self.update_started.is_set(): self.update_started.set() self.update_completed.clear() try: logger.debug("Initializing dummy ID data") self.data = await self.market_status_function() self.dummy_id = self.data["id"] self.date_stamp = today logger.info(f"Dummy ID initialized: {self.dummy_id}") finally: self.update_completed.set() self.update_started.clear() else: # Wait for ongoing update to complete await self.update_completed.wait() return # Check if date has changed if self.date_stamp is None or self.date_stamp.date() < today.date(): # if self.date_stamp.date() < today.date(): # Check if another coroutine is already updating if self.update_started.is_set(): # Wait for ongoing update await self.update_completed.wait() else: # Start update self.update_started.set() self.update_completed.clear() try: logger.debug("Date changed, updating dummy ID") new_data = await self.market_status_function() new_converted_date = self.convertToDateTime(new_data["asOf"]) # Check if NEPSE date matches current date if new_converted_date.date() == today.date(): self.data = new_data self.dummy_id = self.data["id"] self.date_stamp = new_converted_date logger.info(f"Dummy ID updated: {self.dummy_id}") else: # NEPSE is closed (holiday/weekend) self.data = new_data self.dummy_id = self.data["id"] self.date_stamp = today logger.debug("Market closed, using previous dummy ID") finally: self.update_completed.set() self.update_started.clear()
[docs] async def getDummyID(self) -> int: """ Get current dummy ID, updating if necessary. Returns: Current dummy ID """ await self.populateData() assert self.dummy_id is not None return self.dummy_id
__all__ = [ "DummyIDManager", "AsyncDummyIDManager", ]