Channel Trait
TheChannel trait defines the interface for all messaging platforms in Corvus. Every channel (Telegram, Discord, Slack, etc.) implements this trait.
Source: src/channels/traits.rs:47-103
Trait Definition
use async_trait::async_trait;
use tokio::sync::mpsc;
#[async_trait]
pub trait Channel: Send + Sync {
/// Human-readable channel name
fn name(&self) -> &str;
/// Send a message through this channel
async fn send(&self, message: &SendMessage) -> anyhow::Result<()>;
/// Start listening for incoming messages (long-running)
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()>;
/// Check if channel is healthy
async fn health_check(&self) -> bool {
true
}
/// Signal that the bot is processing a response
async fn start_typing(&self, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
/// Stop any active typing indicator
async fn stop_typing(&self, _recipient: &str) -> anyhow::Result<()> {
Ok(())
}
/// Whether this channel supports progressive message updates
fn supports_draft_updates(&self) -> bool {
false
}
/// Send an initial draft message
async fn send_draft(&self, _message: &SendMessage) -> anyhow::Result<Option<String>> {
Ok(None)
}
/// Update a previously sent draft message
async fn update_draft(
&self,
_recipient: &str,
_message_id: &str,
_text: &str,
) -> anyhow::Result<()> {
Ok(())
}
/// Finalize a draft with the complete response
async fn finalize_draft(
&self,
_recipient: &str,
_message_id: &str,
_text: &str,
) -> anyhow::Result<()> {
Ok(())
}
}
Core Types
ChannelMessage (Incoming)
Fromsrc/channels/traits.rs:4-12:
pub struct ChannelMessage {
pub id: String,
pub sender: String,
pub reply_target: String, // recipient for replies
pub content: String,
pub channel: String,
pub timestamp: u64,
}
SendMessage (Outgoing)
Fromsrc/channels/traits.rs:15-44:
pub struct SendMessage {
pub content: String,
pub recipient: String,
pub subject: Option<String>, // for email channels
}
impl SendMessage {
pub fn new(content: impl Into<String>, recipient: impl Into<String>) -> Self {
Self {
content: content.into(),
recipient: recipient.into(),
subject: None,
}
}
pub fn with_subject(
content: impl Into<String>,
recipient: impl Into<String>,
subject: impl Into<String>,
) -> Self {
Self {
content: content.into(),
recipient: recipient.into(),
subject: Some(subject.into()),
}
}
}
Implementing a Channel
Minimal example fromexamples/custom_channel.rs:29-119:
use async_trait::async_trait;
use tokio::sync::mpsc;
pub struct TelegramChannel {
bot_token: String,
allowed_users: Vec<String>,
client: reqwest::Client,
}
impl TelegramChannel {
pub fn new(bot_token: &str, allowed_users: Vec<String>) -> Self {
Self {
bot_token: bot_token.to_string(),
allowed_users,
client: reqwest::Client::new(),
}
}
fn api_url(&self, method: &str) -> String {
format!("https://api.telegram.org/bot{}/{}", self.bot_token, method)
}
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn send(&self, message: &SendMessage) -> anyhow::Result<()> {
self.client
.post(self.api_url("sendMessage"))
.json(&serde_json::json!({
"chat_id": message.recipient,
"text": message.content,
"parse_mode": "Markdown",
}))
.send()
.await?;
Ok(())
}
async fn listen(&self, tx: mpsc::Sender<ChannelMessage>) -> anyhow::Result<()> {
let mut offset: i64 = 0;
loop {
let resp = self.client
.get(self.api_url("getUpdates"))
.query(&[("offset", offset.to_string()), ("timeout", "30".into())])
.send()
.await?
.json::<serde_json::Value>()
.await?;
if let Some(updates) = resp["result"].as_array() {
for update in updates {
if let Some(msg) = update.get("message") {
let sender = msg["from"]["username"]
.as_str()
.unwrap_or("unknown")
.to_string();
// Check allowlist
if !self.allowed_users.is_empty()
&& !self.allowed_users.contains(&sender) {
continue;
}
let channel_msg = ChannelMessage {
id: msg["message_id"].to_string(),
sender: sender.clone(),
reply_target: msg["chat"]["id"].to_string(),
content: msg["text"].as_str().unwrap_or("").to_string(),
channel: "telegram".into(),
timestamp: msg["date"].as_u64().unwrap_or(0),
};
if tx.send(channel_msg).await.is_err() {
return Ok(());
}
}
offset = update["update_id"].as_i64().unwrap_or(offset) + 1;
}
}
}
}
async fn health_check(&self) -> bool {
self.client
.get(self.api_url("getMe"))
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false)
}
}
Draft Updates (Progressive Streaming)
Channels that support editing messages can implement draft updates:impl Channel for TelegramChannel {
fn supports_draft_updates(&self) -> bool {
true
}
async fn send_draft(&self, message: &SendMessage) -> anyhow::Result<Option<String>> {
let resp = self.client
.post(self.api_url("sendMessage"))
.json(&serde_json::json!({
"chat_id": message.recipient,
"text": "...", // placeholder
}))
.send()
.await?
.json::<serde_json::Value>()
.await?;
let message_id = resp["result"]["message_id"].to_string();
Ok(Some(message_id))
}
async fn update_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
self.client
.post(self.api_url("editMessageText"))
.json(&serde_json::json!({
"chat_id": recipient,
"message_id": message_id,
"text": text,
}))
.send()
.await?;
Ok(())
}
async fn finalize_draft(
&self,
recipient: &str,
message_id: &str,
text: &str,
) -> anyhow::Result<()> {
// Apply final formatting (Markdown)
self.client
.post(self.api_url("editMessageText"))
.json(&serde_json::json!({
"chat_id": recipient,
"message_id": message_id,
"text": text,
"parse_mode": "Markdown",
}))
.send()
.await?;
Ok(())
}
}
Registration
Register your channel insrc/channels/mod.rs:
pub async fn create_channel(config: &ChannelConfig) -> Result<Box<dyn Channel>> {
match config.channel_type.as_str() {
"telegram" => Ok(Box::new(telegram::TelegramChannel::new(config))),
"discord" => Ok(Box::new(discord::DiscordChannel::new(config))),
"slack" => Ok(Box::new(slack::SlackChannel::new(config))),
_ => Err(anyhow::anyhow!("Unknown channel type: {}", config.channel_type)),
}
}
Built-in Channels
Corvus ships with:- Telegram (Bot API, long polling)
- Discord (Gateway + REST)
- Slack (Events API + Web API)
- iMessage (AppleScript bridge, macOS only)
- Matrix (Matrix protocol)
- WhatsApp (Meta Cloud API, webhooks)
- IRC (Direct socket connection)
- Email (SMTP/IMAP)
- Webhook (HTTP POST endpoint)
- CLI (stdin/stdout)
Security: Allowlists
All channels implement sender validation:if !self.allowed_users.is_empty() && !self.allowed_users.contains(&sender) {
tracing::warn!("Ignoring message from unauthorized user: {}", sender);
continue;
}
[channels_config.telegram]
bot_token = "..."
allowed_users = ["alice", "bob"] # or ["*"] for all
Best Practices
Implement
health_check() to enable proactive monitoringUse
start_typing() for long-running responses to improve UXAlways validate senders against allowlists to prevent unauthorized access