2026-01-22 13:49:49 -08:00
|
|
|
use reqwest::Url;
|
|
|
|
|
use chrono::{DateTime, Utc};
|
|
|
|
|
use crate::{
|
|
|
|
|
Result,
|
|
|
|
|
AdapterPool,
|
|
|
|
|
db::{
|
|
|
|
|
ChannelId,
|
|
|
|
|
Item,
|
2026-01-23 16:35:43 -08:00
|
|
|
FeedChannel,
|
|
|
|
|
feed_channel::UnparsedFeedChannel,
|
2026-01-22 13:49:49 -08:00
|
|
|
item::UnparsedItem,
|
|
|
|
|
},
|
|
|
|
|
fetch::FetchedRSSChannel,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
pub struct UnparsedChannel {
|
|
|
|
|
pub id: i64,
|
|
|
|
|
pub title: String,
|
|
|
|
|
pub link: String,
|
|
|
|
|
pub description: Option<String>,
|
|
|
|
|
pub last_fetched: Option<String>,
|
|
|
|
|
}
|
|
|
|
|
impl UnparsedChannel {
|
|
|
|
|
pub fn parse(self) -> Result<Channel> {
|
|
|
|
|
Ok(Channel {
|
|
|
|
|
id: ChannelId(self.id),
|
|
|
|
|
title: self.title,
|
|
|
|
|
link: Url::parse(&self.link)?,
|
|
|
|
|
description: self.description,
|
|
|
|
|
last_fetched: self.last_fetched.as_deref()
|
|
|
|
|
.map(DateTime::parse_from_rfc2822)
|
|
|
|
|
.transpose()?
|
|
|
|
|
.map(|dt| dt.with_timezone(&Utc)),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct Channel {
|
|
|
|
|
id: ChannelId,
|
|
|
|
|
title: String,
|
|
|
|
|
link: Url,
|
|
|
|
|
description: Option<String>,
|
|
|
|
|
last_fetched: Option<DateTime<Utc>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Channel {
|
|
|
|
|
pub fn id(&self) -> ChannelId { self.id }
|
|
|
|
|
pub fn title(&self) -> &str { &self.title }
|
|
|
|
|
pub fn link(&self) -> &Url { &self.link }
|
|
|
|
|
pub fn description(&self) -> Option<&str> { self.description.as_deref() }
|
|
|
|
|
pub fn last_fetched(&self) -> Option<DateTime<Utc>> { self.last_fetched }
|
|
|
|
|
|
|
|
|
|
pub async fn get_all(pool: &AdapterPool) -> Result<Vec<Self>> {
|
|
|
|
|
let channels: Result<Vec<Channel>> = sqlx::query_as!(
|
|
|
|
|
UnparsedChannel,
|
|
|
|
|
"SELECT id, title, link, description, last_fetched FROM channels"
|
|
|
|
|
).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect();
|
|
|
|
|
|
|
|
|
|
channels
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result<Self> {
|
|
|
|
|
let channel: Result<Self> = sqlx::query_as!(
|
|
|
|
|
UnparsedChannel,
|
|
|
|
|
"SELECT id, title, link, description, last_fetched
|
|
|
|
|
FROM channels
|
|
|
|
|
WHERE id = ?",
|
|
|
|
|
id.0
|
|
|
|
|
).fetch_one(&pool.0).await?.parse();
|
|
|
|
|
|
|
|
|
|
channel
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn get_or_create(
|
|
|
|
|
pool: &AdapterPool, link: Url
|
|
|
|
|
) -> Result<Self> {
|
|
|
|
|
let link_str = link.as_str();
|
|
|
|
|
|
|
|
|
|
let channel = sqlx::query_as!(
|
|
|
|
|
UnparsedChannel,
|
|
|
|
|
"INSERT INTO channels (title, link)
|
|
|
|
|
VALUES(?, ?)
|
|
|
|
|
ON CONFLICT(link) DO UPDATE SET link = link
|
|
|
|
|
RETURNING id, title, link, description, last_fetched",
|
|
|
|
|
link_str, link_str // We use the url as a placeholder title
|
|
|
|
|
).fetch_one(&pool.0).await?.parse();
|
|
|
|
|
|
|
|
|
|
channel
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO implement fetch skipping
|
|
|
|
|
pub fn should_skip_fetch(&self) -> bool { false }
|
|
|
|
|
|
|
|
|
|
pub async fn update_metadata(
|
|
|
|
|
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
|
|
|
|
|
) -> Result<()> {
|
|
|
|
|
let title = fetched.title();
|
|
|
|
|
let description = fetched.description();
|
|
|
|
|
let link = fetched.link().as_str();
|
|
|
|
|
let fetched_at = fetched.fetched_at().to_rfc2822();
|
|
|
|
|
sqlx::query!(
|
|
|
|
|
"UPDATE channels
|
|
|
|
|
SET title = ?, link = ?, description = ?,
|
|
|
|
|
last_fetched = ?
|
|
|
|
|
WHERE id = ?",
|
|
|
|
|
title, link, description, fetched_at,
|
|
|
|
|
self.id.0
|
|
|
|
|
).execute(&pool.0).await?;
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-22 15:55:31 -08:00
|
|
|
async fn get_feed_channels(
|
|
|
|
|
&self, pool: &AdapterPool
|
|
|
|
|
) -> Result<Vec<FeedChannel>> {
|
|
|
|
|
let feeds: Result<Vec<FeedChannel>> = sqlx::query_as!(
|
|
|
|
|
UnparsedFeedChannel,
|
|
|
|
|
"SELECT channel_id, feed_id
|
|
|
|
|
FROM feed_channels
|
|
|
|
|
WHERE channel_id = ?",
|
|
|
|
|
self.id.0
|
|
|
|
|
).fetch_all(&pool.0).await?.into_iter()
|
|
|
|
|
.map(UnparsedFeedChannel::parse).collect();
|
|
|
|
|
|
|
|
|
|
feeds
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-22 13:49:49 -08:00
|
|
|
pub async fn update_items(
|
|
|
|
|
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
|
|
|
|
|
) -> Result<()> {
|
2026-01-22 15:55:31 -08:00
|
|
|
let fetched_at = fetched.fetched_at();
|
|
|
|
|
|
|
|
|
|
let feed_channels = self.get_feed_channels(pool).await?;
|
2026-01-22 13:49:49 -08:00
|
|
|
|
2026-01-22 15:55:31 -08:00
|
|
|
for rss_item in fetched.items() {
|
|
|
|
|
let new_item = Item::get_or_create(pool, self.id, rss_item.guid()).await?;
|
|
|
|
|
new_item.update_content(pool, rss_item, &fetched_at).await?;
|
|
|
|
|
for feed_channel in &feed_channels {
|
|
|
|
|
feed_channel.add_item(pool, &new_item).await?;
|
|
|
|
|
}
|
2026-01-22 13:49:49 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn get_items(&self, pool: &AdapterPool) -> Result<Vec<Item>> {
|
|
|
|
|
let items: Result<Vec<Item>> = sqlx::query_as!(
|
|
|
|
|
UnparsedItem,
|
2026-01-22 15:55:31 -08:00
|
|
|
"SELECT id as `id!`, channel_id, fetched_at, title, description,
|
2026-01-22 13:49:49 -08:00
|
|
|
content
|
2026-01-22 15:55:31 -08:00
|
|
|
FROM items
|
2026-01-22 13:49:49 -08:00
|
|
|
WHERE channel_id = ?",
|
|
|
|
|
self.id.0
|
|
|
|
|
).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect();
|
|
|
|
|
|
|
|
|
|
items
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use crate::{Adapter, AdapterBuilder};
|
|
|
|
|
|
|
|
|
|
const FEED1: &str = "https://example.com/feed";
|
|
|
|
|
const FEED2: &str = "https://example2.com/feed";
|
|
|
|
|
|
|
|
|
|
async fn setup_adapter() -> Adapter {
|
|
|
|
|
AdapterBuilder::new()
|
|
|
|
|
.database_url("sqlite::memory:")
|
|
|
|
|
.create().await.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn create_channel() {
|
|
|
|
|
let adapter = setup_adapter().await;
|
|
|
|
|
let pool = adapter.get_pool();
|
|
|
|
|
let url_feed = Url::parse(FEED1).unwrap();
|
|
|
|
|
|
|
|
|
|
let channel = Channel::get_or_create(pool, url_feed).await.unwrap();
|
|
|
|
|
|
|
|
|
|
assert!(channel.id().0 > 0);
|
|
|
|
|
assert_eq!(channel.link().as_str(), FEED1);
|
|
|
|
|
assert!(channel.title().len() > 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn create_duplicate_returns_existing() {
|
|
|
|
|
let adapter = setup_adapter().await;
|
|
|
|
|
let pool = adapter.get_pool();
|
|
|
|
|
let url_feed = Url::parse(FEED1).unwrap();
|
|
|
|
|
|
|
|
|
|
let channel1 = Channel::get_or_create(pool, url_feed.clone()).await.unwrap();
|
|
|
|
|
let channel2 = Channel::get_or_create(pool, url_feed).await.unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
|
i64::from(channel1.id()),
|
|
|
|
|
i64::from(channel2.id())
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::test]
|
|
|
|
|
async fn get_all_channels() {
|
|
|
|
|
let adapter = setup_adapter().await;
|
|
|
|
|
let pool = adapter.get_pool();
|
|
|
|
|
let url_feed1 = Url::parse(FEED1).unwrap();
|
|
|
|
|
let url_feed2 = Url::parse(FEED2).unwrap();
|
|
|
|
|
|
|
|
|
|
Channel::get_or_create(pool, url_feed1).await.unwrap();
|
|
|
|
|
Channel::get_or_create(pool, url_feed2).await.unwrap();
|
|
|
|
|
|
|
|
|
|
let channels = Channel::get_all(pool).await.unwrap();
|
|
|
|
|
|
|
|
|
|
assert_eq!(channels.len(), 2);
|
|
|
|
|
}
|
|
|
|
|
}
|