Skip to content

HEDit Python API

This page documents the Python modules and classes in HEDit.

API Models

The API request/response models are defined using Pydantic:

src.api.models

Pydantic models for API requests and responses.

AnnotationRequest

Bases: BaseModel

Request model for HED annotation generation.

Attributes:

Name Type Description
description str

Natural language event description to annotate

schema_version str

HED schema version to use

max_validation_attempts int

Maximum validation retry attempts

run_assessment bool

Whether to run final assessment (adds extra time)

model str | None

Override model for annotation (BYOK mode only)

provider str | None

Override provider preference (BYOK mode only)

temperature float | None

Override LLM temperature (BYOK mode only)

Source code in hedit/src/api/models.py
class AnnotationRequest(BaseModel):
    """Request model for HED annotation generation.

    Attributes:
        description: Natural language event description to annotate
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        run_assessment: Whether to run final assessment (adds extra time)
        model: Override model for annotation (BYOK mode only)
        provider: Override provider preference (BYOK mode only)
        temperature: Override LLM temperature (BYOK mode only)
    """

    description: str = Field(
        ...,
        description="Natural language event description",
        min_length=1,
        examples=["A red circle appears on the left side of the screen"],
    )
    schema_version: str = Field(
        default="8.4.0",
        description="HED schema version",
        examples=["8.3.0", "8.4.0"],
    )
    max_validation_attempts: int = Field(
        default=3,
        description="Maximum validation retry attempts (total iterations = this + 1)",
        ge=1,
        le=10,
    )
    run_assessment: bool = Field(
        default=False,
        description="Run final assessment for completeness (adds extra processing time)",
    )
    # BYOK model configuration (optional, only used when X-OpenRouter-Key is provided)
    model: str | None = Field(
        default=None,
        description="Override model for annotation (BYOK mode only, e.g., 'openai/gpt-4o')",
        examples=["anthropic/claude-haiku-4.5", "qwen/qwen3.5-122b-a10b"],
    )
    provider: str | None = Field(
        default=None,
        description="Override provider preference (BYOK mode only, e.g., 'anthropic')",
        examples=["anthropic", "alibaba", None],
    )
    temperature: float | None = Field(
        default=None,
        description="Override LLM temperature (BYOK mode only, 0.0-1.0)",
        ge=0.0,
        le=1.0,
        examples=[0.1, 0.3, 0.7],
    )
    no_extend: bool = Field(
        default=False,
        description="If True, prohibit tag extensions (use only existing HED vocabulary)",
    )
    telemetry_enabled: bool = Field(
        default=True,
        description="Allow telemetry collection for this request",
    )

AnnotationResponse

Bases: BaseModel

Response model for HED annotation generation.

Attributes:

Name Type Description
annotation str

Generated HED annotation string

is_valid bool

Whether the annotation passed validation

is_faithful bool

Whether the annotation is faithful to description

is_complete bool

Whether the annotation is complete

validation_attempts int

Number of validation attempts made

validation_errors list[str]

List of validation errors (if any)

validation_warnings list[str]

List of validation warnings (if any)

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

status str

Overall workflow status

Source code in hedit/src/api/models.py
class AnnotationResponse(BaseModel):
    """Response model for HED annotation generation.

    Attributes:
        annotation: Generated HED annotation string
        is_valid: Whether the annotation passed validation
        is_faithful: Whether the annotation is faithful to description
        is_complete: Whether the annotation is complete
        validation_attempts: Number of validation attempts made
        validation_errors: List of validation errors (if any)
        validation_warnings: List of validation warnings (if any)
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        status: Overall workflow status
    """

    annotation: str = Field(..., description="Generated HED annotation string")
    is_valid: bool = Field(..., description="Validation status")
    is_faithful: bool = Field(..., description="Faithfulness to original description")
    is_complete: bool = Field(..., description="Completeness status")
    validation_attempts: int = Field(..., description="Number of validation attempts")
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    status: str = Field(..., description="Workflow status", examples=["success", "failed"])

ValidationRequest

Bases: BaseModel

Request model for HED validation only.

Attributes:

Name Type Description
hed_string str

HED annotation string to validate

schema_version str

HED schema version to use

Source code in hedit/src/api/models.py
class ValidationRequest(BaseModel):
    """Request model for HED validation only.

    Attributes:
        hed_string: HED annotation string to validate
        schema_version: HED schema version to use
    """

    hed_string: str = Field(
        ...,
        description="HED annotation string",
        min_length=1,
    )
    schema_version: str = Field(
        default="8.4.0",
        description="HED schema version",
    )

ValidationResponse

Bases: BaseModel

Response model for HED validation.

Attributes:

Name Type Description
is_valid bool

Whether the HED string is valid

errors list[str]

List of validation errors

warnings list[str]

List of validation warnings

parsed_string str | None

Normalized HED string (if valid)

Source code in hedit/src/api/models.py
class ValidationResponse(BaseModel):
    """Response model for HED validation.

    Attributes:
        is_valid: Whether the HED string is valid
        errors: List of validation errors
        warnings: List of validation warnings
        parsed_string: Normalized HED string (if valid)
    """

    is_valid: bool = Field(..., description="Validation status")
    errors: list[str] = Field(default_factory=list)
    warnings: list[str] = Field(default_factory=list)
    parsed_string: str | None = Field(default=None)

ImageAnnotationRequest

Bases: BaseModel

Request model for image-based HED annotation generation.

Attributes:

Name Type Description
image str

Base64 encoded image or data URI

prompt str | None

Optional custom prompt for vision model (uses default if not provided)

schema_version str

HED schema version to use

max_validation_attempts int

Maximum validation retry attempts

run_assessment bool

Whether to run final assessment (adds extra time)

model str | None

Override model for annotation (BYOK mode only)

vision_model str | None

Override vision model for image description (BYOK mode only)

provider str | None

Override provider preference (BYOK mode only)

temperature float | None

Override LLM temperature (BYOK mode only)

Source code in hedit/src/api/models.py
class ImageAnnotationRequest(BaseModel):
    """Request model for image-based HED annotation generation.

    Attributes:
        image: Base64 encoded image or data URI
        prompt: Optional custom prompt for vision model (uses default if not provided)
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        run_assessment: Whether to run final assessment (adds extra time)
        model: Override model for annotation (BYOK mode only)
        vision_model: Override vision model for image description (BYOK mode only)
        provider: Override provider preference (BYOK mode only)
        temperature: Override LLM temperature (BYOK mode only)
    """

    image: str = Field(
        ...,
        description="Base64 encoded image or data URI (data:image/png;base64,...)",
        min_length=1,
    )
    prompt: str | None = Field(
        default=None,
        description="Optional custom prompt for vision model",
        examples=["Describe the visual elements in this image"],
    )
    schema_version: str = Field(
        default="8.4.0",
        description="HED schema version",
        examples=["8.3.0", "8.4.0"],
    )
    max_validation_attempts: int = Field(
        default=3,
        description="Maximum validation retry attempts (total iterations = this + 1)",
        ge=1,
        le=10,
    )
    run_assessment: bool = Field(
        default=False,
        description="Run final assessment for completeness (adds extra processing time)",
    )
    # BYOK model configuration (optional, only used when X-OpenRouter-Key is provided)
    model: str | None = Field(
        default=None,
        description="Override model for annotation (BYOK mode only, e.g., 'openai/gpt-4o')",
        examples=["openai/gpt-4o", "anthropic/claude-3.5-sonnet"],
    )
    vision_model: str | None = Field(
        default=None,
        description="Override vision model for image description (BYOK mode only)",
        examples=["qwen/qwen3.5-122b-a10b", "qwen/qwen3-vl-235b-a22b-instruct"],
    )
    vision_provider: str | None = Field(
        default=None,
        description="Override vision model provider (BYOK mode only, e.g., 'alibaba')",
        examples=["alibaba", "novita", None],
    )
    provider: str | None = Field(
        default=None,
        description="Override annotation provider preference (BYOK mode only, e.g., 'anthropic')",
        examples=["anthropic", "alibaba", None],
    )
    temperature: float | None = Field(
        default=None,
        description="Override LLM temperature (BYOK mode only, 0.0-1.0)",
        ge=0.0,
        le=1.0,
        examples=[0.1, 0.3, 0.7],
    )
    no_extend: bool = Field(
        default=False,
        description="If True, prohibit tag extensions (use only existing HED vocabulary)",
    )
    telemetry_enabled: bool = Field(
        default=True,
        description="Allow telemetry collection for this request",
    )

ImageAnnotationResponse

Bases: BaseModel

Response model for image-based HED annotation generation.

Attributes:

Name Type Description
image_description str

Generated description from vision model

annotation str

Generated HED annotation string

is_valid bool

Whether the annotation passed validation

is_faithful bool

Whether the annotation is faithful to description

is_complete bool

Whether the annotation is complete

validation_attempts int

Number of validation attempts made

validation_errors list[str]

List of validation errors (if any)

validation_warnings list[str]

List of validation warnings (if any)

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

status str

Overall workflow status

image_metadata dict

Metadata about the processed image

Source code in hedit/src/api/models.py
class ImageAnnotationResponse(BaseModel):
    """Response model for image-based HED annotation generation.

    Attributes:
        image_description: Generated description from vision model
        annotation: Generated HED annotation string
        is_valid: Whether the annotation passed validation
        is_faithful: Whether the annotation is faithful to description
        is_complete: Whether the annotation is complete
        validation_attempts: Number of validation attempts made
        validation_errors: List of validation errors (if any)
        validation_warnings: List of validation warnings (if any)
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        status: Overall workflow status
        image_metadata: Metadata about the processed image
    """

    image_description: str = Field(..., description="Generated image description")
    annotation: str = Field(..., description="Generated HED annotation string")
    is_valid: bool = Field(..., description="Validation status")
    is_faithful: bool = Field(..., description="Faithfulness to description")
    is_complete: bool = Field(..., description="Completeness status")
    validation_attempts: int = Field(..., description="Number of validation attempts")
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    status: str = Field(..., description="Workflow status", examples=["success", "failed"])
    image_metadata: dict = Field(default_factory=dict, description="Image metadata")

HealthResponse

Bases: BaseModel

Response model for health check.

Attributes:

Name Type Description
status str

Service status

version str

API version

llm_available bool

Whether LLM is available

validator_available bool

Whether HED validator is available

Source code in hedit/src/api/models.py
class HealthResponse(BaseModel):
    """Response model for health check.

    Attributes:
        status: Service status
        version: API version
        llm_available: Whether LLM is available
        validator_available: Whether HED validator is available
    """

    status: str = Field(..., examples=["healthy", "degraded"])
    version: str = Field(..., examples=["0.1.0"])
    llm_available: bool
    validator_available: bool

FeedbackRequest

Bases: BaseModel

Request model for submitting user feedback.

Attributes:

Name Type Description
type str

Feedback type (text or image annotation)

description str | None

Original input description (for text mode)

image_description str | None

Image description (for image mode)

annotation str

Generated HED annotation

is_valid bool

Whether the annotation was valid

is_faithful bool | None

Whether the annotation was faithful

is_complete bool | None

Whether the annotation was complete

validation_errors list[str]

List of validation errors

validation_warnings list[str]

List of validation warnings

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

user_comment str | None

Optional user comment about the annotation

Source code in hedit/src/api/models.py
class FeedbackRequest(BaseModel):
    """Request model for submitting user feedback.

    Attributes:
        type: Feedback type (text or image annotation)
        description: Original input description (for text mode)
        image_description: Image description (for image mode)
        annotation: Generated HED annotation
        is_valid: Whether the annotation was valid
        is_faithful: Whether the annotation was faithful
        is_complete: Whether the annotation was complete
        validation_errors: List of validation errors
        validation_warnings: List of validation warnings
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        user_comment: Optional user comment about the annotation
    """

    type: str = Field(
        default="text",
        description="Feedback type",
        examples=["text", "image"],
    )
    version: str | None = Field(
        default=None,
        description="App version that generated the annotation",
    )
    description: str | None = Field(
        default=None,
        description="Original input description (for text mode)",
    )
    image_description: str | None = Field(
        default=None,
        description="Image description (for image mode)",
    )
    annotation: str = Field(
        ...,
        description="Generated HED annotation",
        min_length=1,
    )
    is_valid: bool = Field(
        default=False,
        description="Whether the annotation was valid",
    )
    is_faithful: bool | None = Field(
        default=None,
        description="Whether the annotation was faithful",
    )
    is_complete: bool | None = Field(
        default=None,
        description="Whether the annotation was complete",
    )
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    user_comment: str | None = Field(
        default=None,
        description="Optional user comment about the annotation",
    )

FeedbackResponse

Bases: BaseModel

Response model for feedback submission.

Attributes:

Name Type Description
success bool

Whether feedback was saved successfully

feedback_id str

Unique identifier for the feedback

message str

Status message

Source code in hedit/src/api/models.py
class FeedbackResponse(BaseModel):
    """Response model for feedback submission.

    Attributes:
        success: Whether feedback was saved successfully
        feedback_id: Unique identifier for the feedback
        message: Status message
    """

    success: bool = Field(..., description="Whether feedback was saved")
    feedback_id: str = Field(..., description="Unique identifier for the feedback")
    message: str = Field(..., description="Status message")

CLI Module

The CLI is built with Typer:

src.cli.main

HEDit CLI - Main entry point.

Command-line interface for generating HED annotations from natural language. Supports two execution modes: - API mode (default): Uses api.annotation.garden backend - Standalone mode: Runs LangGraph workflow locally (requires hedit[standalone])

get_executor(config, api_key, mode_override=None, user_id=None)

Get the appropriate execution backend based on configuration.

Parameters:

Name Type Description Default
config CLIConfig

CLI configuration

required
api_key str | None

OpenRouter API key

required
mode_override str | None

Override mode from --standalone/--api flags

None
user_id str | None

Custom user ID for cache optimization (default: auto-generated)

None

Returns:

Type Description
ExecutionBackend

Configured ExecutionBackend instance

Raises:

Type Description
Exit

If standalone mode requested but dependencies not available

