lineapy.plugins package

Submodules

lineapy.plugins.pipeline_writers module

class lineapy.plugins.pipeline_writers.AirflowPipelineWriter(artifact_collection: lineapy.graph_reader.artifact_collection.ArtifactCollection, dependencies: Dict[str, Set[str]] = {}, keep_lineapy_save: bool = False, pipeline_name: str = 'pipeline', output_dir: str = '.', dag_config: Optional[lineapy.plugins.task.AirflowDagConfig] = {})[source]

Class for pipeline file writer. Corresponds to “AIRFLOW” framework.

get_artifact_task_definitions(indentation=4) Dict[str, lineapy.plugins.task.TaskDefinition][source]

Add deserialization of input variables and serialization of output variables logic of the artifact fucntion call_block and wrap them into a new function definition.

class lineapy.plugins.pipeline_writers.BasePipelineWriter(artifact_collection: lineapy.graph_reader.artifact_collection.ArtifactCollection, dependencies: Dict[str, Set[str]] = {}, keep_lineapy_save: bool = False, pipeline_name: str = 'pipeline', output_dir: str = '.', dag_config: Optional[lineapy.plugins.task.AirflowDagConfig] = {})[source]

Pipeline writer uses modularized artifact code to generate and write out standard pipeline files, including Python modules, DAG script, and infra files (e.g., Dockerfile).

Base class for pipeline file writer corresponds to “SCRIPT” framework.

write_pipeline_files() None[source]

Write out pipeline files.

lineapy.plugins.pipeline_writers.get_session_task_definition(sa: lineapy.graph_reader.session_artifacts.SessionArtifacts, pipeline_name: str, indentation=4) str[source]

Add serialization of output artifacts logic of the session function call_block and wrap them into a new function definition.

lineapy.plugins.task module

class lineapy.plugins.task.AirflowDagConfig(**kwargs)
class lineapy.plugins.task.AirflowDagFlavor(value)[source]

An enumeration.

class lineapy.plugins.task.TaskDefinition(**kwargs)[source]

Definition of an artifact, can extend new keys(user, project, …) in the future.

class lineapy.plugins.task.TaskGraph(nodes: List[str], mapping: Dict[str, str], edges: Dict[str, Set[str]] = {})[source]

Graph represents for task dependency It is constructed based on the “edges” variable

Parameters

edges

Dictionary with task name as key and set of prerequisite tasks as value. This is the standard library graphlib style graph definition. For instance, {‘C’:{‘A’,’B’}} means A and B are prerequisites for C. Both examples give us following task dependency:

A ---\
      \
       >---> C
      /
B ---/

Note

  • If we only support Python 3.9+, we prefer to use graphlib in standard library instead of networkx for graph operation.

  • We might want to get rid of the mapping for renaming slice_names to task_names.

lineapy.plugins.utils module

lineapy.plugins.utils.load_plugin_template(template_name: str) jinja2.environment.Template[source]

Loads a jinja template for a plugin (currently only airflow) from the jinja_templates folder.

lineapy.plugins.utils.slugify(value, allow_unicode=False)[source]

Taken from https://github.com/django/django/blob/master/django/utils/text.py

Convert to ASCII if ‘allow_unicode’ is False. Convert spaces or repeated dashes to single dashes. Remove characters that aren’t alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace, dashes, and underscores. Lastly, replace all dashes with underscores.

Copyright (c) Django Software Foundation and individual contributors. All rights reserved.

Module contents