Use new API v1 paths for streaming

This commit is contained in:
Marcin Kulik
2025-07-09 16:01:54 +02:00
parent e8b3591c10
commit 9c55a15b8a
2 changed files with 123 additions and 24 deletions

View File

@@ -2,9 +2,10 @@ use std::env;
use std::fmt::Debug;
use anyhow::{bail, Context, Result};
use reqwest::header;
use reqwest::{header, Response};
use reqwest::{multipart::Form, Client, RequestBuilder};
use serde::Deserialize;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::config::Config;
@@ -16,11 +17,20 @@ pub struct RecordingResponse {
}
#[derive(Debug, Deserialize)]
pub struct GetUserStreamResponse {
pub struct StreamResponse {
pub id: u64,
pub ws_producer_url: String,
pub url: String,
}
#[derive(Default, Serialize)]
pub struct StreamChangeset {
#[serde(skip_serializing_if = "Option::is_none")]
pub live: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<Option<u8>>,
}
#[derive(Debug, Deserialize)]
struct ErrorResponse {
reason: String,
@@ -69,16 +79,90 @@ async fn create_recording_request(
.header(header::ACCEPT, "application/json"))
}
pub async fn create_user_stream(stream_id: &str, config: &Config) -> Result<GetUserStreamResponse> {
pub async fn list_user_streams(prefix: &str, config: &Config) -> Result<Vec<StreamResponse>> {
let server_url = config.get_server_url()?;
let server_hostname = server_url.host().unwrap();
let install_id = config.get_install_id()?;
let response = user_stream_request(&server_url, stream_id, &install_id)
let response = list_user_streams_request(&server_url, prefix, &install_id)
.send()
.await
.context("cannot obtain stream producer endpoint - is the server down?")?;
parse_stream_response(response, &server_url).await
}
fn list_user_streams_request(server_url: &Url, prefix: &str, install_id: &str) -> RequestBuilder {
let client = Client::new();
let mut url = server_url.clone();
url.set_path("api/v1/user/streams");
url.set_query(Some(&format!("prefix={prefix}&limit=10")));
add_headers(client.get(url), install_id)
}
pub async fn create_stream(changeset: StreamChangeset, config: &Config) -> Result<StreamResponse> {
let server_url = config.get_server_url()?;
let install_id = config.get_install_id()?;
let response = create_stream_request(&server_url, &install_id, changeset)
.send()
.await
.context("cannot obtain stream producer endpoint - is the server down?")?;
parse_stream_response(response, &server_url).await
}
fn create_stream_request(
server_url: &Url,
install_id: &str,
changeset: StreamChangeset,
) -> RequestBuilder {
let client = Client::new();
let mut url = server_url.clone();
url.set_path("api/v1/streams");
let builder = client.post(url);
let builder = add_headers(builder, install_id);
builder.json(&changeset)
}
pub async fn update_stream(
stream_id: u64,
changeset: StreamChangeset,
config: &Config,
) -> Result<StreamResponse> {
let server_url = config.get_server_url()?;
let install_id = config.get_install_id()?;
let response = update_stream_request(&server_url, &install_id, stream_id, changeset)
.send()
.await
.context("cannot obtain stream producer endpoint - is the server down?")?;
parse_stream_response(response, &server_url).await
}
fn update_stream_request(
server_url: &Url,
install_id: &str,
stream_id: u64,
changeset: StreamChangeset,
) -> RequestBuilder {
let client = Client::new();
let mut url = server_url.clone();
url.set_path(&format!("api/v1/streams/{stream_id}"));
let builder = client.patch(url);
let builder = add_headers(builder, install_id);
builder.json(&changeset)
}
async fn parse_stream_response<T: DeserializeOwned>(
response: Response,
server_url: &Url,
) -> Result<T> {
let server_hostname = server_url.host().unwrap();
match response.status().as_u16() {
401 => bail!(
"this CLI hasn't been authenticated with {server_hostname} - run `asciinema auth` first"
@@ -99,24 +183,10 @@ pub async fn create_user_stream(stream_id: &str, config: &Config) -> Result<GetU
}
}
response
.json::<GetUserStreamResponse>()
.await
.map_err(|e| e.into())
response.json::<T>().await.map_err(|e| e.into())
}
fn user_stream_request(server_url: &Url, stream_id: &str, install_id: &str) -> RequestBuilder {
let client = Client::new();
let mut url = server_url.clone();
let builder = if stream_id.is_empty() {
url.set_path("api/streams");
client.post(url)
} else {
url.set_path(&format!("api/user/streams/{stream_id}"));
client.get(url)
};
fn add_headers(builder: RequestBuilder, install_id: &str) -> RequestBuilder {
builder
.basic_auth(get_username(), Some(install_id))
.header(header::USER_AGENT, build_user_agent())

View File

@@ -14,7 +14,7 @@ use tracing::level_filters::LevelFilter;
use tracing_subscriber::EnvFilter;
use url::{form_urlencoded, Url};
use crate::api;
use crate::api::{self, StreamChangeset, StreamResponse};
use crate::asciicast::{self, Version};
use crate::cli::{self, Format, RelayTarget};
use crate::config::{self, Config};
@@ -330,7 +330,7 @@ impl cli::Session {
let relay = match target {
RelayTarget::StreamId(id) => {
let stream = api::create_user_stream(id, config).await?;
let stream = self.start_stream(id, config).await?;
let ws_producer_url = build_producer_url(&stream.ws_producer_url, metadata)?;
Relay {
@@ -348,6 +348,35 @@ impl cli::Session {
Ok(Some(relay))
}
async fn start_stream(&self, id: &str, config: &Config) -> Result<StreamResponse> {
let changeset = StreamChangeset {
live: Some(true),
..Default::default()
};
if id.is_empty() {
api::create_stream(changeset, config).await
} else {
match &api::list_user_streams(id, config).await?[..] {
[] => {
bail!("no stream matches \"{id}\"");
}
[stream] => api::update_stream(stream.id, changeset, config).await,
streams => {
let urls = streams
.iter()
.map(|s| s.url.clone())
.collect::<Vec<_>>()
.join("\n");
bail!("multiple streams match \"{id}\" prefix:\n\n{urls}");
}
}
}
}
async fn get_tty(&self, quiet: bool) -> Result<Box<dyn Tty>> {
let (cols, rows) = self.window_size.unwrap_or((None, None));