diff --git a/koucha/src/channel.rs b/koucha/src/channel.rs deleted file mode 100644 index da61531..0000000 --- a/koucha/src/channel.rs +++ /dev/null @@ -1,337 +0,0 @@ -use reqwest::Url; -use chrono::{DateTime, Utc}; -use crate::{ - Result, - AdapterPool, - AdapterClient, - Item, - channel::fetch::FetchedRSSChannel, - item::UnparsedItem, -}; - -#[derive(Copy, Clone)] -pub struct ChannelId(i64); -impl From for i64 { fn from(id: ChannelId) -> Self { id.0 } } - -pub mod fetch; - -pub struct UnparsedChannel { - pub id: i64, - pub title: String, - pub link: String, - pub description: Option, - pub last_fetched: Option, -} -impl UnparsedChannel { - pub fn parse(self) -> Result { - Ok(Channel { - id: ChannelId(self.id), - title: self.title, - link: Url::parse(&self.link)?, - description: self.description, - last_fetched: self.last_fetched.as_deref() - .map(DateTime::parse_from_rfc2822) - .transpose()? - .map(|dt| dt.with_timezone(&Utc)), - }) - } -} - -pub struct Channel { - id: ChannelId, - title: String, - link: Url, - description: Option, - last_fetched: Option>, -} - -impl Channel { - pub fn id(&self) -> ChannelId { self.id } - pub fn title(&self) -> &str { &self.title } - pub fn link(&self) -> &Url { &self.link } - pub fn description(&self) -> Option<&str> { self.description.as_deref() } - - pub async fn get_all(pool: &AdapterPool) -> Result> { - let channels: Result> = sqlx::query_as!( - UnparsedChannel, - "SELECT id, title, link, description, last_fetched FROM channels" - ).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect(); - - channels - } - - pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result { - let channel: Result = sqlx::query_as!( - UnparsedChannel, - "SELECT id, title, link, description, last_fetched - FROM channels - WHERE id = ?", - id.0 - ).fetch_one(&pool.0).await?.parse(); - - channel - } - - pub async fn create( - pool: &AdapterPool, link: Url - ) -> Result { - let link_str = link.as_str(); - - if let Ok(existing_channel) = sqlx::query_as!( - UnparsedChannel, - "SELECT id as `id!`, title, link, description, last_fetched - FROM channels - WHERE link = ?", - link_str - ).fetch_one(&pool.0).await { - return existing_channel.parse(); - } - - let new_channel = sqlx::query_as!( - UnparsedChannel, - "INSERT INTO channels (title, link) - VALUES (?, ?) - RETURNING id, title, link, description, last_fetched", - link_str, link_str - ).fetch_one(&pool.0).await?.parse(); - - new_channel - } - - // TODO implement fetch skipping - fn should_skip_fetch(&self) -> bool { false } - - // TODO implement conditional fetching - pub async fn fetch_rss( - &self, client: &AdapterClient - ) -> Result> { - if self.should_skip_fetch() { - return Ok(None); - } - let bytestream = client.0.get(self.link.clone()) - .send().await? - .bytes().await?; - - let rss_channel = rss::Channel::read_from(&bytestream[..])?; - - Ok(Some(FetchedRSSChannel::parse(rss_channel)?)) - } - - pub async fn update_metadata( - &self, pool: &AdapterPool, fetched: FetchedRSSChannel - ) -> Result<()> { - let title = fetched.title(); - let description = fetched.description(); - let link = fetched.link().as_str(); - let fetched_at = fetched.fetched_at().to_rfc2822(); - sqlx::query!( - "UPDATE channels - SET title = ?, link = ?, description = ?, - last_fetched = ? - WHERE id = ?", - title, link, description, fetched_at, - self.id.0 - ).execute(&pool.0).await?; - - Ok(()) - } - - pub async fn update_items( - &self, pool: &AdapterPool, fetched: FetchedRSSChannel - ) -> Result<()> { - let fetched_at = fetched.fetched_at().to_rfc2822(); - - for item in fetched.items() { - let guid = item.guid(); - let title = item.title(); - let description = item.description(); - let content = item.content(); - sqlx::query!( - "INSERT OR IGNORE INTO items - (channel_id, guid, fetched_at, title, description, content) - VALUES (?, ?, ?, ?, ?, ?)", - self.id.0, guid, fetched_at, title, description, content - ) - .execute(&pool.0) - .await?; - } - - Ok(()) - } - - pub async fn get_items(&self, pool: &AdapterPool) -> Result> { - let items: Result> = sqlx::query_as!( - UnparsedItem, - "SELECT id as `id!` FROM items WHERE channel_id = ?", - self.id.0 - ).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect(); - - items - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{Adapter, AdapterBuilder}; - use rss::{ - Guid as RSSGuid, - Item as RSSItem, - ItemBuilder as RSSItemBuilder, - Channel as RSSChannel, - ChannelBuilder as RSSChannelBuilder, - }; - - const ITEM_TITLE: &str = "My Item"; - const ITEM_GUID1: &str = "https://mycontent.com/blog/1"; - const ITEM_GUID2: &str = "something-else!"; - const ITEM_DESC: &str = "A test item"; - const ITEM_CONT: &str = "some rss content baby"; - const CHAN_TITLE: &str = "My Feed"; - const CHAN_DESC: &str = "A test feed"; - const FEED1: &str = "https://example.com/feed"; - const FEED2: &str = "https://example2.com/feed"; - - async fn setup_adapter() -> Adapter { - AdapterBuilder::new() - .database_url("sqlite::memory:") - .create().await.unwrap() - } - - #[tokio::test] - async fn create_channel() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed = Url::parse(FEED1).unwrap(); - - let channel = Channel::create(pool, url_feed).await.unwrap(); - - assert!(channel.id().0 > 0); - assert_eq!(channel.link().as_str(), FEED1); - assert!(channel.title().len() > 0); - } - - #[tokio::test] - async fn create_duplicate_returns_existing() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed = Url::parse(FEED1).unwrap(); - - let channel1 = Channel::create(pool, url_feed.clone()).await.unwrap(); - let channel2 = Channel::create(pool, url_feed).await.unwrap(); - - assert_eq!( - i64::from(channel1.id()), - i64::from(channel2.id()) - ); - } - - #[tokio::test] - async fn get_all_channels() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed1 = Url::parse(FEED1).unwrap(); - let url_feed2 = Url::parse(FEED2).unwrap(); - - Channel::create(pool, url_feed1).await.unwrap(); - Channel::create(pool, url_feed2).await.unwrap(); - - let channels = Channel::get_all(pool).await.unwrap(); - - assert_eq!(channels.len(), 2); - } - - #[tokio::test] - async fn update_metadata() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed = Url::parse(FEED1).unwrap(); - - let channel = Channel::create(pool, url_feed).await.unwrap(); - - let fake_rss: RSSChannel = RSSChannelBuilder::default() - .title(CHAN_TITLE) - .link(FEED2) - .description(CHAN_DESC) - .build(); - - let fetched = FetchedRSSChannel::parse(fake_rss).unwrap(); - - channel.update_metadata(pool, fetched).await.unwrap(); - - let updated = Channel::get(pool, channel.id()).await.unwrap(); - assert_eq!(updated.title(), CHAN_TITLE); - assert_eq!(updated.link().as_str(), FEED2); - assert_eq!(updated.description(), Some(CHAN_DESC)); - } - - #[tokio::test] - async fn update_items() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed = Url::parse(FEED1).unwrap(); - - let channel = Channel::create(pool, url_feed).await.unwrap(); - - let item1: RSSItem = RSSItemBuilder::default() - .title(ITEM_TITLE.to_string()) - .description(ITEM_DESC.to_string()) - .content(ITEM_CONT.to_string()) - .guid(RSSGuid { value: ITEM_GUID1.to_string(), permalink: false }) - .build(); - let item2: RSSItem = RSSItemBuilder::default() - .title(ITEM_TITLE.to_string()) - .description(ITEM_DESC.to_string()) - .content(ITEM_CONT.to_string()) - .guid(RSSGuid { value: ITEM_GUID2.to_string(), permalink: false }) - .build(); - - let fake_rss: RSSChannel = RSSChannelBuilder::default() - .title(CHAN_TITLE) - .link(FEED2) - .description(CHAN_DESC) - .item(item1) - .item(item2) - .build(); - - let fetched = FetchedRSSChannel::parse(fake_rss).unwrap(); - - channel.update_items(pool, fetched).await.unwrap(); - - let items = channel.get_items(pool).await.unwrap(); - assert_eq!(items.len(), 2); - } - - #[tokio::test] - async fn update_items_ignores_duplicates() { - let adapter = setup_adapter().await; - let pool = adapter.get_pool(); - let url_feed = Url::parse(FEED1).unwrap(); - - let channel = Channel::create(pool, url_feed).await.unwrap(); - - let item1: RSSItem = RSSItemBuilder::default() - .title(ITEM_TITLE.to_string()) - .description(ITEM_DESC.to_string()) - .content(ITEM_CONT.to_string()) - .guid(RSSGuid { value: ITEM_GUID1.to_string(), permalink: false }) - .build(); - - let fake_rss: RSSChannel = RSSChannelBuilder::default() - .title(CHAN_TITLE) - .link(FEED2) - .description(CHAN_DESC) - .item(item1) - .build(); - - let fetched = FetchedRSSChannel::parse(fake_rss.clone()).unwrap(); - - channel.update_items(pool, fetched).await.unwrap(); - let fetched = FetchedRSSChannel::parse(fake_rss).unwrap(); - - channel.update_items(pool, fetched).await.unwrap(); - - let items = channel.get_items(pool).await.unwrap(); - assert_eq!(items.len(), 1); - } -} diff --git a/koucha/src/db.rs b/koucha/src/db.rs new file mode 100644 index 0000000..f5e1372 --- /dev/null +++ b/koucha/src/db.rs @@ -0,0 +1,22 @@ +mod user; +pub use user::User; +mod feed; +pub use feed::Feed; +mod channel; +pub use channel::Channel; +mod item; +pub use item::Item; + + +macro_rules! define_id { + ($name:ident) => { + #[derive(Copy, Clone)] + pub struct $name(i64); + impl From<$name> for i64 { fn from(id: $name) -> Self { id.0 } } + }; +} + +define_id!(UserId); +define_id!(FeedId); +define_id!(ChannelId); +define_id!(ItemId); diff --git a/koucha/src/db/channel.rs b/koucha/src/db/channel.rs new file mode 100644 index 0000000..931eb3d --- /dev/null +++ b/koucha/src/db/channel.rs @@ -0,0 +1,204 @@ +use reqwest::Url; +use chrono::{DateTime, Utc}; +use crate::{ + Result, + AdapterPool, + db::{ + ChannelId, + Item, + item::UnparsedItem, + }, + fetch::FetchedRSSChannel, +}; + +pub struct UnparsedChannel { + pub id: i64, + pub title: String, + pub link: String, + pub description: Option, + pub last_fetched: Option, +} +impl UnparsedChannel { + pub fn parse(self) -> Result { + Ok(Channel { + id: ChannelId(self.id), + title: self.title, + link: Url::parse(&self.link)?, + description: self.description, + last_fetched: self.last_fetched.as_deref() + .map(DateTime::parse_from_rfc2822) + .transpose()? + .map(|dt| dt.with_timezone(&Utc)), + }) + } +} + +pub struct Channel { + id: ChannelId, + title: String, + link: Url, + description: Option, + last_fetched: Option>, +} + +impl Channel { + pub fn id(&self) -> ChannelId { self.id } + pub fn title(&self) -> &str { &self.title } + pub fn link(&self) -> &Url { &self.link } + pub fn description(&self) -> Option<&str> { self.description.as_deref() } + pub fn last_fetched(&self) -> Option> { self.last_fetched } + + pub async fn get_all(pool: &AdapterPool) -> Result> { + let channels: Result> = sqlx::query_as!( + UnparsedChannel, + "SELECT id, title, link, description, last_fetched FROM channels" + ).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect(); + + channels + } + + pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result { + let channel: Result = sqlx::query_as!( + UnparsedChannel, + "SELECT id, title, link, description, last_fetched + FROM channels + WHERE id = ?", + id.0 + ).fetch_one(&pool.0).await?.parse(); + + channel + } + + pub async fn get_or_create( + pool: &AdapterPool, link: Url + ) -> Result { + let link_str = link.as_str(); + + let channel = sqlx::query_as!( + UnparsedChannel, + "INSERT INTO channels (title, link) + VALUES(?, ?) + ON CONFLICT(link) DO UPDATE SET link = link + RETURNING id, title, link, description, last_fetched", + link_str, link_str // We use the url as a placeholder title + ).fetch_one(&pool.0).await?.parse(); + + channel + } + + // TODO implement fetch skipping + pub fn should_skip_fetch(&self) -> bool { false } + + pub async fn update_metadata( + &self, pool: &AdapterPool, fetched: FetchedRSSChannel + ) -> Result<()> { + let title = fetched.title(); + let description = fetched.description(); + let link = fetched.link().as_str(); + let fetched_at = fetched.fetched_at().to_rfc2822(); + sqlx::query!( + "UPDATE channels + SET title = ?, link = ?, description = ?, + last_fetched = ? + WHERE id = ?", + title, link, description, fetched_at, + self.id.0 + ).execute(&pool.0).await?; + + Ok(()) + } + + pub async fn update_items( + &self, pool: &AdapterPool, fetched: FetchedRSSChannel + ) -> Result<()> { + let fetched_at = fetched.fetched_at().to_rfc2822(); + + for item in fetched.items() { + let guid = item.guid(); + let title = item.title(); + let description = item.description(); + let content = item.content(); + sqlx::query!( + "INSERT OR IGNORE INTO items + (channel_id, guid, fetched_at, title, description, content) + VALUES (?, ?, ?, ?, ?, ?)", + self.id.0, guid, fetched_at, title, description, content + ) + .execute(&pool.0) + .await?; + } + + Ok(()) + } + + pub async fn get_items(&self, pool: &AdapterPool) -> Result> { + let items: Result> = sqlx::query_as!( + UnparsedItem, + "SELECT id as `id!`, channel_id, guid, fetched_at, title, description, + content + FROM items + WHERE channel_id = ?", + self.id.0 + ).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect(); + + items + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{Adapter, AdapterBuilder}; + + const FEED1: &str = "https://example.com/feed"; + const FEED2: &str = "https://example2.com/feed"; + + async fn setup_adapter() -> Adapter { + AdapterBuilder::new() + .database_url("sqlite::memory:") + .create().await.unwrap() + } + + #[tokio::test] + async fn create_channel() { + let adapter = setup_adapter().await; + let pool = adapter.get_pool(); + let url_feed = Url::parse(FEED1).unwrap(); + + let channel = Channel::get_or_create(pool, url_feed).await.unwrap(); + + assert!(channel.id().0 > 0); + assert_eq!(channel.link().as_str(), FEED1); + assert!(channel.title().len() > 0); + } + + #[tokio::test] + async fn create_duplicate_returns_existing() { + let adapter = setup_adapter().await; + let pool = adapter.get_pool(); + let url_feed = Url::parse(FEED1).unwrap(); + + let channel1 = Channel::get_or_create(pool, url_feed.clone()).await.unwrap(); + let channel2 = Channel::get_or_create(pool, url_feed).await.unwrap(); + + assert_eq!( + i64::from(channel1.id()), + i64::from(channel2.id()) + ); + } + + #[tokio::test] + async fn get_all_channels() { + let adapter = setup_adapter().await; + let pool = adapter.get_pool(); + let url_feed1 = Url::parse(FEED1).unwrap(); + let url_feed2 = Url::parse(FEED2).unwrap(); + + Channel::get_or_create(pool, url_feed1).await.unwrap(); + Channel::get_or_create(pool, url_feed2).await.unwrap(); + + let channels = Channel::get_all(pool).await.unwrap(); + + assert_eq!(channels.len(), 2); + } +} diff --git a/koucha/src/feed.rs b/koucha/src/db/feed.rs similarity index 90% rename from koucha/src/feed.rs rename to koucha/src/db/feed.rs index 5e8092d..1d081e6 100644 --- a/koucha/src/feed.rs +++ b/koucha/src/db/feed.rs @@ -1,20 +1,17 @@ use crate::{ - Result, AdapterPool, - Item, - item::UnparsedItem, - Channel, - channel::{ - UnparsedChannel, + Result, + db::{ + Channel, ChannelId, + FeedId, + Item, + UserId, + channel::UnparsedChannel, + item::UnparsedItem, }, - user::UserId, }; -#[derive(Copy, Clone)] -pub struct FeedId(i64); -impl From for i64 { fn from(id: FeedId) -> Self { id.0 } } - pub struct UnparsedFeed { pub id: i64, pub title: String, @@ -93,7 +90,10 @@ impl Feed { ) -> Result> { let items: Result> = sqlx::query_as!( UnparsedItem, - "SELECT item_id as id FROM feed_items + "SELECT i.id as `id!`, i.channel_id, i.guid, i.fetched_at, i.title, + i.description, i.content + FROM items i + JOIN feed_items fi on i.id = fi.item_id WHERE feed_id = ? AND archived = FALSE ORDER BY score DESC LIMIT ? OFFSET ?", @@ -123,7 +123,11 @@ impl Feed { #[cfg(test)] mod tests { use super::*; - use crate::{User, Adapter, AdapterBuilder}; + use crate::{ + Adapter, + AdapterBuilder, + db::User, + }; async fn setup_adapter() -> Adapter { AdapterBuilder::new() diff --git a/koucha/src/db/item.rs b/koucha/src/db/item.rs new file mode 100644 index 0000000..62fa919 --- /dev/null +++ b/koucha/src/db/item.rs @@ -0,0 +1,68 @@ +use crate::{ + Result, + AdapterPool, + db::{ + ChannelId, + ItemId, + } +}; +use chrono::{DateTime, Utc}; + +pub struct UnparsedItem { + pub id: i64, + pub channel_id: i64, + pub guid: String, + pub fetched_at: String, + + pub title: Option, + pub description: Option, + pub content: Option, +} + +impl UnparsedItem { + pub fn parse(self) -> Result { + Ok(Item { + id: ItemId(self.id), + channel_id: ChannelId(self.channel_id), + guid: self.guid, + fetched_at: DateTime::parse_from_rfc2822(&self.fetched_at)?.with_timezone(&Utc), + + title: self.title, + description: self.description, + content: self.content, + }) + } +} + +pub struct Item { + id: ItemId, + channel_id: ChannelId, + guid: String, + fetched_at: DateTime, + + title: Option, + description: Option, + content: Option, +} + +impl Item { + pub async fn get_or_create( + pool: &AdapterPool, from_channel: ChannelId, guid: &str, + fetched_at: DateTime + ) -> Result { + let int_channel_id = i64::from(from_channel); + let last_fetched = fetched_at.to_rfc2822(); + + let item = sqlx::query_as!( + UnparsedItem, + "INSERT INTO items (channel_id, guid, fetched_at) + VALUES(?, ?, ?) + ON CONFLICT(id) DO UPDATE SET id = id + RETURNING id as `id!`, channel_id, guid, fetched_at, title, description, + content", + int_channel_id, guid, last_fetched + ).fetch_one(&pool.0).await?.parse(); + + item + } +} diff --git a/koucha/src/user.rs b/koucha/src/db/user.rs similarity index 96% rename from koucha/src/user.rs rename to koucha/src/db/user.rs index c8ea50b..05b7a4b 100644 --- a/koucha/src/user.rs +++ b/koucha/src/db/user.rs @@ -1,14 +1,13 @@ use crate::{ Result, AdapterPool, - Feed, - feed::UnparsedFeed, + db::{ + UserId, + Feed, + feed::UnparsedFeed, + }, }; -#[derive(Copy, Clone)] -pub struct UserId(i64); -impl From for i64 { fn from(id: UserId) -> Self { id.0 } } - pub struct UnparsedUser { pub id: i64, pub name: String, diff --git a/koucha/src/channel/fetch.rs b/koucha/src/fetch.rs similarity index 78% rename from koucha/src/channel/fetch.rs rename to koucha/src/fetch.rs index b660cfe..b71b30c 100644 --- a/koucha/src/channel/fetch.rs +++ b/koucha/src/fetch.rs @@ -1,4 +1,8 @@ -use crate::Result; +use crate::{ + Result, + db::Channel, + AdapterClient, +}; use reqwest::Url; use chrono::{DateTime, Utc}; use std::hash::{Hash, Hasher}; @@ -53,7 +57,23 @@ impl FetchedRSSChannel { pub fn items(&self) -> &[FetchedRSSItem] { &self.items } pub fn fetched_at(&self) -> &DateTime { &self.fetched_at } - pub fn parse(rss: rss::Channel) -> Result { + pub async fn fetch_channel( + client: &AdapterClient, channel: Channel + ) -> Result> { + if channel.should_skip_fetch() { + return Ok(None); + } + + let bytestream = client.0.get(channel.link().clone()) + .send().await? + .bytes().await?; + + let rss_channel = rss::Channel::read_from(&bytestream[..])?; + + Ok(Some(FetchedRSSChannel::parse(rss_channel)?)) + } + + fn parse(rss: rss::Channel) -> Result { Ok(FetchedRSSChannel { title: rss.title, link: Url::parse(&rss.link)?, diff --git a/koucha/src/item.rs b/koucha/src/item.rs deleted file mode 100644 index 4d6af9f..0000000 --- a/koucha/src/item.rs +++ /dev/null @@ -1,20 +0,0 @@ -use crate::Result; - -#[derive(Copy, Clone)] -pub struct ItemId(i64); -impl From for i64 { fn from(id: ItemId) -> Self { id.0 } } - -pub struct UnparsedItem { - pub id: i64, -} -impl UnparsedItem { - pub fn parse(self) -> Result { - Ok(Item { - id: ItemId(self.id), - }) - } -} - -pub struct Item { - id: ItemId, -} diff --git a/koucha/src/lib.rs b/koucha/src/lib.rs index 227939a..1e29303 100644 --- a/koucha/src/lib.rs +++ b/koucha/src/lib.rs @@ -2,27 +2,8 @@ use std::error::Error; type Result = std::result::Result>; -mod user; -pub use user::{ - User, - UserId -}; -mod feed; -pub use feed::{ - Feed, - FeedId, -}; -mod channel; -pub use channel::{ - Channel, - ChannelId, - fetch::FetchedRSSChannel, -}; -mod item; -pub use item::{ - Item, - ItemId, -}; +pub mod db; +pub mod fetch; pub struct AdapterPool(sqlx::SqlitePool); pub struct AdapterClient(reqwest::Client);