Skip to content

HEDit Python API

This page documents the Python modules and classes in HEDit.

API Models

The API request/response models are defined using Pydantic:

src.api.models

Pydantic models for API requests and responses.

AnnotationRequest

Bases: BaseModel

Request model for HED annotation generation.

Attributes:

Name Type Description
description str

Natural language event description to annotate

schema_version str

HED schema version to use

max_validation_attempts int

Maximum validation retry attempts

run_assessment bool

Whether to run final assessment (adds extra time)

Source code in hedit/src/api/models.py
class AnnotationRequest(BaseModel):
    """Request model for HED annotation generation.

    Attributes:
        description: Natural language event description to annotate
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        run_assessment: Whether to run final assessment (adds extra time)
    """

    description: str = Field(
        ...,
        description="Natural language event description",
        min_length=1,
        examples=["A red circle appears on the left side of the screen"],
    )
    schema_version: str = Field(
        default="8.3.0",
        description="HED schema version",
        examples=["8.3.0", "8.4.0"],
    )
    max_validation_attempts: int = Field(
        default=5,
        description="Maximum validation retry attempts",
        ge=1,
        le=10,
    )
    run_assessment: bool = Field(
        default=False,
        description="Run final assessment for completeness (adds extra processing time)",
    )

AnnotationResponse

Bases: BaseModel

Response model for HED annotation generation.

Attributes:

Name Type Description
annotation str

Generated HED annotation string

is_valid bool

Whether the annotation passed validation

is_faithful bool

Whether the annotation is faithful to description

is_complete bool

Whether the annotation is complete

validation_attempts int

Number of validation attempts made

validation_errors list[str]

List of validation errors (if any)

validation_warnings list[str]

List of validation warnings (if any)

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

status str

Overall workflow status

Source code in hedit/src/api/models.py
class AnnotationResponse(BaseModel):
    """Response model for HED annotation generation.

    Attributes:
        annotation: Generated HED annotation string
        is_valid: Whether the annotation passed validation
        is_faithful: Whether the annotation is faithful to description
        is_complete: Whether the annotation is complete
        validation_attempts: Number of validation attempts made
        validation_errors: List of validation errors (if any)
        validation_warnings: List of validation warnings (if any)
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        status: Overall workflow status
    """

    annotation: str = Field(..., description="Generated HED annotation string")
    is_valid: bool = Field(..., description="Validation status")
    is_faithful: bool = Field(..., description="Faithfulness to original description")
    is_complete: bool = Field(..., description="Completeness status")
    validation_attempts: int = Field(..., description="Number of validation attempts")
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    status: str = Field(..., description="Workflow status", examples=["success", "failed"])

ValidationRequest

Bases: BaseModel

Request model for HED validation only.

Attributes:

Name Type Description
hed_string str

HED annotation string to validate

schema_version str

HED schema version to use

Source code in hedit/src/api/models.py
class ValidationRequest(BaseModel):
    """Request model for HED validation only.

    Attributes:
        hed_string: HED annotation string to validate
        schema_version: HED schema version to use
    """

    hed_string: str = Field(
        ...,
        description="HED annotation string",
        min_length=1,
    )
    schema_version: str = Field(
        default="8.3.0",
        description="HED schema version",
    )

ValidationResponse

Bases: BaseModel

Response model for HED validation.

Attributes:

Name Type Description
is_valid bool

Whether the HED string is valid

errors list[str]

List of validation errors

warnings list[str]

List of validation warnings

parsed_string str | None

Normalized HED string (if valid)

Source code in hedit/src/api/models.py
class ValidationResponse(BaseModel):
    """Response model for HED validation.

    Attributes:
        is_valid: Whether the HED string is valid
        errors: List of validation errors
        warnings: List of validation warnings
        parsed_string: Normalized HED string (if valid)
    """

    is_valid: bool = Field(..., description="Validation status")
    errors: list[str] = Field(default_factory=list)
    warnings: list[str] = Field(default_factory=list)
    parsed_string: str | None = Field(default=None)

ImageAnnotationRequest

Bases: BaseModel

Request model for image-based HED annotation generation.

Attributes:

Name Type Description
image str

Base64 encoded image or data URI

prompt str | None

Optional custom prompt for vision model (uses default if not provided)

schema_version str

HED schema version to use

max_validation_attempts int

Maximum validation retry attempts

run_assessment bool

Whether to run final assessment (adds extra time)

Source code in hedit/src/api/models.py
class ImageAnnotationRequest(BaseModel):
    """Request model for image-based HED annotation generation.

    Attributes:
        image: Base64 encoded image or data URI
        prompt: Optional custom prompt for vision model (uses default if not provided)
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        run_assessment: Whether to run final assessment (adds extra time)
    """

    image: str = Field(
        ...,
        description="Base64 encoded image or data URI (data:image/png;base64,...)",
        min_length=1,
    )
    prompt: str | None = Field(
        default=None,
        description="Optional custom prompt for vision model",
        examples=["Describe the visual elements in this image"],
    )
    schema_version: str = Field(
        default="8.4.0",
        description="HED schema version",
        examples=["8.3.0", "8.4.0"],
    )
    max_validation_attempts: int = Field(
        default=5,
        description="Maximum validation retry attempts",
        ge=1,
        le=10,
    )
    run_assessment: bool = Field(
        default=False,
        description="Run final assessment for completeness (adds extra processing time)",
    )

ImageAnnotationResponse

Bases: BaseModel

Response model for image-based HED annotation generation.

Attributes:

Name Type Description
image_description str

Generated description from vision model

annotation str

Generated HED annotation string

is_valid bool

Whether the annotation passed validation

is_faithful bool

Whether the annotation is faithful to description

is_complete bool

Whether the annotation is complete

validation_attempts int

Number of validation attempts made

validation_errors list[str]

List of validation errors (if any)

validation_warnings list[str]

List of validation warnings (if any)

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

status str

Overall workflow status

image_metadata dict

Metadata about the processed image

Source code in hedit/src/api/models.py
class ImageAnnotationResponse(BaseModel):
    """Response model for image-based HED annotation generation.

    Attributes:
        image_description: Generated description from vision model
        annotation: Generated HED annotation string
        is_valid: Whether the annotation passed validation
        is_faithful: Whether the annotation is faithful to description
        is_complete: Whether the annotation is complete
        validation_attempts: Number of validation attempts made
        validation_errors: List of validation errors (if any)
        validation_warnings: List of validation warnings (if any)
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        status: Overall workflow status
        image_metadata: Metadata about the processed image
    """

    image_description: str = Field(..., description="Generated image description")
    annotation: str = Field(..., description="Generated HED annotation string")
    is_valid: bool = Field(..., description="Validation status")
    is_faithful: bool = Field(..., description="Faithfulness to description")
    is_complete: bool = Field(..., description="Completeness status")
    validation_attempts: int = Field(..., description="Number of validation attempts")
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    status: str = Field(..., description="Workflow status", examples=["success", "failed"])
    image_metadata: dict = Field(default_factory=dict, description="Image metadata")

HealthResponse

Bases: BaseModel

Response model for health check.

Attributes:

Name Type Description
status str

Service status

version str

API version

llm_available bool

Whether LLM is available

validator_available bool

Whether HED validator is available

Source code in hedit/src/api/models.py
class HealthResponse(BaseModel):
    """Response model for health check.

    Attributes:
        status: Service status
        version: API version
        llm_available: Whether LLM is available
        validator_available: Whether HED validator is available
    """

    status: str = Field(..., examples=["healthy", "degraded"])
    version: str = Field(..., examples=["0.1.0"])
    llm_available: bool
    validator_available: bool

FeedbackRequest

Bases: BaseModel

Request model for submitting user feedback.

Attributes:

Name Type Description
type str

Feedback type (text or image annotation)

description str | None

Original input description (for text mode)

image_description str | None

Image description (for image mode)

annotation str

Generated HED annotation

is_valid bool

Whether the annotation was valid

is_faithful bool | None

Whether the annotation was faithful

is_complete bool | None

Whether the annotation was complete

validation_errors list[str]

List of validation errors

validation_warnings list[str]

List of validation warnings

evaluation_feedback str

Evaluation agent feedback

assessment_feedback str

Assessment agent feedback

user_comment str | None

Optional user comment about the annotation

Source code in hedit/src/api/models.py
class FeedbackRequest(BaseModel):
    """Request model for submitting user feedback.

    Attributes:
        type: Feedback type (text or image annotation)
        description: Original input description (for text mode)
        image_description: Image description (for image mode)
        annotation: Generated HED annotation
        is_valid: Whether the annotation was valid
        is_faithful: Whether the annotation was faithful
        is_complete: Whether the annotation was complete
        validation_errors: List of validation errors
        validation_warnings: List of validation warnings
        evaluation_feedback: Evaluation agent feedback
        assessment_feedback: Assessment agent feedback
        user_comment: Optional user comment about the annotation
    """

    type: str = Field(
        default="text",
        description="Feedback type",
        examples=["text", "image"],
    )
    version: str | None = Field(
        default=None,
        description="App version that generated the annotation",
    )
    description: str | None = Field(
        default=None,
        description="Original input description (for text mode)",
    )
    image_description: str | None = Field(
        default=None,
        description="Image description (for image mode)",
    )
    annotation: str = Field(
        ...,
        description="Generated HED annotation",
        min_length=1,
    )
    is_valid: bool = Field(
        default=False,
        description="Whether the annotation was valid",
    )
    is_faithful: bool | None = Field(
        default=None,
        description="Whether the annotation was faithful",
    )
    is_complete: bool | None = Field(
        default=None,
        description="Whether the annotation was complete",
    )
    validation_errors: list[str] = Field(default_factory=list)
    validation_warnings: list[str] = Field(default_factory=list)
    evaluation_feedback: str = Field(default="")
    assessment_feedback: str = Field(default="")
    user_comment: str | None = Field(
        default=None,
        description="Optional user comment about the annotation",
    )

