Skip to content

task

DagTaskBreakdown

Bases: Enum

Enum to define how to break down a graph into tasks for the pipeline.

Source code in lineapy/plugins/task.py
179
180
181
182
183
184
185
186
class DagTaskBreakdown(Enum):
    """
    Enum to define how to break down a graph into tasks for the pipeline.
    """

    TaskAllSessions = 1  # all sessions are one task
    TaskPerSession = 2  # each session corresponds to one task
    TaskPerArtifact = 3  # each artifact or common variable is a task

TaskDefinition dataclass

Definition of an artifact, can extend new keys(user, project, ...) in the future.

Attributes:

Name Type Description
function_name str

suggested function name this task that wont conflict with other linea generated tasks

user_input_variables List[str]

arguments that must be provided through the framework

loaded_input_variables List[str]

arguments that are provided by other tasks and must be loaded through inter task communication

typing_blocks List[str]

for user_input_variables, casts the input variables to the correct type

call_block str

line of code to call the function in module file

return_vars List[str]

outputs that need to be serialized to be used

pipeline_name str

overall pipeline name

Source code in lineapy/plugins/task.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@dataclass
class TaskDefinition:
    """
    Definition of an artifact, can extend new keys(user, project, ...)
    in the future.

    Attributes
    ----------
    function_name: str
        suggested function name this task that wont conflict with other linea generated tasks
    user_input_variables: List[str]
        arguments that must be provided through the framework
    loaded_input_variables: List[str]
        arguments that are provided by other tasks and must be loaded through inter task communication
    typing_blocks: List[str]
        for user_input_variables, casts the input variables to the correct type
    call_block: str
        line of code to call the function in module file
    return_vars: List[str]
        outputs that need to be serialized to be used
    pipeline_name: str
        overall pipeline name

    """

    function_name: str
    user_input_variables: List[str]
    loaded_input_variables: List[str]
    typing_blocks: List[str]
    pre_call_block: str
    call_block: str
    post_call_block: str
    return_vars: List[str]
    pipeline_name: str

TaskGraph

Bases: object

Graph represents for task dependency It is constructed based on the "edges" variable

Attributes:

Name Type Description
edges TaskGraphEdge

Dictionary with task name as key and set of prerequisite tasks as value. This is the standard library graphlib style graph definition. For instance, {'C':{'A','B'}} means A and B are prerequisites for C. Both examples give us following task dependency::

A ---\ \

---> C / B ---/

Note
  • If we only support Python 3.9+, we prefer to use graphlib in standard library instead of networkx for graph operation.
  • We might want to get rid of the mapping for renaming slice_names to task_names.
