diff --git a/koucha/migrations/20260115003047_initial_schema.sql b/koucha/migrations/20260115003047_initial_schema.sql index 25122e7..63d3591 100644 --- a/koucha/migrations/20260115003047_initial_schema.sql +++ b/koucha/migrations/20260115003047_initial_schema.sql @@ -19,8 +19,8 @@ CREATE TABLE items ( id INTEGER PRIMARY KEY, channel_id INTEGER NOT NULL, guid TEXT NOT NULL, - fetched_at TEXT NOT NULL, + fetched_at TEXT, title TEXT, description TEXT, content TEXT, diff --git a/koucha/src/db/channel.rs b/koucha/src/db/channel.rs index 931eb3d..c837475 100644 --- a/koucha/src/db/channel.rs +++ b/koucha/src/db/channel.rs @@ -6,6 +6,7 @@ use crate::{ db::{ ChannelId, Item, + FeedId, item::UnparsedItem, }, fetch::FetchedRSSChannel, @@ -33,6 +34,40 @@ impl UnparsedChannel { } } +pub struct UnparsedFeedChannel { + pub channel_id: i64, + pub feed_id: i64, +} +impl UnparsedFeedChannel { + pub fn parse(self) -> Result { + Ok(FeedChannel { + channel_id: ChannelId(self.channel_id), + feed_id: FeedId(self.feed_id) + }) + } +} + +pub struct FeedChannel { + channel_id: ChannelId, + feed_id: FeedId, +} + +impl FeedChannel { + pub async fn add_item( + &self, pool: &AdapterPool, item: &Item + ) -> Result<()> { + let int_item_id = i64::from(item.id()); + let int_feed_id = i64::from(self.feed_id); + + sqlx::query!( + "INSERT OR IGNORE INTO feed_items (feed_id, item_id, score) + VALUES (?, ?, 5)", // TODO: Add in scoring featuress + int_feed_id, int_item_id + ).execute(&pool.0).await?; + Ok(()) + } +} + pub struct Channel { id: ChannelId, title: String, @@ -108,24 +143,34 @@ impl Channel { Ok(()) } + async fn get_feed_channels( + &self, pool: &AdapterPool + ) -> Result> { + let feeds: Result> = sqlx::query_as!( + UnparsedFeedChannel, + "SELECT channel_id, feed_id + FROM feed_channels + WHERE channel_id = ?", + self.id.0 + ).fetch_all(&pool.0).await?.into_iter() + .map(UnparsedFeedChannel::parse).collect(); + + feeds + } + pub async fn update_items( &self, pool: &AdapterPool, fetched: FetchedRSSChannel ) -> Result<()> { - let fetched_at = fetched.fetched_at().to_rfc2822(); + let fetched_at = fetched.fetched_at(); - for item in fetched.items() { - let guid = item.guid(); - let title = item.title(); - let description = item.description(); - let content = item.content(); - sqlx::query!( - "INSERT OR IGNORE INTO items - (channel_id, guid, fetched_at, title, description, content) - VALUES (?, ?, ?, ?, ?, ?)", - self.id.0, guid, fetched_at, title, description, content - ) - .execute(&pool.0) - .await?; + let feed_channels = self.get_feed_channels(pool).await?; + + for rss_item in fetched.items() { + let new_item = Item::get_or_create(pool, self.id, rss_item.guid()).await?; + new_item.update_content(pool, rss_item, &fetched_at).await?; + for feed_channel in &feed_channels { + feed_channel.add_item(pool, &new_item).await?; + } } Ok(()) @@ -134,9 +179,9 @@ impl Channel { pub async fn get_items(&self, pool: &AdapterPool) -> Result> { let items: Result> = sqlx::query_as!( UnparsedItem, - "SELECT id as `id!`, channel_id, guid, fetched_at, title, description, + "SELECT id as `id!`, channel_id, fetched_at, title, description, content - FROM items + FROM items WHERE channel_id = ?", self.id.0 ).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect(); diff --git a/koucha/src/db/feed.rs b/koucha/src/db/feed.rs index 1d081e6..bb4783d 100644 --- a/koucha/src/db/feed.rs +++ b/koucha/src/db/feed.rs @@ -90,8 +90,8 @@ impl Feed { ) -> Result> { let items: Result> = sqlx::query_as!( UnparsedItem, - "SELECT i.id as `id!`, i.channel_id, i.guid, i.fetched_at, i.title, - i.description, i.content + "SELECT i.id as `id!`, i.channel_id, i.fetched_at, i.title, i.description, + i.content FROM items i JOIN feed_items fi on i.id = fi.item_id WHERE feed_id = ? AND archived = FALSE diff --git a/koucha/src/db/item.rs b/koucha/src/db/item.rs index 62fa919..1cc7f6c 100644 --- a/koucha/src/db/item.rs +++ b/koucha/src/db/item.rs @@ -4,16 +4,16 @@ use crate::{ db::{ ChannelId, ItemId, - } + }, + fetch::FetchedRSSItem, }; use chrono::{DateTime, Utc}; pub struct UnparsedItem { pub id: i64, pub channel_id: i64, - pub guid: String, - pub fetched_at: String, - + pub fetched_at: Option, + pub title: Option, pub description: Option, pub content: Option, @@ -24,8 +24,10 @@ impl UnparsedItem { Ok(Item { id: ItemId(self.id), channel_id: ChannelId(self.channel_id), - guid: self.guid, - fetched_at: DateTime::parse_from_rfc2822(&self.fetched_at)?.with_timezone(&Utc), + fetched_at: self.fetched_at.as_deref() + .map(DateTime::parse_from_rfc2822) + .transpose()? + .map(|dt| dt.with_timezone(&Utc)), title: self.title, description: self.description, @@ -37,32 +39,55 @@ impl UnparsedItem { pub struct Item { id: ItemId, channel_id: ChannelId, - guid: String, - fetched_at: DateTime, + fetched_at: Option>, title: Option, description: Option, content: Option, } impl Item { + pub fn id(&self) -> ItemId { self.id } + pub fn channel(&self) -> ChannelId { self.channel_id } + pub fn title(&self) -> Option<&str> { self.title.as_deref() } + pub fn description(&self) -> Option<&str> { self.description.as_deref() } + pub fn content(&self) -> Option<&str> { self.content.as_deref() } + pub async fn get_or_create( - pool: &AdapterPool, from_channel: ChannelId, guid: &str, - fetched_at: DateTime + pool: &AdapterPool, from_channel: ChannelId, guid: &str ) -> Result { let int_channel_id = i64::from(from_channel); - let last_fetched = fetched_at.to_rfc2822(); let item = sqlx::query_as!( UnparsedItem, - "INSERT INTO items (channel_id, guid, fetched_at) - VALUES(?, ?, ?) + "INSERT INTO items (channel_id, guid) + VALUES(?, ?) ON CONFLICT(id) DO UPDATE SET id = id - RETURNING id as `id!`, channel_id, guid, fetched_at, title, description, + RETURNING id as `id!`, channel_id, fetched_at, title, description, content", - int_channel_id, guid, last_fetched + int_channel_id, guid ).fetch_one(&pool.0).await?.parse(); item } + + pub async fn update_content( + &self, pool: &AdapterPool, fetched: &FetchedRSSItem, fetched_at: &DateTime + ) -> Result<()> { + let title = fetched.title(); + let description = fetched.description(); + let content = fetched.content(); + let string_fetched_at = fetched_at.to_rfc2822(); + + sqlx::query!( + "UPDATE items + SET title = ?, description = ?, content = ?, + fetched_at = ? + WHERE id = ?", + title, description, content, string_fetched_at, + self.id.0 + ).execute(&pool.0).await?; + + Ok(()) + } }