FeedbackResponse

Bases: BaseModel

Response model for feedback submission.

Attributes:

Name Type Description
success bool

Whether feedback was saved successfully

feedback_id str

Unique identifier for the feedback

message str

Status message

Source code in hedit/src/api/models.py
class FeedbackResponse(BaseModel):
    """Response model for feedback submission.

    Attributes:
        success: Whether feedback was saved successfully
        feedback_id: Unique identifier for the feedback
        message: Status message
    """

    success: bool = Field(..., description="Whether feedback was saved")
    feedback_id: str = Field(..., description="Unique identifier for the feedback")
    message: str = Field(..., description="Status message")

CLI Module

The CLI is built with Typer:

src.cli.main

HEDit CLI - Main entry point.

Command-line interface for generating HED annotations from natural language. Uses the HEDit API (api.annotation.garden/hedit) with bring-your-own-key (BYOK) support.

version_callback(value)

Print version and exit.

Source code in hedit/src/cli/main.py
def version_callback(value: bool) -> None:
    """Print version and exit."""
    if value:
        console.print(f"hedit version {__version__}")
        raise typer.Exit()

main(version=False)

HEDit CLI - Generate HED annotations from natural language.

Convert event descriptions to valid HED (Hierarchical Event Descriptors) annotations using AI-powered multi-agent system.

Get started

hedit init --api-key YOUR_OPENROUTER_KEY hedit annotate "A red circle appears on screen"

Source code in hedit/src/cli/main.py
@app.callback()
def main(
    version: Annotated[
        bool,
        typer.Option(
            "--version",
            "-V",
            callback=version_callback,
            is_eager=True,
            help="Show version and exit",
        ),
    ] = False,
) -> None:
    """HEDit CLI - Generate HED annotations from natural language.

    Convert event descriptions to valid HED (Hierarchical Event Descriptors)
    annotations using AI-powered multi-agent system.

    Get started:
        hedit init --api-key YOUR_OPENROUTER_KEY
        hedit annotate "A red circle appears on screen"
    """
    pass

init(api_key=None, api_url=None, model=None, provider=None, temperature=None)

Initialize HEDit CLI with your API key and preferences.

This saves your configuration to ~/.config/hedit/ so you don't need to provide the API key for every command.

Get an OpenRouter API key at: https://openrouter.ai/keys

Source code in hedit/src/cli/main.py
@app.command()
def init(
    api_key: Annotated[
        str | None,
        typer.Option(
            "--api-key",
            "-k",
            help="OpenRouter API key (get one at https://openrouter.ai/keys)",
            prompt="OpenRouter API key",
            hide_input=True,
        ),
    ] = None,
    api_url: ApiUrlOption = None,
    model: Annotated[
        str | None,
        typer.Option(
            "--model",
            "-m",
            help="Default model for annotation",
        ),
    ] = None,
    provider: Annotated[
        str | None,
        typer.Option(
            "--provider",
            help="Provider preference (e.g., Cerebras for fast inference)",
        ),
    ] = None,
    temperature: Annotated[
        float | None,
        typer.Option(
            "--temperature",
            "-t",
            help="LLM temperature (0.0-1.0, lower = more consistent)",
        ),
    ] = None,
) -> None:
    """Initialize HEDit CLI with your API key and preferences.

    This saves your configuration to ~/.config/hedit/ so you don't need
    to provide the API key for every command.

    Get an OpenRouter API key at: https://openrouter.ai/keys
    """
    # Load existing config
    config = load_config()
    creds = load_credentials()

    # Update with provided values
    if api_key:
        creds.openrouter_api_key = api_key
    if api_url:
        config.api.url = api_url
    if model:
        config.models.default = model
    if provider:
        config.models.provider = provider
    if temperature is not None:
        config.models.temperature = temperature

    # Save
    save_credentials(creds)
    save_config(config)

    output.print_success("Configuration saved!")
    output.print_info(f"Config file: {CONFIG_FILE}")
    output.print_info(f"Credentials: {CREDENTIALS_FILE}")

    # Test connection
    if creds.openrouter_api_key:
        output.print_progress("Testing API connection")
        try:
            client = create_client(config, creds.openrouter_api_key)
            health = client.health()
            if health.get("status") == "healthy":
                output.print_success("API connection successful!")
            else:
                output.print_info(f"API status: {health.get('status', 'unknown')}")
        except APIError as e:
            output.print_error(f"Could not connect to API: {e}", hint="Check your API key and URL")
        except Exception as e:
            output.print_error(f"Connection test failed: {e}")

annotate(description, api_key=None, api_url=None, model=None, provider=None, temperature=None, schema_version=None, output_format='text', max_attempts=5, assessment=False, verbose=False)

Generate HED annotation from a text description.

Examples:

hedit annotate "A red circle appears on the left side of the screen" hedit annotate "Participant pressed the spacebar" --schema 8.4.0 hedit annotate "Audio beep plays" -o json > result.json hedit annotate "..." --model gpt-4o-mini --temperature 0.2

Source code in hedit/src/cli/main.py
@app.command()
def annotate(
    description: Annotated[
        str,
        typer.Argument(help="Natural language event description"),
    ],
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    model: ModelOption = None,
    provider: ProviderOption = None,
    temperature: TemperatureOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
    max_attempts: Annotated[
        int,
        typer.Option(
            "--max-attempts",
            help="Maximum validation attempts",
        ),
    ] = 5,
    assessment: Annotated[
        bool,
        typer.Option(
            "--assessment/--no-assessment",
            help="Run completeness assessment",
        ),
    ] = False,
    verbose: VerboseOption = False,
) -> None:
    """Generate HED annotation from a text description.

    Examples:
        hedit annotate "A red circle appears on the left side of the screen"
        hedit annotate "Participant pressed the spacebar" --schema 8.4.0
        hedit annotate "Audio beep plays" -o json > result.json
        hedit annotate "..." --model gpt-4o-mini --temperature 0.2
    """
    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        model=model,
        provider=provider,
        temperature=temperature,
        schema_version=schema_version,
        output_format=output_format,
    )

    if not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key",
        )
        raise typer.Exit(1)

    # Show progress if not piped
    if not output.is_piped():
        output.print_progress("Generating HED annotation")

    try:
        client = create_client(config, effective_key)
        result = client.annotate(
            description=description,
            schema_version=schema_version or config.settings.schema_version,
            max_validation_attempts=max_attempts,
            run_assessment=assessment,
        )
        output.print_annotation_result(result, output_format, verbose)

        # Exit with error code if annotation failed
        if result.get("status") != "success" or not result.get("is_valid"):
            raise typer.Exit(1)

    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

annotate_image(image, prompt=None, api_key=None, api_url=None, model=None, provider=None, temperature=None, schema_version=None, output_format='text', max_attempts=5, assessment=False, verbose=False)

Generate HED annotation from an image.

First generates a description using a vision model, then annotates it.

Examples:

hedit annotate-image stimulus.png hedit annotate-image photo.jpg --prompt "Describe the experimental setup" hedit annotate-image screen.png -o json > result.json

Source code in hedit/src/cli/main.py
@app.command("annotate-image")
def annotate_image(
    image: Annotated[
        Path,
        typer.Argument(help="Path to image file (PNG, JPG, etc.)"),
    ],
    prompt: Annotated[
        str | None,
        typer.Option(
            "--prompt",
            help="Custom prompt for vision model",
        ),
    ] = None,
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    model: ModelOption = None,
    provider: ProviderOption = None,
    temperature: TemperatureOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
    max_attempts: Annotated[
        int,
        typer.Option(
            "--max-attempts",
            help="Maximum validation attempts",
        ),
    ] = 5,
    assessment: Annotated[
        bool,
        typer.Option(
            "--assessment/--no-assessment",
            help="Run completeness assessment",
        ),
    ] = False,
    verbose: VerboseOption = False,
) -> None:
    """Generate HED annotation from an image.

    First generates a description using a vision model, then annotates it.

    Examples:
        hedit annotate-image stimulus.png
        hedit annotate-image photo.jpg --prompt "Describe the experimental setup"
        hedit annotate-image screen.png -o json > result.json
    """
    # Validate image exists
    if not image.exists():
        output.print_error(f"Image file not found: {image}")
        raise typer.Exit(1)

    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        model=model,
        provider=provider,
        temperature=temperature,
        schema_version=schema_version,
        output_format=output_format,
    )

    if not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key",
        )
        raise typer.Exit(1)

    if not output.is_piped():
        output.print_progress("Analyzing image and generating HED annotation")

    try:
        client = create_client(config, effective_key)
        result = client.annotate_image(
            image_path=image,
            prompt=prompt,
            schema_version=schema_version or config.settings.schema_version,
            max_validation_attempts=max_attempts,
            run_assessment=assessment,
        )
        output.print_image_annotation_result(result, output_format, verbose)

        if result.get("status") != "success" or not result.get("is_valid"):
            raise typer.Exit(1)

    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