Source code in lineapy/plugins/task.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class TaskGraph(object):
    """
    Graph represents for task dependency
    It is constructed based on the "edges" variable

    Attributes
    ----------
    edges: TaskGraphEdge
        Dictionary with task name as key and set of prerequisite 
        tasks as value. This is the standard library `graphlib` style graph 
        definition. For instance, {'C':{'A','B'}} means A and B are 
        prerequisites for C. Both examples give us following task dependency::

            A ---\\
                  \\
                   >---> C
                  /
            B ---/


    ??? note

        - If we only support Python 3.9+, we prefer to use graphlib in standard 
          library instead of networkx for graph operation.
        - We might want to get rid of the mapping for renaming slice_names to 
          task_names.

    """

    def __init__(
        self,
        nodes: List[str],
        edges: TaskGraphEdge = {},
    ):
        self.graph = nx.DiGraph()
        self.graph.add_nodes_from(nodes)
        # parsing the other format to our tuple-based format
        # note that nesting is not allowed (enforced by the type signature)
        # per the spec in https://docs.python.org/3/library/graphlib.html
        graph_edges = list(
            chain.from_iterable(
                ((node, to_node) for node in from_node)
                for to_node, from_node in edges.items()
            )
        )
        self.graph.add_edges_from(graph_edges)

    def copy(
        self,
    ) -> TaskGraph:
        copied_taskgraph = TaskGraph([])
        copied_taskgraph.graph = self.graph.copy()
        return copied_taskgraph

    def remap_nodes(self, mapping: Dict[str, str]) -> TaskGraph:
        remapped_taskgraph = TaskGraph([])
        remapped_taskgraph.graph = nx.relabel_nodes(
            self.graph, mapping, copy=True
        )
        return remapped_taskgraph

    def insert_setup_task(self, setup_task_name: str):
        """
        insert_setup_task adds a setup task that will be run before all the original source tasks
        """

        self.graph.add_node(setup_task_name)

        for old_source in self.source_nodes:
            if not old_source == setup_task_name:
                self.graph.add_edge(setup_task_name, old_source)

    def insert_teardown_task(self, cleanup_task_name: str):
        """
        insert_cleanup_task adds a cleanup task that will be run after all the original sink tasks
        """

        self.graph.add_node(cleanup_task_name)

        for old_sink in self.sink_nodes:
            if not old_sink == cleanup_task_name:
                self.graph.add_edge(old_sink, cleanup_task_name)

    def get_taskorder(self) -> List[str]:
        try:
            return list(nx.topological_sort(self.graph))
        except NetworkXUnfeasible:
            raise Exception(
                "Current implementation of LineaPy demands it be able to linearly order different sessions, "
                "which prohibits any circular dependencies between sessions. "
                "Please check if your provided dependencies include such circular dependencies between sessions, "
                "e.g., Artifact A (Session 1) -> Artifact B (Session 2) -> Artifact C (Session 1)."
            )

    def remove_self_loops(self):
        self.graph.remove_edges_from(nx.selfloop_edges(self.graph))

    @property
    def sink_nodes(self):
        return [
            node
            for node in self.graph.nodes
            if self.graph.out_degree(node) == 0
        ]

    @property
    def source_nodes(self):
        return [
            node
            for node in self.graph.nodes
            if self.graph.in_degree(node) == 0
        ]

insert_setup_task(setup_task_name)

insert_setup_task adds a setup task that will be run before all the original source tasks

Source code in lineapy/plugins/task.py
77
78
79
80
81
82
83
84
85
86
def insert_setup_task(self, setup_task_name: str):
    """
    insert_setup_task adds a setup task that will be run before all the original source tasks
    """

    self.graph.add_node(setup_task_name)

    for old_source in self.source_nodes:
        if not old_source == setup_task_name:
            self.graph.add_edge(setup_task_name, old_source)

insert_teardown_task(cleanup_task_name)

insert_cleanup_task adds a cleanup task that will be run after all the original sink tasks

Source code in lineapy/plugins/task.py
88
89
90
91
92
93
94
95
96
97
def insert_teardown_task(self, cleanup_task_name: str):
    """
    insert_cleanup_task adds a cleanup task that will be run after all the original sink tasks
    """

    self.graph.add_node(cleanup_task_name)

    for old_sink in self.sink_nodes:
        if not old_sink == cleanup_task_name:
            self.graph.add_edge(old_sink, cleanup_task_name)

TaskSerializer

Bases: Enum

Enum to define what type of object serialization to use for inter task communication.

Source code in lineapy/plugins/task.py
189
190
191
192
193
194
195
196
197
198
class TaskSerializer(Enum):
    """Enum to define what type of object serialization to use for inter task communication."""

    # Write to local pickle directory under /tmp/
    TmpDirPickle = 1
    # Write to a pickle directory that can be parametrized
    ParametrizedPickle = 2
    # Write pickle to same directory as pipeline files
    # (Current Working Directory)
    CWDPickle = 3

render_task_definitions(task_defs, pipeline_name, task_serialization, task_name_fn=lambda task_def: task_def.function_name, function_decorator_fn=lambda task_def: '', user_input_variables_fn=lambda task_def: ', '.join(task_def.user_input_variables), typing_blocks_fn=lambda task_def: task_def.typing_blocks, pre_call_block_fn=lambda task_def: task_def.pre_call_block, call_block_fn=lambda task_def: task_def.call_block, post_call_block_fn=lambda task_def: task_def.post_call_block, return_block_fn=lambda task_def: '', include_imports_locally=False)

Returns rendered tasks for the pipeline tasks.

