lineapy.plugins package

Submodules

lineapy.plugins.airflow module

class lineapy.plugins.airflow.AirflowDagConfig(_typename, _fields=None, /, **kwargs)
class lineapy.plugins.airflow.AirflowPlugin(db: RelationalLineaDB, session_id: LineaID)[source]
to_airflow(dag_name: str, task_names: List[str], output_dir_path: Path, task_graph: TaskGraph, airflow_dag_config: Optional[AirflowDagConfig] = {}) None[source]

Create an Airflow DAG.

Parameters

lineapy.plugins.base module

class lineapy.plugins.base.BasePlugin(db: lineapy.db.db.RelationalLineaDB, session_id: lineapy.data.types.LineaID)[source]
generate_infra(module_name: str, output_dir_path: Path)[source]

Generates templates to test the airflow module. Currently, we produce a <module_name>_Dockerfile and a <module_name>_requirements.txt file. These can be used to test the dag that gets generated by linea. For more details, Testing locally

generate_python_module(module_name: str, artifacts_code: Dict[str, CodeSlice], output_dir_path: Path)[source]

Generate python module code and save to a file.

prepare_output_dir(copy_dst: str)[source]

This helper creates directories if missing

slice_dag_helper(slice_names: List[str], module_name: Optional[str] = None, task_dependencies: Dict[str, Set[str]] = {}, output_dir: Optional[str] = None) Tuple[str, List[str], Path, TaskGraph][source]

A generic function shared by Script and Airflow

To create DAG from the sliced code. This includes a python file with one function per slice, task dependencies file in Airflow format and an example Dockerfile and requirements.txt that can be used to run this.

Parameters
  • slice_names – list of slice names to be used as tasks.

  • module_name – name of the Python module the generated code will be saved to.

  • task_dependencies – tasks dependencies in graphlib format {‘B’:{‘A’,’C’}}”; this means task A and C are prerequisites for task B.

  • output_dir – directory to save the generated code to.

  • airflow_dag_config – Configs of Airflow DAG model.

lineapy.plugins.script module

class lineapy.plugins.script.ScriptDagConfig(_typename, _fields=None, /, **kwargs)
class lineapy.plugins.script.ScriptPlugin(db: RelationalLineaDB, session_id: LineaID)[source]
to_script(dag_name: str, output_dir_path: Path, task_graph: TaskGraph) None[source]

Create an Python Script DAG.

Parameters
  • dag_name – Name of the DAG and the python file it is saved in

  • output_dir_path – Directory of the DAG and the python file it is saved in

  • task_graph

lineapy.plugins.task module

class lineapy.plugins.task.TaskGraph(nodes: List[str], mapping: Dict[str, str], edges: Dict[str, Set[str]] = {})[source]

Graph represents for task dependency It is constructed based on the “edges” variable

Parameters

edges

Dictionary with task name as key and set of prerequisite tasks as value. This is the standard library graphlib style graph definition. For instance, {‘C’:{‘A’,’B’}} means A and B are prerequisites for C. Both examples give us following task dependency:

A ---\
      \
       >---> C
      /
B ---/

Note

  • If we only support Python 3.9+, we prefer to use graphlib in standard library instead of networkx for graph operation.

  • We might want to get rid of the mapping for renaming slice_names to task_names.

lineapy.plugins.utils module

lineapy.plugins.utils.load_plugin_template(template_name: str) Template[source]

Loads a jinja template for a plugin (currently only airflow) from the jinja_templates folder.

lineapy.plugins.utils.safe_var_name(name: str) str[source]

Returns a python-safe variable name for the given string. eg for name = “p value” “p_value” is returned

Module contents