Skip to main content

Channel Trait

The Channel trait defines the interface for all messaging platforms in Corvus. Every channel (Telegram, Discord, Slack, etc.) implements this trait. Source: src/channels/traits.rs:47-103

Trait Definition

use async_trait::async_trait;
use tokio::sync::mpsc;

#[async_trait]
pub trait Channel: Send + Sync {
    /// Human-readable channel name
    fn name(&self) -> &str;
    
    /// Send a message through this channel
    async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
    
    /// Start listening for incoming messages (long-running)
    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
    
    /// Check if channel is healthy
    async fn health_check(&self) -> bool {
        true
    }
    
    /// Signal that the bot is processing a response
    async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
        Ok(())
    }
    
    /// Stop any active typing indicator
    async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
        Ok(())
    }
    
    /// Whether this channel supports progressive message updates
    fn supports_draft_updates(&self) -> bool {
        false
    }
    
    /// Send an initial draft message
    async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
        Ok(None)
    }
    
    /// Update a previously sent draft message
    async fn update_draft(
        &self,
        _recipient: &str,
        _message_id: &str,
        _text: &str,
    ) -> anyhow::Result<()> {
        Ok(())
    }
    
    /// Finalize a draft with the complete response
    async fn finalize_draft(
        &self,
        _recipient: &str,
        _message_id: &str,
        _text: &str,
    ) -> anyhow::Result<()> {
        Ok(())
    }
}

Core Types

ChannelMessage (Incoming)

From src/channels/traits.rs:4-12:
pub struct ChannelMessage {
    pub id: String,
    pub sender: String,
    pub reply_target: String,  // recipient for replies
    pub content: String,
    pub channel: String,
    pub timestamp: u64,
}

SendMessage (Outgoing)

From src/channels/traits.rs:15-44:
pub struct SendMessage {
    pub content: String,
    pub recipient: String,
    pub subject: Option<String>,  // for email channels
}

impl SendMessage {
    pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
        Self {
            content: content.into(),
            recipient: recipient.into(),
            subject: None,
        }
    }
    
    pub fn with_subject(
        content: impl Into<String>,
        recipient: impl Into<String>,
        subject: impl Into<String>,
    ) -> Self {
        Self {
            content: content.into(),
            recipient: recipient.into(),
            subject: Some(subject.into()),
        }
    }
}

Implementing a Channel

Minimal example from examples/custom_channel.rs:29-119:
use async_trait::async_trait;
use tokio::sync::mpsc;

pub struct TelegramChannel {
    bot_token: String,
    allowed_users: Vec<String>,
    client: reqwest::Client,
}

impl TelegramChannel {
    pub fn new(bot_token: &str, allowed_users: Vec<String>) -> Self {
        Self {
            bot_token: bot_token.to_string(),
            allowed_users,
            client: reqwest::Client::new(),
        }
    }
    
    fn api_url(&self, method: &str) -> String {
        format!("https://api.telegram.org/bot{}/{}", self.bot_token, method)
    }
}

#[async_trait]
impl Channel for TelegramChannel {
    fn name(&self) -> &str {
        "telegram"
    }
    
