Move to DB module for visibility; separate db and client code

This commit is contained in:
Julia Lange 2026-01-22 13:49:49 -08:00
parent f5fc83a471
commit 3748606e21
Signed by: Julia
SSH key fingerprint: SHA256:5DJcfxa5/fKCYn57dcabJa2vN2e6eT0pBerYi5SUbto
9 changed files with 340 additions and 399 deletions

View file

@ -1,337 +0,0 @@
use reqwest::Url;
use chrono::{DateTime, Utc};
use crate::{
Result,
AdapterPool,
AdapterClient,
Item,
channel::fetch::FetchedRSSChannel,
item::UnparsedItem,
};
#[derive(Copy, Clone)]
pub struct ChannelId(i64);
impl From<ChannelId> for i64 { fn from(id: ChannelId) -> Self { id.0 } }
pub mod fetch;
pub struct UnparsedChannel {
pub id: i64,
pub title: String,
pub link: String,
pub description: Option<String>,
pub last_fetched: Option<String>,
}
impl UnparsedChannel {
pub fn parse(self) -> Result<Channel> {
Ok(Channel {
id: ChannelId(self.id),
title: self.title,
link: Url::parse(&self.link)?,
description: self.description,
last_fetched: self.last_fetched.as_deref()
.map(DateTime::parse_from_rfc2822)
.transpose()?
.map(|dt| dt.with_timezone(&Utc)),
})
}
}
pub struct Channel {
id: ChannelId,
title: String,
link: Url,
description: Option<String>,
last_fetched: Option<DateTime<Utc>>,
}
impl Channel {
pub fn id(&self) -> ChannelId { self.id }
pub fn title(&self) -> &str { &self.title }
pub fn link(&self) -> &Url { &self.link }
pub fn description(&self) -> Option<&str> { self.description.as_deref() }
pub async fn get_all(pool: &AdapterPool) -> Result<Vec<Self>> {
let channels: Result<Vec<Channel>> = sqlx::query_as!(
UnparsedChannel,
"SELECT id, title, link, description, last_fetched FROM channels"
).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect();
channels
}
pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result<Self> {
let channel: Result<Self> = sqlx::query_as!(
UnparsedChannel,
"SELECT id, title, link, description, last_fetched
FROM channels
WHERE id = ?",
id.0
).fetch_one(&pool.0).await?.parse();
channel
}
pub async fn create(
pool: &AdapterPool, link: Url
) -> Result<Self> {
let link_str = link.as_str();
if let Ok(existing_channel) = sqlx::query_as!(
UnparsedChannel,
"SELECT id as `id!`, title, link, description, last_fetched
FROM channels
WHERE link = ?",
link_str
).fetch_one(&pool.0).await {
return existing_channel.parse();
}
let new_channel = sqlx::query_as!(
UnparsedChannel,
"INSERT INTO channels (title, link)
VALUES (?, ?)
RETURNING id, title, link, description, last_fetched",
link_str, link_str
).fetch_one(&pool.0).await?.parse();
new_channel
}
// TODO implement fetch skipping
fn should_skip_fetch(&self) -> bool { false }
// TODO implement conditional fetching
pub async fn fetch_rss(
&self, client: &AdapterClient
) -> Result<Option<FetchedRSSChannel>> {
if self.should_skip_fetch() {
return Ok(None);
}
let bytestream = client.0.get(self.link.clone())
.send().await?
.bytes().await?;
let rss_channel = rss::Channel::read_from(&bytestream[..])?;
Ok(Some(FetchedRSSChannel::parse(rss_channel)?))
}
pub async fn update_metadata(
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
) -> Result<()> {
let title = fetched.title();
let description = fetched.description();
let link = fetched.link().as_str();
let fetched_at = fetched.fetched_at().to_rfc2822();
sqlx::query!(
"UPDATE channels
SET title = ?, link = ?, description = ?,
last_fetched = ?
WHERE id = ?",
title, link, description, fetched_at,
self.id.0
).execute(&pool.0).await?;
Ok(())
}
pub async fn update_items(
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
) -> Result<()> {
let fetched_at = fetched.fetched_at().to_rfc2822();
for item in fetched.items() {
let guid = item.guid();
let title = item.title();
let description = item.description();
let content = item.content();
sqlx::query!(
"INSERT OR IGNORE INTO items
(channel_id, guid, fetched_at, title, description, content)
VALUES (?, ?, ?, ?, ?, ?)",
self.id.0, guid, fetched_at, title, description, content
)
.execute(&pool.0)
.await?;
}
Ok(())
}
pub async fn get_items(&self, pool: &AdapterPool) -> Result<Vec<Item>> {
let items: Result<Vec<Item>> = sqlx::query_as!(
UnparsedItem,
"SELECT id as `id!` FROM items WHERE channel_id = ?",
self.id.0
).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect();
items
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Adapter, AdapterBuilder};
use rss::{
Guid as RSSGuid,
Item as RSSItem,
ItemBuilder as RSSItemBuilder,
Channel as RSSChannel,
ChannelBuilder as RSSChannelBuilder,
};
const ITEM_TITLE: &str = "My Item";
const ITEM_GUID1: &str = "https://mycontent.com/blog/1";
const ITEM_GUID2: &str = "something-else!";
const ITEM_DESC: &str = "A test item";
const ITEM_CONT: &str = "some rss content baby";
const CHAN_TITLE: &str = "My Feed";
const CHAN_DESC: &str = "A test feed";
const FEED1: &str = "https://example.com/feed";
const FEED2: &str = "https://example2.com/feed";
async fn setup_adapter() -> Adapter {
AdapterBuilder::new()
.database_url("sqlite::memory:")
.create().await.unwrap()
}
#[tokio::test]
async fn create_channel() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel = Channel::create(pool, url_feed).await.unwrap();
assert!(channel.id().0 > 0);
assert_eq!(channel.link().as_str(), FEED1);
assert!(channel.title().len() > 0);
}
#[tokio::test]
async fn create_duplicate_returns_existing() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel1 = Channel::create(pool, url_feed.clone()).await.unwrap();
let channel2 = Channel::create(pool, url_feed).await.unwrap();
assert_eq!(
i64::from(channel1.id()),
i64::from(channel2.id())
);
}
#[tokio::test]
async fn get_all_channels() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed1 = Url::parse(FEED1).unwrap();
let url_feed2 = Url::parse(FEED2).unwrap();
Channel::create(pool, url_feed1).await.unwrap();
Channel::create(pool, url_feed2).await.unwrap();
let channels = Channel::get_all(pool).await.unwrap();
assert_eq!(channels.len(), 2);
}
#[tokio::test]
async fn update_metadata() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel = Channel::create(pool, url_feed).await.unwrap();
let fake_rss: RSSChannel = RSSChannelBuilder::default()
.title(CHAN_TITLE)
.link(FEED2)
.description(CHAN_DESC)
.build();
let fetched = FetchedRSSChannel::parse(fake_rss).unwrap();
channel.update_metadata(pool, fetched).await.unwrap();
let updated = Channel::get(pool, channel.id()).await.unwrap();
assert_eq!(updated.title(), CHAN_TITLE);
assert_eq!(updated.link().as_str(), FEED2);
assert_eq!(updated.description(), Some(CHAN_DESC));
}
#[tokio::test]
async fn update_items() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel = Channel::create(pool, url_feed).await.unwrap();
let item1: RSSItem = RSSItemBuilder::default()
.title(ITEM_TITLE.to_string())
.description(ITEM_DESC.to_string())
.content(ITEM_CONT.to_string())
.guid(RSSGuid { value: ITEM_GUID1.to_string(), permalink: false })
.build();
let item2: RSSItem = RSSItemBuilder::default()
.title(ITEM_TITLE.to_string())
.description(ITEM_DESC.to_string())
.content(ITEM_CONT.to_string())
.guid(RSSGuid { value: ITEM_GUID2.to_string(), permalink: false })
.build();
let fake_rss: RSSChannel = RSSChannelBuilder::default()
.title(CHAN_TITLE)
.link(FEED2)
.description(CHAN_DESC)
.item(item1)
.item(item2)
.build();
let fetched = FetchedRSSChannel::parse(fake_rss).unwrap();
channel.update_items(pool, fetched).await.unwrap();
let items = channel.get_items(pool).await.unwrap();
assert_eq!(items.len(), 2);
}
#[tokio::test]
async fn update_items_ignores_duplicates() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel = Channel::create(pool, url_feed).await.unwrap();
let item1: RSSItem = RSSItemBuilder::default()
.title(ITEM_TITLE.to_string())
.description(ITEM_DESC.to_string())
.content(ITEM_CONT.to_string())
.guid(RSSGuid { value: ITEM_GUID1.to_string(), permalink: false })
.build();
let fake_rss: RSSChannel = RSSChannelBuilder::default()
.title(CHAN_TITLE)
.link(FEED2)
.description(CHAN_DESC)
.item(item1)
.build();
let fetched = FetchedRSSChannel::parse(fake_rss.clone()).unwrap();
channel.update_items(pool, fetched).await.unwrap();
let fetched = FetchedRSSChannel::parse(fake_rss).unwrap();
channel.update_items(pool, fetched).await.unwrap();
let items = channel.get_items(pool).await.unwrap();
assert_eq!(items.len(), 1);
}
}