validate(hed_string, api_key=None, api_url=None, schema_version=None, output_format='text')

Validate a HED annotation string.

Checks if the HED string is syntactically correct and semantically valid according to the HED schema.

Examples:

hedit validate "Sensory-event, Visual-presentation" hedit validate "(Red, Circle)" --schema 8.4.0 hedit validate "Event" -o json

Source code in hedit/src/cli/main.py
@app.command()
def validate(
    hed_string: Annotated[
        str,
        typer.Argument(help="HED annotation string to validate"),
    ],
    api_key: ApiKeyOption = None,
    api_url: ApiUrlOption = None,
    schema_version: SchemaVersionOption = None,
    output_format: OutputFormatOption = "text",
) -> None:
    """Validate a HED annotation string.

    Checks if the HED string is syntactically correct and semantically valid
    according to the HED schema.

    Examples:
        hedit validate "Sensory-event, Visual-presentation"
        hedit validate "(Red, Circle)" --schema 8.4.0
        hedit validate "Event" -o json
    """
    config, effective_key = get_effective_config(
        api_key=api_key,
        api_url=api_url,
        schema_version=schema_version,
        output_format=output_format,
    )

    if not effective_key:
        output.print_error(
            "No API key configured",
            hint="Run 'hedit init' or provide --api-key",
        )
        raise typer.Exit(1)

    if not output.is_piped():
        output.print_progress("Validating HED string")

    try:
        client = create_client(config, effective_key)
        result = client.validate(
            hed_string=hed_string,
            schema_version=schema_version or config.settings.schema_version,
        )
        output.print_validation_result(result, output_format)

        if not result.get("is_valid"):
            raise typer.Exit(1)

    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None

config_show(show_key=False)

Show current configuration.

Source code in hedit/src/cli/main.py
@config_app.command("show")
def config_show(
    show_key: Annotated[
        bool,
        typer.Option(
            "--show-key",
            help="Show full API key (default: masked)",
        ),
    ] = False,
) -> None:
    """Show current configuration."""
    config = load_config()
    creds = load_credentials()

    # Merge for display
    config_dict = config.model_dump()
    config_dict["credentials"] = {"openrouter_api_key": creds.openrouter_api_key}

    output.print_config(config_dict, show_key)

    # Show file paths
    paths = get_config_paths()
    output.print_info(f"\nConfig directory: {paths['config_dir']}")

config_set(key, value)

Set a configuration value.

Examples:

hedit config set models.default gpt-4o hedit config set settings.temperature 0.2 hedit config set api.url https://api.example.com/hedit

Source code in hedit/src/cli/main.py
@config_app.command("set")
def config_set(
    key: Annotated[
        str,
        typer.Argument(help="Config key (e.g., models.default, settings.temperature)"),
    ],
    value: Annotated[
        str,
        typer.Argument(help="New value"),
    ],
) -> None:
    """Set a configuration value.

    Examples:
        hedit config set models.default gpt-4o
        hedit config set settings.temperature 0.2
        hedit config set api.url https://api.example.com/hedit
    """
    try:
        update_config(key, value)
        output.print_success(f"Set {key} = {value}")
    except ValueError as e:
        output.print_error(str(e))
        raise typer.Exit(1) from None

config_path()

Show configuration file paths.

Source code in hedit/src/cli/main.py
@config_app.command("path")
def config_path() -> None:
    """Show configuration file paths."""
    paths = get_config_paths()
    console.print(f"Config directory: {paths['config_dir']}")
    console.print(f"Config file: {paths['config_file']}")
    console.print(f"Credentials file: {paths['credentials_file']}")

config_clear_credentials(force=False)

Remove stored API credentials.

Source code in hedit/src/cli/main.py
@config_app.command("clear-credentials")
def config_clear_credentials(
    force: Annotated[
        bool,
        typer.Option(
            "--force",
            "-f",
            help="Skip confirmation",
        ),
    ] = False,
) -> None:
    """Remove stored API credentials."""
    if not force:
        confirm = typer.confirm("Are you sure you want to remove stored credentials?")
        if not confirm:
            raise typer.Abort()

    clear_credentials()
    output.print_success("Credentials removed")

health(api_url=None)

Check API health status.

Source code in hedit/src/cli/main.py
@app.command()
def health(
    api_url: ApiUrlOption = None,
) -> None:
    """Check API health status."""
    config, _ = get_effective_config(api_url=api_url)

    try:
        client = create_client(config)
        result = client.health()

        status = result.get("status", "unknown")
        version = result.get("version", "unknown")
        llm = "[green][x][/]" if result.get("llm_available") else "[red][ ][/]"
        validator = "[green][x][/]" if result.get("validator_available") else "[red][ ][/]"

        console.print(f"API: {config.api.url}")
        console.print(f"Status: [bold]{status}[/]")
        console.print(f"Version: {version}")
        console.print(f"LLM: {llm}")
        console.print(f"Validator: {validator}")

    except APIError as e:
        output.print_error(str(e), hint=e.detail)
        raise typer.Exit(1) from None
    except Exception as e:
        output.print_error(f"Could not connect to API: {e}")
        raise typer.Exit(1) from None

cli()

Entry point for CLI.

Source code in hedit/src/cli/main.py
def cli() -> None:
    """Entry point for CLI."""
    app()

Configuration

src.cli.config

Configuration management for HEDit CLI.

Handles persistent storage of API keys and settings in a cross-platform config directory. Supports environment variables as fallback/override.

CredentialsConfig

Bases: BaseModel

Credentials stored separately with restricted permissions.

Source code in hedit/src/cli/config.py
class CredentialsConfig(BaseModel):
    """Credentials stored separately with restricted permissions."""

    openrouter_api_key: str | None = Field(default=None, description="OpenRouter API key")

ModelsConfig

Bases: BaseModel

Model configuration for different agents.

Source code in hedit/src/cli/config.py
class ModelsConfig(BaseModel):
    """Model configuration for different agents."""

    default: str = Field(default=DEFAULT_MODEL, description="Default model for annotation")
    vision: str = Field(default=DEFAULT_VISION_MODEL, description="Vision model for images")
    provider: str | None = Field(default=DEFAULT_PROVIDER, description="Provider preference")
    temperature: float = Field(default=0.1, ge=0.0, le=1.0, description="Model temperature")

SettingsConfig

Bases: BaseModel

General settings.

Source code in hedit/src/cli/config.py
class SettingsConfig(BaseModel):
    """General settings."""

    schema_version: str = Field(default="8.4.0", description="HED schema version")
    max_validation_attempts: int = Field(default=5, ge=1, le=10, description="Max retries")
    run_assessment: bool = Field(default=False, description="Run assessment by default")

OutputConfig

Bases: BaseModel

Output formatting settings.

Source code in hedit/src/cli/config.py
class OutputConfig(BaseModel):
    """Output formatting settings."""

    format: str = Field(default="text", description="Output format (text, json)")
    color: bool = Field(default=True, description="Enable colored output")
    verbose: bool = Field(default=False, description="Verbose output")

APIConfig

Bases: BaseModel

API endpoint configuration.

Source code in hedit/src/cli/config.py
class APIConfig(BaseModel):
    """API endpoint configuration."""

    url: str = Field(default=DEFAULT_API_URL, description="API endpoint URL")

CLIConfig

Bases: BaseModel

Complete CLI configuration.

Source code in hedit/src/cli/config.py
class CLIConfig(BaseModel):
    """Complete CLI configuration."""

    api: APIConfig = Field(default_factory=APIConfig)
    models: ModelsConfig = Field(default_factory=ModelsConfig)
    settings: SettingsConfig = Field(default_factory=SettingsConfig)
    output: OutputConfig = Field(default_factory=OutputConfig)

ensure_config_dir()

Create config directory if it doesn't exist.

Source code in hedit/src/cli/config.py
def ensure_config_dir() -> None:
    """Create config directory if it doesn't exist."""
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)

load_credentials()

Load credentials from file or environment.

Environment variables take precedence over stored credentials.

Source code in hedit/src/cli/config.py
def load_credentials() -> CredentialsConfig:
    """Load credentials from file or environment.

    Environment variables take precedence over stored credentials.
    """
    creds = CredentialsConfig()

    # Try loading from file first
    if CREDENTIALS_FILE.exists():
        try:
            with open(CREDENTIALS_FILE) as f:
                data = yaml.safe_load(f) or {}
                creds = CredentialsConfig(**data)
        except (yaml.YAMLError, ValueError):
            pass  # Use defaults if file is corrupted

    # Environment variables override file
    env_key = os.environ.get("OPENROUTER_API_KEY")
    if env_key:
        creds.openrouter_api_key = env_key

    return creds

save_credentials(creds)

Save credentials to file with restricted permissions.

Source code in hedit/src/cli/config.py
def save_credentials(creds: CredentialsConfig) -> None:
    """Save credentials to file with restricted permissions."""
    ensure_config_dir()

    # Write credentials
    with open(CREDENTIALS_FILE, "w") as f:
        yaml.dump(creds.model_dump(exclude_none=True), f, default_flow_style=False)

    # Restrict permissions (Unix only)
    try:
        os.chmod(CREDENTIALS_FILE, 0o600)
    except (OSError, AttributeError):
        pass  # Windows doesn't support chmod the same way

load_config()

Load configuration from file.

