Skip to content

Plugin Security and Scopes Guide

Warning

Development is moving fast, and this document may not reflect the latest changes. Once updated, we will remove this warning.

This guide provides comprehensive information for plugin maintainers on implementing security, authentication scopes, and authorization in AgentUp plugins.

Note: The security system in AgentUp is currently under development. Some of the security features described in this document represent planned functionality and may not be fully implemented in the current version. The plugin system currently focuses on basic capability registration and execution.

Overview

AgentUp uses scope-based authentication and context-aware middleware. Plugins must define their security requirements and implement proper authorization checks to ensure secure operation.

Table of Contents

  1. Authentication Scopes
  2. Security Context
  3. Implementing Security in Plugins
  4. Scope Hierarchy
  5. Best Practices
  6. Examples
  7. Migration Guide

Authentication Scopes

What are Scopes?

Scopes are permission strings that define what operations a user is authorized to perform. They follow a hierarchical structure and support inheritance.

Scope Naming Convention

<domain>:<action>[:<resource>]

Examples: - files:read - Permission to read files - files:write - Permission to write files - files:sensitive - Permission to access sensitive files - system:read - Permission to read system information - api:external - Permission to call external APIs - admin - Administrative access (inherits all permissions)

Standard Scope Domains

Domain Description Example Scopes
files File system operations files:read, files:write, files:sensitive
system System information and control system:read, system:write, system:admin
network Network operations network:access, network:admin
api External API access api:external, api:restricted
data Data processing operations data:read, data:process, data:export
ai AI model operations ai:execute, ai:train, ai:admin
admin Administrative functions admin (grants all permissions)

Plugin Identity and Security

Plugin ID vs Package Name Security Model

AgentUp requires both plugin_id and package fields in plugin configuration for critical security reasons:

Security Threat: Namespace Hijacking Without package name verification, malicious actors could: 1. Create a PyPI package with a similar name to a legitimate plugin 2. Register entry points that match expected plugin IDs 3. Trick users into installing malicious code instead of legitimate plugins

Example Attack Scenario:

# Legitimate plugin configuration
plugins:
  - plugin_id: file_manager
    # Without package field, system accepts any package claiming this ID

# Attacker creates PyPI package "file-managr" (typo) with:
# Entry point: file_manager = "malicious.plugin:MaliciousPlugin"

AgentUp's Protection:

# Secure configuration requires explicit package verification
plugins:
  - plugin_id: file_manager
    package: legitimate-file-manager  # Exact PyPI package name required
    # System rejects packages that don't match this exact name

Why Both Fields Are Necessary:

  1. plugin_id: Internal identifier for capability routing and function calls
  2. package: Cryptographic verification against PyPI package name
  3. Security Enforcement: Plugin registry validates entry_point.dist.name == expected_package
  4. Namespace Protection: Prevents malicious packages from impersonating legitimate plugins

Implementation Details: - The plugin allowlist matches plugin_id to expected package name - Entry point discovery validates actual package name against expected name - Mismatches are rejected with security warnings logged - This creates a secure binding between logical plugin identity and distribution package

This dual-field requirement is essential for preventing supply chain attacks and ensuring plugin authenticity in production environments.

Plugin Classification

Plugin Characteristics

class PluginCharacteristics:
    plugin_type: PluginType
    network_dependent: bool = False
    cacheable: bool = True
    cache_ttl: Optional[int] = None
    retry_suitable: bool = False
    rate_limit_required: bool = True
    auth_scopes: List[str] = []
    performance_critical: bool = False

Security Context

Capability Context

The CapabilityContext provides plugins with comprehensive security information:

class CapabilityContext:
    # Core fields
    task: Task
    config: dict[str, Any]
    services: dict[str, Any]
    state: dict[str, Any]
    metadata: dict[str, Any]

    # Note: Security features like auth, user_scopes, etc. are available
    # through the AgentUp security system and middleware

Using Security Context