Source code in hedit/src/cli/main.py
def get_executor(
    config: CLIConfig,
    api_key: str | None,
    mode_override: str | None = None,
    user_id: str | None = None,
) -> ExecutionBackend:
    """Get the appropriate execution backend based on configuration.

    Args:
        config: CLI configuration
        api_key: OpenRouter API key
        mode_override: Override mode from --standalone/--api flags
        user_id: Custom user ID for cache optimization (default: auto-generated)

    Returns:
        Configured ExecutionBackend instance

    Raises:
        typer.Exit: If standalone mode requested but dependencies not available
    """
    mode = mode_override or config.execution.mode

    if mode == "standalone":
        from src.cli.local_executor import LocalExecutionBackend

        executor = LocalExecutionBackend(
            api_key=api_key,
            model=config.models.default,
            eval_model=config.models.evaluation,
            eval_provider=config.models.eval_provider,
            vision_model=config.models.vision,
            vision_provider=config.models.vision_provider,
            provider=config.models.provider,
            temperature=config.models.temperature,
            user_id=user_id,
        )

        if not executor.is_available():
            output.print_error(
                "Standalone mode requires additional dependencies",
                hint="Install with: pip install hedit[standalone]",
            )
            raise typer.Exit(1)

        return executor
    else:
        from src.cli.api_executor import APIExecutionBackend

        return APIExecutionBackend(
            api_url=config.api.url,
            api_key=api_key,
            model=config.models.default,
            eval_model=config.models.evaluation,
            eval_provider=config.models.eval_provider,
            vision_model=config.models.vision,
            provider=config.models.provider,
            temperature=config.models.temperature,
            user_id=user_id,
        )

version_callback(value)

Print version and exit.

Source code in hedit/src/cli/main.py
def version_callback(value: bool) -> None:
    """Print version and exit."""
    if value:
        console.print(f"hedit version {__version__}")
        raise typer.Exit()

main(version=False)

HEDit CLI - Generate HED annotations from natural language.

Convert event descriptions to valid HED (Hierarchical Event Descriptors) annotations using AI-powered multi-agent system.

Get started

hedit init --api-key YOUR_OPENROUTER_KEY hedit annotate "A red circle appears on screen"

Source code in hedit/src/cli/main.py
@app.callback()
def main(
    version: Annotated[
        bool,
        typer.Option(
            "--version",
            "-V",
            callback=version_callback,
            is_eager=True,
            help="Show version and exit",
        ),
    ] = False,
) -> None:
    """HEDit CLI - Generate HED annotations from natural language.

    Convert event descriptions to valid HED (Hierarchical Event Descriptors)
    annotations using AI-powered multi-agent system.

    Get started:
        hedit init --api-key YOUR_OPENROUTER_KEY
        hedit annotate "A red circle appears on screen"
    """
    pass

init(api_key=None, api_url=None, model=None, provider=None, temperature=None, standalone=False)

Initialize HEDit CLI with your API key and preferences.

This saves your configuration to ~/.config/hedit/ so you don't need to provide the API key for every command.

Get an OpenRouter API key at: https://openrouter.ai/keys

Examples:

hedit init --api-key YOUR_KEY # API mode (default) hedit init --api-key YOUR_KEY --standalone # Standalone mode

Source code in hedit/src/cli/main.py
@app.command()
def init(
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            "-k",
            help="OpenRouter API key (get one at https://openrouter.ai/keys)",
            prompt="OpenRouter API key",
            hide_input=True,
        ),
    ] = None,
    api_url: ApiUrlOption = None,
    model: Annotated[
        str | None,
        typer.Option(
            "--model",
            "-m",
            help="Default model for annotation",
        ),
    ] = None,
    provider: Annotated[
        str | None,
        typer.Option(
            "--provider",
            help="Provider preference (e.g., Cerebras for fast inference)",
        ),
    ] = None,
    temperature: Annotated[
        float | None,
        typer.Option(
            "--temperature",
            "-t",
            help="LLM temperature (0.0-1.0, lower = more consistent)",
        ),
    ] = None,
    standalone: Annotated[
        bool,
        typer.Option(
            "--standalone",
            help="Set default mode to standalone (run locally without backend)",
        ),
    ] = False,
) -> None:
    """Initialize HEDit CLI with your API key and preferences.

    This saves your configuration to ~/.config/hedit/ so you don't need
    to provide the API key for every command.

    Get an OpenRouter API key at: https://openrouter.ai/keys

    Examples:
        hedit init --api-key YOUR_KEY           # API mode (default)
        hedit init --api-key YOUR_KEY --standalone  # Standalone mode
    """
    # Show telemetry disclosure on first run
    if is_first_run():
        show_telemetry_disclosure()
        mark_first_run_complete()

    # Load existing config
    config = load_config()
    creds = load_credentials()

    # Update with provided values
    if api_key:
        creds.openrouter_api_key = api_key
    if api_url:
        config.api.url = api_url
    if model:
        config.models.default = model
    if provider:
        config.models.provider = provider
    if temperature is not None:
        config.models.temperature = temperature
    if standalone:
        config.execution.mode = "standalone"

    # Save
    save_credentials(creds)
    save_config(config)

    output.print_success("Configuration saved!")
    output.print_info(f"Config file: {CONFIG_FILE}")
    output.print_info(f"Credentials: {CREDENTIALS_FILE}")
    output.print_info(f"Execution mode: {config.execution.mode}")

    # Test connection based on mode
    if creds.openrouter_api_key:
        if config.execution.mode == "standalone":
            output.print_progress("Checking standalone mode dependencies")
            try:
                executor = get_executor(config, creds.openrouter_api_key)
                health = executor.health()
                if health.get("status") == "healthy":
                    output.print_success("Standalone mode ready!")
                    if not health.get("validator_available"):
                        output.print_info(
                            "Note: hedtools not installed; local validation unavailable"
                        )
                else:
                    output.print_info(f"Status: {health.get('status', 'unknown')}")
            except ExecutionError as e:
                output.print_error(f"Standalone mode issue: {e}", hint=e.detail)
        else:
            output.print_progress("Testing API connection")
            try:
                executor = get_executor(config, creds.openrouter_api_key)
                health = executor.health()
                if health.get("status") == "healthy":
                    output.print_success("API connection successful!")
                else:
                    output.print_info(f"API status: {health.get('status', 'unknown')}")
            except ExecutionError as e:
                output.print_error(f"Could not connect to API: {e}", hint=e.detail)
            except APIError as e:
                output.print_error(
                    f"Could not connect to API: {e}", hint="Check your API key and URL"
                )
            except Exception as e:
                output.print_error(f"Connection test failed: {e}")

annotate(description, api_key=None, api_url=None, model=None, eval_model=None, eval_provider=None, provider=None, temperature=None, schema_version=None, output_format='text', max_attempts=5, assessment=False, no_streaming=False, no_extend=False, standalone=False, api_mode=False, verbose=False, user_id=None)

Generate HED annotation from a text description.

Examples:

hedit annotate "A red circle appears on the left side of the screen" hedit annotate "Participant pressed the spacebar" --schema 8.4.0 hedit annotate "Audio beep plays" -o json > result.json hedit annotate "..." --model gpt-4o-mini --temperature 0.2 hedit annotate "..." --standalone # Run locally hedit annotate "..." --no-streaming # Disable live progress hedit annotate "..." --standalone --no-extend # No tag extensions

Source code in hedit/src/cli/main.py
@app.command()
def annotate(
    description: Annotated[
        str,
        typer.Argument(help="Natural language event description"),
    ],
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    model: ModelOption = None,
    eval_model: EvalModelOption = None,
    eval_provider: EvalProviderOption = None,
    provider: ProviderOption = None,
    temperature: TemperatureOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
    max_attempts: Annotated[
        int,
        typer.Option(
            "--max-attempts",
            help="Maximum validation attempts",
        ),
    ] = 5,
    assessment: Annotated[
        bool,
        typer.Option(
            "--assessment/--no-assessment",
            help="Run completeness assessment",
        ),
    ] = False,
    no_streaming: Annotated[
        bool,
        typer.Option(
            "--no-streaming",
            help="Disable streaming progress (use batch mode)",
        ),
    ] = False,
    no_extend: NoExtendOption = False,
    standalone: StandaloneOption = False,
    api_mode: ApiModeOption = False,
    verbose: VerboseOption = False,
    user_id: UserIdOption = None,
) -> None:
    """Generate HED annotation from a text description.

    Examples:
        hedit annotate "A red circle appears on the left side of the screen"
        hedit annotate "Participant pressed the spacebar" --schema 8.4.0
        hedit annotate "Audio beep plays" -o json > result.json
        hedit annotate "..." --model gpt-4o-mini --temperature 0.2
        hedit annotate "..." --standalone  # Run locally
        hedit annotate "..." --no-streaming  # Disable live progress
        hedit annotate "..." --standalone --no-extend  # No tag extensions
    """
    # Show telemetry disclosure on first run
    if is_first_run():
        show_telemetry_disclosure()
        mark_first_run_complete()

    # Determine mode override
    mode_override = None
    if standalone:
        mode_override = "standalone"
    elif api_mode:
        mode_override = "api"

    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        model=model,
        eval_model=eval_model,
        eval_provider=eval_provider,
        provider=provider,
        temperature=temperature,
        schema_version=schema_version,
        output_format=output_format,
        user_id=user_id,
    )

    if not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key",
        )
        raise typer.Exit(1)

    mode_name = mode_override or config.execution.mode
    # Determine if streaming should be used
    # Streaming only works in API mode and when not piped
    use_streaming = (
        config.output.streaming
        and not no_streaming
        and mode_name == "api"
        and not output.is_piped()
    )

    try:
        executor = get_executor(config, effective_key, mode_override, config.settings.user_id)

        if use_streaming and hasattr(executor, "annotate_stream"):
            # Use streaming mode with live progress updates
            result = None
            with output.streaming_status("Connecting to API...") as status:
                for event_type, data in executor.annotate_stream(
                    description=description,
                    schema_version=schema_version or config.settings.schema_version,
                    max_validation_attempts=max_attempts,
                    run_assessment=assessment,
                    no_extend=no_extend,
                ):
                    output.update_streaming_status(status, event_type, data)
                    if event_type == "result":
                        result = data
                    elif event_type == "error":
                        output.print_error(data.get("message", "Unknown error"))
                        raise typer.Exit(1)

            if result:
                output.print_annotation_result(result, output_format, verbose)
                if result.get("status") != "success" or not result.get("is_valid"):
                    raise typer.Exit(1)
            else:
                output.print_error("No result received from streaming API")
                raise typer.Exit(1)
        else:
            # Use batch mode (non-streaming)
            if not output.is_piped():
                output.print_progress(f"Generating HED annotation ({mode_name} mode)")

            result = executor.annotate(
                description=description,
                schema_version=schema_version or config.settings.schema_version,
                max_validation_attempts=max_attempts,
                run_assessment=assessment,
                no_extend=no_extend,
            )
            output.print_annotation_result(result, output_format, verbose)

            if result.get("status") != "success" or not result.get("is_valid"):
                raise typer.Exit(1)

    except ExecutionError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

annotate_image(image, prompt=None, api_key=None, api_url=None, model=None, eval_model=None, eval_provider=None, provider=None, temperature=None, schema_version=None, output_format='text', max_attempts=5, assessment=False, no_streaming=False, no_extend=False, standalone=False, api_mode=False, verbose=False, user_id=None)

Generate HED annotation from an image.

First generates a description using a vision model, then annotates it.

Examples:

hedit annotate-image stimulus.png hedit annotate-image photo.jpg --prompt "Describe the experimental setup" hedit annotate-image screen.png -o json > result.json hedit annotate-image stimulus.png --standalone # Run locally hedit annotate-image stimulus.png --no-streaming # Disable live progress hedit annotate-image stimulus.png --standalone --no-extend # No tag extensions

Source code in hedit/src/cli/main.py
@app.command("annotate-image")
def annotate_image(
    image: Annotated[
        Path,
        typer.Argument(help="Path to image file (PNG, JPG, etc.)"),
    ],
    prompt: Annotated[
        str | None,
        typer.Option(
            "--prompt",
            help="Custom prompt for vision model",
        ),
    ] = None,
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    model: ModelOption = None,
    eval_model: EvalModelOption = None,
    eval_provider: EvalProviderOption = None,
    provider: ProviderOption = None,
    temperature: TemperatureOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
    max_attempts: Annotated[
        int,
        typer.Option(
            "--max-attempts",
            help="Maximum validation attempts",
        ),
    ] = 5,
    assessment: Annotated[
        bool,
        typer.Option(
            "--assessment/--no-assessment",
            help="Run completeness assessment",
        ),
    ] = False,
    no_streaming: Annotated[
        bool,
        typer.Option(
            "--no-streaming",
            help="Disable streaming progress (use batch mode)",
        ),
    ] = False,
    no_extend: NoExtendOption = False,
    standalone: StandaloneOption = False,
    api_mode: ApiModeOption = False,
    verbose: VerboseOption = False,
    user_id: UserIdOption = None,
) -> None:
    """Generate HED annotation from an image.

    First generates a description using a vision model, then annotates it.

    Examples:
        hedit annotate-image stimulus.png
        hedit annotate-image photo.jpg --prompt "Describe the experimental setup"
        hedit annotate-image screen.png -o json > result.json
        hedit annotate-image stimulus.png --standalone  # Run locally
        hedit annotate-image stimulus.png --no-streaming  # Disable live progress
        hedit annotate-image stimulus.png --standalone --no-extend  # No tag extensions
    """
    # Show telemetry disclosure on first run
    if is_first_run():
        show_telemetry_disclosure()
        mark_first_run_complete()

    # Validate image exists
    if not image.exists():
        output.print_error(f"Image file not found: {image}")
        raise typer.Exit(1)

    # Determine mode override
    mode_override = None
    if standalone:
        mode_override = "standalone"
    elif api_mode:
        mode_override = "api"

    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        model=model,
        eval_model=eval_model,
        eval_provider=eval_provider,
        provider=provider,
        temperature=temperature,
        schema_version=schema_version,
        output_format=output_format,
        user_id=user_id,
    )

    if not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key",
        )
        raise typer.Exit(1)

    mode_name = mode_override or config.execution.mode
    # Determine if streaming should be used
    # Streaming only works in API mode and when not piped
    use_streaming = (
        config.output.streaming
        and not no_streaming
        and mode_name == "api"
        and not output.is_piped()
    )

    try:
        executor = get_executor(config, effective_key, mode_override, config.settings.user_id)

        if use_streaming and hasattr(executor, "annotate_image_stream"):
            # Use streaming mode with live progress updates
            result = None
            with output.streaming_status("Connecting to API...") as status:
                for event_type, data in executor.annotate_image_stream(
                    image_path=image,
                    prompt=prompt,
                    schema_version=schema_version or config.settings.schema_version,
                    max_validation_attempts=max_attempts,
                    run_assessment=assessment,
                    no_extend=no_extend,
                ):
                    output.update_streaming_status(status, event_type, data)
                    if event_type == "result":
                        result = data
                    elif event_type == "error":
                        output.print_error(data.get("message", "Unknown error"))
                        raise typer.Exit(1)

            if result:
                output.print_image_annotation_result(result, output_format, verbose)
                if result.get("status") != "success" or not result.get("is_valid"):
                    raise typer.Exit(1)
            else:
                output.print_error("No result received from streaming API")
                raise typer.Exit(1)
        else:
            # Use batch mode (non-streaming)
            if not output.is_piped():
                output.print_progress(
                    f"Analyzing image and generating HED annotation ({mode_name} mode)"
                )

            result = executor.annotate_image(
                image_path=image,
                prompt=prompt,
                schema_version=schema_version or config.settings.schema_version,
                max_validation_attempts=max_attempts,
                run_assessment=assessment,
                no_extend=no_extend,
            )
            output.print_image_annotation_result(result, output_format, verbose)

            if result.get("status") != "success" or not result.get("is_valid"):
                raise typer.Exit(1)

    except ExecutionError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

