all_tasks
Tasks can be imported piecemeal or imported in their entirety from here.
Attributes⚓︎
TaskList
module-attribute
⚓︎
TaskList = List[Union[Call, DeferedTask]]
List of wrapped or normal task functions.
Classes⚓︎
Functions⚓︎
main ⚓︎
main(_ctx)
Main task pipeline.
Source code in calcipy/tasks/all_tasks.py
@task(post=with_progress(_MAIN_TASKS))
def main(_ctx: Context) -> None:
"""Main task pipeline."""
other ⚓︎
other(_ctx)
Run tasks that are otherwise not exercised in main.
Source code in calcipy/tasks/all_tasks.py
@task(post=with_progress(_OTHER_TASKS))
def other(_ctx: Context) -> None:
"""Run tasks that are otherwise not exercised in main."""
progress ⚓︎
progress(_ctx, *, index, total)
Progress Task.
Source code in calcipy/tasks/all_tasks.py
@task(
help={
'index': 'Current index (0-indexed)',
'total': 'Total steps',
},
show_task_info=False,
)
def progress(_ctx: Context, *, index: int, total: int) -> None:
"""Progress Task."""
logger.text('Progress', is_header=True, index=index + 1, total=total)
release ⚓︎
release(ctx, *, suffix=None)
Release pipeline.
Source code in calcipy/tasks/all_tasks.py
@task(
help=cl.bump.help, # pyright: ignore[reportFunctionMemberAccess]
post=with_progress(
[
pack.lock,
doc.build,
doc.deploy,
pack.publish,
],
offset=1,
),
)
def release(ctx: Context, *, suffix: cl.SuffixT = None) -> None:
"""Release pipeline."""
cl.bumpz(ctx, suffix=suffix)
summary ⚓︎
summary(_ctx, *, message)
Summary Task.
Source code in calcipy/tasks/all_tasks.py
@task(
help={
'message': 'String message to display',
},
show_task_info=False,
)
def summary(_ctx: Context, *, message: str) -> None:
"""Summary Task."""
logger.text(message, is_header=True)
with_progress ⚓︎
with_progress(items, offset=0)
Inject intermediary ‘progress’ tasks.
PARAMETER | DESCRIPTION |
---|---|
items |
list of tasks
TYPE:
|
offset |
Optional offset to shift counters
TYPE:
|
Source code in calcipy/tasks/all_tasks.py
@beartype
def with_progress(items: Any, offset: int = 0) -> TaskList:
"""Inject intermediary 'progress' tasks.
Args:
items: list of tasks
offset: Optional offset to shift counters
"""
task_items = [_build_task(_t) for _t in items]
message = 'Running tasks: ' + ', '.join([str(_t.__name__) for _t in task_items])
tasks: TaskList = [summary.with_kwargs(message=message)] # pyright: ignore[reportFunctionMemberAccess]
total = len(task_items) + offset
for idx, item in enumerate(task_items):
tasks += [
progress.with_kwargs(index=idx + offset, total=total), # pyright: ignore[reportFunctionMemberAccess]
item,
]
return tasks