@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    # Check basic authentication
    if not context.auth:
        raise UnauthorizedError("Authentication required")

    # Check specific scope
    context.require_scope("files:read")

    # Access user information
    user_id = context.get_user_id()
    user_scopes = context.user_scopes

    # Conditional logic based on permissions
    if context.has_scope("files:sensitive"):
        # Allow access to sensitive files
        pass
    else:
        # Restrict to public files only
        pass

    return CapabilityResult(
        content=f"Operation completed for user: {user_id}",
        success=True
    )

Implementing Security in Plugins

Available and Planned Hook Methods

Implementation Status: The methods below show the planned security architecture. Currently, only basic plugin registration and execution hooks are implemented. Advanced security features are planned for future releases.

Currently Available Hooks

These hooks are currently implemented and available for use:

@hookimpl
def register_capability(self) -> CapabilityDefinition:
    """Register the capability with AgentUp (AVAILABLE)."""
    return CapabilityDefinition(
        id="my_capability",
        name="My Capability",
        version="1.0.0",
        description="My capability description",
        capabilities=[CapabilityType.TEXT],
        required_scopes=["api:read"],  # Basic scope definition
    )

@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    """Execute the capability (AVAILABLE)."""
    # Basic security can be implemented here manually
    return CapabilityResult(content="Result", success=True)

@hookimpl
def can_handle_task(self, context: CapabilityContext) -> bool | float:
    """Check if capability can handle task (AVAILABLE)."""
    return 0.8

@hookimpl
def get_ai_functions(self) -> list[AIFunction]:
    """Get AI functions for LLM integration (AVAILABLE)."""
    return []

Planned Security Hooks

The following hooks represent the planned security architecture:

1. Plugin Characteristics (Planned)

@hookimpl
def get_plugin_characteristics(self) -> PluginCharacteristics:
    """Define plugin operational characteristics."""
    return PluginCharacteristics(
        plugin_type=PluginType.LOCAL,  # or NETWORK, HYBRID, AI_FUNCTION, CORE
        network_dependent=False,
        cacheable=True,
        cache_ttl=300,
        retry_suitable=False,
        rate_limit_required=False,
        auth_scopes=["your_domain:read"],
        performance_critical=False
    )

2. Required Scopes (Planned)

@hookimpl
def get_required_scopes(self, capability_id: str) -> list[str]:
    """Define required scopes per capability."""
    scope_map = {
        "read_file": ["files:read"],
        "write_file": ["files:write"],
        "delete_file": ["files:write", "files:admin"],
        "read_sensitive": ["files:read", "files:sensitive"],
    }
    return scope_map.get(capability_id, ["default:access"])

3. Custom Authorization (Planned)

@hookimpl
def validate_access(self, context: CapabilityContext) -> bool:
    """Custom authorization logic beyond scope checking."""

    # Example: Time-based access control
    import datetime
    current_hour = datetime.datetime.now().hour
    if 22 <= current_hour or current_hour <= 6:  # 10 PM to 6 AM
        if not context.has_scope("system:24hour"):
            return False

    # Example: User attribute-based access
    if context.auth.metadata.get("user_type") == "restricted":
        restricted_capabilities = ["basic_read", "basic_write"]
        return context.metadata.get("capability_id") in restricted_capabilities

    # Example: Rate limiting based on user tier
    user_tier = context.auth.metadata.get("tier", "basic")
    if user_tier == "basic" and context.metadata.get("operation_complexity") == "high":
        return False

    return True

4. Middleware Preferences (Planned)

@hookimpl
def get_middleware_preferences(self, capability_id: str) -> dict[str, Any]:
    """Define preferred middleware configuration."""

    # Different preferences per capability
    if capability_id == "cpu_info":
        return {
            "cached": {
                "enabled": True,
                "ttl": 60,  # CPU info changes frequently
                "key_strategy": "global"  # Same for all users
            },
            "rate_limited": {
                "enabled": False,
                "reason": "Local system call, very fast"
            }
        }

    elif capability_id == "external_api_call":
        return {
            "cached": {
                "enabled": True,
                "ttl": 1800,  # 30 minutes
                "key_strategy": "user_aware"  # Different per user
            },
            "rate_limited": {
                "enabled": True,
                "requests_per_minute": 20,
                "reason": "Respect external API limits"
            },
            "retryable": {
                "enabled": True,
                "max_attempts": 3,
                "backoff_factor": 2.0
            }
        }

    return {}  # Use defaults

