use reqwest::Url; use chrono::{DateTime, Utc}; use crate::{ Result, AdapterPool, db::{ ChannelId, Item, FeedChannel, feed_channel::UnparsedFeedChannel, item::UnparsedItem, }, fetch::FetchedRSSChannel, }; pub struct UnparsedChannel { pub id: i64, pub title: String, pub link: String, pub description: Option, pub last_fetched: Option, } impl UnparsedChannel { pub fn parse(self) -> Result { Ok(Channel { id: ChannelId(self.id), title: self.title, link: Url::parse(&self.link)?, description: self.description, last_fetched: self.last_fetched.as_deref() .map(DateTime::parse_from_rfc2822) .transpose()? .map(|dt| dt.with_timezone(&Utc)), }) } } pub struct Channel { id: ChannelId, title: String, link: Url, description: Option, last_fetched: Option>, } impl Channel { pub fn id(&self) -> ChannelId { self.id } pub fn title(&self) -> &str { &self.title } pub fn link(&self) -> &Url { &self.link } pub fn description(&self) -> Option<&str> { self.description.as_deref() } pub fn last_fetched(&self) -> Option> { self.last_fetched } pub async fn get_all(pool: &AdapterPool) -> Result> { let channels: Result> = sqlx::query_as!( UnparsedChannel, "SELECT id, title, link, description, last_fetched FROM channels" ).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect(); channels } pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result { let channel: Result = sqlx::query_as!( UnparsedChannel, "SELECT id, title, link, description, last_fetched FROM channels WHERE id = ?", id.0 ).fetch_one(&pool.0).await?.parse(); channel } pub async fn get_or_create( pool: &AdapterPool, link: Url ) -> Result { let link_str = link.as_str(); let channel = sqlx::query_as!( UnparsedChannel, "INSERT INTO channels (title, link) VALUES(?, ?) ON CONFLICT(link) DO UPDATE SET link = link RETURNING id, title, link, description, last_fetched", link_str, link_str // We use the url as a placeholder title ).fetch_one(&pool.0).await?.parse(); channel } // TODO implement fetch skipping pub fn should_skip_fetch(&self) -> bool { false } pub async fn update_metadata( &self, pool: &AdapterPool, fetched: FetchedRSSChannel ) -> Result<()> { let title = fetched.title(); let description = fetched.description(); let link = fetched.link().as_str(); let fetched_at = fetched.fetched_at().to_rfc2822(); sqlx::query!( "UPDATE channels SET title = ?, link = ?, description = ?, last_fetched = ? WHERE id = ?", title, link, description, fetched_at, self.id.0 ).execute(&pool.0).await?; Ok(()) } async fn get_feed_channels( &self, pool: &AdapterPool ) -> Result> { let feeds: Result> = sqlx::query_as!( UnparsedFeedChannel, "SELECT channel_id, feed_id FROM feed_channels WHERE channel_id = ?", self.id.0 ).fetch_all(&pool.0).await?.into_iter() .map(UnparsedFeedChannel::parse).collect(); feeds } pub async fn update_items( &self, pool: &AdapterPool, fetched: FetchedRSSChannel ) -> Result<()> { let fetched_at = fetched.fetched_at(); let feed_channels = self.get_feed_channels(pool).await?; for rss_item in fetched.items() { let new_item = Item::get_or_create(pool, self.id, rss_item.guid()).await?; new_item.update_content(pool, rss_item, &fetched_at).await?; for feed_channel in &feed_channels { feed_channel.add_item(pool, &new_item).await?; } } Ok(()) } pub async fn get_items(&self, pool: &AdapterPool) -> Result> { let items: Result> = sqlx::query_as!( UnparsedItem, "SELECT id as `id!`, channel_id, fetched_at, title, description, content FROM items WHERE channel_id = ?", self.id.0 ).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect(); items } } #[cfg(test)] mod tests { use super::*; use crate::{ db::{ Feed, User, }, test_utils::{ FEED1, FEED2, CHANNEL_TITLE, CHANNEL_DESC, USERNAME, FEED_TITLE, FEED_TITLE2, ITEM_GUID, ITEM_GUID2, setup_adapter, setup_channel, }, }; use chrono::TimeZone; #[test] fn parse_unparsed_item() { const CHANNEL_ID: i64 = 1; let date: DateTime = Utc.with_ymd_and_hms(2020,1,1,0,0,0).unwrap(); let raw_channel = UnparsedChannel { id: CHANNEL_ID, title: CHANNEL_TITLE.to_string(), link: FEED1.to_string(), description: Some(CHANNEL_DESC.to_string()), last_fetched: Some(date.to_rfc2822()), }; let channel = raw_channel.parse().unwrap(); assert_eq!(channel.id.0, CHANNEL_ID); assert_eq!(channel.title, CHANNEL_TITLE); assert_eq!(channel.link.as_str(), FEED1); assert_eq!(channel.description, Some(CHANNEL_DESC.to_string())); assert_eq!(channel.last_fetched, Some(date)); } #[tokio::test] async fn get_all() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let url1 = Url::parse(FEED1).unwrap(); let url2 = Url::parse(FEED2).unwrap(); Channel::get_or_create(pool, url1).await.unwrap(); Channel::get_or_create(pool, url2).await.unwrap(); let channels = Channel::get_all(pool).await.unwrap(); assert_eq!(channels.len(), 2); } #[tokio::test] async fn get() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let channel_a = setup_channel(pool).await; let channel_b = Channel::get(pool, channel_a.id()).await.unwrap(); assert_eq!(channel_a.id, channel_b.id); assert_eq!(channel_a.title, channel_b.title); assert_eq!(channel_a.link, channel_b.link); assert_eq!(channel_a.last_fetched, channel_b.last_fetched); assert_eq!(channel_a.description, channel_b.description); } #[tokio::test] async fn create() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let url_feed = Url::parse(FEED1).unwrap(); let channel = Channel::get_or_create(pool, url_feed).await.unwrap(); assert!(channel.id().0 > 0); assert_eq!(channel.link().as_str(), FEED1); assert!(channel.title().len() > 0); } #[tokio::test] async fn create_duplicate_returns_existing() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let url_feed = Url::parse(FEED1).unwrap(); let channel1 = Channel::get_or_create(pool, url_feed.clone()).await.unwrap(); let channel2 = Channel::get_or_create(pool, url_feed).await.unwrap(); assert_eq!( i64::from(channel1.id()), i64::from(channel2.id()) ); } #[tokio::test] async fn get_all_channels() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let url_feed1 = Url::parse(FEED1).unwrap(); let url_feed2 = Url::parse(FEED2).unwrap(); Channel::get_or_create(pool, url_feed1).await.unwrap(); Channel::get_or_create(pool, url_feed2).await.unwrap(); let channels = Channel::get_all(pool).await.unwrap(); assert_eq!(channels.len(), 2); } #[tokio::test] async fn get_feed_channels() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let channel = setup_channel(pool).await; let user = User::create(pool, USERNAME).await.unwrap(); let feed1 = Feed::create(pool, user.id(), FEED_TITLE).await.unwrap(); let feed2 = Feed::create(pool, user.id(), FEED_TITLE2).await.unwrap(); feed1.add_channel(pool, channel.id).await.unwrap(); feed2.add_channel(pool, channel.id).await.unwrap(); let fc_list = channel.get_feed_channels(pool).await.unwrap(); assert_eq!(fc_list.len(), 2); } #[tokio::test] async fn get_channels() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let channel = setup_channel(pool).await; let user = User::create(pool, USERNAME).await.unwrap(); let feed1 = Feed::create(pool, user.id(), FEED_TITLE).await.unwrap(); let feed2 = Feed::create(pool, user.id(), FEED_TITLE2).await.unwrap(); feed1.add_channel(pool, channel.id).await.unwrap(); feed2.add_channel(pool, channel.id).await.unwrap(); let fc_list = channel.get_feed_channels(pool).await.unwrap(); assert_eq!(fc_list.len(), 2); } #[tokio::test] async fn get_items() { let adapter = setup_adapter().await; let pool = adapter.get_pool(); let channel = setup_channel(pool).await; Item::get_or_create(pool, channel.id(), ITEM_GUID).await.unwrap(); Item::get_or_create(pool, channel.id(), ITEM_GUID2).await.unwrap(); let items = channel.get_items(pool).await.unwrap(); assert_eq!(items.len(), 2); } }