Source code in hedit/src/cli/config.py
def load_config() -> CLIConfig:
    """Load configuration from file."""
    if not CONFIG_FILE.exists():
        return CLIConfig()

    try:
        with open(CONFIG_FILE) as f:
            data = yaml.safe_load(f) or {}
            return CLIConfig(**data)
    except (yaml.YAMLError, ValueError):
        return CLIConfig()

save_config(config)

Save configuration to file.

Source code in hedit/src/cli/config.py
def save_config(config: CLIConfig) -> None:
    """Save configuration to file."""
    ensure_config_dir()

    with open(CONFIG_FILE, "w") as f:
        yaml.dump(config.model_dump(), f, default_flow_style=False)

get_api_key(override=None)

Get API key with priority: override > env > stored.

Parameters:

Name Type Description Default
override str | None

Explicit API key from command line

None

Returns:

Type Description
str | None

API key or None if not configured

Source code in hedit/src/cli/config.py
def get_api_key(override: str | None = None) -> str | None:
    """Get API key with priority: override > env > stored.

    Args:
        override: Explicit API key from command line

    Returns:
        API key or None if not configured
    """
    if override:
        return override

    creds = load_credentials()
    return creds.openrouter_api_key

get_effective_config(api_key=None, api_url=None, model=None, provider=None, temperature=None, schema_version=None, output_format=None)

Get effective config with command-line overrides applied.

Parameters:

Name Type Description Default
api_key str | None

Override API key

None
api_url str | None

Override API URL

None
model str | None

Override model (if non-default, clears provider unless explicitly set)

None
provider str | None

Override provider preference (e.g., "Cerebras")

None
temperature float | None

Override temperature

None
schema_version str | None

Override schema version

None
output_format str | None

Override output format

None

Returns:

Type Description
tuple[CLIConfig, str | None]

Tuple of (effective config, effective API key)

Note

When a custom model is specified without an explicit provider, the provider is cleared. This is because the default provider (Cerebras) only supports specific models.

Source code in hedit/src/cli/config.py
def get_effective_config(
    api_key: str | None = None,
    api_url: str | None = None,
    model: str | None = None,
    provider: str | None = None,
    temperature: float | None = None,
    schema_version: str | None = None,
    output_format: str | None = None,
) -> tuple[CLIConfig, str | None]:
    """Get effective config with command-line overrides applied.

    Args:
        api_key: Override API key
        api_url: Override API URL
        model: Override model (if non-default, clears provider unless explicitly set)
        provider: Override provider preference (e.g., "Cerebras")
        temperature: Override temperature
        schema_version: Override schema version
        output_format: Override output format

    Returns:
        Tuple of (effective config, effective API key)

    Note:
        When a custom model is specified without an explicit provider, the provider
        is cleared. This is because the default provider (Cerebras) only supports
        specific models.
    """
    config = load_config()
    effective_key = get_api_key(api_key)

    # Apply overrides
    if api_url:
        config.api.url = api_url

    # Handle model/provider interaction:
    # If user specifies a model different from default but doesn't specify provider,
    # clear the provider (since Cerebras only supports specific models)
    if model:
        config.models.default = model
        # Clear provider if model changed and provider not explicitly set
        if provider is None and model != DEFAULT_MODEL:
            config.models.provider = None
    if provider is not None:  # Allow empty string to clear provider
        config.models.provider = provider if provider else None

    if temperature is not None:
        config.models.temperature = temperature
    if schema_version:
        config.settings.schema_version = schema_version
    if output_format:
        config.output.format = output_format

    return config, effective_key

update_config(key, value)

Update a specific config value.

Parameters:

Name Type Description Default
key str

Dot-notation key (e.g., "models.default", "settings.temperature")

required
value Any

New value

required
Source code in hedit/src/cli/config.py
def update_config(key: str, value: Any) -> None:
    """Update a specific config value.

    Args:
        key: Dot-notation key (e.g., "models.default", "settings.temperature")
        value: New value
    """
    config = load_config()

    # Parse dot notation
    parts = key.split(".")
    if len(parts) == 1:
        # Top-level key not supported for safety
        raise ValueError(f"Invalid config key: {key}")
    elif len(parts) == 2:
        section, field = parts
        if hasattr(config, section):
            section_obj = getattr(config, section)
            if hasattr(section_obj, field):
                # Type coercion for common types
                current = getattr(section_obj, field)
                if isinstance(current, bool):
                    value = str(value).lower() in ("true", "1", "yes")
                elif isinstance(current, int):
                    value = int(value)
                elif isinstance(current, float):
                    value = float(value)
                setattr(section_obj, field, value)
            else:
                raise ValueError(f"Unknown field: {field} in {section}")
        else:
            raise ValueError(f"Unknown section: {section}")
    else:
        raise ValueError(f"Invalid config key format: {key}")

    save_config(config)

clear_credentials()

Remove stored credentials.

Source code in hedit/src/cli/config.py
def clear_credentials() -> None:
    """Remove stored credentials."""
    if CREDENTIALS_FILE.exists():
        CREDENTIALS_FILE.unlink()

get_config_paths()

Get paths to config files for debugging.

Source code in hedit/src/cli/config.py
def get_config_paths() -> dict[str, Path]:
    """Get paths to config files for debugging."""
    return {
        "config_dir": CONFIG_DIR,
        "config_file": CONFIG_FILE,
        "credentials_file": CREDENTIALS_FILE,
    }

API Client

src.cli.client

HTTP client for HEDit API.

Handles all API communication with proper error handling and timeout management.

APIError

Bases: Exception

API request error.

Source code in hedit/src/cli/client.py
class APIError(Exception):
    """API request error."""

    def __init__(self, message: str, status_code: int | None = None, detail: str | None = None):
        super().__init__(message)
        self.status_code = status_code
        self.detail = detail

HEDitClient

Client for HEDit API.