validate(hed_string, api_key=None, api_url=None, schema_version=None, output_format='text', standalone=False, api_mode=False)

Validate a HED annotation string.

Checks if the HED string is syntactically correct and semantically valid according to the HED schema.

Examples:

hedit validate "Sensory-event, Visual-presentation" hedit validate "(Red, Circle)" --schema 8.4.0 hedit validate "Event" -o json hedit validate "Event" --standalone # Validate locally with hedtools

Source code in hedit/src/cli/main.py
@app.command()
def validate(
    hed_string: Annotated[
        str,
        typer.Argument(help="HED annotation string to validate"),
    ],
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
    standalone: StandaloneOption = False,
    api_mode: ApiModeOption = False,
) -> None:
    """Validate a HED annotation string.

    Checks if the HED string is syntactically correct and semantically valid
    according to the HED schema.

    Examples:
        hedit validate "Sensory-event, Visual-presentation"
        hedit validate "(Red, Circle)" --schema 8.4.0
        hedit validate "Event" -o json
        hedit validate "Event" --standalone  # Validate locally with hedtools
    """
    # Show telemetry disclosure on first run
    if is_first_run():
        show_telemetry_disclosure()
        mark_first_run_complete()

    # Determine mode override
    mode_override = None
    if standalone:
        mode_override = "standalone"
    elif api_mode:
        mode_override = "api"

    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        schema_version=schema_version,
        output_format=output_format,
    )

    # For standalone validation, we don't need an API key (uses hedtools locally)
    effective_mode = mode_override or config.execution.mode
    if effective_mode != "standalone" and not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key, or use --standalone for local validation",
        )
        raise typer.Exit(1)

    if not output.is_piped():
        output.print_progress(f"Validating HED string ({effective_mode} mode)")

    try:
        executor = get_executor(config, effective_key, mode_override)
        result = executor.validate(
            hed_string=hed_string,
            schema_version=schema_version or config.settings.schema_version,
        )
        output.print_validation_result(result, output_format)

        if not result.get("is_valid"):
            raise typer.Exit(1)

    except ExecutionError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

config_show(show_key=False)

Show current configuration.

Source code in hedit/src/cli/main.py
@config_app.command("show")
def config_show(
    show_key: Annotated[
        bool,
        typer.Option(
            "--show-key",
            help="Show full API key (default: masked)",
        ),
    ] = False,
) -> None:
    """Show current configuration."""
    config = load_config()
    creds = load_credentials()

    # Merge for display
    config_dict = config.model_dump()
    config_dict["credentials"] = {"openrouter_api_key": creds.openrouter_api_key}

    output.print_config(config_dict, show_key)

    # Show file paths
    paths = get_config_paths()
    output.print_info(f"\nConfig directory: {paths['config_dir']}")

config_set(key, value)

Set a configuration value.

Examples:

hedit config set models.default gpt-4o hedit config set settings.temperature 0.2 hedit config set api.url https://api.example.com/hedit

Source code in hedit/src/cli/main.py
@config_app.command("set")
def config_set(
    key: Annotated[
        str,
        typer.Argument(help="Config key (e.g., models.default, settings.temperature)"),
    ],
    value: Annotated[
        str,
        typer.Argument(help="New value"),
    ],
) -> None:
    """Set a configuration value.

    Examples:
        hedit config set models.default gpt-4o
        hedit config set settings.temperature 0.2
        hedit config set api.url https://api.example.com/hedit
    """
    try:
        update_config(key, value)
        output.print_success(f"Set {key} = {value}")
    except ValueError as e:
        output.print_error(str(e))
        raise typer.Exit(1) from None

config_path()

Show configuration file paths.

Source code in hedit/src/cli/main.py
@config_app.command("path")
def config_path() -> None:
    """Show configuration file paths."""
    paths = get_config_paths()
    console.print(f"Config directory: {paths['config_dir']}")
    console.print(f"Config file: {paths['config_file']}")
    console.print(f"Credentials file: {paths['credentials_file']}")

config_clear_credentials(force=False)

Remove stored API credentials.

Source code in hedit/src/cli/main.py
@config_app.command("clear-credentials")
def config_clear_credentials(
    force: Annotated[
        bool,
        typer.Option(
            "--force",
            "-f",
            help="Skip confirmation",
        ),
    ] = False,
) -> None:
    """Remove stored API credentials."""
    if not force:
        confirm = typer.confirm("Are you sure you want to remove stored credentials?")
        if not confirm:
            raise typer.Abort()

    clear_credentials()
    output.print_success("Credentials removed")

config_reset_cmd(force=False, clear_key=False)

Reset configuration to defaults.

By default, this keeps your stored API key intact. Use --clear-key to also remove it.

Examples:

hedit config reset # Reset to defaults, keep API key hedit config reset --clear-key # Reset everything including API key hedit config reset -f # Reset without confirmation

Source code in hedit/src/cli/main.py
@config_app.command("reset")
def config_reset_cmd(
    force: Annotated[
        bool,
        typer.Option(
            "--force",
            "-f",
            help="Skip confirmation",
        ),
    ] = False,
    clear_key: Annotated[
        bool,
        typer.Option(
            "--clear-key",
            help="Also remove stored API key (default: keep API key)",
        ),
    ] = False,
) -> None:
    """Reset configuration to defaults.

    By default, this keeps your stored API key intact. Use --clear-key to also remove it.

    Examples:
        hedit config reset              # Reset to defaults, keep API key
        hedit config reset --clear-key  # Reset everything including API key
        hedit config reset -f           # Reset without confirmation
    """
    if not force:
        if clear_key:
            msg = "Reset all configuration AND remove stored API key?"
        else:
            msg = "Reset configuration to defaults? (API key will be preserved)"
        confirm = typer.confirm(msg)
        if not confirm:
            raise typer.Abort()

    # Reset config to defaults
    new_config = reset_config()

    # Optionally clear credentials
    if clear_key:
        clear_credentials()
        output.print_success("Configuration reset to defaults (API key removed)")
    else:
        output.print_success("Configuration reset to defaults (API key preserved)")

    # Show new defaults
    output.print_info("\nNew default settings:")
    output.print_info(f"  Model: {new_config.models.default}")
    output.print_info(f"  Provider: {new_config.models.provider}")
    output.print_info(f"  Temperature: {new_config.models.temperature}")
    output.print_info(f"  Schema: {new_config.settings.schema_version}")
    output.print_info(f"  Streaming: {new_config.output.streaming}")

health(api_url=None, standalone=False, api_mode=False)

Check health status of the execution backend.

Examples:

hedit health # Check API health hedit health --standalone # Check standalone mode dependencies

Source code in hedit/src/cli/main.py
@app.command()
def health(
    api_url: ApiUrlOption = None,
    standalone: StandaloneOption = False,
    api_mode: ApiModeOption = False,
) -> None:
    """Check health status of the execution backend.

    Examples:
        hedit health                 # Check API health
        hedit health --standalone    # Check standalone mode dependencies
    """
    # Determine mode override
    mode_override = None
    if standalone:
        mode_override = "standalone"
    elif api_mode:
        mode_override = "api"

    config, _ = get_effective_config(api_url=api_url)
    effective_mode = mode_override or config.execution.mode

    try:
        # For health check, we don't require an API key
        executor = get_executor(config, api_key=None, mode_override=mode_override)
        result = executor.health()

        status = result.get("status", "unknown")
        version = result.get("version", "unknown")
        mode = result.get("mode", effective_mode)
        llm = "[green][x][/]" if result.get("llm_available") else "[red][ ][/]"
        validator = "[green][x][/]" if result.get("validator_available") else "[red][ ][/]"

        console.print(f"Mode: [bold]{mode}[/]")
        if mode == "api":
            console.print(f"API: {config.api.url}")
        console.print(f"Status: [bold]{status}[/]")
        console.print(f"Version: {version}")
        console.print(f"LLM: {llm}")
        console.print(f"Validator: {validator}")

        # Show dependency details for standalone mode
        if mode == "standalone" and "dependencies" in result:
            deps = result["dependencies"]
            console.print("\nDependencies:")
            for dep, available in deps.items():
                status_icon = "[green][x][/]" if available else "[red][ ][/]"
                console.print(f"  {status_icon} {dep}")

    except ExecutionError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except Exception as e:
        output.print_error(f"Health check failed: {e}")
        raise typer.Exit(1) from None

show_telemetry_disclosure()

Display first-run telemetry disclosure notice.

Source code in hedit/src/cli/main.py
def show_telemetry_disclosure() -> None:
    """Display first-run telemetry disclosure notice."""
    from rich.panel import Panel

    disclosure_text = (
        "[bold]Welcome to HEDit![/]\n\n"
        "HEDit collects anonymous usage data to improve the annotation service:\n"
        "  • Input descriptions and generated annotations\n"
        "  • Model performance metrics (latency, iterations)\n"
        "  • Validation results\n\n"
        "[dim]What is NOT collected:[/]\n"
        "  • API keys or credentials\n"
        "  • Personal information\n"
        "  • File paths or system details\n\n"
        "[bold cyan]To disable:[/] hedit config set telemetry.enabled false\n"
        "[bold cyan]To view config:[/] hedit config show"
    )

    panel = Panel(
        disclosure_text,
        title="[bold]Privacy & Data Collection[/]",
        border_style="cyan",
        padding=(1, 2),
    )

    console.print()
    console.print(panel)
    console.print()

cli()

Entry point for CLI.

Source code in hedit/src/cli/main.py
def cli() -> None:
    """Entry point for CLI."""
    app()

Configuration

src.cli.config

Configuration management for HEDit CLI.

Handles persistent storage of API keys and settings in a cross-platform config directory. Supports environment variables as fallback/override.

CredentialsConfig

Bases: BaseModel

Credentials stored separately with restricted permissions.

Source code in hedit/src/cli/config.py
class CredentialsConfig(BaseModel):
    """Credentials stored separately with restricted permissions."""

    openrouter_api_key: str | None = Field(default=None, description="OpenRouter API key")

ModelsConfig

Bases: BaseModel

Model configuration for different agents.

Source code in hedit/src/cli/config.py
class ModelsConfig(BaseModel):
    """Model configuration for different agents."""

    default: str = Field(default=DEFAULT_MODEL, description="Default model for annotation")
    provider: str | None = Field(
        default=DEFAULT_PROVIDER, description="Provider for annotation model"
    )
    evaluation: str | None = Field(
        default=DEFAULT_EVAL_MODEL,
        description="Model for evaluation/assessment agents",
    )
    eval_provider: str | None = Field(
        default=DEFAULT_EVAL_PROVIDER,
        description="Provider for evaluation model (default: alibaba)",
    )
    vision: str = Field(default=DEFAULT_VISION_MODEL, description="Vision model for images")
    vision_provider: str | None = Field(
        default=DEFAULT_VISION_PROVIDER,
        description="Provider for vision model (alibaba for qwen)",
    )
    temperature: float = Field(default=0.1, ge=0.0, le=1.0, description="Model temperature")

ExecutionMode

Bases: BaseModel

Execution mode configuration.

Source code in hedit/src/cli/config.py
class ExecutionMode(BaseModel):
    """Execution mode configuration."""

    mode: str = Field(
        default="api",
        description="Execution mode: 'api' (use backend) or 'standalone' (run locally)",
    )

SettingsConfig

Bases: BaseModel

General settings.

Source code in hedit/src/cli/config.py
class SettingsConfig(BaseModel):
    """General settings."""

    schema_version: str = Field(default="8.4.0", description="HED schema version")
    max_validation_attempts: int = Field(default=5, ge=1, le=10, description="Max retries")
    run_assessment: bool = Field(default=False, description="Run assessment by default")
    user_id: str | None = Field(
        default=None,
        description="Custom user ID for cache optimization (default: auto-generated machine ID)",
    )

OutputConfig

Bases: BaseModel

Output formatting settings.

Source code in hedit/src/cli/config.py
class OutputConfig(BaseModel):
    """Output formatting settings."""

    format: str = Field(default="text", description="Output format (text, json)")
    color: bool = Field(default=True, description="Enable colored output")
    verbose: bool = Field(default=False, description="Verbose output")
    streaming: bool = Field(default=True, description="Enable streaming progress display")

APIConfig

Bases: BaseModel

API endpoint configuration.

Source code in hedit/src/cli/config.py
class APIConfig(BaseModel):
    """API endpoint configuration."""

    url: str = Field(default=DEFAULT_API_URL, description="API endpoint URL")

TelemetryConfig

Bases: BaseModel

Telemetry configuration.

Source code in hedit/src/cli/config.py
class TelemetryConfig(BaseModel):
    """Telemetry configuration."""

    enabled: bool = Field(default=True, description="Enable telemetry collection")
    model_blacklist: list[str] = Field(
        default_factory=lambda: ["openai/gpt-oss-120b"],
        description="Models to exclude from telemetry",
    )

CLIConfig