22
koucha/src/db.rs Normal file
View file

@ -0,0 +1,22 @@
mod user;
pub use user::User;
mod feed;
pub use feed::Feed;
mod channel;
pub use channel::Channel;
mod item;
pub use item::Item;
macro_rules! define_id {
($name:ident) => {
#[derive(Copy, Clone)]
pub struct $name(i64);
impl From<$name> for i64 { fn from(id: $name) -> Self { id.0 } }
};
}
define_id!(UserId);
define_id!(FeedId);
define_id!(ChannelId);
define_id!(ItemId);

204
koucha/src/db/channel.rs Normal file
View file

@ -0,0 +1,204 @@
use reqwest::Url;
use chrono::{DateTime, Utc};
use crate::{
Result,
AdapterPool,
db::{
ChannelId,
Item,
item::UnparsedItem,
},
fetch::FetchedRSSChannel,
};
pub struct UnparsedChannel {
pub id: i64,
pub title: String,
pub link: String,
pub description: Option<String>,
pub last_fetched: Option<String>,
}
impl UnparsedChannel {
pub fn parse(self) -> Result<Channel> {
Ok(Channel {
id: ChannelId(self.id),
title: self.title,
link: Url::parse(&self.link)?,
description: self.description,
last_fetched: self.last_fetched.as_deref()
.map(DateTime::parse_from_rfc2822)
.transpose()?
.map(|dt| dt.with_timezone(&Utc)),
})
}
}
pub struct Channel {
id: ChannelId,
title: String,
link: Url,
description: Option<String>,
last_fetched: Option<DateTime<Utc>>,
}
impl Channel {
pub fn id(&self) -> ChannelId { self.id }
pub fn title(&self) -> &str { &self.title }
pub fn link(&self) -> &Url { &self.link }
pub fn description(&self) -> Option<&str> { self.description.as_deref() }
pub fn last_fetched(&self) -> Option<DateTime<Utc>> { self.last_fetched }
pub async fn get_all(pool: &AdapterPool) -> Result<Vec<Self>> {
let channels: Result<Vec<Channel>> = sqlx::query_as!(
UnparsedChannel,
"SELECT id, title, link, description, last_fetched FROM channels"
).fetch_all(&pool.0).await?.into_iter().map(UnparsedChannel::parse).collect();
channels
}
pub async fn get(pool: &AdapterPool, id: ChannelId) -> Result<Self> {
let channel: Result<Self> = sqlx::query_as!(
UnparsedChannel,
"SELECT id, title, link, description, last_fetched
FROM channels
WHERE id = ?",
id.0
).fetch_one(&pool.0).await?.parse();
channel
}
pub async fn get_or_create(
pool: &AdapterPool, link: Url
) -> Result<Self> {
let link_str = link.as_str();
let channel = sqlx::query_as!(
UnparsedChannel,
"INSERT INTO channels (title, link)
VALUES(?, ?)
ON CONFLICT(link) DO UPDATE SET link = link
RETURNING id, title, link, description, last_fetched",
link_str, link_str // We use the url as a placeholder title
).fetch_one(&pool.0).await?.parse();
channel
}
// TODO implement fetch skipping
pub fn should_skip_fetch(&self) -> bool { false }
pub async fn update_metadata(
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
) -> Result<()> {
let title = fetched.title();
let description = fetched.description();
let link = fetched.link().as_str();
let fetched_at = fetched.fetched_at().to_rfc2822();
sqlx::query!(
"UPDATE channels
SET title = ?, link = ?, description = ?,
last_fetched = ?
WHERE id = ?",
title, link, description, fetched_at,
self.id.0
).execute(&pool.0).await?;
Ok(())
}
pub async fn update_items(
&self, pool: &AdapterPool, fetched: FetchedRSSChannel
) -> Result<()> {
let fetched_at = fetched.fetched_at().to_rfc2822();
for item in fetched.items() {
let guid = item.guid();
let title = item.title();
let description = item.description();
let content = item.content();
sqlx::query!(
"INSERT OR IGNORE INTO items
(channel_id, guid, fetched_at, title, description, content)
VALUES (?, ?, ?, ?, ?, ?)",
self.id.0, guid, fetched_at, title, description, content
)
.execute(&pool.0)
.await?;
}
Ok(())
}
pub async fn get_items(&self, pool: &AdapterPool) -> Result<Vec<Item>> {
let items: Result<Vec<Item>> = sqlx::query_as!(
UnparsedItem,
"SELECT id as `id!`, channel_id, guid, fetched_at, title, description,
content
FROM items
WHERE channel_id = ?",
self.id.0
).fetch_all(&pool.0).await?.into_iter().map(UnparsedItem::parse).collect();
items
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Adapter, AdapterBuilder};
const FEED1: &str = "https://example.com/feed";
const FEED2: &str = "https://example2.com/feed";
async fn setup_adapter() -> Adapter {
AdapterBuilder::new()
.database_url("sqlite::memory:")
.create().await.unwrap()
}
#[tokio::test]
async fn create_channel() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel = Channel::get_or_create(pool, url_feed).await.unwrap();
assert!(channel.id().0 > 0);
assert_eq!(channel.link().as_str(), FEED1);
assert!(channel.title().len() > 0);
}
#[tokio::test]
async fn create_duplicate_returns_existing() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed = Url::parse(FEED1).unwrap();
let channel1 = Channel::get_or_create(pool, url_feed.clone()).await.unwrap();
let channel2 = Channel::get_or_create(pool, url_feed).await.unwrap();
assert_eq!(
i64::from(channel1.id()),
i64::from(channel2.id())
);
}
#[tokio::test]
async fn get_all_channels() {
let adapter = setup_adapter().await;
let pool = adapter.get_pool();
let url_feed1 = Url::parse(FEED1).unwrap();
let url_feed2 = Url::parse(FEED2).unwrap();
Channel::get_or_create(pool, url_feed1).await.unwrap();
Channel::get_or_create(pool, url_feed2).await.unwrap();
let channels = Channel::get_all(pool).await.unwrap();
assert_eq!(channels.len(), 2);
}
}