Current Security Implementation Approach

Since the advanced security hooks are not yet implemented, plugins can implement basic security manually in their execute_capability method:

@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    """Execute capability with basic security checks."""

    try:
        # Basic validation using available context fields
        # Note: Advanced auth fields may not be available yet

        # 1. Check if this is a sensitive operation
        operation = context.metadata.get("operation", "")
        if operation in ["admin", "delete", "sensitive"]:
            # Implement your own auth checks here
            # This is plugin-specific until framework auth is available
            if not self._check_admin_permission(context):
                return CapabilityResult(
                    content="Access denied: insufficient permissions",
                    success=False,
                    error="PERMISSION_DENIED"
                )

        # 2. Execute the actual operation
        result = self._execute_operation(context)

        # 3. Add metadata for audit purposes
        result.metadata.update({
            "executed_at": datetime.now(timezone.utc).isoformat(),
            "operation": operation,
        })

        return result

    except Exception as e:
        return CapabilityResult(
            content=f"Operation failed: {str(e)}",
            success=False,
            error=str(e)
        )

def _check_admin_permission(self, context: CapabilityContext) -> bool:
    """Plugin-specific permission check until framework auth is available."""
    # Implement your security logic here
    # This could check API keys, user roles, etc.
    return True  # Placeholder

def _execute_operation(self, context: CapabilityContext) -> CapabilityResult:
    """Execute the actual capability operation."""
    return CapabilityResult(content="Operation completed", success=True)

def _log_access(self, context: CapabilityContext):
    """Log successful access for audit trail."""
    logger.info(
        "Capability access granted",
        user_id=context.get_user_id(),
        capability_id=context.metadata.get("capability_id"),
        scopes=context.user_scopes,
        request_id=context.request_id
    )

def _log_security_violation(self, context: CapabilityContext, error: str):
    """Log security violations for monitoring."""
    logger.warning(
        "Security violation detected",
        user_id=context.get_user_id(),
        capability_id=context.metadata.get("capability_id"),
        error=error,
        request_id=context.request_id
    )

Scope Hierarchy

Hierarchical Permissions

Scopes support inheritance where higher-level scopes automatically grant lower-level permissions:

scope_hierarchy = {
    "admin": ["*"],  # Admin has all permissions
    "files:admin": ["files:write", "files:read", "files:sensitive"],
    "files:write": ["files:read"],
    "system:admin": ["system:write", "system:read"],
    "system:write": ["system:read"],
    "api:admin": ["api:external", "api:restricted"],
}

Scope Validation

def validate_scope_hierarchy(user_scopes: list[str], required_scope: str) -> bool:
    """Check if user has required scope including hierarchy."""

    # Direct scope match
    if required_scope in user_scopes:
        return True

    # Admin override
    if "admin" in user_scopes:
        return True

    # Check hierarchy
    for user_scope in user_scopes:
        if user_scope in scope_hierarchy:
            inherited_scopes = scope_hierarchy[user_scope]
            if required_scope in inherited_scopes or "*" in inherited_scopes:
                return True

    return False

Custom Scope Hierarchies

Plugins can define their own scope hierarchies:

@hookimpl
def get_scope_hierarchy(self) -> dict[str, list[str]]:
    """Define custom scope hierarchy for this plugin."""
    return {
        "myapp:admin": ["myapp:write", "myapp:read", "myapp:config"],
        "myapp:write": ["myapp:read"],
        "myapp:poweruser": ["myapp:advanced", "myapp:read"],
    }

Best Practices

1. Principle of Least Privilege

Always request the minimum scopes necessary for your plugin to function:

# ✓ Good: Specific scopes
@hookimpl
def get_required_scopes(self, capability_id: str) -> list[str]:
    return {
        "read_config": ["config:read"],
        "update_config": ["config:write"],
        "backup_config": ["config:read", "files:write"]
    }.get(capability_id, [])