Source code in hedit/src/cli/client.py
class HEDitClient:
    """Client for HEDit API."""

    def __init__(
        self,
        api_url: str,
        api_key: str | None = None,
        model: str | None = None,
        provider: str | None = None,
        temperature: float | None = None,
        timeout: httpx.Timeout = DEFAULT_TIMEOUT,
    ):
        """Initialize client.

        Args:
            api_url: Base API URL
            api_key: OpenRouter API key for BYOK mode
            model: Model to use for annotation
            provider: Provider preference (e.g., "Cerebras")
            temperature: LLM temperature (0.0-1.0)
            timeout: Request timeout settings
        """
        self.api_url = api_url.rstrip("/")
        self.api_key = api_key
        self.model = model
        self.provider = provider
        self.temperature = temperature
        self.timeout = timeout

    def _get_headers(self) -> dict[str, str]:
        """Get request headers with BYOK configuration."""
        headers = {
            "Content-Type": "application/json",
            "User-Agent": "hedit-cli",
        }
        if self.api_key:
            # Use X-OpenRouter-Key header for BYOK mode
            headers["X-OpenRouter-Key"] = self.api_key
        # Include model configuration in headers for BYOK
        if self.model:
            headers["X-OpenRouter-Model"] = self.model
        if self.provider:
            headers["X-OpenRouter-Provider"] = self.provider
        if self.temperature is not None:
            headers["X-OpenRouter-Temperature"] = str(self.temperature)
        return headers

    def _handle_response(self, response: httpx.Response) -> dict[str, Any]:
        """Handle API response and errors.

        Args:
            response: HTTP response

        Returns:
            Response JSON data

        Raises:
            APIError: If request failed
        """
        if response.status_code == 200:
            return response.json()

        # Parse error detail
        try:
            error_data = response.json()
            detail = error_data.get("detail", str(error_data))
        except Exception:
            detail = response.text

        if response.status_code == 401:
            raise APIError(
                "Authentication required",
                status_code=401,
                detail="Please provide an OpenRouter API key with --api-key or run 'hedit init'",
            )
        elif response.status_code == 422:
            raise APIError(
                "Invalid request",
                status_code=422,
                detail=detail,
            )
        elif response.status_code == 500:
            raise APIError(
                "Server error",
                status_code=500,
                detail=detail,
            )
        elif response.status_code == 503:
            raise APIError(
                "Service unavailable",
                status_code=503,
                detail="The API is temporarily unavailable. Please try again later.",
            )
        else:
            raise APIError(
                f"Request failed with status {response.status_code}",
                status_code=response.status_code,
                detail=detail,
            )

    def annotate(
        self,
        description: str,
        schema_version: str = "8.3.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
    ) -> dict[str, Any]:
        """Generate HED annotation from text description.

        Args:
            description: Natural language event description
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment

        Returns:
            Annotation response dictionary
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/annotate",
                headers=self._get_headers(),
                json={
                    "description": description,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                },
            )
            return self._handle_response(response)

    def annotate_image(
        self,
        image_path: Path | str,
        prompt: str | None = None,
        schema_version: str = "8.4.0",
        max_validation_attempts: int = 5,
        run_assessment: bool = False,
    ) -> dict[str, Any]:
        """Generate HED annotation from image.

        Args:
            image_path: Path to image file
            prompt: Optional custom prompt for vision model
            schema_version: HED schema version
            max_validation_attempts: Maximum validation retries
            run_assessment: Whether to run assessment

        Returns:
            Annotation response dictionary
        """
        # Read and encode image
        image_path = Path(image_path)
        if not image_path.exists():
            raise APIError(f"Image file not found: {image_path}")

        # Detect MIME type
        suffix = image_path.suffix.lower()
        mime_types = {
            ".png": "image/png",
            ".jpg": "image/jpeg",
            ".jpeg": "image/jpeg",
            ".gif": "image/gif",
            ".webp": "image/webp",
        }
        mime_type = mime_types.get(suffix, "image/png")

        # Read and encode
        with open(image_path, "rb") as f:
            image_data = base64.b64encode(f.read()).decode("utf-8")

        image_uri = f"data:{mime_type};base64,{image_data}"

        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/annotate-from-image",
                headers=self._get_headers(),
                json={
                    "image": image_uri,
                    "prompt": prompt,
                    "schema_version": schema_version,
                    "max_validation_attempts": max_validation_attempts,
                    "run_assessment": run_assessment,
                },
            )
            return self._handle_response(response)

    def validate(
        self,
        hed_string: str,
        schema_version: str = "8.3.0",
    ) -> dict[str, Any]:
        """Validate HED string.

        Args:
            hed_string: HED annotation to validate
            schema_version: HED schema version

        Returns:
            Validation response dictionary
        """
        with httpx.Client(timeout=self.timeout) as client:
            response = client.post(
                f"{self.api_url}/validate",
                headers=self._get_headers(),
                json={
                    "hed_string": hed_string,
                    "schema_version": schema_version,
                },
            )
            return self._handle_response(response)

    def health(self) -> dict[str, Any]:
        """Check API health.

        Returns:
            Health status dictionary
        """
        with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
            response = client.get(f"{self.api_url}/health")
            return self._handle_response(response)

    def version(self) -> dict[str, Any]:
        """Get API version info.

        Returns:
            Version information dictionary
        """
        with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
            response = client.get(f"{self.api_url}/version")
            return self._handle_response(response)

__init__(api_url, api_key=None, model=None, provider=None, temperature=None, timeout=DEFAULT_TIMEOUT)

Initialize client.

Parameters:

Name Type Description Default
api_url str

Base API URL

required
api_key str | None

OpenRouter API key for BYOK mode

None
model str | None

Model to use for annotation

None
provider str | None

Provider preference (e.g., "Cerebras")

None
temperature float | None

LLM temperature (0.0-1.0)

None
timeout Timeout

Request timeout settings

DEFAULT_TIMEOUT
Source code in hedit/src/cli/client.py
def __init__(
    self,
    api_url: str,
    api_key: str | None = None,
    model: str | None = None,
    provider: str | None = None,
    temperature: float | None = None,
    timeout: httpx.Timeout = DEFAULT_TIMEOUT,
):
    """Initialize client.

    Args:
        api_url: Base API URL
        api_key: OpenRouter API key for BYOK mode
        model: Model to use for annotation
        provider: Provider preference (e.g., "Cerebras")
        temperature: LLM temperature (0.0-1.0)
        timeout: Request timeout settings
    """
    self.api_url = api_url.rstrip("/")
    self.api_key = api_key
    self.model = model
    self.provider = provider
    self.temperature = temperature
    self.timeout = timeout

annotate(description, schema_version='8.3.0', max_validation_attempts=5, run_assessment=False)

Generate HED annotation from text description.

Parameters:

Name Type Description Default
description str

Natural language event description

required
schema_version str

HED schema version

'8.3.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False

Returns:

Type Description
dict[str, Any]

Annotation response dictionary

Source code in hedit/src/cli/client.py
def annotate(
    self,
    description: str,
    schema_version: str = "8.3.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
) -> dict[str, Any]:
    """Generate HED annotation from text description.

    Args:
        description: Natural language event description
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment

    Returns:
        Annotation response dictionary
    """
    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/annotate",
            headers=self._get_headers(),
            json={
                "description": description,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
            },
        )
        return self._handle_response(response)

annotate_image(image_path, prompt=None, schema_version='8.4.0', max_validation_attempts=5, run_assessment=False)

Generate HED annotation from image.

Parameters:

Name Type Description Default
image_path Path | str

Path to image file

required
prompt str | None

Optional custom prompt for vision model

None
schema_version str

HED schema version

'8.4.0'
max_validation_attempts int

Maximum validation retries

5
run_assessment bool

Whether to run assessment

False

Returns:

Type Description
dict[str, Any]

Annotation response dictionary

Source code in hedit/src/cli/client.py
def annotate_image(
    self,
    image_path: Path | str,
    prompt: str | None = None,
    schema_version: str = "8.4.0",
    max_validation_attempts: int = 5,
    run_assessment: bool = False,
) -> dict[str, Any]:
    """Generate HED annotation from image.

    Args:
        image_path: Path to image file
        prompt: Optional custom prompt for vision model
        schema_version: HED schema version
        max_validation_attempts: Maximum validation retries
        run_assessment: Whether to run assessment

    Returns:
        Annotation response dictionary
    """
    # Read and encode image
    image_path = Path(image_path)
    if not image_path.exists():
        raise APIError(f"Image file not found: {image_path}")

    # Detect MIME type
    suffix = image_path.suffix.lower()
    mime_types = {
        ".png": "image/png",
        ".jpg": "image/jpeg",
        ".jpeg": "image/jpeg",
        ".gif": "image/gif",
        ".webp": "image/webp",
    }
    mime_type = mime_types.get(suffix, "image/png")

    # Read and encode
    with open(image_path, "rb") as f:
        image_data = base64.b64encode(f.read()).decode("utf-8")

    image_uri = f"data:{mime_type};base64,{image_data}"

    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/annotate-from-image",
            headers=self._get_headers(),
            json={
                "image": image_uri,
                "prompt": prompt,
                "schema_version": schema_version,
                "max_validation_attempts": max_validation_attempts,
                "run_assessment": run_assessment,
            },
        )
        return self._handle_response(response)

validate(hed_string, schema_version='8.3.0')

Validate HED string.

Parameters:

Name Type Description Default
hed_string str

HED annotation to validate

required
schema_version str

HED schema version

'8.3.0'

Returns:

Type Description
dict[str, Any]

Validation response dictionary

Source code in hedit/src/cli/client.py
def validate(
    self,
    hed_string: str,
    schema_version: str = "8.3.0",
) -> dict[str, Any]:
    """Validate HED string.

    Args:
        hed_string: HED annotation to validate
        schema_version: HED schema version

    Returns:
        Validation response dictionary
    """
    with httpx.Client(timeout=self.timeout) as client:
        response = client.post(
            f"{self.api_url}/validate",
            headers=self._get_headers(),
            json={
                "hed_string": hed_string,
                "schema_version": schema_version,
            },
        )
        return self._handle_response(response)

health()

Check API health.

Returns:

Type Description
dict[str, Any]

Health status dictionary

Source code in hedit/src/cli/client.py
def health(self) -> dict[str, Any]:
    """Check API health.

    Returns:
        Health status dictionary
    """
    with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
        response = client.get(f"{self.api_url}/health")
        return self._handle_response(response)

version()

Get API version info.

Returns:

Type Description
dict[str, Any]

Version information dictionary

Source code in hedit/src/cli/client.py
def version(self) -> dict[str, Any]:
    """Get API version info.

    Returns:
        Version information dictionary
    """
    with httpx.Client(timeout=httpx.Timeout(10.0)) as client:
        response = client.get(f"{self.api_url}/version")
        return self._handle_response(response)

create_client(config, api_key=None)

Create API client from config.

Parameters:

Name Type Description Default
config CLIConfig

CLI configuration

required
api_key str | None

API key (overrides config)

None

Returns:

Type Description
HEDitClient

Configured HEDitClient

Source code in hedit/src/cli/client.py
def create_client(config: CLIConfig, api_key: str | None = None) -> HEDitClient:
    """Create API client from config.

    Args:
        config: CLI configuration
        api_key: API key (overrides config)

    Returns:
        Configured HEDitClient
    """
    return HEDitClient(
        api_url=config.api.url,
        api_key=api_key,
        model=config.models.default,
        provider=config.models.provider,
        temperature=config.models.temperature,
    )

Workflow

The multi-agent annotation workflow:

src.agents.workflow

LangGraph workflow for HED annotation generation.

This module defines the multi-agent workflow that orchestrates annotation, validation, evaluation, and assessment.

HedAnnotationWorkflow

Multi-agent workflow for HED annotation generation and validation.

The workflow follows this pattern: 1. Annotation: Generate HED tags from natural language 2. Validation: Check HED compliance 3. If errors and attempts < max: Return to annotation with feedback 4. If valid: Proceed to evaluation 5. Evaluation: Assess faithfulness to original description 6. If needs refinement: Return to annotation 7. If faithful: Proceed to assessment 8. Assessment: Final comparison for completeness 9. End: Return final annotation with feedback

Source code in hedit/src/agents/workflow.py
class HedAnnotationWorkflow:
    """Multi-agent workflow for HED annotation generation and validation.

    The workflow follows this pattern:
    1. Annotation: Generate HED tags from natural language
    2. Validation: Check HED compliance
    3. If errors and attempts < max: Return to annotation with feedback
    4. If valid: Proceed to evaluation
    5. Evaluation: Assess faithfulness to original description
    6. If needs refinement: Return to annotation
    7. If faithful: Proceed to assessment
    8. Assessment: Final comparison for completeness
    9. End: Return final annotation with feedback
    """

    def __init__(
        self,
        llm: BaseChatModel,
        evaluation_llm: BaseChatModel | None = None,
        assessment_llm: BaseChatModel | None = None,
        feedback_llm: BaseChatModel | None = None,
        schema_dir: Path | str | None = None,
        validator_path: Path | None = None,
        use_js_validator: bool = True,
    ) -> None:
        """Initialize the workflow.

        Args:
            llm: Language model for annotation agent
            evaluation_llm: Language model for evaluation agent (defaults to llm)
            assessment_llm: Language model for assessment agent (defaults to llm)
            feedback_llm: Language model for feedback summarization (defaults to llm)
            schema_dir: Directory containing JSON schemas
            validator_path: Path to hed-javascript for validation
            use_js_validator: Whether to use JavaScript validator
        """
        # Store schema directory (None means use HED library to fetch from GitHub)
        self.schema_dir = schema_dir

        # Initialize legacy schema loader for validation
        self.schema_loader = HedSchemaLoader()

        # Use provided LLMs or default to main llm
        eval_llm = evaluation_llm or llm
        assess_llm = assessment_llm or llm
        feed_llm = feedback_llm or llm

        # Initialize agents with JSON schema support and per-agent LLMs
        self.annotation_agent = AnnotationAgent(llm, schema_dir=self.schema_dir)
        self.validation_agent = ValidationAgent(
            self.schema_loader,
            use_javascript=use_js_validator,
            validator_path=validator_path,
        )
        self.evaluation_agent = EvaluationAgent(eval_llm, schema_dir=self.schema_dir)
        self.assessment_agent = AssessmentAgent(assess_llm, schema_dir=self.schema_dir)
        self.feedback_summarizer = FeedbackSummarizer(feed_llm)

        # Build graph
        self.graph = self._build_graph()

    def _build_graph(self) -> StateGraph:
        """Build the LangGraph workflow.

        Returns:
            Compiled StateGraph
        """
        # Create graph
        workflow = StateGraph(HedAnnotationState)

        # Add nodes
        workflow.add_node("annotate", self._annotate_node)
        workflow.add_node("validate", self._validate_node)
        workflow.add_node("summarize_feedback", self._summarize_feedback_node)
        workflow.add_node("evaluate", self._evaluate_node)
        workflow.add_node("assess", self._assess_node)

        # Add edges
        workflow.set_entry_point("annotate")

        # After annotation, always validate
        workflow.add_edge("annotate", "validate")

        # After validation, route based on result
        workflow.add_conditional_edges(
            "validate",
            self._route_after_validation,
            {
                "summarize_feedback": "summarize_feedback",  # Summarize feedback if invalid
                "evaluate": "evaluate",  # Proceed if valid
                "end": END,  # End if max attempts reached
            },
        )

        # After feedback summarization, go to annotation
        workflow.add_edge("summarize_feedback", "annotate")

        # After evaluation, route based on faithfulness
        workflow.add_conditional_edges(
            "evaluate",
            self._route_after_evaluation,
            {
                "summarize_feedback": "summarize_feedback",  # Summarize feedback if not faithful
                "assess": "assess",  # Proceed to assessment if needed
                "end": END,  # Skip assessment if valid and faithful
            },
        )

        # After assessment, always end
        workflow.add_edge("assess", END)

        return workflow.compile()

    async def _annotate_node(self, state: HedAnnotationState) -> dict:
        """Annotation node: Generate or refine HED annotation.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        total_iters = state.get("total_iterations", 0) + 1
        print(
            f"[WORKFLOW] Entering annotate node (validation attempt {state['validation_attempts']}, total iteration {total_iters})"
        )
        result = await self.annotation_agent.annotate(state)
        result["total_iterations"] = total_iters  # Increment counter
        print(f"[WORKFLOW] Annotation generated: {result.get('current_annotation', '')[:100]}...")
        return result

    async def _validate_node(self, state: HedAnnotationState) -> dict:
        """Validation node: Validate HED annotation.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        print("[WORKFLOW] Entering validate node")
        result = await self.validation_agent.validate(state)
        print(
            f"[WORKFLOW] Validation result: {result.get('validation_status')}, is_valid: {result.get('is_valid')}"
        )
        if not result.get("is_valid"):
            print(f"[WORKFLOW] Validation errors: {result.get('validation_errors', [])}")
        return result

    async def _evaluate_node(self, state: HedAnnotationState) -> dict:
        """Evaluation node: Evaluate annotation faithfulness.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        print("[WORKFLOW] Entering evaluate node")
        result = await self.evaluation_agent.evaluate(state)
        print(f"[WORKFLOW] Evaluation result: is_faithful={result.get('is_faithful')}")

        # Set default assessment values if assessment will be skipped
        run_assessment = state.get("run_assessment", False)
        if not run_assessment:
            result["is_complete"] = result.get("is_faithful", False) and state.get(
                "is_valid", False
            )
            if result["is_complete"]:
                result["assessment_feedback"] = (
                    "Annotation is valid and faithful to the original description."
                )
            else:
                result["assessment_feedback"] = ""

        return result

    async def _assess_node(self, state: HedAnnotationState) -> dict:
        """Assessment node: Final assessment.

        Args:
            state: Current workflow state

        Returns:
            State update
        """
        return await self.assessment_agent.assess(state)

    async def _summarize_feedback_node(self, state: HedAnnotationState) -> dict:
        """Summarize feedback node: Condense errors and feedback.

        Args:
            state: Current workflow state

        Returns:
            State update with summarized feedback
        """
        print("[WORKFLOW] Entering summarize_feedback node")
        result = await self.feedback_summarizer.summarize(state)
        print(
            f"[WORKFLOW] Feedback summarized: {result.get('validation_errors_augmented', [''])[0][:100] if result.get('validation_errors_augmented') else 'No feedback'}..."
        )
        return result

    def _route_after_validation(
        self,
        state: HedAnnotationState,
    ) -> str:
        """Route after validation based on result.

        Args:
            state: Current workflow state

        Returns:
            Next node name
        """
        if state["validation_status"] == "valid":
            print("[WORKFLOW] Routing to evaluate (validation passed)")
            return "evaluate"
        elif state["validation_status"] == "max_attempts_reached":
            print("[WORKFLOW] Routing to end (max validation attempts reached)")
            return "end"
        else:
            print(
                f"[WORKFLOW] Routing to summarize_feedback (validation failed, attempts: {state['validation_attempts']}/{state['max_validation_attempts']})"
            )
            return "summarize_feedback"

    def _route_after_evaluation(
        self,
        state: HedAnnotationState,
    ) -> str:
        """Route after evaluation based on faithfulness.

        Args:
            state: Current workflow state

        Returns:
            Next node name
        """
        # Check if max total iterations reached
        total_iters = state.get("total_iterations", 0)
        max_iters = state.get("max_total_iterations", 10)
        run_assessment = state.get("run_assessment", False)

        if total_iters >= max_iters:
            # Only run assessment at max iterations if explicitly requested
            if run_assessment:
                print(f"[WORKFLOW] Routing to assess (max total iterations {max_iters} reached)")
                return "assess"
            else:
                print(
                    "[WORKFLOW] Skipping assessment (max iterations reached, assessment not requested) - routing to END"
                )
                return "end"

        if state["is_faithful"]:
            # Only run assessment if explicitly requested
            if state.get("is_valid") and run_assessment:
                print(
                    "[WORKFLOW] Routing to assess (annotation is valid and faithful, assessment requested)"
                )
                return "assess"
            elif state.get("is_valid"):
                print(
                    "[WORKFLOW] Skipping assessment (annotation is valid and faithful, assessment not requested) - routing to END"
                )
                return "end"
            elif run_assessment:
                print(
                    "[WORKFLOW] Routing to assess (annotation is faithful but has validation issues)"
                )
                return "assess"
            else:
                print(
                    "[WORKFLOW] Skipping assessment (has validation issues, assessment not requested) - routing to END"
                )
                return "end"
        else:
            print(
                f"[WORKFLOW] Routing to summarize_feedback (annotation needs refinement, iteration {total_iters}/{max_iters})"
            )
            return "summarize_feedback"

    async def run(
        self,
        input_description: str,
        schema_version: str = "8.3.0",
        max_validation_attempts: int = 5,
        max_total_iterations: int = 10,
        run_assessment: bool = False,
        config: dict | None = None,
    ) -> HedAnnotationState:
        """Run the complete annotation workflow.

        Args:
            input_description: Natural language event description
            schema_version: HED schema version to use
            max_validation_attempts: Maximum validation retry attempts
            max_total_iterations: Maximum total iterations to prevent infinite loops
            run_assessment: Whether to run final assessment (default: False)
            config: Optional LangGraph config (e.g., recursion_limit)

        Returns:
            Final workflow state with annotation and feedback
        """
        from src.agents.state import create_initial_state

        # Create initial state
        initial_state = create_initial_state(
            input_description,
            schema_version,
            max_validation_attempts,
            max_total_iterations,
            run_assessment,
        )

        # Run workflow
        final_state = await self.graph.ainvoke(initial_state, config=config)

        return final_state

