Skip to content

Pipelines

This is the most important part of the checker. Pipelines are used to actually check and grade the solution.
Each pipeline is a sequence of plugins. Each plugin (pipeline stage) have arguments, run_if condition and return exclusion result.

Please refer to the plugins configuration for more details on pre-defined and custom plugins.

3 pipeline types

There are 3 types of pipelines you need to define in .checker.yml file: * global_pipeline - pipeline to be executed once for all checker repository.
You can place here any general compilation, installation, etc.
* task_pipeline - pipeline to be executed for each task.
You can place here any task-specific compilation, installation, etc.
For example, you run pytest by default, but for some tasks you want to have MR checked first.
(can be re-defined in .task.yml file) * report_pipeline - pipeline to be executed for each task after all tests are passed (not failed).
You can place here any task-specific score reporting, etc.
For example, you can report the score to the Manytask platform, but for some tasks you want to have MR checked first.
(can be re-defined in .task.yml file)

# .checker.yml
...
testing:
  # once
  global_pipeline:
    - name: "Install requirements"
      run: "run_script"
      args:
        ...
  # for each task
  task_pipeline:
    - name: "Run pytest"
      run: "pytest"
      args:
        ...
  # for each task after task_pipeline is passed
  report_pipeline:
    - name: "Report Score Manytask"
      run: "report_score_manytask"
      args: 
        ...
...

Single pipeline stage

Each pipeline stage is a plugin called with arguments. Here is the example of a single pipeline stage:

  - name: "Check forbidden regexps"  
    fail: fast
    run: "check_regexps"
    run_if: true
    register_output: "forbidden_regexps"
    args:
      origin: "/tmp/origin"
      patterns: ["**/*.py"]
      regexps: ["exit(0)"]

  • name: Human-readable name of the pipeline stage to be shown in the logs.
  • fail: Defines how to handle the failure of this pipeline stage.
    • fast - (default) fail fast, stop the pipeline and fail the task immediately.
    • after_all - fail after all pipeline stages are executed.
    • never - ignore the failure of this pipeline stage.
  • run: key name of the plugin to be executed. Will be searched within pre-defined and custom plugins.
  • run_if: condition to run this pipeline stage. Cast to bool, true by default.
  • register_output: name of the output to be registered in outputs variable. The PipelineStageResult object will be stored in outputs dict with this name.
  • args: arguments to be passed to the plugin.
    Arguments are validated by pydantic library as defined by each individual plugin.

Templating in Tester Pipelines

You can use jinja2 templating in .checker.yml file pipeline arguments and run_if conditions.
They can be used with ${{ ... }} syntax, expression within this brackets will be evaluated before plugin execution. For example:

  report_pipeline:
    - name: "Report Score Manytask"
      run: "report_score_manytask"
      args:
        origin: "${{ global.temp_dir }}/${{ task.task_sub_path }}"
        patterns: ["**/*.py"]
        username: ${{ global.username }}
        task_name: ${{ task.task_name }}
        score: ${{ outputs.test_output.percentage }}

The available variables are:

  • global - global parameters

    Base variables passed in pipeline stages.

    Source code in checker/tester.py
    16
    17
    18
    19
    20
    21
    22
    23
    24
    @dataclass
    class GlobalPipelineVariables:
        """Base variables passed in pipeline stages."""
    
        ref_dir: str
        repo_dir: str
        temp_dir: str
        task_names: list[str]
        task_sub_paths: list[str]
    
  • task - task parameters

    Variables passed in pipeline stages for each task.

    Source code in checker/tester.py
    27
    28
    29
    30
    31
    32
    @dataclass
    class TaskPipelineVariables:
        """Variables passed in pipeline stages for each task."""
    
        task_name: str
        task_sub_path: str
    
  • parameters - default parameters

    Bases: RootModel[dict[str, TParamType]]

    Source code in checker/configs/checker.py
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    class CheckerParametersConfig(RootModel[dict[str, TParamType]]):
        root: dict[str, TParamType]
    
        def __getitem__(self, item: str) -> TParamType:
            return self.root[item]
    
        def __contains__(self, item: str) -> bool:
            return item in self.root
    
        @property
        def __dict__(self) -> dict[str, TParamType]:
            return self.root
    
        @__dict__.setter
        def __dict__(self, value: dict[str, TParamType]) -> None:
            self.root = value
    
  • env - environment variables dict in the moment of running checker

  • outputs - outputs of previous pipeline step if register_output is set, dict of string to checker.plugins.PluginOutput objects

    Result of a single pipeline stage. :param name: name of the stage :param failed: if True, stage failed :param skipped: if True, stage was skipped :param percentage: optional percentage of points earned :param elapsed_time: optional elapsed time in seconds :param output: output of the stage

    Source code in checker/pipeline.py
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @dataclass
    class PipelineStageResult:
        """Result of a single pipeline stage.
        :param name: name of the stage
        :param failed: if True, stage failed
        :param skipped: if True, stage was skipped
        :param percentage: optional percentage of points earned
        :param elapsed_time: optional elapsed time in seconds
        :param output: output of the stage
        """
    
        name: str
        failed: bool
        skipped: bool
        percentage: float | None = None
        elapsed_time: float | None = None
        output: str = ""
    
        def __str__(self) -> str:  # pragma: no cover
            return (
                f"PipelineStageResult: failed={int(self.failed)}, "
                f"skipped={int(self.skipped)}, percentage={self.percentage or 1.0:.2f}, name='{self.name}'"
            )
    

Pipeline stage result

Each stage can optionally register its output in outputs context to be used by the next stages.
e.g. register percentage of passed tests to be used in the next stage to report the score.

Each pipeline processes internally as PipelineStageResult object. It contains the following fields:

Result of a single pipeline stage. :param name: name of the stage :param failed: if True, stage failed :param skipped: if True, stage was skipped :param percentage: optional percentage of points earned :param elapsed_time: optional elapsed time in seconds :param output: output of the stage

Source code in checker/pipeline.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class PipelineStageResult:
    """Result of a single pipeline stage.
    :param name: name of the stage
    :param failed: if True, stage failed
    :param skipped: if True, stage was skipped
    :param percentage: optional percentage of points earned
    :param elapsed_time: optional elapsed time in seconds
    :param output: output of the stage
    """

    name: str
    failed: bool
    skipped: bool
    percentage: float | None = None
    elapsed_time: float | None = None
    output: str = ""

    def __str__(self) -> str:  # pragma: no cover
        return (
            f"PipelineStageResult: failed={int(self.failed)}, "
            f"skipped={int(self.skipped)}, percentage={self.percentage or 1.0:.2f}, name='{self.name}'"
        )

And can be accessed in the next pipeline stages using templating syntax ${{ outputs.<registered-name>.<result-field> }}