# ✗ Bad: Overly broad scopes
@hookimpl
def get_required_scopes(self, capability_id: str) -> list[str]:
    return ["admin"]  # Too broad for most operations

2. Scope Granularity

Design scopes that are neither too broad nor too narrow:

# ✓ Good: Appropriate granularity
"files:read"      # Read any file
"files:write"     # Write any file
"files:sensitive" # Access sensitive files

# ✗ Too broad
"files:all"       # Unclear what this includes

# ✗ Too narrow
"files:read:config"        # Too specific
"files:read:logs"          # Creates too many scopes
"files:read:user_data"     # Hard to manage

3. Security Context Usage

Always validate security context before performing operations:

@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    # ✓ Good: Comprehensive security checks
    if not context.auth:
        raise UnauthorizedError("Authentication required")

    context.require_scope("files:read")

    if sensitive_operation:
        context.require_scope("files:sensitive")

    # ✗ Bad: No security validation
    # Just execute without checking permissions

4. Error Handling

Provide clear, secure error messages:

try:
    context.require_scope("files:admin")
    # ... perform operation
except ForbiddenError:
    # ✓ Good: Clear but not revealing
    return CapabilityResult(
        content="Permission denied: insufficient privileges",
        success=False,
        error="PERMISSION_DENIED"
    )

# ✗ Bad: Reveals internal information
except ForbiddenError:
    return CapabilityResult(
        content="Access denied: user lacks files:admin scope for /etc/passwd",
        success=False
    )

5. Audit Logging

Implement comprehensive audit logging:

def _log_operation(self, context: EnhancedCapabilityContext, operation: str, result: str):
    """Log operations for audit trail."""
    logger.info(
        "Plugin operation completed",
        extra={
            "user_id": context.get_user_id(),
            "plugin_name": self.__class__.__name__,
            "operation": operation,
            "result": result,
            "scopes": context.user_scopes,
            "request_id": context.request_id,
            "timestamp": datetime.now(timezone.utc).isoformat()
        }
    )

Examples

Example 1: Current File System Plugin Implementation

class FileSystemPlugin:
    """Plugin for file system operations with current security approach."""

    @hookimpl
    def register_capability(self) -> CapabilityDefinition:
        """Register file system capability."""
        return CapabilityDefinition(
            id="file_system",
            name="File System Operations",
            version="1.0.0",
            description="Read and write files with security checks",
            capabilities=[CapabilityType.TEXT],
            required_scopes=["files:read", "files:write"],  # Basic scope definition
        )

    @hookimpl
    def can_handle_task(self, context: CapabilityContext) -> float:
        """Check if this capability can handle file operations."""
        # Simple keyword-based routing for now
        if hasattr(context.task, 'history') and context.task.history:
            last_msg = context.task.history[-1]
            if hasattr(last_msg, 'parts') and last_msg.parts:
                content = last_msg.parts[0].text if hasattr(last_msg.parts[0], 'text') else ""
                if any(word in content.lower() for word in ['file', 'read', 'write', 'save']):
                    return 0.8
        return 0.0

    @hookimpl
    def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
        """Execute file operations with current security approach."""
        try:
            # Basic security checks using available context
            operation = context.metadata.get("operation", "")
            file_path = context.metadata.get("file_path", "")

            # Manual security validation since advanced auth hooks not available
            if not self._validate_file_access(file_path, operation):
                return CapabilityResult(
                    content="Access denied: insufficient file permissions",
                    success=False,
                    error="PERMISSION_DENIED"
                )

            if operation == "read_file":
                return self._read_file(context, file_path)
            elif operation == "write_file":
                return self._write_file(context, file_path)
            else:
                return CapabilityResult(
                    content="Unknown file operation",
                    success=False,
                    error="INVALID_OPERATION"
                )

        except Exception as e:
            return CapabilityResult(
                content=f"File operation failed: {str(e)}",
                success=False,
                error=str(e)
            )

    def _validate_file_access(self, file_path: str, operation: str) -> bool:
        """Manual file access validation."""
        # Restrict access to system files
        if file_path.startswith(("/etc/", "/sys/", "/proc/")):
            return False  # Would check admin permissions when available

        # Restrict access to sensitive directories
        sensitive_dirs = ["/home/", "/Users/", "/.ssh/"]
        if any(file_path.startswith(d) for d in sensitive_dirs):
            return False  # Would check sensitive file permissions when available

        return True

    def _read_file(self, context: CapabilityContext, file_path: str) -> CapabilityResult:
        """Read file with basic error handling."""
        try:
            with open(file_path, 'r') as f:
                content = f.read()

            return CapabilityResult(
                content=content,
                success=True,
                metadata={
                    "file_path": file_path,
                    "operation": "read_file",
                    "size": len(content),
                    "executed_at": datetime.utcnow().isoformat()
                }
            )
        except FileNotFoundError:
            return CapabilityResult(
                content="File not found",
                success=False,
                error="FILE_NOT_FOUND"
            )
        except PermissionError:
            return CapabilityResult(
                content="Permission denied",
                success=False,
                error="PERMISSION_DENIED"
            )

    def _write_file(self, context: CapabilityContext, file_path: str) -> CapabilityResult:
        """Write file with basic error handling."""
        content_to_write = context.metadata.get("content", "")
        try:
            with open(file_path, 'w') as f:
                f.write(content_to_write)

            return CapabilityResult(
                content=f"Successfully wrote {len(content_to_write)} characters to {file_path}",
                success=True,
                metadata={
                    "file_path": file_path,
                    "operation": "write_file",
                    "bytes_written": len(content_to_write),
                    "executed_at": datetime.utcnow().isoformat()
                }
            )
        except PermissionError:
            return CapabilityResult(
                content="Permission denied",
                success=False,
                error="PERMISSION_DENIED"
            )