    async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
        self.client
            .post(self.api_url("sendMessage"))
            .json(&serde_json::json!({
                "chat_id": message.recipient,
                "text": message.content,
                "parse_mode": "Markdown",
            }))
            .send()
            .await?;
        Ok(())
    }
    
    async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
        let mut offset: i64 = 0;
        
        loop {
            let resp = self.client
                .get(self.api_url("getUpdates"))
                .query(&[("offset", offset.to_string()), ("timeout", "30".into())])
                .send()
                .await?
                .json::<serde_json::Value>()
                .await?;
            
            if let Some(updates) = resp["result"].as_array() {
                for update in updates {
                    if let Some(msg) = update.get("message") {
                        let sender = msg["from"]["username"]
                            .as_str()
                            .unwrap_or("unknown")
                            .to_string();
                        
                        // Check allowlist
                        if !self.allowed_users.is_empty() 
                            && !self.allowed_users.contains(&sender) {
                            continue;
                        }
                        
                        let channel_msg = ChannelMessage {
                            id: msg["message_id"].to_string(),
                            sender: sender.clone(),
                            reply_target: msg["chat"]["id"].to_string(),
                            content: msg["text"].as_str().unwrap_or("").to_string(),
                            channel: "telegram".into(),
                            timestamp: msg["date"].as_u64().unwrap_or(0),
                        };
                        
                        if tx.send(channel_msg).await.is_err() {
                            return Ok(());
                        }
                    }
                    offset = update["update_id"].as_i64().unwrap_or(offset) + 1;
                }
            }
        }
    }
    
    async fn health_check(&self) -> bool {
        self.client
            .get(self.api_url("getMe"))
            .send()
            .await
            .map(|r| r.status().is_success())
            .unwrap_or(false)
    }
}

Draft Updates (Progressive Streaming)

Channels that support editing messages can implement draft updates:
impl Channel for TelegramChannel {
    fn supports_draft_updates(&self) -> bool {
        true
    }
    
    async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
        let resp = self.client
            .post(self.api_url("sendMessage"))
            .json(&serde_json::json!({
                "chat_id": message.recipient,
                "text": "...",  // placeholder
            }))
            .send()
            .await?
            .json::<serde_json::Value>()
            .await?;
        
        let message_id = resp["result"]["message_id"].to_string();
        Ok(Some(message_id))
    }
    
    async fn update_draft(
        &self,
        recipient: &str,
        message_id: &str,
        text: &str,
    ) -> anyhow::Result<()> {
        self.client
            .post(self.api_url("editMessageText"))
            .json(&serde_json::json!({
                "chat_id": recipient,
                "message_id": message_id,
                "text": text,
            }))
            .send()
            .await?;
        Ok(())
    }
    
    async fn finalize_draft(
        &self,
        recipient: &str,
        message_id: &str,
        text: &str,
    ) -> anyhow::Result<()> {
        // Apply final formatting (Markdown)
        self.client
            .post(self.api_url("editMessageText"))
            .json(&serde_json::json!({
                "chat_id": recipient,
                "message_id": message_id,
                "text": text,
                "parse_mode": "Markdown",
            }))
            .send()
            .await?;
        Ok(())
    }
}

Registration

Register your channel in src/channels/mod.rs:
pub async fn create_channel(config: &ChannelConfig) -> Result<Box<dyn Channel>> {
    match config.channel_type.as_str() {
        "telegram" => Ok(Box::new(telegram::TelegramChannel::new(config))),
        "discord" => Ok(Box::new(discord::DiscordChannel::new(config))),
        "slack" => Ok(Box::new(slack::SlackChannel::new(config))),
        _ => Err(anyhow::anyhow!("Unknown channel type: {}", config.channel_type)),
    }
}

Built-in Channels

Corvus ships with:
  • Telegram (Bot API, long polling)
  • Discord (Gateway + REST)
  • Slack (Events API + Web API)
  • iMessage (AppleScript bridge, macOS only)
  • Matrix (Matrix protocol)
  • WhatsApp (Meta Cloud API, webhooks)
  • IRC (Direct socket connection)
  • Email (SMTP/IMAP)
  • Webhook (HTTP POST endpoint)
  • CLI (stdin/stdout)
See Channels Guide for details.

Security: Allowlists

All channels implement sender validation:
if !self.allowed_users.is_empty() && !self.allowed_users.contains(&sender) {
    tracing::warn!("Ignoring message from unauthorized user: {}", sender);
    continue;
}
From config:
[channels_config.telegram]
bot_token = "..."
allowed_users = ["alice", "bob"]  # or ["*"] for all

Best Practices

Implement health_check() to enable proactive monitoring
Use start_typing() for long-running responses to improve UX
Always validate senders against allowlists to prevent unauthorized access