Source code for lineapy.utils.analytics.event_schemas

from dataclasses import dataclass
from typing import Optional, Union

# I think amplitude doesn't really support nested objects,
# so flattening the object here.


[docs]@dataclass class SaveEvent: side_effect: str # "file_system", "db", or "" (for variables)
[docs]@dataclass class GetEvent: version_specified: bool
[docs]@dataclass class CatalogEvent: catalog_size: int
[docs]@dataclass class ToPipelineEvent: engine: str # AIRFLOW, SCRIPT artifact_count: int has_task_dependency: bool has_config: bool
[docs]@dataclass class LibImportEvent: name: str version: str
[docs]@dataclass class ExceptionEvent: # TODO: set error_type to be string enums error_type: str error_msg: Optional[str] = None
[docs]@dataclass class GetValueEvent: has_value: bool
[docs]@dataclass class GetCodeEvent: use_lineapy_serialization: bool is_session_code: bool
[docs]@dataclass class GetVersionEvent: dummy_entry: str # just a dummy entry for now
AllEvents = Union[ CatalogEvent, LibImportEvent, ExceptionEvent, GetEvent, SaveEvent, ToPipelineEvent, GetCodeEvent, GetValueEvent, GetVersionEvent, ]