Example 2: Current External API Plugin Implementation

class WeatherAPIPlugin:
    """Plugin for weather API with current implementation approach."""

    @hookimpl
    def register_capability(self) -> CapabilityDefinition:
        """Register weather API capability."""
        return CapabilityDefinition(
            id="weather_api",
            name="Weather API",
            version="1.0.0",
            description="Get weather information from external APIs",
            capabilities=[CapabilityType.TEXT, CapabilityType.AI_FUNCTION],
            required_scopes=["api:external", "weather:read"],
        )

    @hookimpl
    def can_handle_task(self, context: CapabilityContext) -> float:
        """Check if this capability can handle weather requests."""
        if hasattr(context.task, 'history') and context.task.history:
            last_msg = context.task.history[-1]
            if hasattr(last_msg, 'parts') and last_msg.parts:
                content = last_msg.parts[0].text if hasattr(last_msg.parts[0], 'text') else ""
                if any(word in content.lower() for word in ['weather', 'temperature', 'forecast', 'rain']):
                    return 0.9
        return 0.0

    @hookimpl
    def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
        """Execute weather API operations with current security approach."""
        try:
            operation = context.metadata.get("operation", "get_weather")
            location = context.metadata.get("location", "")

            # Basic validation
            if not location:
                return CapabilityResult(
                    content="Location is required for weather queries",
                    success=False,
                    error="MISSING_LOCATION"
                )

            # Manual rate limiting check (would be framework-handled in future)
            if not self._check_rate_limit():
                return CapabilityResult(
                    content="Rate limit exceeded. Please try again later.",
                    success=False,
                    error="RATE_LIMIT_EXCEEDED"
                )

            # Execute weather API call
            weather_data = self._get_weather_data(location)

            return CapabilityResult(
                content=f"Weather in {location}: {weather_data}",
                success=True,
                metadata={
                    "operation": operation,
                    "location": location,
                    "executed_at": datetime.utcnow().isoformat()
                }
            )

        except Exception as e:
            return CapabilityResult(
                content=f"Weather API error: {str(e)}",
                success=False,
                error=str(e)
            )

    def _check_rate_limit(self) -> bool:
        """Basic rate limiting check."""
        # Plugin-specific rate limiting until framework support
        return True  # Placeholder

    def _get_weather_data(self, location: str) -> str:
        """Get weather data from external API."""
        # Actual API implementation would go here
        return f"Sunny, 22°C"  # Placeholder