__init__(llm, evaluation_llm=None, assessment_llm=None, feedback_llm=None, schema_dir=None, validator_path=None, use_js_validator=True)

Initialize the workflow.

Parameters:

Name Type Description Default
llm BaseChatModel

Language model for annotation agent

required
evaluation_llm BaseChatModel | None

Language model for evaluation agent (defaults to llm)

None
assessment_llm BaseChatModel | None

Language model for assessment agent (defaults to llm)

None
feedback_llm BaseChatModel | None

Language model for feedback summarization (defaults to llm)

None
schema_dir Path | str | None

Directory containing JSON schemas

None
validator_path Path | None

Path to hed-javascript for validation

None
use_js_validator bool

Whether to use JavaScript validator

True
Source code in hedit/src/agents/workflow.py
def __init__(
    self,
    llm: BaseChatModel,
    evaluation_llm: BaseChatModel | None = None,
    assessment_llm: BaseChatModel | None = None,
    feedback_llm: BaseChatModel | None = None,
    schema_dir: Path | str | None = None,
    validator_path: Path | None = None,
    use_js_validator: bool = True,
) -> None:
    """Initialize the workflow.

    Args:
        llm: Language model for annotation agent
        evaluation_llm: Language model for evaluation agent (defaults to llm)
        assessment_llm: Language model for assessment agent (defaults to llm)
        feedback_llm: Language model for feedback summarization (defaults to llm)
        schema_dir: Directory containing JSON schemas
        validator_path: Path to hed-javascript for validation
        use_js_validator: Whether to use JavaScript validator
    """
    # Store schema directory (None means use HED library to fetch from GitHub)
    self.schema_dir = schema_dir

    # Initialize legacy schema loader for validation
    self.schema_loader = HedSchemaLoader()

    # Use provided LLMs or default to main llm
    eval_llm = evaluation_llm or llm
    assess_llm = assessment_llm or llm
    feed_llm = feedback_llm or llm

    # Initialize agents with JSON schema support and per-agent LLMs
    self.annotation_agent = AnnotationAgent(llm, schema_dir=self.schema_dir)
    self.validation_agent = ValidationAgent(
        self.schema_loader,
        use_javascript=use_js_validator,
        validator_path=validator_path,
    )
    self.evaluation_agent = EvaluationAgent(eval_llm, schema_dir=self.schema_dir)
    self.assessment_agent = AssessmentAgent(assess_llm, schema_dir=self.schema_dir)
    self.feedback_summarizer = FeedbackSummarizer(feed_llm)

    # Build graph
    self.graph = self._build_graph()