Bases: BaseModel

Complete CLI configuration.

Source code in hedit/src/cli/config.py
class CLIConfig(BaseModel):
    """Complete CLI configuration."""

    api: APIConfig = Field(default_factory=APIConfig)
    models: ModelsConfig = Field(default_factory=ModelsConfig)
    settings: SettingsConfig = Field(default_factory=SettingsConfig)
    output: OutputConfig = Field(default_factory=OutputConfig)
    execution: ExecutionMode = Field(default_factory=ExecutionMode)
    telemetry: TelemetryConfig = Field(default_factory=TelemetryConfig)

ensure_config_dir()

Create config directory if it doesn't exist.

Source code in hedit/src/cli/config.py
def ensure_config_dir() -> None:
    """Create config directory if it doesn't exist."""
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)

load_credentials()

Load credentials from file or environment.

Environment variables take precedence over stored credentials.

Source code in hedit/src/cli/config.py
def load_credentials() -> CredentialsConfig:
    """Load credentials from file or environment.

    Environment variables take precedence over stored credentials.
    """
    creds = CredentialsConfig()

    # Try loading from file first
    if CREDENTIALS_FILE.exists():
        try:
            with open(CREDENTIALS_FILE) as f:
                data = yaml.safe_load(f) or {}
                creds = CredentialsConfig(**data)
        except (yaml.YAMLError, ValueError):
            pass  # Use defaults if file is corrupted

    # Environment variables override file
    env_key = os.environ.get("OPENROUTER_API_KEY")
    if env_key:
        creds.openrouter_api_key = env_key

    return creds

save_credentials(creds)

Save credentials to file with restricted permissions.

Source code in hedit/src/cli/config.py
def save_credentials(creds: CredentialsConfig) -> None:
    """Save credentials to file with restricted permissions."""
    ensure_config_dir()

    # Write credentials
    with open(CREDENTIALS_FILE, "w") as f:
        yaml.dump(creds.model_dump(exclude_none=True), f, default_flow_style=False)

    # Restrict permissions (Unix only)
    try:
        os.chmod(CREDENTIALS_FILE, 0o600)
    except (OSError, AttributeError):
        pass  # Windows doesn't support chmod the same way

load_config()

Load configuration from file.

Source code in hedit/src/cli/config.py
def load_config() -> CLIConfig:
    """Load configuration from file."""
    if not CONFIG_FILE.exists():
        return CLIConfig()

    try:
        with open(CONFIG_FILE) as f:
            data = yaml.safe_load(f) or {}
            return CLIConfig(**data)
    except (yaml.YAMLError, ValueError):
        return CLIConfig()

save_config(config)

Save configuration to file.

Source code in hedit/src/cli/config.py
def save_config(config: CLIConfig) -> None:
    """Save configuration to file."""
    ensure_config_dir()

    with open(CONFIG_FILE, "w") as f:
        yaml.dump(config.model_dump(), f, default_flow_style=False)

get_api_key(override=None)

Get API key with priority: override > env > stored.

Parameters:

Name Type Description Default
override str | None

Explicit API key from command line

None

Returns:

Type Description
str | None

API key or None if not configured

Source code in hedit/src/cli/config.py
def get_api_key(override: str | None = None) -> str | None:
    """Get API key with priority: override > env > stored.

    Args:
        override: Explicit API key from command line

    Returns:
        API key or None if not configured
    """
    if override:
        return override

    creds = load_credentials()
    return creds.openrouter_api_key

get_effective_config(api_key=None, api_url=None, model=None, eval_model=None, eval_provider=None, provider=None, temperature=None, schema_version=None, output_format=None, mode=None, user_id=None)

Get effective config with command-line overrides applied.

Parameters:

Name Type Description Default
api_key str | None

Override API key

None
api_url str | None

Override API URL

None
model str | None

Override model (if non-default, clears provider unless explicitly set)

None
eval_model str | None

Override evaluation model (for consistent benchmarking)

None
eval_provider str | None

Override provider for evaluation model (e.g., "alibaba")

None
provider str | None

Override provider preference (e.g., "anthropic")

None
temperature float | None

Override temperature

None
schema_version str | None

Override schema version

None
output_format str | None

Override output format

None
mode str | None

Override execution mode ("api" or "standalone")

None
user_id str | None

Override user ID for cache optimization

None

Returns:

Type Description
tuple[CLIConfig, str | None]

Tuple of (effective config, effective API key)

Note

When a custom model is specified without an explicit provider, the provider is cleared. This is because a pinned provider may only support specific models.

Source code in hedit/src/cli/config.py
def get_effective_config(
    api_key: str | None = None,
    api_url: str | None = None,
    model: str | None = None,
    eval_model: str | None = None,
    eval_provider: str | None = None,
    provider: str | None = None,
    temperature: float | None = None,
    schema_version: str | None = None,
    output_format: str | None = None,
    mode: str | None = None,
    user_id: str | None = None,
) -> tuple[CLIConfig, str | None]:
    """Get effective config with command-line overrides applied.

    Args:
        api_key: Override API key
        api_url: Override API URL
        model: Override model (if non-default, clears provider unless explicitly set)
        eval_model: Override evaluation model (for consistent benchmarking)
        eval_provider: Override provider for evaluation model (e.g., "alibaba")
        provider: Override provider preference (e.g., "anthropic")
        temperature: Override temperature
        schema_version: Override schema version
        output_format: Override output format
        mode: Override execution mode ("api" or "standalone")
        user_id: Override user ID for cache optimization

    Returns:
        Tuple of (effective config, effective API key)

    Note:
        When a custom model is specified without an explicit provider, the provider
        is cleared. This is because a pinned provider may only support
        specific models.
    """
    config = load_config()
    effective_key = get_api_key(api_key)

    # Apply overrides
    if api_url:
        config.api.url = api_url

    # Handle model/provider interaction:
    # If user specifies a model different from default but doesn't specify provider,
    # clear the provider (a pinned provider may not support the custom model)
    if model:
        config.models.default = model
        # Clear provider if model changed and provider not explicitly set
        if provider is None and model != DEFAULT_MODEL:
            config.models.provider = None
    if eval_model:
        config.models.evaluation = eval_model
    if eval_provider is not None:
        config.models.eval_provider = eval_provider if eval_provider else None
    if provider is not None:  # Allow empty string to clear provider
        config.models.provider = provider if provider else None

    if temperature is not None:
        config.models.temperature = temperature
    if schema_version:
        config.settings.schema_version = schema_version
    if output_format:
        config.output.format = output_format
    if mode:
        if mode not in ("api", "standalone"):
            raise ValueError(f"Invalid mode: {mode}. Must be 'api' or 'standalone'")
        config.execution.mode = mode
    if user_id:
        config.settings.user_id = user_id

    return config, effective_key

update_config(key, value)

Update a specific config value.

Parameters:

Name Type Description Default
key str

Dot-notation key (e.g., "models.default", "settings.temperature")

required
value Any

New value

required
Source code in hedit/src/cli/config.py
def update_config(key: str, value: Any) -> None:
    """Update a specific config value.

    Args:
        key: Dot-notation key (e.g., "models.default", "settings.temperature")
        value: New value
    """
    config = load_config()

    # Parse dot notation
    parts = key.split(".")
    if len(parts) == 1:
        # Top-level key not supported for safety
        raise ValueError(f"Invalid config key: {key}")
    elif len(parts) == 2:
        section, field = parts
        if hasattr(config, section):
            section_obj = getattr(config, section)
            if hasattr(section_obj, field):
                # Type coercion for common types
                current = getattr(section_obj, field)
                if isinstance(current, bool):
                    value = str(value).lower() in ("true", "1", "yes")
                elif isinstance(current, int):
                    value = int(value)
                elif isinstance(current, float):
                    value = float(value)
                setattr(section_obj, field, value)
            else:
                raise ValueError(f"Unknown field: {field} in {section}")
        else:
            raise ValueError(f"Unknown section: {section}")
    else:
        raise ValueError(f"Invalid config key format: {key}")

    save_config(config)

clear_credentials()

Remove stored credentials.

Source code in hedit/src/cli/config.py
def clear_credentials() -> None:
    """Remove stored credentials."""
    if CREDENTIALS_FILE.exists():
        CREDENTIALS_FILE.unlink()

reset_config(preserve_credentials=True)

Reset configuration to defaults.

Parameters:

Name Type Description Default
preserve_credentials bool

If True, keep BYOK API key intact (default: True)

True

Returns:

Type Description
CLIConfig

The new default configuration

Source code in hedit/src/cli/config.py
def reset_config(preserve_credentials: bool = True) -> CLIConfig:
    """Reset configuration to defaults.

    Args:
        preserve_credentials: If True, keep BYOK API key intact (default: True)

    Returns:
        The new default configuration
    """
    # Create fresh default config
    config = CLIConfig()

    # Save the default config
    save_config(config)

    return config

get_machine_id()

Get or generate a stable machine ID for cache optimization.

This ID is used by OpenRouter for sticky cache routing to reduce costs. It is NOT used for telemetry and is never transmitted except to OpenRouter.

The ID is generated once and persists across pip updates.

Returns:

Type Description
str

16-character hexadecimal machine ID

Source code in hedit/src/cli/config.py
def get_machine_id() -> str:
    """Get or generate a stable machine ID for cache optimization.

    This ID is used by OpenRouter for sticky cache routing to reduce costs.
    It is NOT used for telemetry and is never transmitted except to OpenRouter.

    The ID is generated once and persists across pip updates.

    Returns:
        16-character hexadecimal machine ID
    """
    ensure_config_dir()

    if MACHINE_ID_FILE.exists():
        try:
            machine_id = MACHINE_ID_FILE.read_text().strip()
            # Validate format (16 hex chars)
            if len(machine_id) == 16 and all(c in "0123456789abcdef" for c in machine_id):
                return machine_id
        except (OSError, UnicodeDecodeError):
            pass  # File corrupted, regenerate

    # Generate new machine ID
    machine_id = uuid.uuid4().hex[:16]

    # Save to file
    try:
        MACHINE_ID_FILE.write_text(machine_id)
        # Readable by user only (Unix)
        try:
            os.chmod(MACHINE_ID_FILE, 0o600)
        except (OSError, AttributeError):
            pass  # Windows doesn't support chmod the same way
    except OSError:
        pass  # If we can't write, still return the ID for this session

    return machine_id

is_first_run()

Check if this is the first time HEDit is run.

Returns:

Type Description
bool

True if first run, False otherwise

Source code in hedit/src/cli/config.py
def is_first_run() -> bool:
    """Check if this is the first time HEDit is run.

    Returns:
        True if first run, False otherwise
    """
    return not FIRST_RUN_FILE.exists()

mark_first_run_complete()

Mark first run as complete by creating the marker file.

Source code in hedit/src/cli/config.py
def mark_first_run_complete() -> None:
    """Mark first run as complete by creating the marker file."""
    ensure_config_dir()
    try:
        FIRST_RUN_FILE.touch()
    except OSError:
        pass  # Ignore write errors

get_config_paths()

Get paths to config files for debugging.

Source code in hedit/src/cli/config.py
def get_config_paths() -> dict[str, Path]:
    """Get paths to config files for debugging."""
    return {
        "config_dir": CONFIG_DIR,
        "config_file": CONFIG_FILE,
        "credentials_file": CREDENTIALS_FILE,
        "machine_id_file": MACHINE_ID_FILE,
    }

API Client

src.cli.client

HTTP client for HEDit API.

Handles all API communication with proper error handling and timeout management.

APIError

Bases: Exception

API request error.

Source code in hedit/src/cli/client.py
class APIError(Exception):
    """API request error."""

    def __init__(self, message: str, status_code: int | None = None, detail: str | None = None):
        super().__init__(message)
        self.status_code = status_code
        self.detail = detail

HEDitClient

Client for HEDit API.