Migration Guide

Migrating from Legacy Plugins

Step 1: Add Security Hooks

Add the required security hook methods to your existing plugin:

# Add to existing plugin class
@hookimpl
def get_plugin_characteristics(self) -> PluginCharacteristics:
    # Define characteristics based on your plugin's behavior
    return PluginCharacteristics(
        plugin_type=PluginType.LOCAL,  # or appropriate type
        # ... other characteristics
    )

@hookimpl
def get_required_scopes(self, capability_id: str) -> list[str]:
    # Define minimum required scopes
    return ["default:access"]  # Start minimal, refine later

Step 2: Update Capability Execution

Update your execute method to use the enhanced context:

# Before (legacy)
@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    # No security validation
    return self._do_operation(context)

# After (enhanced)
@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    # Add security validation
    if not context.auth:
        raise UnauthorizedError("Authentication required")

    # Your existing operation
    return self._do_operation(context)

Step 3: Gradual Security Enhancement

Start with minimal security and gradually enhance:

# Phase 1: Basic authentication check
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    if not context.auth:
        raise UnauthorizedError("Authentication required")
    return self._do_operation(context)

# Phase 2: Add scope checking
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    if not context.auth:
        raise UnauthorizedError("Authentication required")
    context.require_scope("your_domain:read")
    return self._do_operation(context)

# Phase 3: Add custom authorization
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    if not context.auth:
        raise UnauthorizedError("Authentication required")
    context.require_scope("your_domain:read")
    if not self.validate_access(context):
        raise ForbiddenError("Access denied")
    return self._do_operation(context)

Step 4: Test and Refine

Test your plugin with different users and scope combinations to ensure proper security enforcement.

Trusted Publishing System

Overview

AgentUp's trusted publishing system provides cryptographic verification of plugin authenticity through PyPI's trusted publishing feature. This system ensures that plugins come from verified sources and haven't been tampered with during distribution.

How Trusted Publishing Works

Traditional PyPI Upload:

# Old way - requires manual API key management
python -m build
twine upload dist/* --username __token__ --password pypi-xxx

Trusted Publishing (New Way):

# Secure way - no API keys needed
git push origin main --tags
# GitHub Actions automatically publishes with OIDC tokens

Key Benefits

  1. No API Key Management: Eliminates the need to store PyPI API keys
  2. Cryptographic Verification: Uses OIDC tokens for identity verification
  3. Tamper-Proof Distribution: Cryptographic attestations verify package integrity
  4. Publisher Identity: Verifies who published each plugin version
  5. Automatic Security Scanning: Integrated security checks during publishing

Plugin Trust Levels

Trust Level Description Requirements
official Official AgentUp plugins Published by agentup-official with strict verification
community Community-verified plugins Published via trusted publishing by known contributors
unknown Standard PyPI uploads Traditional upload method without trusted publishing

Setting Up Trusted Publishing

1. Configure PyPI Trusted Publisher

Visit https://pypi.org/manage/account/publishing/ and add: - Publisher: GitHub - Owner: your-username - Repository: your-repo-name - Workflow: publish.yml

2. Configure Your Plugin

Update pyproject.toml:

[tool.agentup.trusted-publishing]
publisher = "your-github-username"
repository = "your-username/plugin-repo"
workflow = "publish.yml"
trust_level = "community"

[tool.agentup.plugin]
min_agentup_version = "2.0.0"
plugin_api_version = "1.0"
security_hash = "sha256:abc123..."

3. GitHub Actions Workflow

Create .github/workflows/publish.yml:

name: Publish Plugin
on:
  release:
    types: [published]

jobs:
  publish:
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v4
        with:
          python-version: "3.11"

      - name: Build package
        run: |
          python -m pip install --upgrade pip build
          python -m build

      - name: Publish to PyPI
        uses: pypa/gh-action-pypi-publish@release/v1

Plugin Installation Security

Secure Installation Commands:

# Require trusted publishing
agentup plugin install weather-plugin --require-trusted

# Set minimum trust level
agentup plugin install weather-plugin --trust-level community

# Verify after installation
agentup plugin verify weather-plugin

Installation Safety Checks: - ✅ Publisher identity verification - ✅ Cryptographic attestation validation - ✅ Trust level compliance - ✅ Security scanning results - ✅ Interactive approval prompts

Publisher Trust Management

Add Trusted Publishers:

# Add a community publisher
agentup plugin trust add awesome-contributor \
  github.com/awesome-contributor/weather-plugin \
  --trust-level community \
  --description "Weather plugin specialist"

# List trusted publishers
agentup plugin trust list

Publisher Configuration:

# In agent configuration
plugins:
  trust_settings:
    require_trusted_publishing: true
    minimum_trust_level: "community"
    trusted_publishers:
      - publisher_id: "awesome-contributor"
        repositories: ["github.com/awesome-contributor/*"]
        trust_level: "community"


Troubleshooting

Common Issues

1. "Authentication required" errors

  • Ensure your plugin properly handles the enhanced context
  • Check that authentication middleware is properly configured

2. "Permission denied" errors

  • Verify required scopes are correctly defined
  • Check scope hierarchy configuration
  • Ensure users have appropriate scopes assigned

3. Middleware not applied

  • Check plugin characteristics are properly defined
  • Verify middleware preferences are correctly specified
  • Ensure plugin classification is accurate

Debugging Security Issues

Enable debug logging for security operations:

import logging
logging.getLogger("agent.security").setLevel(logging.DEBUG)
logging.getLogger("agent.plugins").setLevel(logging.DEBUG)

Check the request context:

@hookimpl
def execute_capability(self, context: CapabilityContext) -> CapabilityResult:
    logger.debug(f"User: {context.get_user_id()}")
    logger.debug(f"Scopes: {context.user_scopes}")
    logger.debug(f"Required scopes: {self.get_required_scopes(context.metadata.get('capability_id'))}")
    # ... rest of implementation

Plugin Visibility Control

Overview

AgentUp supports plugin visibility control through the A2A Authenticated Extended Card system. This allows you to control which plugins are advertised to public versus authenticated clients.

Visibility Levels

Configure plugin visibility in your agent configuration:

plugins:
  # Public plugins - visible to everyone
  - plugin_id: "general_help"
    name: "General Help"
    description: "Basic assistance"
    visibility: "public"  # default

  # Extended plugins - only visible to authenticated clients
  - plugin_id: "admin_tools"
    name: "Admin Tools"
    description: "Administrative functions"
    visibility: "extended"

Behavior

Visibility Public Agent Card Extended Agent Card Execution
"public" ✓ Visible ✓ Visible ✓ Available
"extended" ✗ Hidden ✓ Visible ✓ Available

Important: Visibility only controls Agent Card advertisement, not plugin execution. All configured plugins are available for execution regardless of visibility setting.

Use Cases

Enterprise Deployments:

plugins:
  # Public information
  - plugin_id: "company_info"
    name: "Company Information"
    description: "Basic company details"
    visibility: "public"

  # Customer features
  - plugin_id: "order_status"
    name: "Order Status"
    description: "Check order status"
    visibility: "extended"
    required_scopes: ["customer:read"]

  # Admin features
  - plugin_id: "user_management"
    name: "User Management"
    description: "Manage users"
    visibility: "extended"
    required_scopes: ["admin:users"]

Development vs Production:

plugins:
  # Always visible
  - plugin_id: "core_features"
    name: "Core Features"
    visibility: "public"

  # Debug tools - hidden from public
  - plugin_id: "debug_tools"
    name: "Debug Tools"
    description: "Development debugging tools"
    visibility: "extended"
    required_scopes: ["debug:access"]

Security Considerations

  1. Discovery Control: Extended plugins are hidden from public Agent Card discovery
  2. Execution Security: Use required_scopes to control actual plugin execution
  3. Combined Approach: Use both visibility and scopes for comprehensive control
plugins:
  - plugin_id: "sensitive_data"
    name: "Sensitive Data Access"
    description: "Access confidential information"
    visibility: "extended"        # Hidden from public discovery
    required_scopes: ["data:sensitive"]  # Execution requires scope

A2A Protocol Integration

The visibility system integrates with the A2A protocol:

  • Public Agent Card (/.well-known/agent.json): Shows only public plugins
  • Extended Agent Card (/agent/authenticatedExtendedCard): Shows all plugins
  • supportsAuthenticatedExtendedCard: Automatically set based on extended plugin presence

For complete details, see the A2A Extended Card documentation.


Important: LLM Native Capabilities vs Plugin Tools

Understanding the Security Model

AgentUp's security system protects plugin capabilities but allows native LLM capabilities to continue functioning. This is by design and creates two execution paths:

Secured Path (Plugin Tools Available)

When users have appropriate scopes: - AI receives function schemas for plugin capabilities - AI can call specific plugin functions (e.g., analyze_image) - Plugin functions execute with full security enforcement - Users access enhanced, plugin-specific functionality

Fallback Path (No Plugin Tools Available)

When users lack required scopes: - AI receives no function schemas (tools filtered by security) - AI falls back to native LLM capabilities (OpenAI vision, etc.) - No plugin functions are called - pure LLM processing - Users get basic LLM functionality without plugin enhancements

Example: Image Analysis

With image:read scope:

User: "What's in this image?"
→ AI calls analyze_image plugin function
→ Plugin processes image with custom logic
→ Enhanced analysis with metadata, confidence scores, etc.

Without image:read scope:

User: "What's in this image?"
→ No plugin tools available to AI
→ AI uses OpenAI's native vision capabilities
→ Basic image analysis without plugin enhancements

Why This Design?

  1. Graceful Degradation: Users still get basic functionality even without plugin permissions
  2. Clear Separation: Plugin security vs native LLM capabilities are distinct layers
  3. User Experience: Requests don't fail completely - they fall back to basic LLM processing
  4. Transparency: Security logs clearly show when plugin tools are denied vs when fallback occurs

Security Considerations

  • Plugin tools are properly secured - scope enforcement works correctly
  • Native LLM capabilities remain available - this is intentional, not a bypass
  • Audit logs track both scenarios - plugin denials and fallback usage are logged
  • Users understand the difference - enhanced plugin features vs basic LLM functionality

Current State and Future Roadmap

What's Available Now

  • Basic Plugin Registration: Use register_capability() to define capabilities with basic scope requirements
  • Capability Execution: Implement security checks manually in execute_capability()
  • Task Routing: Use can_handle_task() for simple keyword-based routing
  • AI Functions: Integrate with LLMs through get_ai_functions()

Planned Security Features

The following features are planned for future releases:

  • Advanced Authentication: Comprehensive auth context with user information and token validation
  • Automatic Scope Validation: Framework-level scope checking based on required_scopes
  • Plugin Characteristics: Detailed plugin metadata for middleware selection
  • Middleware Integration: Automatic rate limiting, caching, and retry logic
  • Custom Authorization Hooks: Plugin-specific access control beyond scopes
  • Audit Logging: Built-in security event logging and monitoring

Migration Path

When advanced security features become available:

  1. Current plugins will continue to work - no breaking changes
  2. Add security hooks gradually - implement new hooks as they become available
  3. Remove manual security code - replace plugin-specific auth with framework features
  4. Enhanced configuration - use declarative security configuration

Best Practices for Current Implementation

  1. Implement basic validation in execute_capability()
  2. Use required_scopes in CapabilityDefinition to document intended permissions
  3. Return clear error messages for security violations
  4. Log security events for audit purposes
  5. Validate input parameters thoroughly
  6. Follow principle of least privilege in your security checks

Conclusion

While AgentUp's advanced security system is under development, plugin maintainers can still implement secure plugins using the current hooks and manual security validation. The planned security features will provide a more comprehensive and automated approach to plugin security.

For additional support or questions, refer to the main AgentUp documentation or reach out to the development team.