From 9c55a15b8a14427b8bf9f52aeb3c28113db7ccbf Mon Sep 17 00:00:00 2001 From: Marcin Kulik Date: Wed, 9 Jul 2025 16:01:54 +0200 Subject: [PATCH] Use new API v1 paths for streaming --- src/api.rs | 114 ++++++++++++++++++++++++++++++++++++--------- src/cmd/session.rs | 33 ++++++++++++- 2 files changed, 123 insertions(+), 24 deletions(-) diff --git a/src/api.rs b/src/api.rs index aac7c03..c424b0f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -2,9 +2,10 @@ use std::env; use std::fmt::Debug; use anyhow::{bail, Context, Result}; -use reqwest::header; +use reqwest::{header, Response}; use reqwest::{multipart::Form, Client, RequestBuilder}; -use serde::Deserialize; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; use url::Url; use crate::config::Config; @@ -16,11 +17,20 @@ pub struct RecordingResponse { } #[derive(Debug, Deserialize)] -pub struct GetUserStreamResponse { +pub struct StreamResponse { + pub id: u64, pub ws_producer_url: String, pub url: String, } +#[derive(Default, Serialize)] +pub struct StreamChangeset { + #[serde(skip_serializing_if = "Option::is_none")] + pub live: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub title: Option>, +} + #[derive(Debug, Deserialize)] struct ErrorResponse { reason: String, @@ -69,16 +79,90 @@ async fn create_recording_request( .header(header::ACCEPT, "application/json")) } -pub async fn create_user_stream(stream_id: &str, config: &Config) -> Result { +pub async fn list_user_streams(prefix: &str, config: &Config) -> Result> { let server_url = config.get_server_url()?; - let server_hostname = server_url.host().unwrap(); let install_id = config.get_install_id()?; - let response = user_stream_request(&server_url, stream_id, &install_id) + let response = list_user_streams_request(&server_url, prefix, &install_id) .send() .await .context("cannot obtain stream producer endpoint - is the server down?")?; + parse_stream_response(response, &server_url).await +} + +fn list_user_streams_request(server_url: &Url, prefix: &str, install_id: &str) -> RequestBuilder { + let client = Client::new(); + let mut url = server_url.clone(); + url.set_path("api/v1/user/streams"); + url.set_query(Some(&format!("prefix={prefix}&limit=10"))); + + add_headers(client.get(url), install_id) +} + +pub async fn create_stream(changeset: StreamChangeset, config: &Config) -> Result { + let server_url = config.get_server_url()?; + let install_id = config.get_install_id()?; + + let response = create_stream_request(&server_url, &install_id, changeset) + .send() + .await + .context("cannot obtain stream producer endpoint - is the server down?")?; + + parse_stream_response(response, &server_url).await +} + +fn create_stream_request( + server_url: &Url, + install_id: &str, + changeset: StreamChangeset, +) -> RequestBuilder { + let client = Client::new(); + let mut url = server_url.clone(); + url.set_path("api/v1/streams"); + let builder = client.post(url); + let builder = add_headers(builder, install_id); + + builder.json(&changeset) +} + +pub async fn update_stream( + stream_id: u64, + changeset: StreamChangeset, + config: &Config, +) -> Result { + let server_url = config.get_server_url()?; + let install_id = config.get_install_id()?; + + let response = update_stream_request(&server_url, &install_id, stream_id, changeset) + .send() + .await + .context("cannot obtain stream producer endpoint - is the server down?")?; + + parse_stream_response(response, &server_url).await +} + +fn update_stream_request( + server_url: &Url, + install_id: &str, + stream_id: u64, + changeset: StreamChangeset, +) -> RequestBuilder { + let client = Client::new(); + let mut url = server_url.clone(); + url.set_path(&format!("api/v1/streams/{stream_id}")); + let builder = client.patch(url); + let builder = add_headers(builder, install_id); + + builder.json(&changeset) +} + +async fn parse_stream_response( + response: Response, + server_url: &Url, +) -> Result { + let server_hostname = server_url.host().unwrap(); + match response.status().as_u16() { 401 => bail!( "this CLI hasn't been authenticated with {server_hostname} - run `asciinema auth` first" @@ -99,24 +183,10 @@ pub async fn create_user_stream(stream_id: &str, config: &Config) -> Result() - .await - .map_err(|e| e.into()) + response.json::().await.map_err(|e| e.into()) } -fn user_stream_request(server_url: &Url, stream_id: &str, install_id: &str) -> RequestBuilder { - let client = Client::new(); - let mut url = server_url.clone(); - - let builder = if stream_id.is_empty() { - url.set_path("api/streams"); - client.post(url) - } else { - url.set_path(&format!("api/user/streams/{stream_id}")); - client.get(url) - }; - +fn add_headers(builder: RequestBuilder, install_id: &str) -> RequestBuilder { builder .basic_auth(get_username(), Some(install_id)) .header(header::USER_AGENT, build_user_agent()) diff --git a/src/cmd/session.rs b/src/cmd/session.rs index 108f64f..a3a46c9 100644 --- a/src/cmd/session.rs +++ b/src/cmd/session.rs @@ -14,7 +14,7 @@ use tracing::level_filters::LevelFilter; use tracing_subscriber::EnvFilter; use url::{form_urlencoded, Url}; -use crate::api; +use crate::api::{self, StreamChangeset, StreamResponse}; use crate::asciicast::{self, Version}; use crate::cli::{self, Format, RelayTarget}; use crate::config::{self, Config}; @@ -330,7 +330,7 @@ impl cli::Session { let relay = match target { RelayTarget::StreamId(id) => { - let stream = api::create_user_stream(id, config).await?; + let stream = self.start_stream(id, config).await?; let ws_producer_url = build_producer_url(&stream.ws_producer_url, metadata)?; Relay { @@ -348,6 +348,35 @@ impl cli::Session { Ok(Some(relay)) } + async fn start_stream(&self, id: &str, config: &Config) -> Result { + let changeset = StreamChangeset { + live: Some(true), + ..Default::default() + }; + + if id.is_empty() { + api::create_stream(changeset, config).await + } else { + match &api::list_user_streams(id, config).await?[..] { + [] => { + bail!("no stream matches \"{id}\""); + } + + [stream] => api::update_stream(stream.id, changeset, config).await, + + streams => { + let urls = streams + .iter() + .map(|s| s.url.clone()) + .collect::>() + .join("\n"); + + bail!("multiple streams match \"{id}\" prefix:\n\n{urls}"); + } + } + } + } + async fn get_tty(&self, quiet: bool) -> Result> { let (cols, rows) = self.window_size.unwrap_or((None, None));