Source code in hedit/src/cli/client.py
class HEDitClient:
    """Client for HEDit API."""

    def __init__(
        self,
        api_url: str,
        api_key: str | None = None,
        model: str | None = None,
        eval_model: str | None = None,
        eval_provider: str | None = None,
        provider: str | None = None,
        temperature: float | None = None,
        timeout: httpx.Timeout = DEFAULT_TIMEOUT,
        user_id: str | None = None,
    ):
        """Initialize client.

        Args:
            api_url: Base API URL
            api_key: OpenRouter API key for BYOK mode
            model: Model to use for annotation
            eval_model: Model for evaluation/assessment agents (for fair benchmarking)
            eval_provider: Provider for evaluation model (e.g., Cerebras for qwen models)
            provider: Provider preference (e.g., "Cerebras")
            temperature: LLM temperature (0.0-1.0)
            timeout: Request timeout settings
            user_id: Custom user ID for cache optimization (default: derived from API key)
        """
        self.api_url = api_url.rstrip("/")
        self.api_key = api_key
        self.model = model
        self.eval_model = eval_model
        self.eval_provider = eval_provider
        self.provider = provider
        self.temperature = temperature
        self.timeout = timeout
        self.user_id = user_id

    def _get_headers(self) -> dict[str, str]:
        """Get request headers with BYOK configuration."""
        headers = {
            "Content-Type": "application/json",
            "User-Agent": "hedit-cli",
        }
        if self.api_key:
            # Use X-OpenRouter-Key header for BYOK mode
            headers["X-OpenRouter-Key"] = self.api_key
        # Include model configuration in headers for BYOK
        if self.model:
            headers["X-OpenRouter-Model"] = self.model
        if self.eval_model:
            headers["X-OpenRouter-Eval-Model"] = self.eval_model
        if self.eval_provider:
            headers["X-OpenRouter-Eval-Provider"] = self.eval_provider
        if self.provider:
            headers["X-OpenRouter-Provider"] = self.provider
        if self.temperature is not None:
            headers["X-OpenRouter-Temperature"] = str(self.temperature)
        # Custom user ID for cache optimization
        if self.user_id:
            headers["X-User-Id"] = self.user_id
        return headers

    def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
        """Handle API response and errors.

        Args:
            response: HTTP response

        Returns:
            Response JSON data

        Raises:
            APIError: If request failed
        """
        if response.status_code == 200:
            return response.json()  # type: ignore[no-any-return]

        # Parse error detail
        try:
            error_data = response.json()
            detail = error_data.get("detail", str(error_data))
        except Exception:
            detail = response.text

        if response.status_code == 401:
            raise APIError(
                "Authentication required",
                status_code=401,
                detail="Please provide an OpenRouter API key with --api-key or run 'hedit init'",
            )
        elif response.status_code == 422:
            raise APIError(
                "Invalid request",
                status_code=422,
                detail=detail,
            )
        elif response.status_code == 500:
            raise APIError(
                "Server error",
                status_code=500,
                detail=detail,
            )
        elif response.status_code == 503:
            raise APIError(
                "Service unavailable",
                status_code=503,
                detail=detail or "The API is temporarily unavailable. Please try again later.",
            )
        elif response.status_code == 504:
            raise APIError(
                "Gateway timeout",
                status_code=504,
                detail="The server took too long to respond. Try a faster model/provider "
                "or use --standalone mode.",
            )
        else:
            raise APIError(
                f"Request failed with status {response.status_code}",
                status_code=response.status_code,
                detail=detail,
            )

    def annotate(
        self,
        description: str,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
        no_extend: bool = False,
    ) -> dict[str, Any]:
        """Generate HED annotation from text description.

        Args:
            description: Natural language event description
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment
            no_extend: If True, prohibit tag extensions

        Returns:
            Annotation response dictionary
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/annotate",
                headers=self._get_headers(),
                json={
                    "description": description,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                    "no_extend": no_extend,
                },
            )
            return self._handle_response(response)

    def annotate_stream(
        self,
        description: str,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
        no_extend: bool = False,
    ) -> Generator[tuple[str, dict[str, Any]], None, None]:
        """Generate HED annotation with streaming progress.

        Yields SSE events as (event_type, data) tuples.

        Args:
            description: Natural language event description
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment
            no_extend: If True, prohibit tag extensions

        Yields:
            Tuple of (event_type, event_data) for each SSE event.
            Event types: "progress", "validation", "result", "error", "done"
        """
        with httpx.Client(timeout=self.timeout) as client:
            with client.stream(
                "POST",
                f"{self.api_url}/annotate/stream",
                headers=self._get_headers(),
                json={
                    "description": description,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                    "no_extend": no_extend,
                },
            ) as response:
                if response.status_code != 200:
                    # Read full response for error
                    response.read()
                    self._handle_response(response)
                    return

                # Parse SSE stream
                current_event = None
                for line in response.iter_lines():
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: ") and current_event:
                        try:
                            data = json.loads(line[6:])
                            yield (current_event, data)
                        except json.JSONDecodeError:
                            pass  # Skip malformed data
                        current_event = None

    def annotate_image(
        self,
        image_path: Path | str,
        prompt: str | None = None,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
        no_extend: bool = False,
    ) -> dict[str, Any]:
        """Generate HED annotation from image.

        Args:
            image_path: Path to image file
            prompt: Optional custom prompt for vision model
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment
            no_extend: If True, prohibit tag extensions

        Returns:
            Annotation response dictionary
        """
        image_uri = self._encode_image(image_path)

        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/annotate-from-image",
                headers=self._get_headers(),
                json={
                    "image": image_uri,
                    "prompt": prompt,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                    "no_extend": no_extend,
                },
            )
            return self._handle_response(response)

    def annotate_image_stream(
        self,
        image_path: Path | str,
        prompt: str | None = None,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
        no_extend: bool = False,
    ) -> Generator[tuple[str, dict[str, Any]], None, None]:
        """Generate HED annotation from image with streaming progress.

        Yields SSE events as (event_type, data) tuples.

        Args:
            image_path: Path to image file
            prompt: Optional custom prompt for vision model
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment
            no_extend: If True, prohibit tag extensions

        Yields:
            Tuple of (event_type, event_data) for each SSE event.
            Event types: "progress", "image_description", "validation", "result", "error", "done"
        """
        image_uri = self._encode_image(image_path)

        with httpx.Client(timeout=self.timeout) as client:
            with client.stream(
                "POST",
                f"{self.api_url}/annotate-from-image/stream",
                headers=self._get_headers(),
                json={
                    "image": image_uri,
                    "prompt": prompt,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                    "no_extend": no_extend,
                },
            ) as response:
                if response.status_code != 200:
                    # Read full response for error
                    response.read()
                    self._handle_response(response)
                    return

                # Parse SSE stream
                current_event = None
                for line in response.iter_lines():
                    if line.startswith("event: "):
                        current_event = line[7:]
                    elif line.startswith("data: ") and current_event:
                        try:
                            data = json.loads(line[6:])
                            yield (current_event, data)
                        except json.JSONDecodeError:
                            pass  # Skip malformed data
                        current_event = None

    def _encode_image(self, image_path: Path | str) -> str:
        """Encode an image file to base64 data URI.

        Args:
            image_path: Path to image file

        Returns:
            Base64-encoded data URI string

        Raises:
            APIError: If image file not found
        """
        image_path = Path(image_path)
        if not image_path.exists():
            raise APIError(f"Image file not found: {image_path}")

        # Detect MIME type
        suffix = image_path.suffix.lower()
        mime_types = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        mime_type = mime_types.get(suffix, "image/png")

        # Read and encode
        with open(image_path, "rb") as f:
            image_data = base64.b64encode(f.read()).decode("utf-8")

        return f"data:{mime_type};base64,{image_data}"

    def validate(
        self,
        hed_string: str,
        schema_version: str = "8.4.0",
    ) -> dict[str, Any]:
        """Validate HED string.

        Args:
            hed_string: HED annotation to validate
            schema_version: HED schema version

        Returns:
            Validation response dictionary
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/validate",
                headers=self._get_headers(),
                json={
                    "hed_string": hed_string,
                    "schema_version": schema_version,
                },
            )
            return self._handle_response(response)

    def health(self) -> dict[str, Any]:
        """Check API health.

        Returns:
            Health status dictionary
        """
        with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
            response = client.get(f"{self.api_url}/health")
            return self._handle_response(response)

    def version(self) -> dict[str, Any]:
        """Get API version info.

        Returns:
            Version information dictionary
        """
        with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
            response = client.get(f"{self.api_url}/version")
            return self._handle_response(response)

__init__(api_url, api_key=None, model=None, eval_model=None, eval_provider=None, provider=None, temperature=None, timeout=DEFAULT_TIMEOUT, user_id=None)

Initialize client.

Parameters:

Name Type Description Default
api_url str

Base API URL

required
api_key str | None

OpenRouter API key for BYOK mode

None
model str | None

Model to use for annotation

None
eval_model str | None

Model for evaluation/assessment agents (for fair benchmarking)

None
eval_provider str | None

Provider for evaluation model (e.g., Cerebras for qwen models)

None
provider str | None

Provider preference (e.g., "Cerebras")

None
temperature float | None

LLM temperature (0.0-1.0)

None
timeout Timeout

Request timeout settings

DEFAULT_TIMEOUT
user_id str | None

Custom user ID for cache optimization (default: derived from API key)

None
Source code in hedit/src/cli/client.py
def __init__(
    self,
    api_url: str,
    api_key: str | None = None,
    model: str | None = None,
    eval_model: str | None = None,
    eval_provider: str | None = None,
    provider: str | None = None,
    temperature: float | None = None,
    timeout: httpx.Timeout = DEFAULT_TIMEOUT,
    user_id: str | None = None,
):
    """Initialize client.

    Args:
        api_url: Base API URL
        api_key: OpenRouter API key for BYOK mode
        model: Model to use for annotation
        eval_model: Model for evaluation/assessment agents (for fair benchmarking)
        eval_provider: Provider for evaluation model (e.g., Cerebras for qwen models)
        provider: Provider preference (e.g., "Cerebras")
        temperature: LLM temperature (0.0-1.0)
        timeout: Request timeout settings
        user_id: Custom user ID for cache optimization (default: derived from API key)
    """
    self.api_url = api_url.rstrip("/")
    self.api_key = api_key
    self.model = model
    self.eval_model = eval_model
    self.eval_provider = eval_provider
    self.provider = provider
    self.temperature = temperature
    self.timeout = timeout
    self.user_id = user_id

annotate(description, schema_version='8.4.0', max_validation_attempts=5, run_assessment=False, no_extend=False)

Generate HED annotation from text description.

Parameters:

Name Type Description Default
description str

Natural language event description

required
schema_version str

HED schema version

'8.4.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False
no_extend bool

If True, prohibit tag extensions

False

Returns:

Type Description
dict[str, Any]

Annotation response dictionary

Source code in hedit/src/cli/client.py
def annotate(
    self,
    description: str,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
    no_extend: bool = False,
) -> dict[str, Any]:
    """Generate HED annotation from text description.

    Args:
        description: Natural language event description
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment
        no_extend: If True, prohibit tag extensions

    Returns:
        Annotation response dictionary
    """
    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/annotate",
            headers=self._get_headers(),
            json={
                "description": description,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
                "no_extend": no_extend,
            },
        )
        return self._handle_response(response)

annotate_stream(description, schema_version='8.4.0', max_validation_attempts=5, run_assessment=False, no_extend=False)

Generate HED annotation with streaming progress.

Yields SSE events as (event_type, data) tuples.

Parameters:

Name Type Description Default
description str

Natural language event description

required
schema_version str

HED schema version

'8.4.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False
no_extend bool

If True, prohibit tag extensions

False

Yields:

Type Description
str

Tuple of (event_type, event_data) for each SSE event.

dict[str, Any]

Event types: "progress", "validation", "result", "error", "done"

Source code in hedit/src/cli/client.py
def annotate_stream(
    self,
    description: str,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
    no_extend: bool = False,
) -> Generator[tuple[str, dict[str, Any]], None, None]:
    """Generate HED annotation with streaming progress.

    Yields SSE events as (event_type, data) tuples.

    Args:
        description: Natural language event description
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment
        no_extend: If True, prohibit tag extensions

    Yields:
        Tuple of (event_type, event_data) for each SSE event.
        Event types: "progress", "validation", "result", "error", "done"
    """
    with httpx.Client(timeout=self.timeout) as client:
        with client.stream(
            "POST",
            f"{self.api_url}/annotate/stream",
            headers=self._get_headers(),
            json={
                "description": description,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
                "no_extend": no_extend,
            },
        ) as response:
            if response.status_code != 200:
                # Read full response for error
                response.read()
                self._handle_response(response)
                return

            # Parse SSE stream
            current_event = None
            for line in response.iter_lines():
                if line.startswith("event: "):
                    current_event = line[7:]
                elif line.startswith("data: ") and current_event:
                    try:
                        data = json.loads(line[6:])
                        yield (current_event, data)
                    except json.JSONDecodeError:
                        pass  # Skip malformed data
                    current_event = None

annotate_image(image_path, prompt=None, schema_version='8.4.0', max_validation_attempts=5, run_assessment=False, no_extend=False)

Generate HED annotation from image.

Parameters:

Name Type Description Default
image_path Path | str

Path to image file

required
prompt str | None

Optional custom prompt for vision model

None
schema_version str

HED schema version

'8.4.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False
no_extend bool

If True, prohibit tag extensions

False

Returns:

Type Description
dict[str, Any]

Annotation response dictionary

Source code in hedit/src/cli/client.py
def annotate_image(
    self,
    image_path: Path | str,
    prompt: str | None = None,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
    no_extend: bool = False,
) -> dict[str, Any]:
    """Generate HED annotation from image.

    Args:
        image_path: Path to image file
        prompt: Optional custom prompt for vision model
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment
        no_extend: If True, prohibit tag extensions

    Returns:
        Annotation response dictionary
    """
    image_uri = self._encode_image(image_path)

    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/annotate-from-image",
            headers=self._get_headers(),
            json={
                "image": image_uri,
                "prompt": prompt,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
                "no_extend": no_extend,
            },
        )
        return self._handle_response(response)

annotate_image_stream(image_path, prompt=None, schema_version='8.4.0', max_validation_attempts=5, run_assessment=False, no_extend=False)

Generate HED annotation from image with streaming progress.

Yields SSE events as (event_type, data) tuples.

Parameters:

Name Type Description Default
image_path Path | str

Path to image file

required
prompt str | None

Optional custom prompt for vision model

None
schema_version str

HED schema version

'8.4.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False
no_extend bool

If True, prohibit tag extensions

False

Yields:

Type Description
str

Tuple of (event_type, event_data) for each SSE event.

dict[str, Any]

Event types: "progress", "image_description", "validation", "result", "error", "done"

Source code in hedit/src/cli/client.py
def annotate_image_stream(
    self,
    image_path: Path | str,
    prompt: str | None = None,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
    no_extend: bool = False,
) -> Generator[tuple[str, dict[str, Any]], None, None]:
    """Generate HED annotation from image with streaming progress.

    Yields SSE events as (event_type, data) tuples.

    Args:
        image_path: Path to image file
        prompt: Optional custom prompt for vision model
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment
        no_extend: If True, prohibit tag extensions

    Yields:
        Tuple of (event_type, event_data) for each SSE event.
        Event types: "progress", "image_description", "validation", "result", "error", "done"
    """
    image_uri = self._encode_image(image_path)

    with httpx.Client(timeout=self.timeout) as client:
        with client.stream(
            "POST",
            f"{self.api_url}/annotate-from-image/stream",
            headers=self._get_headers(),
            json={
                "image": image_uri,
                "prompt": prompt,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
                "no_extend": no_extend,
            },
        ) as response:
            if response.status_code != 200:
                # Read full response for error
                response.read()
                self._handle_response(response)
                return

            # Parse SSE stream
            current_event = None
            for line in response.iter_lines():
                if line.startswith("event: "):
                    current_event = line[7:]
                elif line.startswith("data: ") and current_event:
                    try:
                        data = json.loads(line[6:])
                        yield (current_event, data)
                    except json.JSONDecodeError:
                        pass  # Skip malformed data
                    current_event = None

validate(hed_string, schema_version='8.4.0')

Validate HED string.

Parameters:

Name Type Description Default
hed_string str

HED annotation to validate

required
schema_version str

HED schema version

'8.4.0'

Returns:

Type Description
dict[str, Any]

Validation response dictionary

Source code in hedit/src/cli/client.py
def validate(
    self,
    hed_string: str,
    schema_version: str = "8.4.0",
) -> dict[str, Any]:
    """Validate HED string.

    Args:
        hed_string: HED annotation to validate
        schema_version: HED schema version

    Returns:
        Validation response dictionary
    """
    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/validate",
            headers=self._get_headers(),
            json={
                "hed_string": hed_string,
                "schema_version": schema_version,
            },
        )
        return self._handle_response(response)

