diff --git a/koucha/migrations/20260115003047_initial_schema.sql b/koucha/migrations/20260115003047_initial_schema.sql index d437df3..25122e7 100644 --- a/koucha/migrations/20260115003047_initial_schema.sql +++ b/koucha/migrations/20260115003047_initial_schema.sql @@ -32,6 +32,7 @@ CREATE TABLE items ( CREATE TABLE feeds ( id INTEGER PRIMARY KEY, user_id INTEGER NOT NULL, + title TEXT NOT NULL, FOREIGN KEY (user_id) REFERENCES users(id) ); diff --git a/koucha/src/bin/server.rs b/koucha/src/bin/server.rs index d4a8d34..4a05eed 100644 --- a/koucha/src/bin/server.rs +++ b/koucha/src/bin/server.rs @@ -1,11 +1,14 @@ use std::error::Error; use reqwest::Client; -use koucha::fetch_channel; +use koucha::{ + AdapterOptions, +}; #[tokio::main] async fn main() -> Result<(), Box> { - let client = Client::new(); - let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?; + // let adapter = AdapterOptions::new().create().await?; + // + // let _channel = fetch_channel(&client, "https://lorem-rss.herokuapp.com/feed?unit=year").await?; Ok(()) } diff --git a/koucha/src/channel.rs b/koucha/src/channel.rs new file mode 100644 index 0000000..6d130ff --- /dev/null +++ b/koucha/src/channel.rs @@ -0,0 +1,166 @@ +use reqwest::{Url, Client}; +use sqlx::SqlitePool; +use chrono::{DateTime, Utc}; +use crate::{Result}; +use std::hash::{Hash, Hasher}; + +pub struct ChannelId(pub i64); +impl From for ChannelId { fn from(id: i64) -> Self { ChannelId(id) } } +impl From for i64 { fn from(id: ChannelId) -> Self { id.0 } } + + +struct FetchedRSSItem { + guid: String, + title: String, + description: String, + content: String, +} +impl FetchedRSSItem { + fn parse(item: rss::Item) -> Self { + FetchedRSSItem { + guid: Self::get_or_create_guid(&item), + title: item.title().unwrap_or("").to_string(), + description: item.description().unwrap_or("").to_string(), + content: item.content().unwrap_or("").to_string(), + } + } + + fn get_or_create_guid(item: &rss::Item) -> String { + if let Some(guid) = item.guid() { + return guid.value().to_string(); + } + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + item.link().unwrap_or("").hash(&mut hasher); + item.title().unwrap_or("").hash(&mut hasher); + item.description().unwrap_or("").hash(&mut hasher); + + format!("gen-{:x}", hasher.finish()) + } +} +pub struct FetchedRSSChannel { + title: String, + link: Url, + description: String, + + items: Vec, + + fetched_at: DateTime, +} +impl FetchedRSSChannel { + pub fn fetched_at(&self) -> &DateTime { + &self.fetched_at + } + + fn parse(rss: rss::Channel) -> Result { + Ok(FetchedRSSChannel { + title: rss.title, + link: Url::parse(&rss.link)?, + description: rss.description, + + items: rss.items.into_iter().map(FetchedRSSItem::parse).collect(), + + fetched_at: Utc::now(), + }) + } +} + +pub struct UnparsedChannel { + pub id: i64, + pub title: String, + pub link: String, + pub description: Option, + pub last_fetched: Option, +} +impl UnparsedChannel { + pub fn parse(self) -> Result { + Ok(Channel { + id: ChannelId(self.id), + title: self.title, + link: Url::parse(&self.link)?, + description: self.description, + last_fetched: self.last_fetched.as_deref() + .map(DateTime::parse_from_rfc2822) + .transpose()? + .map(|dt| dt.with_timezone(&Utc)), + }) + } +} + +pub struct Channel { + pub id: ChannelId, + pub title: String, + pub link: Url, + pub description: Option, + pub last_fetched: Option>, +} + +impl Channel { + + + pub async fn get_all(pool: &SqlitePool) -> Result> { + let channels: Result> = sqlx::query_as!( + UnparsedChannel, + "SELECT id, title, link, description, last_fetched FROM channels" + ) + .fetch_all(pool).await?.into_iter() + .map(UnparsedChannel::parse).collect(); + + channels + } + + // TODO implement fetch skipping + fn should_skip_fetch(&self) -> bool { false } + + // TODO implement conditional fetching + pub async fn fetch_rss( + client: &Client, channel: &Channel + ) -> Result> { + if channel.should_skip_fetch() { + return Ok(None); + } + let bytestream = client.get(channel.link.clone()) + .send().await? + .bytes().await?; + + let rss_channel = rss::Channel::read_from(&bytestream[..])?; + + Ok(Some(FetchedRSSChannel::parse(rss_channel)?)) + } + + pub async fn update_metadata( + pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel + ) -> Result<()> { + let link = fetched.link.as_str(); + let fetched_at = fetched.fetched_at.to_rfc2822(); + sqlx::query!( + "UPDATE channels + SET title = ?, link = ?, description = ?, + last_fetched = ? + WHERE id = ?", + fetched.title, link, fetched.description, fetched_at, + id.0 + ).execute(pool).await?; + + Ok(()) + } + + pub async fn update_items( + pool: &SqlitePool, id: ChannelId, fetched: FetchedRSSChannel + ) -> Result<()> { + let fetched_at = fetched.fetched_at.to_rfc2822(); + + for item in fetched.items { + sqlx::query!( + "INSERT OR IGNORE INTO items + (channel_id, guid, fetched_at, title, description, content) + VALUES (?, ?, ?, ?, ?, ?)", + id.0, item.guid, fetched_at, item.title, item.description, item.content + ) + .execute(pool) + .await?; + } + + Ok(()) + } +} diff --git a/koucha/src/feed.rs b/koucha/src/feed.rs new file mode 100644 index 0000000..8e04a0d --- /dev/null +++ b/koucha/src/feed.rs @@ -0,0 +1,96 @@ +use crate::{ + Result, + Item, + Channel, + channel::{ + UnparsedChannel, + ChannelId, + }, + user::UserId, +}; +use sqlx::SqlitePool; + +pub struct FeedId(pub i64); +impl From for FeedId { fn from(id: i64) -> Self { FeedId(id) } } +impl From for i64 { fn from(id: FeedId) -> Self { id.0 } } + +pub struct UnparsedFeed { + pub id: i64, + pub title: String, + pub user_id: i64 +} +impl UnparsedFeed { + pub fn parse(self) -> Result { + Ok(Feed { + id: FeedId(self.id), + title: self.title, + user_id: UserId(self.user_id), + }) + } +} + +pub struct Feed { + pub id: FeedId, + pub title: String, + pub user_id: UserId, +} + +impl Feed { + pub async fn create( + pool: &SqlitePool, id: UserId, name: &str + ) -> Result { + let new_feed = sqlx::query_as!( + UnparsedFeed, + "INSERT INTO feeds (user_id, title) + VALUES (?, ?) + RETURNING id as `id!`, user_id, title", + id.0, name + ).fetch_one(pool).await?.parse(); + + new_feed + } + + pub async fn add_channel( + pool: &SqlitePool, id: FeedId, channel_id: ChannelId + ) -> Result<()> { + sqlx::query!( + "INSERT INTO feed_channels (feed_id, channel_id) + VALUES (?, ?)", + id.0, channel_id.0 + ).execute(pool).await?; + + Ok(()) + } + + pub async fn get_items( + pool: &SqlitePool, id: FeedId, limit: u8, offset: u32 + ) -> Result> { + let items = sqlx::query_as!( + Item, + "SELECT item_id as id FROM feed_items + WHERE feed_id = ? AND archived = FALSE + ORDER BY score DESC + LIMIT ? OFFSET ?", + id.0, limit, offset + ).fetch_all(pool).await?; + + Ok(items) + } + + pub async fn get_channels( + pool: &SqlitePool, id: FeedId + ) -> Result> { + let channels: Result> = sqlx::query_as!( + UnparsedChannel, + "SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched + FROM channels c + JOIN feed_channels fc on c.id = fc.channel_id + WHERE fc.feed_id = ?", + id.0 + ).fetch_all(pool).await?.into_iter() + .map(UnparsedChannel::parse).collect(); + + channels + } +} + diff --git a/koucha/src/item.rs b/koucha/src/item.rs new file mode 100644 index 0000000..0d2a7ab --- /dev/null +++ b/koucha/src/item.rs @@ -0,0 +1,3 @@ +pub struct Item { + pub id: i64, +} diff --git a/koucha/src/lib.rs b/koucha/src/lib.rs index 961ad1d..d12c88e 100644 --- a/koucha/src/lib.rs +++ b/koucha/src/lib.rs @@ -1,15 +1,16 @@ -use std::{ - error::Error, - hash::{Hash, Hasher}, -}; -use reqwest::Url; -use chrono::{ - Utc, - DateTime, -}; +use std::error::Error; type Result = std::result::Result>; +mod user; +pub use user::User; +mod feed; +pub use feed::Feed; +mod channel; +pub use channel::Channel; +mod item; +pub use item::Item; + pub struct AdapterOptions { database_url: String, } @@ -41,187 +42,6 @@ pub struct Adapter { } impl Adapter { - pub async fn get_all_users(&self) -> Result> { - let users = sqlx::query_as!( - User, - "SELECT id, name FROM users" - ).fetch_all(&self.db).await?; - - Ok(users) - } - - // pub async fn update_channels(&self) -> Result<()> { - // - // } - // - // async fn get_all_channels(&self) -> Result> { - // let users = sqlx::query_as!( - // Channel, - // "SELECT id FROM channels" - // ).fetch_all(&self.db).await?; - // - // Ok(users) - // } - - fn get_pool(&self) -> &sqlx::SqlitePool { &self.db } - fn get_client(&self) -> &reqwest::client { &self.client } -} - -pub struct User { - id: i64, - name: String, -} - -impl User { - // async fn get_by_id(adapter: &Adapter, id: i64) -> Result { - // let user = sqlx::query!("SELECT name FROM users WHERE id = ?", id) - // .fetch_one(adapter.get_pool()).await?; - // - // Ok(Self { - // id: id, - // name: user.name, - // }) - // } - - pub async fn create(adapter: &Adapter, name: &str) -> Result { - let result = sqlx::query!("INSERT INTO users (name) VALUES (?)", name) - .execute(adapter.get_pool()).await?; - - Ok(Self { - id: result.last_insert_rowid(), - name: name.to_string() - }) - } - - pub async fn change_name( - &mut self, adapter: &Adapter, new_name: &str) -> Result<()> { - sqlx::query!( - "UPDATE users SET name = ? WHERE id = ?", - new_name, self.id - ).execute(adapter.get_pool()).await?; - - self.name = new_name.to_string(); - - Ok(()) - } - - pub async fn get_feeds(&self, adapter: &Adapter) -> Result> { - let feeds = sqlx::query_as!( - Feed, - "SELECT id FROM feeds WHERE user_id = ?", - self.id - ).fetch_all(adapter.get_pool()).await?; - - Ok(feeds) - } - - pub fn name(&self) -> &str { &self.name } -} - -pub struct Feed { - id: i64, -} - -impl Feed { - pub async fn get_items( - &self, adapter: &Adapter, limit: u8, offset: u32) -> Result> { - let items = sqlx::query_as!( - Item, - "SELECT item_id as id FROM feed_items - WHERE feed_id = ? AND archived = FALSE - ORDER BY score DESC - LIMIT ? OFFSET ?", - self.id, limit, offset - ).fetch_all(adapter.get_pool()).await?; - - Ok(items) - } - - pub async fn get_channels(&self, adapter: &Adapter) -> Result> { - let db_channels = sqlx::query!( - "SELECT c.id as `id!`, c.title, c.link, c.description, c.last_fetched - FROM channels c - JOIN feed_channels fc on c.id = fc.channel_id - WHERE fc.feed_id = ?", - self.id - ).fetch_all(adapter.get_pool()).await?; - let mut channels = Vec::with_capacity(db_channels.len()); - for db_channel in db_channels { - channels.push(Channel { - id: db_channel.id, - title: db_channel.title, - link: Url::parse(&db_channel.link)?, - description: db_channel.description, - last_fetched: db_channel.last_fetched.as_deref() - .map(DateTime::parse_from_rfc2822) - .transpose()? - .map(|dt| dt.with_timezone(&Utc)), - }) - } - Ok(channels) - } -} - -pub struct Channel { - id: i64, - title: String, - link: Url, - description: Option, - last_fetched: Option>, -} - -impl Channel { - pub async fn fetch(mut self, adapter: &Adapter) -> Result { - let bytestream = adapter.get_client().get(self.link.clone()) - .send().await? - .bytes().await?; - - let rss_channel = rss::Channel::read_from(&bytestream[..])?; - self.title = rss_channel.title; - self.link = Url::parse(&rss_channel.link)?; - self.description = Some(rss_channel.description); - let now = Utc::now(); - self.last_fetched = Some(now); - - sqlx::query!( - "UPDATE channels - SET title = ?, link = ?, description = ?, - last_fetched = ? - WHERE id = ?", - self.title, self.link.as_str(), self.description, now.to_rfc2822(), - self.id - ).execute(adapter.get_pool()).await?; - - fn get_or_create_guid(item: &rss::Item) -> String { - if let Some(guid) = item.guid() { - return guid.value().to_string(); - } - - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - item.link().unwrap_or("").hash(&mut hasher); - item.title().unwrap_or("").hash(&mut hasher); - item.description().unwrap_or("").hash(&mut hasher); - - format!("gen-{:x}", hasher.finish()) - } - - for item in rss_channel.items { - sqlx::query!( - "INSERT OR IGNORE INTO items - (channel_id, guid, fetched_at, title, description, content) - VALUES (?, ?, ?, ?, ?, ?)", - self.id, get_or_create_guid(&item), now.to_rfc2822(), - item.title().unwrap_or(""), item.description().unwrap_or(""), - item.content().unwrap_or("") - ) - .execute(adapter.get_pool()) - .await?; - } - - Ok(self) - } -} - -pub struct Item { - id: i64, + pub fn get_pool(&self) -> &sqlx::SqlitePool { &self.db } + pub fn get_client(&self) -> &reqwest::Client { &self.client } } diff --git a/koucha/src/user.rs b/koucha/src/user.rs new file mode 100644 index 0000000..5a038a0 --- /dev/null +++ b/koucha/src/user.rs @@ -0,0 +1,62 @@ +use sqlx::SqlitePool; +use crate::{ + Result, + Feed +}; + +pub struct UserId(pub i64); +impl From for UserId { fn from(id: i64) -> Self { UserId(id) } } +impl From for i64 { fn from(id: UserId) -> Self { id.0 } } + +pub struct User { + pub id: UserId, + pub name: String, +} + +impl User { + pub async fn get_all(pool: &SqlitePool) -> Result> { + let users = sqlx::query_as!( + User, + "SELECT id, name FROM users" + ).fetch_all(pool).await?; + + Ok(users) + } + + pub async fn create(pool: &SqlitePool, name: &str) -> Result { + let result = sqlx::query!( + "INSERT INTO users (name) + VALUES (?) + RETURNING id, name", + name + ).fetch_one(pool).await?; + + Ok(Self { + id: UserId(result.id), + name: result.name, + }) + } + + pub async fn update_name( + pool: &SqlitePool, id: UserId, new_name: &str + ) -> Result<()> { + sqlx::query!( + "UPDATE users SET name = ? WHERE id = ?", + new_name, id.0 + ).execute(pool).await?; + + Ok(()) + } + + pub async fn get_feeds(pool: &SqlitePool, id: UserId) -> Result> { + let feeds = sqlx::query_as!( + Feed, + "SELECT id FROM feeds WHERE user_id = ?", + id.0 + ).fetch_all(pool).await?; + + Ok(feeds) + } + + pub fn name(&self) -> &str { &self.name } +}