run(input_description, schema_version='8.3.0', max_validation_attempts=5, max_total_iterations=10, run_assessment=False, config=None) async

Run the complete annotation workflow.

Parameters:

Name Type Description Default
input_description str

Natural language event description

required
schema_version str

HED schema version to use

'8.3.0'
max_validation_attempts int

Maximum validation retry attempts

5
max_total_iterations int

Maximum total iterations to prevent infinite loops

10
run_assessment bool

Whether to run final assessment (default: False)

False
config dict | None

Optional LangGraph config (e.g., recursion_limit)

None

Returns:

Type Description
HedAnnotationState

Final workflow state with annotation and feedback

Source code in hedit/src/agents/workflow.py
async def run(
    self,
    input_description: str,
    schema_version: str = "8.3.0",
    max_validation_attempts: int = 5,
    max_total_iterations: int = 10,
    run_assessment: bool = False,
    config: dict | None = None,
) -> HedAnnotationState:
    """Run the complete annotation workflow.

    Args:
        input_description: Natural language event description
        schema_version: HED schema version to use
        max_validation_attempts: Maximum validation retry attempts
        max_total_iterations: Maximum total iterations to prevent infinite loops
        run_assessment: Whether to run final assessment (default: False)
        config: Optional LangGraph config (e.g., recursion_limit)

    Returns:
        Final workflow state with annotation and feedback
    """
    from src.agents.state import create_initial_state

    # Create initial state
    initial_state = create_initial_state(
        input_description,
        schema_version,
        max_validation_attempts,
        max_total_iterations,
        run_assessment,
    )

    # Run workflow
    final_state = await self.graph.ainvoke(initial_state, config=config)

    return final_state

Validation

src.validation.hed_validator

HED validation using both Python and JavaScript validators.

This module provides integration with HED validation tools, primarily using the JavaScript validator for comprehensive feedback, with Python fallback.

ValidationIssue dataclass

Represents a single validation issue (error or warning).

Attributes:

Name Type Description
code str

Issue code (e.g., 'TAG_INVALID')

level Literal['error', 'warning']

Severity level ('error' or 'warning')

message str

Human-readable error message

tag str | None

The problematic tag (if applicable)

context dict | None

Additional context information

Source code in hedit/src/validation/hed_validator.py
@dataclass
class ValidationIssue:
    """Represents a single validation issue (error or warning).

    Attributes:
        code: Issue code (e.g., 'TAG_INVALID')
        level: Severity level ('error' or 'warning')
        message: Human-readable error message
        tag: The problematic tag (if applicable)
        context: Additional context information
    """

    code: str
    level: Literal["error", "warning"]
    message: str
    tag: str | None = None
    context: dict | None = None

ValidationResult dataclass

Result of HED string validation.

Attributes:

Name Type Description
is_valid bool

Whether the HED string is valid

errors list[ValidationIssue]

List of error issues

warnings list[ValidationIssue]

List of warning issues

parsed_string str | None

Successfully parsed HED string (if valid)

Source code in hedit/src/validation/hed_validator.py
@dataclass
class ValidationResult:
    """Result of HED string validation.

    Attributes:
        is_valid: Whether the HED string is valid
        errors: List of error issues
        warnings: List of warning issues
        parsed_string: Successfully parsed HED string (if valid)
    """

    is_valid: bool
    errors: list[ValidationIssue]
    warnings: list[ValidationIssue]
    parsed_string: str | None = None

HedPythonValidator

Validates HED strings using the Python HED tools.

Source code in hedit/src/validation/hed_validator.py
class HedPythonValidator:
    """Validates HED strings using the Python HED tools."""

    def __init__(self, schema: HedSchema) -> None:
        """Initialize validator with a HED schema.

        Args:
            schema: HedSchema object to validate against
        """
        self.schema = schema
        self.validator = HedValidator(schema)

    def validate(self, hed_string: str) -> ValidationResult:
        """Validate a HED string.

        Args:
            hed_string: HED annotation string to validate

        Returns:
            ValidationResult with errors and warnings
        """
        errors = []
        warnings = []

        try:
            # Parse and validate HED string
            hed_string_obj = HedString(hed_string, self.schema)
            issues = hed_string_obj.validate(self.validator)

            # Process issues
            for issue in issues:
                issue_str = get_printable_issue_string([issue])
                severity = "error" if issue["severity"] == "error" else "warning"

                validation_issue = ValidationIssue(
                    code=issue.get("code", "UNKNOWN"),
                    level=severity,
                    message=issue_str,
                    tag=issue.get("tag", None),
                )

                if severity == "error":
                    errors.append(validation_issue)
                else:
                    warnings.append(validation_issue)

            is_valid = len(errors) == 0
            parsed = str(hed_string_obj) if is_valid else None

            return ValidationResult(
                is_valid=is_valid,
                errors=errors,
                warnings=warnings,
                parsed_string=parsed,
            )

        except Exception as e:
            # Catch parsing errors
            errors.append(
                ValidationIssue(
                    code="PARSE_ERROR",
                    level="error",
                    message=str(e),
                )
            )
            return ValidationResult(is_valid=False, errors=errors, warnings=warnings)

__init__(schema)

Initialize validator with a HED schema.

Parameters:

Name Type Description Default
schema HedSchema

HedSchema object to validate against

required
Source code in hedit/src/validation/hed_validator.py
def __init__(self, schema: HedSchema) -> None:
    """Initialize validator with a HED schema.

    Args:
        schema: HedSchema object to validate against
    """
    self.schema = schema
    self.validator = HedValidator(schema)

validate(hed_string)

Validate a HED string.

Parameters:

Name Type Description Default
hed_string str

HED annotation string to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with errors and warnings

Source code in hedit/src/validation/hed_validator.py
def validate(self, hed_string: str) -> ValidationResult:
    """Validate a HED string.

    Args:
        hed_string: HED annotation string to validate

    Returns:
        ValidationResult with errors and warnings
    """
    errors = []
    warnings = []

    try:
        # Parse and validate HED string
        hed_string_obj = HedString(hed_string, self.schema)
        issues = hed_string_obj.validate(self.validator)

        # Process issues
        for issue in issues:
            issue_str = get_printable_issue_string([issue])
            severity = "error" if issue["severity"] == "error" else "warning"

            validation_issue = ValidationIssue(
                code=issue.get("code", "UNKNOWN"),
                level=severity,
                message=issue_str,
                tag=issue.get("tag", None),
            )

            if severity == "error":
                errors.append(validation_issue)
            else:
                warnings.append(validation_issue)

        is_valid = len(errors) == 0
        parsed = str(hed_string_obj) if is_valid else None

        return ValidationResult(
            is_valid=is_valid,
            errors=errors,
            warnings=warnings,
            parsed_string=parsed,
        )

    except Exception as e:
        # Catch parsing errors
        errors.append(
            ValidationIssue(
                code="PARSE_ERROR",
                level="error",
                message=str(e),
            )
        )
        return ValidationResult(is_valid=False, errors=errors, warnings=warnings)

HedJavaScriptValidator

Validates HED strings using the JavaScript HED validator.

This provides more detailed feedback than the Python validator. Requires Node.js and the hed-javascript package.