health()

Check API health.

Returns:

Type Description
dict[str, Any]

Health status dictionary

Source code in hedit/src/cli/client.py
def health(self) -> dict[str, Any]:
    """Check API health.

    Returns:
        Health status dictionary
    """
    with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
        response = client.get(f"{self.api_url}/health")
        return self._handle_response(response)

version()

Get API version info.

Returns:

Type Description
dict[str, Any]

Version information dictionary

Source code in hedit/src/cli/client.py
def version(self) -> dict[str, Any]:
    """Get API version info.

    Returns:
        Version information dictionary
    """
    with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
        response = client.get(f"{self.api_url}/version")
        return self._handle_response(response)

create_client(config, api_key=None)

Create API client from config.

Parameters:

Name Type Description Default
config CLIConfig

CLI configuration

required
api_key str | None

API key (overrides config)

None

Returns:

Type Description
HEDitClient

Configured HEDitClient

Source code in hedit/src/cli/client.py
def create_client(config: CLIConfig, api_key: str | None = None) -> HEDitClient:
    """Create API client from config.

    Args:
        config: CLI configuration
        api_key: API key (overrides config)

    Returns:
        Configured HEDitClient
    """
    return HEDitClient(
        api_url=config.api.url,
        api_key=api_key,
        model=config.models.default,
        provider=config.models.provider,
        temperature=config.models.temperature,
    )

Workflow

The multi-agent annotation workflow:

src.agents.workflow

LangGraph workflow for HED annotation generation.

This module defines the multi-agent workflow that orchestrates annotation, validation, evaluation, and assessment.

HedAnnotationWorkflow

Multi-agent workflow for HED annotation generation and validation.

The workflow follows this pattern: 1. Annotation: Generate HED tags from natural language 2. Validation: Check HED compliance 3. If errors and attempts < max: Return to annotation with feedback 4. If valid: Proceed to evaluation 5. Evaluation: Assess faithfulness to original description 6. If needs refinement: Return to annotation 7. If faithful: Proceed to assessment 8. Assessment: Final comparison for completeness 9. End: Return final annotation with feedback

Source code in hedit/src/agents/workflow.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
class HedAnnotationWorkflow:
    """Multi-agent workflow for HED annotation generation and validation.

    The workflow follows this pattern:
    1. Annotation: Generate HED tags from natural language
    2. Validation: Check HED compliance
    3. If errors and attempts < max: Return to annotation with feedback
    4. If valid: Proceed to evaluation
    5. Evaluation: Assess faithfulness to original description
    6. If needs refinement: Return to annotation
    7. If faithful: Proceed to assessment
    8. Assessment: Final comparison for completeness
    9. End: Return final annotation with feedback
    """

    def __init__(
        self,
        llm: BaseChatModel,
        evaluation_llm: BaseChatModel | None = None,
        assessment_llm: BaseChatModel | None = None,
        feedback_llm: BaseChatModel | None = None,
        schema_dir: Path | str | None = None,
        validator_path: Path | None = None,
        use_js_validator: bool = True,
        enable_semantic_search: bool = True,
    ) -> None:
        """Initialize the workflow.

        Args:
            llm: Language model for annotation agent
            evaluation_llm: Language model for evaluation agent (defaults to llm)
            assessment_llm: Language model for assessment agent (defaults to llm)
            feedback_llm: Language model for feedback summarization (defaults to llm)
            schema_dir: Directory containing JSON schemas
            validator_path: Path to hed-javascript for validation
            use_js_validator: Whether to use JavaScript validator
            enable_semantic_search: Whether to use hed-lsp CLI for tag suggestions
        """
        # Store schema directory (None means use HED library to fetch from GitHub)
        self.schema_dir = schema_dir
        # Keyword extraction always runs; LSP enrichment requires hed-lsp CLI
        self.enable_semantic_search = enable_semantic_search

        # Initialize legacy schema loader for validation
        self.schema_loader = HedSchemaLoader()

        # Use provided LLMs or default to main llm
        eval_llm = evaluation_llm or llm
        assess_llm = assessment_llm or llm
        feed_llm = feedback_llm or llm

        # Store feedback LLM for keyword extraction (cheap/fast model)
        self.feedback_llm = feed_llm

        # Initialize agents with JSON schema support and per-agent LLMs
        self.annotation_agent = AnnotationAgent(llm, schema_dir=self.schema_dir)
        self.validation_agent = ValidationAgent(
            self.schema_loader,
            use_javascript=use_js_validator,
            validator_path=validator_path,
        )
        self.evaluation_agent = EvaluationAgent(eval_llm, schema_dir=self.schema_dir)
        self.assessment_agent = AssessmentAgent(assess_llm, schema_dir=self.schema_dir)
        self.feedback_summarizer = FeedbackSummarizer(feed_llm)

        # Initialize hed-lsp client for semantic search (optional enrichment)
        self.hed_lsp_client: HedLspClient | None = None
        if self.enable_semantic_search and is_hed_lsp_available():
            try:
                self.hed_lsp_client = HedLspClient()
                logger.info("[WORKFLOW] hed-lsp CLI available for semantic tag suggestions")
            except RuntimeError as e:
                logger.warning(f"[WORKFLOW] hed-lsp CLI not available: {e}")
        elif self.enable_semantic_search:
            logger.info(
                "[WORKFLOW] hed-lsp CLI not in PATH; "
                "keyword extraction will run without LSP enrichment"
            )

        # Build graph
        self.graph = self._build_graph()

    def _build_graph(self) -> StateGraph:
        """Build the LangGraph workflow.

        Returns:
            Compiled StateGraph
        """
        # Create graph
        workflow = StateGraph(HedAnnotationState)  # type: ignore[arg-type]  # LangGraph typing limitation

        # Add nodes
        if self.enable_semantic_search:
            workflow.add_node("semantic_preprocess", self._semantic_preprocess_node)
        workflow.add_node("annotate", self._annotate_node)
        workflow.add_node("validate", self._validate_node)
        workflow.add_node("summarize_feedback", self._summarize_feedback_node)
        workflow.add_node("evaluate", self._evaluate_node)
        workflow.add_node("assess", self._assess_node)

        # Add edges
        if self.enable_semantic_search:
            workflow.set_entry_point("semantic_preprocess")
            workflow.add_edge("semantic_preprocess", "annotate")
        else:
            workflow.set_entry_point("annotate")

        # After annotation, always validate
        workflow.add_edge("annotate", "validate")

        # After validation, route based on result
        workflow.add_conditional_edges(
            "validate",
            self._route_after_validation,
            {
                "summarize_feedback": "summarize_feedback",  # Summarize feedback if invalid
                "evaluate": "evaluate",  # Proceed if valid
                "end": END,  # End if max attempts reached
            },
        )

        # After feedback summarization, go to annotation
        workflow.add_edge("summarize_feedback", "annotate")

        # After evaluation, route based on faithfulness
        workflow.add_conditional_edges(
            "evaluate",
            self._route_after_evaluation,
            {
                "summarize_feedback": "summarize_feedback",  # Summarize feedback if not faithful
                "assess": "assess",  # Proceed to assessment if needed
                "end": END,  # Skip assessment if valid and faithful
            },
        )

        # After assessment, always end
        workflow.add_edge("assess", END)

        return workflow.compile()  # type: ignore[return-value]

    async def _extract_keywords(self, description: str) -> list[str]:
        """Extract HED-relevant keywords from a natural language description.

        Uses the feedback LLM (cheap/fast model) to identify key concepts
        that can be mapped to HED tags via the LSP suggest tool.

        Args:
            description: Natural language event or image description

        Returns:
            List of extracted keywords (max 20)
        """
        system_prompt = (
            "You are a keyword extractor for neuroscience event descriptions. "
            "Extract the most important concepts that could map to HED "
            "(Hierarchical Event Descriptors) tags.\n\n"
            "Extract:\n"
            "- Objects/entities (person, car, button, screen, face, etc.)\n"
            "- Actions/events (pressing, flashing, appearing, moving, etc.)\n"
            "- Properties/attributes (red, large, fast, loud, etc.)\n"
            "- Spatial relationships (left, center, above, etc.)\n"
            "- Temporal aspects (onset, offset, duration, etc.)\n"
            "- Sensory modalities (visual, auditory, tactile, etc.)\n\n"
            "Return ONLY a comma-separated list of single words or short phrases "
            "(2-3 words max). Return at most 20 keywords. "
            "Do not include any other text, explanation, or formatting."
        )

        try:
            response = await self.feedback_llm.ainvoke(
                [
                    SystemMessage(content=system_prompt),
                    HumanMessage(content=f"Description: {description}"),
                ]
            )
            raw_text = extract_text_content(response.content)
            # Parse comma-separated keywords, strip whitespace, filter empty
            keywords = [kw.strip() for kw in raw_text.split(",") if kw.strip()]
            # Limit to 20 keywords
            keywords = keywords[:20]
            logger.info(f"[WORKFLOW] Extracted {len(keywords)} keywords: {keywords}")
            return keywords
        except Exception as e:
            logger.warning("[WORKFLOW] Keyword extraction failed: %s", e, exc_info=True)
            return []

    async def _semantic_preprocess_node(self, state: HedAnnotationState) -> dict:
        """Semantic preprocessing node: Extract keywords and suggest HED tags.

        This node runs before annotation to provide semantic hints based on
        the input description. It uses the feedback LLM to extract keywords,
        then passes those keywords to hed-lsp CLI for tag suggestions.
        Only runs on the first iteration.

        Args:
            state: Current workflow state

        Returns:
            State update with extracted_keywords and semantic_hints
        """
        # Only run preprocessing on first iteration
        if state.get("total_iterations", 0) > 0:
            logger.debug("[WORKFLOW] Skipping semantic preprocessing (not first iteration)")
            return {}

        logger.info("[WORKFLOW] Entering semantic_preprocess node")

        # Step 1: Extract keywords from the description using LLM
        keywords = await self._extract_keywords(state["input_description"])

        # Step 2: Use hed-lsp CLI to get tag suggestions for each keyword
        semantic_hints: list[dict] = []

        if keywords and self.hed_lsp_client:
            # Query hed-lsp for each keyword individually for better results
            for keyword in keywords:
                try:
                    result = await asyncio.to_thread(self.hed_lsp_client.suggest, keyword)
                except Exception as e:
                    logger.warning("[WORKFLOW] hed-lsp error for '%s': %s", keyword, e)
                    continue
                if result.success:
                    for s in result.suggestions:
                        semantic_hints.append(
                            {
                                "tag": s.tag,
                                "keyword": keyword,
                                "score": s.score or 0.0,
                                "source": "hed-lsp",
                            }
                        )
                else:
                    logger.debug(
                        "[WORKFLOW] hed-lsp suggestion failed for '%s': %s",
                        keyword,
                        result.error,
                    )

            # Deduplicate by tag, keeping highest score
            if semantic_hints:
                seen_tags: dict[str, dict] = {}
                for hint in semantic_hints:
                    tag = hint["tag"]
                    if tag not in seen_tags or hint["score"] > seen_tags[tag]["score"]:
                        seen_tags[tag] = hint
                semantic_hints = sorted(seen_tags.values(), key=lambda h: h["score"], reverse=True)

            logger.info(
                "[WORKFLOW] hed-lsp suggested %d unique tags from %d keywords",
                len(semantic_hints),
                len(keywords),
            )
        elif keywords:
            # LSP not available; still store keywords for the annotation agent
            logger.info(
                "[WORKFLOW] hed-lsp not available; storing %d extracted keywords",
                len(keywords),
            )

        return {
            "extracted_keywords": keywords,
            "semantic_hints": semantic_hints,
        }

    async def _annotate_node(self, state: HedAnnotationState) -> dict:
        """Annotation node: Generate or refine HED annotation.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        total_iters = state.get("total_iterations", 0) + 1
        print(
            f"[WORKFLOW] Entering annotate node (validation attempt {state['validation_attempts']}, total iteration {total_iters})"
        )
        t0 = time.monotonic()
        result = await self.annotation_agent.annotate(state)
        elapsed = time.monotonic() - t0
        result["total_iterations"] = total_iters  # Increment counter
        print(
            f"[WORKFLOW] Annotation generated in {elapsed:.1f}s: {result.get('current_annotation', '')[:100]}..."
        )
        return result

    async def _validate_node(self, state: HedAnnotationState) -> dict:
        """Validation node: Validate HED annotation.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        print("[WORKFLOW] Entering validate node")
        t0 = time.monotonic()
        result = await self.validation_agent.validate(state)
        elapsed = time.monotonic() - t0
        print(
            f"[WORKFLOW] Validation result in {elapsed:.1f}s: {result.get('validation_status')}, is_valid: {result.get('is_valid')}"
        )
        if not result.get("is_valid"):
            print(f"[WORKFLOW] Validation errors: {result.get('validation_errors', [])}")
        return result

    async def _evaluate_node(self, state: HedAnnotationState) -> dict:
        """Evaluation node: Evaluate annotation faithfulness.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        print("[WORKFLOW] Entering evaluate node")
        t0 = time.monotonic()
        result = await self.evaluation_agent.evaluate(state)
        elapsed = time.monotonic() - t0
        print(
            f"[WORKFLOW] Evaluation result in {elapsed:.1f}s: is_faithful={result.get('is_faithful')}"
        )

        # Set default assessment values if assessment will be skipped
        run_assessment = state.get("run_assessment", False)
        if not run_assessment:
            result["is_complete"] = result.get("is_faithful", False) and state.get(
                "is_valid", False
            )
            if result["is_complete"]:
                result["assessment_feedback"] = (
                    "Annotation is valid and faithful to the original description."
                )
            else:
                result["assessment_feedback"] = ""

        return result

    async def _assess_node(self, state: HedAnnotationState) -> dict:
        """Assessment node: Final assessment.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        print("[WORKFLOW] Entering assess node")
        t0 = time.monotonic()
        result = await self.assessment_agent.assess(state)
        elapsed = time.monotonic() - t0
        print(f"[WORKFLOW] Assessment completed in {elapsed:.1f}s")
        return result

    async def _summarize_feedback_node(self, state: HedAnnotationState) -> dict:
        """Summarize feedback node: Condense errors and feedback.

        Args:
            state: Current workflow state

        Returns:
            State update with summarized feedback
        """
        print("[WORKFLOW] Entering summarize_feedback node")
        t0 = time.monotonic()
        result = await self.feedback_summarizer.summarize(state)
        elapsed = time.monotonic() - t0
        print(
            f"[WORKFLOW] Feedback summarized in {elapsed:.1f}s: {result.get('validation_errors_augmented', [''])[0][:100] if result.get('validation_errors_augmented') else 'No feedback'}..."
        )
        return result

    def _route_after_validation(
        self,
        state: HedAnnotationState,
    ) -> str:
        """Route after validation based on result.

        Args:
            state: Current workflow state

        Returns:
            Next node name
        """
        if state["validation_status"] == "valid":
            print("[WORKFLOW] Routing to evaluate (validation passed)")
            return "evaluate"
        elif state["validation_status"] == "max_attempts_reached":
            print("[WORKFLOW] Routing to end (max validation attempts reached)")
            return "end"
        else:
            print(
                f"[WORKFLOW] Routing to summarize_feedback (validation failed, attempts: {state['validation_attempts']}/{state['max_validation_attempts']})"
            )
            return "summarize_feedback"

    def _route_after_evaluation(
        self,
        state: HedAnnotationState,
    ) -> str:
        """Route after evaluation based on faithfulness and assessment mode.

        When run_assessment=False (default), evaluation is informational only;
        the result is reported but never triggers refinement loops.
        When run_assessment=True, evaluation can trigger refinement and the
        assessment node runs at the end.

        Args:
            state: Current workflow state

        Returns:
            Next node name
        """
        run_assessment = state.get("run_assessment", False)

        # When assessment is off, evaluation is informational -- always end
        if not run_assessment:
            print(
                f"[WORKFLOW] Evaluation complete (informational, is_faithful={state['is_faithful']}) - routing to END"
            )
            return "end"

        # Assessment mode: allow refinement loops with iteration cap
        total_iters = state.get("total_iterations", 0)
        max_iters = state.get("max_total_iterations", 4)

        if total_iters >= max_iters:
            print(f"[WORKFLOW] Routing to assess (max total iterations {max_iters} reached)")
            return "assess"

        if state["is_faithful"]:
            print("[WORKFLOW] Routing to assess (annotation is faithful)")
            return "assess"
        else:
            print(
                f"[WORKFLOW] Routing to summarize_feedback (annotation needs refinement, iteration {total_iters}/{max_iters})"
            )
            return "summarize_feedback"

    async def run(
        self,
        input_description: str,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 3,
        max_total_iterations: int | None = None,
        run_assessment: bool = False,
        no_extend: bool = False,
        config: dict | None = None,
    ) -> HedAnnotationState:
        """Run the complete annotation workflow.

        Args:
            input_description: Natural language event description
            schema_version: HED schema version to use
            max_validation_attempts: Maximum validation retry attempts
            max_total_iterations: Maximum total iterations (default: max_validation_attempts + 1)
            run_assessment: Whether to run final assessment (default: False)
            no_extend: If True, prohibit tag extensions (use only existing vocabulary)
            config: Optional LangGraph config (e.g., recursion_limit)

        Returns:
            Final workflow state with annotation and feedback
        """
        from src.agents.state import create_initial_state

        if max_total_iterations is None:
            max_total_iterations = max_validation_attempts + 1

        # Create initial state
        initial_state = create_initial_state(
            input_description,
            schema_version,
            max_validation_attempts,
            max_total_iterations,
            run_assessment,
            no_extend=no_extend,
        )

        # Run workflow
        final_state = await self.graph.ainvoke(initial_state, config=config)  # type: ignore[attr-defined]

        return final_state