View file

@ -1,20 +1,17 @@
use crate::{ use crate::{
Result,
AdapterPool, AdapterPool,
Item, Result,
item::UnparsedItem, db::{
Channel, Channel,
channel::{
UnparsedChannel,
ChannelId, ChannelId,
FeedId,
Item,
UserId,
channel::UnparsedChannel,
item::UnparsedItem,
}, },
user::UserId,
}; };
#[derive(Copy, Clone)]
pub struct FeedId(i64);
impl From<FeedId> for i64 { fn from(id: FeedId) -> Self { id.0 } }
pub struct UnparsedFeed { pub struct UnparsedFeed {
pub id: i64, pub id: i64,
pub title: String, pub title: String,
@ -93,7 +90,10 @@ impl Feed {
) -> Result<Vec<Item>> { ) -> Result<Vec<Item>> {
let items: Result<Vec<Item>> = sqlx::query_as!( let items: Result<Vec<Item>> = sqlx::query_as!(
UnparsedItem, UnparsedItem,
"SELECT item_id as id FROM feed_items "SELECT i.id as `id!`, i.channel_id, i.guid, i.fetched_at, i.title,
i.description, i.content
FROM items i
JOIN feed_items fi on i.id = fi.item_id
WHERE feed_id = ? AND archived = FALSE WHERE feed_id = ? AND archived = FALSE
ORDER BY score DESC ORDER BY score DESC
LIMIT ? OFFSET ?", LIMIT ? OFFSET ?",
@ -123,7 +123,11 @@ impl Feed {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{User, Adapter, AdapterBuilder}; use crate::{
Adapter,
AdapterBuilder,
db::User,
};
async fn setup_adapter() -> Adapter { async fn setup_adapter() -> Adapter {
AdapterBuilder::new() AdapterBuilder::new()

68
koucha/src/db/item.rs Normal file
View file

@ -0,0 +1,68 @@
use crate::{
Result,
AdapterPool,
db::{
ChannelId,
ItemId,
}
};
use chrono::{DateTime, Utc};
pub struct UnparsedItem {
pub id: i64,
pub channel_id: i64,
pub guid: String,
pub fetched_at: String,
pub title: Option<String>,
pub description: Option<String>,
pub content: Option<String>,
}
impl UnparsedItem {
pub fn parse(self) -> Result<Item> {
Ok(Item {
id: ItemId(self.id),
channel_id: ChannelId(self.channel_id),
guid: self.guid,
fetched_at: DateTime::parse_from_rfc2822(&self.fetched_at)?.with_timezone(&Utc),
title: self.title,
description: self.description,
content: self.content,
})
}
}
pub struct Item {
id: ItemId,
channel_id: ChannelId,
guid: String,
fetched_at: DateTime<Utc>,
title: Option<String>,
description: Option<String>,
content: Option<String>,
}
impl Item {
pub async fn get_or_create(
pool: &AdapterPool, from_channel: ChannelId, guid: &str,
fetched_at: DateTime<Utc>
) -> Result<Self> {
let int_channel_id = i64::from(from_channel);
let last_fetched = fetched_at.to_rfc2822();
let item = sqlx::query_as!(
UnparsedItem,
"INSERT INTO items (channel_id, guid, fetched_at)
VALUES(?, ?, ?)
ON CONFLICT(id) DO UPDATE SET id = id
RETURNING id as `id!`, channel_id, guid, fetched_at, title, description,
content",
int_channel_id, guid, last_fetched
).fetch_one(&pool.0).await?.parse();
item
}
}

View file

@ -1,14 +1,13 @@
use crate::{ use crate::{
Result, Result,
AdapterPool, AdapterPool,
db::{
UserId,
Feed, Feed,
feed::UnparsedFeed, feed::UnparsedFeed,
},
}; };
#[derive(Copy, Clone)]
pub struct UserId(i64);
impl From<UserId> for i64 { fn from(id: UserId) -> Self { id.0 } }
pub struct UnparsedUser { pub struct UnparsedUser {
pub id: i64, pub id: i64,
pub name: String, pub name: String,

View file

@ -1,4 +1,8 @@
use crate::Result; use crate::{
Result,
db::Channel,
AdapterClient,
};
use reqwest::Url; use reqwest::Url;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
@ -53,7 +57,23 @@ impl FetchedRSSChannel {
pub fn items(&self) -> &[FetchedRSSItem] { &self.items } pub fn items(&self) -> &[FetchedRSSItem] { &self.items }
pub fn fetched_at(&self) -> &DateTime<Utc> { &self.fetched_at } pub fn fetched_at(&self) -> &DateTime<Utc> { &self.fetched_at }
pub fn parse(rss: rss::Channel) -> Result<Self> { pub async fn fetch_channel(
client: &AdapterClient, channel: Channel
) -> Result<Option<Self>> {
if channel.should_skip_fetch() {
return Ok(None);
}
let bytestream = client.0.get(channel.link().clone())
.send().await?
.bytes().await?;
let rss_channel = rss::Channel::read_from(&bytestream[..])?;
Ok(Some(FetchedRSSChannel::parse(rss_channel)?))
}
fn parse(rss: rss::Channel) -> Result<Self> {
Ok(FetchedRSSChannel { Ok(FetchedRSSChannel {
title: rss.title, title: rss.title,
link: Url::parse(&rss.link)?, link: Url::parse(&rss.link)?,

View file

@ -1,20 +0,0 @@
use crate::Result;
#[derive(Copy, Clone)]
pub struct ItemId(i64);
impl From<ItemId> for i64 { fn from(id: ItemId) -> Self { id.0 } }
pub struct UnparsedItem {
pub id: i64,
}
impl UnparsedItem {
pub fn parse(self) -> Result<Item> {
Ok(Item {
id: ItemId(self.id),
})
}
}
pub struct Item {
id: ItemId,
}

View file

@ -2,27 +2,8 @@ use std::error::Error;
type Result<T> = std::result::Result<T, Box<dyn Error>>; type Result<T> = std::result::Result<T, Box<dyn Error>>;
mod user; pub mod db;
pub use user::{ pub mod fetch;
User,
UserId
};
mod feed;
pub use feed::{
Feed,
FeedId,
};
mod channel;
pub use channel::{
Channel,
ChannelId,
fetch::FetchedRSSChannel,
};
mod item;
pub use item::{
Item,
ItemId,
};
pub struct AdapterPool(sqlx::SqlitePool); pub struct AdapterPool(sqlx::SqlitePool);
pub struct AdapterClient(reqwest::Client); pub struct AdapterClient(reqwest::Client);