lineapy.plugins package
Submodules
lineapy.plugins.airflow module
- class lineapy.plugins.airflow.AirflowDagConfig(_typename, _fields=None, /, **kwargs)
- class lineapy.plugins.airflow.AirflowPlugin(db: RelationalLineaDB, session_id: LineaID)[source]
- to_airflow(dag_name: str, task_names: List[str], output_dir_path: Path, task_graph: TaskGraph, airflow_dag_config: Optional[AirflowDagConfig] = {}) None [source]
Create an Airflow DAG.
- Parameters
dag_name – Name of the DAG and the python file it is saved in
task_dependencies – Tasks dependencies in graphlib format {‘B’:{‘A’,’C’}}”; this means task A and C are prerequisites for task B.
airflow_dag_config – Configs of Airflow DAG model. See https://airflow.apache.org/_api/airflow/models/dag/index.html#airflow.models.dag.DAG for the full spec.
lineapy.plugins.base module
- class lineapy.plugins.base.BasePlugin(db: lineapy.db.db.RelationalLineaDB, session_id: lineapy.data.types.LineaID)[source]
- generate_infra(module_name: str, output_dir_path: Path)[source]
Generates templates to test the airflow module. Currently, we produce a <module_name>_Dockerfile and a <module_name>_requirements.txt file. These can be used to test the dag that gets generated by linea. For more details, Testing locally
- generate_python_module(module_name: str, artifacts_code: Dict[str, CodeSlice], output_dir_path: Path)[source]
Generate python module code and save to a file.
- slice_dag_helper(slice_names: List[str], module_name: Optional[str] = None, task_dependencies: Dict[str, Set[str]] = {}, output_dir: Optional[str] = None) Tuple[str, List[str], Path, TaskGraph] [source]
A generic function shared by Script and Airflow
To create DAG from the sliced code. This includes a python file with one function per slice, task dependencies file in Airflow format and an example Dockerfile and requirements.txt that can be used to run this.
- Parameters
slice_names – list of slice names to be used as tasks.
module_name – name of the Python module the generated code will be saved to.
task_dependencies – tasks dependencies in graphlib format {‘B’:{‘A’,’C’}}”; this means task A and C are prerequisites for task B.
output_dir – directory to save the generated code to.
airflow_dag_config – Configs of Airflow DAG model.
lineapy.plugins.script module
- class lineapy.plugins.script.ScriptDagConfig(_typename, _fields=None, /, **kwargs)
- class lineapy.plugins.script.ScriptPlugin(db: RelationalLineaDB, session_id: LineaID)[source]
lineapy.plugins.task module
- class lineapy.plugins.task.TaskGraph(nodes: List[str], mapping: Dict[str, str], edges: Dict[str, Set[str]] = {})[source]
Graph represents for task dependency It is constructed based on the “edges” variable
- Parameters
edges –
Dictionary with task name as key and set of prerequisite tasks as value. This is the standard library graphlib style graph definition. For instance, {‘C’:{‘A’,’B’}} means A and B are prerequisites for C. Both examples give us following task dependency:
A ---\ \ >---> C / B ---/
Note
If we only support Python 3.9+, we prefer to use graphlib in standard library instead of networkx for graph operation.
We might want to get rid of the mapping for renaming slice_names to task_names.