__init__(llm, evaluation_llm=None, assessment_llm=None, feedback_llm=None, schema_dir=None, validator_path=None, use_js_validator=True, enable_semantic_search=True)

Initialize the workflow.

Parameters:

Name Type Description Default
llm BaseChatModel

Language model for annotation agent

required
evaluation_llm BaseChatModel | None

Language model for evaluation agent (defaults to llm)

None
assessment_llm BaseChatModel | None

Language model for assessment agent (defaults to llm)

None
feedback_llm BaseChatModel | None

Language model for feedback summarization (defaults to llm)

None
schema_dir Path | str | None

Directory containing JSON schemas

None
validator_path Path | None

Path to hed-javascript for validation

None
use_js_validator bool

Whether to use JavaScript validator

True
enable_semantic_search bool

Whether to use hed-lsp CLI for tag suggestions

True
Source code in hedit/src/agents/workflow.py
def __init__(
    self,
    llm: BaseChatModel,
    evaluation_llm: BaseChatModel | None = None,
    assessment_llm: BaseChatModel | None = None,
    feedback_llm: BaseChatModel | None = None,
    schema_dir: Path | str | None = None,
    validator_path: Path | None = None,
    use_js_validator: bool = True,
    enable_semantic_search: bool = True,
) -> None:
    """Initialize the workflow.

    Args:
        llm: Language model for annotation agent
        evaluation_llm: Language model for evaluation agent (defaults to llm)
        assessment_llm: Language model for assessment agent (defaults to llm)
        feedback_llm: Language model for feedback summarization (defaults to llm)
        schema_dir: Directory containing JSON schemas
        validator_path: Path to hed-javascript for validation
        use_js_validator: Whether to use JavaScript validator
        enable_semantic_search: Whether to use hed-lsp CLI for tag suggestions
    """
    # Store schema directory (None means use HED library to fetch from GitHub)
    self.schema_dir = schema_dir
    # Keyword extraction always runs; LSP enrichment requires hed-lsp CLI
    self.enable_semantic_search = enable_semantic_search

    # Initialize legacy schema loader for validation
    self.schema_loader = HedSchemaLoader()

    # Use provided LLMs or default to main llm
    eval_llm = evaluation_llm or llm
    assess_llm = assessment_llm or llm
    feed_llm = feedback_llm or llm

    # Store feedback LLM for keyword extraction (cheap/fast model)
    self.feedback_llm = feed_llm

    # Initialize agents with JSON schema support and per-agent LLMs
    self.annotation_agent = AnnotationAgent(llm, schema_dir=self.schema_dir)
    self.validation_agent = ValidationAgent(
        self.schema_loader,
        use_javascript=use_js_validator,
        validator_path=validator_path,
    )
    self.evaluation_agent = EvaluationAgent(eval_llm, schema_dir=self.schema_dir)
    self.assessment_agent = AssessmentAgent(assess_llm, schema_dir=self.schema_dir)
    self.feedback_summarizer = FeedbackSummarizer(feed_llm)

    # Initialize hed-lsp client for semantic search (optional enrichment)
    self.hed_lsp_client: HedLspClient | None = None
    if self.enable_semantic_search and is_hed_lsp_available():
        try:
            self.hed_lsp_client = HedLspClient()
            logger.info("[WORKFLOW] hed-lsp CLI available for semantic tag suggestions")
        except RuntimeError as e:
            logger.warning(f"[WORKFLOW] hed-lsp CLI not available: {e}")
    elif self.enable_semantic_search:
        logger.info(
            "[WORKFLOW] hed-lsp CLI not in PATH; "
            "keyword extraction will run without LSP enrichment"
        )

    # Build graph
    self.graph = self._build_graph()

run(input_description, schema_version='8.4.0', max_validation_attempts=3, max_total_iterations=None, run_assessment=False, no_extend=False, config=None) async

Run the complete annotation workflow.

Parameters:

Name Type Description Default
input_description str

Natural language event description

required
schema_version str

HED schema version to use

'8.4.0'
max_validation_attempts int

Maximum validation retry attempts

3
max_total_iterations int | None

Maximum total iterations (default: max_validation_attempts + 1)

None
run_assessment bool

Whether to run final assessment (default: False)

False
no_extend bool

If True, prohibit tag extensions (use only existing vocabulary)

False
config dict | None

Optional LangGraph config (e.g., recursion_limit)

None

Returns:

Type Description
HedAnnotationState

Final workflow state with annotation and feedback

Source code in hedit/src/agents/workflow.py
async def run(
    self,
    input_description: str,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 3,
    max_total_iterations: int | None = None,
    run_assessment: bool = False,
    no_extend: bool = False,
    config: dict | None = None,
) -> HedAnnotationState:
    """Run the complete annotation workflow.

    Args:
        input_description: Natural language event description
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        max_total_iterations: Maximum total iterations (default: max_validation_attempts + 1)
        run_assessment: Whether to run final assessment (default: False)
        no_extend: If True, prohibit tag extensions (use only existing vocabulary)
        config: Optional LangGraph config (e.g., recursion_limit)

    Returns:
        Final workflow state with annotation and feedback
    """
    from src.agents.state import create_initial_state

    if max_total_iterations is None:
        max_total_iterations = max_validation_attempts + 1

    # Create initial state
    initial_state = create_initial_state(
        input_description,
        schema_version,
        max_validation_attempts,
        max_total_iterations,
        run_assessment,
        no_extend=no_extend,
    )

    # Run workflow
    final_state = await self.graph.ainvoke(initial_state, config=config)  # type: ignore[attr-defined]

    return final_state

Validation

src.validation.hed_validator

HED validation using Python and JavaScript validators.

This module provides integration with HED validation tools. The validator factory supports JavaScript (most detailed) and Python (always available fallback) backends.

ValidationIssue dataclass

Represents a single validation issue (error or warning).

Attributes:

Name Type Description
code str

Issue code (e.g., 'TAG_INVALID')

level Literal['error', 'warning']

Severity level ('error' or 'warning')

message str

Human-readable error message

tag str | None

The problematic tag (if applicable)

context dict | None

Additional context information

Source code in hedit/src/validation/hed_validator.py
@dataclass
class ValidationIssue:
    """Represents a single validation issue (error or warning).

    Attributes:
        code: Issue code (e.g., 'TAG_INVALID')
        level: Severity level ('error' or 'warning')
        message: Human-readable error message
        tag: The problematic tag (if applicable)
        context: Additional context information
    """

    code: str
    level: Literal["error", "warning"]
    message: str
    tag: str | None = None
    context: dict | None = None

ValidationResult dataclass

Result of HED string validation.

Attributes:

Name Type Description
is_valid bool

Whether the HED string is valid

errors list[ValidationIssue]

List of error issues

warnings list[ValidationIssue]

List of warning issues

parsed_string str | None

Successfully parsed HED string (if valid)

Source code in hedit/src/validation/hed_validator.py
@dataclass
class ValidationResult:
    """Result of HED string validation.

    Attributes:
        is_valid: Whether the HED string is valid
        errors: List of error issues
        warnings: List of warning issues
        parsed_string: Successfully parsed HED string (if valid)
    """

    is_valid: bool
    errors: list[ValidationIssue]
    warnings: list[ValidationIssue]
    parsed_string: str | None = None

HedPythonValidator

Validates HED strings using the Python HED tools.

Source code in hedit/src/validation/hed_validator.py
class HedPythonValidator:
    """Validates HED strings using the Python HED tools."""

    def __init__(self, schema: HedSchema) -> None:
        """Initialize validator with a HED schema.

        Args:
            schema: HedSchema object to validate against
        """
        self.schema = schema
        self.validator = HedValidator(schema)

    def validate(self, hed_string: str) -> ValidationResult:
        """Validate a HED string.

        Args:
            hed_string: HED annotation string to validate

        Returns:
            ValidationResult with errors and warnings
        """
        errors = []
        warnings = []

        try:
            # Parse and validate HED string
            hed_string_obj = HedString(hed_string, self.schema)
            issues = hed_string_obj.validate(self.validator)

            # Process issues
            for issue in issues:
                issue_str = get_printable_issue_string([issue])
                severity: Literal["error", "warning"] = (
                    "error" if issue["severity"] == "error" else "warning"
                )

                validation_issue = ValidationIssue(
                    code=issue.get("code", "UNKNOWN"),
                    level=severity,
                    message=issue_str,
                    tag=issue.get("tag", None),
                )

                if severity == "error":
                    errors.append(validation_issue)
                else:
                    warnings.append(validation_issue)

            is_valid = len(errors) == 0
            parsed = str(hed_string_obj) if is_valid else None

            return ValidationResult(
                is_valid=is_valid,
                errors=errors,
                warnings=warnings,
                parsed_string=parsed,
            )

        except Exception as e:
            logger.warning("HED string validation failed: %s", e, exc_info=True)
            errors.append(
                ValidationIssue(
                    code="PARSE_ERROR",
                    level="error",
                    message=str(e),
                )
            )
            return ValidationResult(is_valid=False, errors=errors, warnings=warnings)

__init__(schema)

Initialize validator with a HED schema.

Parameters:

Name Type Description Default
schema HedSchema

HedSchema object to validate against

required
Source code in hedit/src/validation/hed_validator.py
def __init__(self, schema: HedSchema) -> None:
    """Initialize validator with a HED schema.

    Args:
        schema: HedSchema object to validate against
    """
    self.schema = schema
    self.validator = HedValidator(schema)

validate(hed_string)

Validate a HED string.

Parameters:

Name Type Description Default
hed_string str

HED annotation string to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with errors and warnings

Source code in hedit/src/validation/hed_validator.py
def validate(self, hed_string: str) -> ValidationResult:
    """Validate a HED string.

    Args:
        hed_string: HED annotation string to validate

    Returns:
        ValidationResult with errors and warnings
    """
    errors = []
    warnings = []

    try:
        # Parse and validate HED string
        hed_string_obj = HedString(hed_string, self.schema)
        issues = hed_string_obj.validate(self.validator)

        # Process issues
        for issue in issues:
            issue_str = get_printable_issue_string([issue])
            severity: Literal["error", "warning"] = (
                "error" if issue["severity"] == "error" else "warning"
            )

            validation_issue = ValidationIssue(
                code=issue.get("code", "UNKNOWN"),
                level=severity,
                message=issue_str,
                tag=issue.get("tag", None),
            )

            if severity == "error":
                errors.append(validation_issue)
            else:
                warnings.append(validation_issue)

        is_valid = len(errors) == 0
        parsed = str(hed_string_obj) if is_valid else None

        return ValidationResult(
            is_valid=is_valid,
            errors=errors,
            warnings=warnings,
            parsed_string=parsed,
        )

    except Exception as e:
        logger.warning("HED string validation failed: %s", e, exc_info=True)
        errors.append(
            ValidationIssue(
                code="PARSE_ERROR",
                level="error",
                message=str(e),
            )
        )
        return ValidationResult(is_valid=False, errors=errors, warnings=warnings)