Source code in lineapy/plugins/task.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def render_task_definitions(
    task_defs: Dict[str, TaskDefinition],
    pipeline_name: str,
    task_serialization: Optional[TaskSerializer],
    task_name_fn: Callable = lambda task_def: task_def.function_name,
    function_decorator_fn: Callable = lambda task_def: "",
    user_input_variables_fn: Callable = lambda task_def: ", ".join(
        task_def.user_input_variables
    ),
    typing_blocks_fn: Callable = lambda task_def: task_def.typing_blocks,
    pre_call_block_fn: Callable = lambda task_def: task_def.pre_call_block,
    call_block_fn: Callable = lambda task_def: task_def.call_block,
    post_call_block_fn: Callable = lambda task_def: task_def.post_call_block,
    return_block_fn: Callable = lambda task_def: "",
    include_imports_locally: bool = False,
) -> List[str]:
    """
    Returns rendered tasks for the pipeline tasks.
    """
    TASK_FUNCTION_TEMPLATE = load_plugin_template("task/task_function.jinja")
    rendered_task_defs: List[str] = []
    for task_name, task_def in task_defs.items():
        if task_serialization:
            loading_blocks, dumping_blocks = render_task_io_serialize_blocks(
                task_def, task_serialization
            )
        else:
            loading_blocks, dumping_blocks = [], []

        task_def_rendered = TASK_FUNCTION_TEMPLATE.render(
            MODULE_NAME=pipeline_name + "_module",
            function_name=task_name_fn(task_def),
            function_decorator=function_decorator_fn(task_def),
            user_input_variables=user_input_variables_fn(task_def),
            typing_blocks=typing_blocks_fn(task_def),
            loading_blocks=loading_blocks,
            pre_call_block=pre_call_block_fn(task_def),
            call_block=call_block_fn(task_def),
            post_call_block=post_call_block_fn(task_def),
            dumping_blocks=dumping_blocks,
            return_block=return_block_fn(task_def),
            include_imports_locally=include_imports_locally,
        )
        rendered_task_defs.append(task_def_rendered)

    return rendered_task_defs

render_task_io_serialize_blocks(taskdef, task_serialization)

render_task_io_serialize_blocks renders object ser and deser code blocks.

These code blocks can be used for inter task communication. This function returns the task deserialization block first, since this block should be included first in the function to load the variables.

Source code in lineapy/plugins/task.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
def render_task_io_serialize_blocks(
    taskdef: TaskDefinition, task_serialization: TaskSerializer
) -> Tuple[List[str], List[str]]:
    """
    render_task_io_serialize_blocks renders object ser and deser code blocks.

    These code blocks can be used for inter task communication.
    This function returns the task deserialization block first,
    since this block should be included first in the function to load the variables.
    """
    task_serialization_blocks = []
    task_deserialization_block = []

    if task_serialization == TaskSerializer.TmpDirPickle:
        SERIALIZER_TEMPLATE = load_plugin_template(
            "task/tmpdirpickle/task_ser.jinja"
        )
        DESERIALIZER_TEMPLATE = load_plugin_template(
            "task/tmpdirpickle/task_deser.jinja"
        )
    elif task_serialization == TaskSerializer.ParametrizedPickle:
        SERIALIZER_TEMPLATE = load_plugin_template(
            "task/parameterizedpickle/task_ser.jinja"
        )
        DESERIALIZER_TEMPLATE = load_plugin_template(
            "task/parameterizedpickle/task_deser.jinja"
        )
    elif task_serialization == TaskSerializer.CWDPickle:
        SERIALIZER_TEMPLATE = load_plugin_template(
            "task/cwdpickle/task_ser.jinja"
        )
        DESERIALIZER_TEMPLATE = load_plugin_template(
            "task/cwdpickle/task_deser.jinja"
        )

    # Add more renderable task serializers here

    for loaded_input_variable in taskdef.loaded_input_variables:
        task_deserialization_block.append(
            DESERIALIZER_TEMPLATE.render(
                loaded_input_variable=loaded_input_variable,
                pipeline_name=taskdef.pipeline_name,
            )
        )
    for return_variable in taskdef.return_vars:
        task_serialization_blocks.append(
            SERIALIZER_TEMPLATE.render(
                return_variable=return_variable,
                pipeline_name=taskdef.pipeline_name,
            )
        )

    return task_deserialization_block, task_serialization_blocks

Was this helpful?

Help us improve docs with your feedback!