Source code in hedit/src/validation/hed_validator.py
class HedJavaScriptValidator:
    """Validates HED strings using the JavaScript HED validator.

    This provides more detailed feedback than the Python validator.
    Requires Node.js and the hed-javascript package.
    """

    def __init__(
        self,
        validator_path: Path,
        schema_version: str = "8.3.0",
    ) -> None:
        """Initialize JavaScript validator.

        Args:
            validator_path: Path to hed-javascript repository
            schema_version: HED schema version to use
        """
        self.validator_path = Path(validator_path)
        self.schema_version = schema_version
        self._check_installation()

    def _check_installation(self) -> None:
        """Verify that Node.js and hed-validator are available."""
        # Check Node.js
        try:
            subprocess.run(
                ["node", "--version"],
                check=True,
                capture_output=True,
                timeout=5,
            )
        except (subprocess.CalledProcessError, FileNotFoundError) as e:
            raise RuntimeError("Node.js is not installed or not in PATH") from e

        # Check validator path
        if not self.validator_path.exists():
            raise RuntimeError(f"HED JavaScript validator not found at {self.validator_path}")

    def validate(self, hed_string: str) -> ValidationResult:
        """Validate a HED string using JavaScript validator.

        Args:
            hed_string: HED annotation string to validate

        Returns:
            ValidationResult with detailed errors and warnings
        """
        # Create validation script
        script = f"""
        const {{ parseHedString, buildSchemasFromVersion }} = require('{self.validator_path}/dist/commonjs/index.js');

        async function validate() {{
            try {{
                const schemas = await buildSchemasFromVersion('{self.schema_version}');
                const hedString = `{hed_string}`;
                const [parsed, errors, warnings] = parseHedString(
                    hedString,
                    schemas,
                    false,  // no definitions
                    false,  // no placeholders
                    true    // full validation
                );

                // Reclassify warnings that should actually be errors
                // Based on HED validator source: these indicate invalid/malformed HED
                const errorCodes = [
                    'TAG_INVALID',                    // Invalid tag - doesn't exist in schema
                    'TAG_NAMESPACE_PREFIX_INVALID',   // Invalid tag prefix
                    'TAG_NOT_UNIQUE',                 // Multiple unique tags
                    'TAG_REQUIRES_CHILD',             // Child/value required
                    'TAG_EXTENSION_INVALID',          // Invalid extension
                    'TAG_EMPTY',                      // Empty tag
                    'UNITS_INVALID',                  // Invalid units
                    'VALUE_INVALID',                  // Invalid value
                ];
                const actualErrors = [];
                const actualWarnings = [];

                // Process errors
                errors.forEach(e => {{
                    actualErrors.push({{
                        code: e.hedCode || e.internalCode,
                        message: e.message,
                        tag: e.parameters?.tag,
                        level: 'error'
                    }});
                }});

                // Process warnings - promote critical ones to errors
                warnings.forEach(w => {{
                    const code = w.hedCode || w.internalCode;
                    const issue = {{
                        code: code,
                        message: w.message,
                        tag: w.parameters?.tag,
                        level: errorCodes.includes(code) ? 'error' : 'warning'
                    }};

                    if (errorCodes.includes(code)) {{
                        actualErrors.push(issue);
                    }} else {{
                        actualWarnings.push(issue);
                    }}
                }});

                const result = {{
                    isValid: actualErrors.length === 0,
                    parsed: parsed ? parsed.toString() : null,
                    errors: actualErrors,
                    warnings: actualWarnings
                }};

                console.log(JSON.stringify(result));
            }} catch (error) {{
                console.log(JSON.stringify({{
                    isValid: false,
                    errors: [{{ code: 'VALIDATOR_ERROR', message: error.message, level: 'error' }}],
                    warnings: []
                }}));
            }}
        }}

        validate();
        """

        try:
            # Run Node.js validation
            result = subprocess.run(
                ["node", "-e", script],
                capture_output=True,
                text=True,
                timeout=30,
                check=True,
            )

            # Parse result
            output = json.loads(result.stdout)

            errors = [
                ValidationIssue(
                    code=e["code"],
                    level="error",
                    message=e["message"],
                    tag=e.get("tag"),
                )
                for e in output["errors"]
            ]

            warnings = [
                ValidationIssue(
                    code=w["code"],
                    level="warning",
                    message=w["message"],
                    tag=w.get("tag"),
                )
                for w in output["warnings"]
            ]

            return ValidationResult(
                is_valid=output["isValid"],
                errors=errors,
                warnings=warnings,
                parsed_string=output.get("parsed"),
            )

        except subprocess.TimeoutExpired:
            return ValidationResult(
                is_valid=False,
                errors=[
                    ValidationIssue(
                        code="TIMEOUT",
                        level="error",
                        message="Validation timed out",
                    )
                ],
                warnings=[],
            )
        except Exception as e:
            return ValidationResult(
                is_valid=False,
                errors=[
                    ValidationIssue(
                        code="VALIDATION_ERROR",
                        level="error",
                        message=f"Validation failed: {e}",
                    )
                ],
                warnings=[],
            )

__init__(validator_path, schema_version='8.3.0')

Initialize JavaScript validator.

Parameters:

Name Type Description Default
validator_path Path

Path to hed-javascript repository

required
schema_version str

HED schema version to use

'8.3.0'
Source code in hedit/src/validation/hed_validator.py
def __init__(
    self,
    validator_path: Path,
    schema_version: str = "8.3.0",
) -> None:
    """Initialize JavaScript validator.

    Args:
        validator_path: Path to hed-javascript repository
        schema_version: HED schema version to use
    """
    self.validator_path = Path(validator_path)
    self.schema_version = schema_version
    self._check_installation()

validate(hed_string)

Validate a HED string using JavaScript validator.

Parameters:

Name Type Description Default
hed_string str

HED annotation string to validate

required

Returns:

Type Description
ValidationResult

ValidationResult with detailed errors and warnings

Source code in hedit/src/validation/hed_validator.py
def validate(self, hed_string: str) -> ValidationResult:
    """Validate a HED string using JavaScript validator.

    Args:
        hed_string: HED annotation string to validate

    Returns:
        ValidationResult with detailed errors and warnings
    """
    # Create validation script
    script = f"""
    const {{ parseHedString, buildSchemasFromVersion }} = require('{self.validator_path}/dist/commonjs/index.js');

    async function validate() {{
        try {{
            const schemas = await buildSchemasFromVersion('{self.schema_version}');
            const hedString = `{hed_string}`;
            const [parsed, errors, warnings] = parseHedString(
                hedString,
                schemas,
                false,  // no definitions
                false,  // no placeholders
                true    // full validation
            );

            // Reclassify warnings that should actually be errors
            // Based on HED validator source: these indicate invalid/malformed HED
            const errorCodes = [
                'TAG_INVALID',                    // Invalid tag - doesn't exist in schema
                'TAG_NAMESPACE_PREFIX_INVALID',   // Invalid tag prefix
                'TAG_NOT_UNIQUE',                 // Multiple unique tags
                'TAG_REQUIRES_CHILD',             // Child/value required
                'TAG_EXTENSION_INVALID',          // Invalid extension
                'TAG_EMPTY',                      // Empty tag
                'UNITS_INVALID',                  // Invalid units
                'VALUE_INVALID',                  // Invalid value
            ];
            const actualErrors = [];
            const actualWarnings = [];

            // Process errors
            errors.forEach(e => {{
                actualErrors.push({{
                    code: e.hedCode || e.internalCode,
                    message: e.message,
                    tag: e.parameters?.tag,
                    level: 'error'
                }});
            }});

            // Process warnings - promote critical ones to errors
            warnings.forEach(w => {{
                const code = w.hedCode || w.internalCode;
                const issue = {{
                    code: code,
                    message: w.message,
                    tag: w.parameters?.tag,
                    level: errorCodes.includes(code) ? 'error' : 'warning'
                }};

                if (errorCodes.includes(code)) {{
                    actualErrors.push(issue);
                }} else {{
                    actualWarnings.push(issue);
                }}
            }});

            const result = {{
                isValid: actualErrors.length === 0,
                parsed: parsed ? parsed.toString() : null,
                errors: actualErrors,
                warnings: actualWarnings
            }};

            console.log(JSON.stringify(result));
        }} catch (error) {{
            console.log(JSON.stringify({{
                isValid: false,
                errors: [{{ code: 'VALIDATOR_ERROR', message: error.message, level: 'error' }}],
                warnings: []
            }}));
        }}
    }}

    validate();
    """

    try:
        # Run Node.js validation
        result = subprocess.run(
            ["node", "-e", script],
            capture_output=True,
            text=True,
            timeout=30,
            check=True,
        )

        # Parse result
        output = json.loads(result.stdout)

        errors = [
            ValidationIssue(
                code=e["code"],
                level="error",
                message=e["message"],
                tag=e.get("tag"),
            )
            for e in output["errors"]
        ]

        warnings = [
            ValidationIssue(
                code=w["code"],
                level="warning",
                message=w["message"],
                tag=w.get("tag"),
            )
            for w in output["warnings"]
        ]

        return ValidationResult(
            is_valid=output["isValid"],
            errors=errors,
            warnings=warnings,
            parsed_string=output.get("parsed"),
        )

    except subprocess.TimeoutExpired:
        return ValidationResult(
            is_valid=False,
            errors=[
                ValidationIssue(
                    code="TIMEOUT",
                    level="error",
                    message="Validation timed out",
                )
            ],
            warnings=[],
        )
    except Exception as e:
        return ValidationResult(
            is_valid=False,
            errors=[
                ValidationIssue(
                    code="VALIDATION_ERROR",
                    level="error",
                    message=f"Validation failed: {e}",
                )
            ],
            warnings=[],
        )