HedJavaScriptValidator

Validates HED strings using the JavaScript HED validator.

This provides more detailed feedback than the Python validator. Requires Node.js and the hed-javascript package.

Source code in hedit/src/validation/hed_validator.py
class HedJavaScriptValidator:
    """Validates HED strings using the JavaScript HED validator.

    This provides more detailed feedback than the Python validator.
    Requires Node.js and the hed-javascript package.
    """

    def __init__(
        self,
        validator_path: Path,
        schema_version: str = "8.4.0",
    ) -> None:
        """Initialize JavaScript validator.

        Args:
            validator_path: Path to hed-javascript repository
            schema_version: HED schema version to use
        """
        self.validator_path = Path(validator_path)
        self.schema_version = schema_version
        self._check_installation()

    def _check_installation(self) -> None:
        """Verify that Node.js and hed-validator are available."""
        # Check Node.js
        try:
            subprocess.run(
                ["node", "--version"],
                check=True,
                capture_output=True,
                timeout=5,
            )
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
            raise RuntimeError("Node.js is not installed or not in PATH") from e

        # Check validator path
        if not self.validator_path.exists():
            raise RuntimeError(f"HED JavaScript validator not found at {self.validator_path}")

    def validate(self, hed_string: str) -> ValidationResult:
        """Validate a HED string using JavaScript validator.

        Args:
            hed_string: HED annotation string to validate

        Returns:
            ValidationResult with detailed errors and warnings
        """
        # Create validation script
        script = f"""
        const {{ parseHedString, buildSchemasFromVersion }} = require('{self.validator_path}/dist/commonjs/index.js');

        async function validate() {{
            try {{
                const schemas = await buildSchemasFromVersion('{self.schema_version}');
                const hedString = `{hed_string}`;
                const [parsed, errors, warnings] = parseHedString(
                    hedString,
                    schemas,
                    false,  // no definitions
                    false,  // no placeholders
                    true    // full validation
                );

                // Reclassify warnings that should actually be errors
                // Based on HED validator source: these indicate invalid/malformed HED
                const errorCodes = [
                    'TAG_INVALID',                    // Invalid tag - doesn't exist in schema
                    'TAG_NAMESPACE_PREFIX_INVALID',   // Invalid tag prefix
                    'TAG_NOT_UNIQUE',                 // Multiple unique tags
                    'TAG_REQUIRES_CHILD',             // Child/value required
                    'TAG_EXTENSION_INVALID',          // Invalid extension
                    'TAG_EMPTY',                      // Empty tag
                    'UNITS_INVALID',                  // Invalid units
                    'VALUE_INVALID',                  // Invalid value
                ];
                const actualErrors = [];
                const actualWarnings = [];

                // Process errors
                errors.forEach(e => {{
                    actualErrors.push({{
                        code: e.hedCode || e.internalCode,
                        message: e.message,
                        tag: e.parameters?.tag,
                        level: 'error'
                    }});
                }});

                // Process warnings - promote critical ones to errors
                warnings.forEach(w => {{
                    const code = w.hedCode || w.internalCode;
                    const issue = {{
                        code: code,
                        message: w.message,
                        tag: w.parameters?.tag,
                        level: errorCodes.includes(code) ? 'error' : 'warning'
                    }};

                    if (errorCodes.includes(code)) {{
                        actualErrors.push(issue);
                    }} else {{
                        actualWarnings.push(issue);
                    }}
                }});

                const result = {{
                    isValid: actualErrors.length === 0,
                    parsed: parsed ? parsed.toString() : null,
                    errors: actualErrors,
                    warnings: actualWarnings
                }};

                console.log(JSON.stringify(result));
            }} catch (error) {{
                console.log(JSON.stringify({{
                    isValid: false,
                    errors: [{{ code: 'VALIDATOR_ERROR', message: error.message, level: 'error' }}],
                    warnings: []
                }}));
            }}
        }}

        validate();
        """

        try:
            # Run Node.js validation
            result = subprocess.run(
                ["node", "-e", script],
                capture_output=True,
                text=True,
                timeout=30,
                check=True,
            )

            # Parse result
            output = json.loads(result.stdout)

            errors = [
                ValidationIssue(
                    code=e["code"],
                    level="error",
                    message=e["message"],
                    tag=e.get("tag"),
                )
                for e in output["errors"]
            ]

            warnings = [
                ValidationIssue(
                    code=w["code"],
                    level="warning",
                    message=w["message"],
                    tag=w.get("tag"),
                )
                for w in output["warnings"]
            ]

            return ValidationResult(
                is_valid=output["isValid"],
                errors=errors,
                warnings=warnings,
                parsed_string=output.get("parsed"),
            )

        except subprocess.TimeoutExpired:
            logger.warning("JavaScript validation timed out after 30s")
            return ValidationResult(
                is_valid=False,
                errors=[
                    ValidationIssue(
                        code="TIMEOUT",
                        level="error",
                        message="Validation timed out",
                    )
                ],
                warnings=[],
            )
        except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
            logger.error("JavaScript validator failed: %s", e)
            return ValidationResult(
                is_valid=False,
                errors=[
                    ValidationIssue(
                        code="VALIDATION_ERROR",
                        level="error",
                        message=f"Validation failed: {e}",
                    )
                ],
                warnings=[],
            )
        except Exception as e:
            logger.error("Unexpected error in JavaScript validation: %s", e, exc_info=True)
            return ValidationResult(
                is_valid=False,
                errors=[
                    ValidationIssue(
                        code="VALIDATION_ERROR",
                        level="error",
                        message=f"Validation failed: {e}",
                    )
                ],
                warnings=[],
            )

__init__(validator_path, schema_version='8.4.0')

Initialize JavaScript validator.

Parameters:

Name Type Description Default
validator_path Path

Path to hed-javascript repository

required
schema_version str

HED schema version to use

'8.4.0'
Source code in hedit/src/validation/hed_validator.py
def __init__(
    self,
    validator_path: Path,
    schema_version: str = "8.4.0",
) -> None:
    """Initialize JavaScript validator.

    Args:
        validator_path: Path to hed-javascript repository
        schema_version: HED schema version to use
    """
    self.validator_path = Path(validator_path)
    self.schema_version = schema_version
    self._check_installation()

validate(hed_string)

Validate a HED string using JavaScript validator.

Parameters:

Name Type Description Default
hed_string str

HED annotation string to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with detailed errors and warnings

Source code in hedit/src/validation/hed_validator.py
def validate(self, hed_string: str) -> ValidationResult:
    """Validate a HED string using JavaScript validator.

    Args:
        hed_string: HED annotation string to validate

    Returns:
        ValidationResult with detailed errors and warnings
    """
    # Create validation script
    script = f"""
    const {{ parseHedString, buildSchemasFromVersion }} = require('{self.validator_path}/dist/commonjs/index.js');

    async function validate() {{
        try {{
            const schemas = await buildSchemasFromVersion('{self.schema_version}');
            const hedString = `{hed_string}`;
            const [parsed, errors, warnings] = parseHedString(
                hedString,
                schemas,
                false,  // no definitions
                false,  // no placeholders
                true    // full validation
            );

            // Reclassify warnings that should actually be errors
            // Based on HED validator source: these indicate invalid/malformed HED
            const errorCodes = [
                'TAG_INVALID',                    // Invalid tag - doesn't exist in schema
                'TAG_NAMESPACE_PREFIX_INVALID',   // Invalid tag prefix
                'TAG_NOT_UNIQUE',                 // Multiple unique tags
                'TAG_REQUIRES_CHILD',             // Child/value required
                'TAG_EXTENSION_INVALID',          // Invalid extension
                'TAG_EMPTY',                      // Empty tag
                'UNITS_INVALID',                  // Invalid units
                'VALUE_INVALID',                  // Invalid value
            ];
            const actualErrors = [];
            const actualWarnings = [];

            // Process errors
            errors.forEach(e => {{
                actualErrors.push({{
                    code: e.hedCode || e.internalCode,
                    message: e.message,
                    tag: e.parameters?.tag,
                    level: 'error'
                }});
            }});

            // Process warnings - promote critical ones to errors
            warnings.forEach(w => {{
                const code = w.hedCode || w.internalCode;
                const issue = {{
                    code: code,
                    message: w.message,
                    tag: w.parameters?.tag,
                    level: errorCodes.includes(code) ? 'error' : 'warning'
                }};

                if (errorCodes.includes(code)) {{
                    actualErrors.push(issue);
                }} else {{
                    actualWarnings.push(issue);
                }}
            }});

            const result = {{
                isValid: actualErrors.length === 0,
                parsed: parsed ? parsed.toString() : null,
                errors: actualErrors,
                warnings: actualWarnings
            }};

            console.log(JSON.stringify(result));
        }} catch (error) {{
            console.log(JSON.stringify({{
                isValid: false,
                errors: [{{ code: 'VALIDATOR_ERROR', message: error.message, level: 'error' }}],
                warnings: []
            }}));
        }}
    }}

    validate();
    """

    try:
        # Run Node.js validation
        result = subprocess.run(
            ["node", "-e", script],
            capture_output=True,
            text=True,
            timeout=30,
            check=True,
        )

        # Parse result
        output = json.loads(result.stdout)

        errors = [
            ValidationIssue(
                code=e["code"],
                level="error",
                message=e["message"],
                tag=e.get("tag"),
            )
            for e in output["errors"]
        ]

        warnings = [
            ValidationIssue(
                code=w["code"],
                level="warning",
                message=w["message"],
                tag=w.get("tag"),
            )
            for w in output["warnings"]
        ]

        return ValidationResult(
            is_valid=output["isValid"],
            errors=errors,
            warnings=warnings,
            parsed_string=output.get("parsed"),
        )

    except subprocess.TimeoutExpired:
        logger.warning("JavaScript validation timed out after 30s")
        return ValidationResult(
            is_valid=False,
            errors=[
                ValidationIssue(
                    code="TIMEOUT",
                    level="error",
                    message="Validation timed out",
                )
            ],
            warnings=[],
        )
    except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
        logger.error("JavaScript validator failed: %s", e)
        return ValidationResult(
            is_valid=False,
            errors=[
                ValidationIssue(
                    code="VALIDATION_ERROR",
                    level="error",
                    message=f"Validation failed: {e}",
                )
            ],
            warnings=[],
        )
    except Exception as e:
        logger.error("Unexpected error in JavaScript validation: %s", e, exc_info=True)
        return ValidationResult(
            is_valid=False,
            errors=[
                ValidationIssue(
                    code="VALIDATION_ERROR",
                    level="error",
                    message=f"Validation failed: {e}",
                )
            ],
            warnings=[],
        )

is_js_validator_available(validator_path=None)

Check if JavaScript validator is available.

Parameters:

Name Type Description Default
validator_path Path | str | None

Path to hed-javascript. If None, uses HED_VALIDATOR_PATH env var.

None

Returns:

Type Description
bool

True if Node.js is installed and hed-javascript is available.

Source code in hedit/src/validation/hed_validator.py
def is_js_validator_available(validator_path: Path | str | None = None) -> bool:
    """Check if JavaScript validator is available.

    Args:
        validator_path: Path to hed-javascript. If None, uses HED_VALIDATOR_PATH env var.

    Returns:
        True if Node.js is installed and hed-javascript is available.
    """
    # Check Node.js
    if not shutil.which("node"):
        return False

    # Check validator path
    if validator_path is None:
        validator_path = os.environ.get("HED_VALIDATOR_PATH")
    if validator_path is None:
        return False

    path = Path(validator_path)
    return path.exists() and (path / "dist" / "commonjs" / "index.js").exists()

get_validator(schema_version='8.4.0', prefer_js=True, require_js=False, validator_path=None)

Get the appropriate HED validator based on availability and preferences.

Parameters:

Name Type Description Default
schema_version str

HED schema version (e.g., "8.3.0", "8.4.0")

'8.4.0'
prefer_js bool

If True, prefer JavaScript validator when available.

True
require_js bool

If True, raise error if JavaScript validator unavailable (no fallback)

False
validator_path Path | str | None

Path to hed-javascript. If None, uses HED_VALIDATOR_PATH env var.

None

Returns:

Type Description
HedPythonValidator | HedJavaScriptValidator

Configured validator instance

Raises:

Type Description
RuntimeError

If require_js=True and JavaScript validator is unavailable

Source code in hedit/src/validation/hed_validator.py
def get_validator(
    schema_version: str = "8.4.0",
    prefer_js: bool = True,
    require_js: bool = False,
    validator_path: Path | str | None = None,
) -> HedPythonValidator | HedJavaScriptValidator:
    """Get the appropriate HED validator based on availability and preferences.

    Args:
        schema_version: HED schema version (e.g., "8.3.0", "8.4.0")
        prefer_js: If True, prefer JavaScript validator when available.
        require_js: If True, raise error if JavaScript validator unavailable (no fallback)
        validator_path: Path to hed-javascript. If None, uses HED_VALIDATOR_PATH env var.

    Returns:
        Configured validator instance

    Raises:
        RuntimeError: If require_js=True and JavaScript validator is unavailable
    """
    # Resolve validator_path once from env if not provided
    if validator_path is None:
        validator_path = os.environ.get("HED_VALIDATOR_PATH")

    js_unavailable_msg = (
        "JavaScript validator required but unavailable. "
        "Ensure Node.js is installed and HED_VALIDATOR_PATH is set."
    )

    js_available = is_js_validator_available(validator_path)

    if require_js and not js_available:
        raise RuntimeError(js_unavailable_msg)

    if prefer_js and js_available and validator_path:
        logger.info("Using JavaScript HED validator at %s", validator_path)
        return HedJavaScriptValidator(
            validator_path=Path(validator_path),
            schema_version=schema_version,
        )

    if prefer_js and not js_available:
        logger.warning(
            "JavaScript validator preferred but unavailable (path=%s), "
            "falling back to Python HED validator",
            validator_path,
        )

    logger.info("Using Python HED validator (schema=%s)", schema_version)
    schema = load_schema_version(schema_version)
    return HedPythonValidator(schema=schema)