From 8f33acb3b9a60c9db8b2067f5cae0c1c08363e63 Mon Sep 17 00:00:00 2001 From: Ryan Voots Date: Mon, 4 Mar 2024 13:43:00 -0500 Subject: [PATCH] Server side of this new api setup --- lib/OpenAIAsync/Server.pm | 73 ++++++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/lib/OpenAIAsync/Server.pm b/lib/OpenAIAsync/Server.pm index 224eb2b..2430d08 100644 --- a/lib/OpenAIAsync/Server.pm +++ b/lib/OpenAIAsync/Server.pm @@ -244,6 +244,20 @@ class OpenAIAsync::Server :repr(HASH) :strict(params) { IO::Async::Notifier::configure($self, %io_async_params); } + # These might not stay internal only + method _make_future() { + # TODO make this workable with Future::IO too, but longer term goal + return $self->loop->new_future(); + } + + method _make_queue() { + my $queue = Future::Queue->new( + prototype => sub {$self->_make_future()} + max_items => 1, # set a max item count, this is so that await ->push() will cause things to yield + ); + return $queue; + } + method __make_http_server() { # TODO args? # TODO make this work during a reload @@ -340,30 +354,47 @@ class OpenAIAsync::Server :repr(HASH) :strict(params) { } try { - my ($result, @extra) = (await $route->{handle}->($req, $ctx, $obj, $params)); + # TODO make these use some other method for creation that is abstracted out, once I get rid of IO::Async + my $future_status = $self->_make_future(); + my $queue = $self->_make_queue(); + + # TODO can I redo this to eliminate the $future_status? I want it for internal chaining inside the handler + # that is needed for it to persist some code that's running in the future that populates the queue + await $route->{handle}->($req, $future_status, $queue, $ctx, $obj, $params); - if ($route->{result_class}) { - my $out_obj = $result; + my $status = await $future_status; + my $is_streaming_event = $status->{is_streaming}; - unless ($out_obj isa $route->{result_class}) { - $out_obj = $route->{result_class}->new(%$result); - } - - if (@extra) { - $self->_resp_custom($req, $extra[0], $out_obj); # TODO better design? - } else { - $self->_resp_custom($req, 200, $out_obj); - } - } else { - if (@extra) { - $self->_resp_custom($req, @extra); # TODO better design? - } else { - # Nothing to output directly - $self->_resp_custom($req, 200, ""); - } - - return; + my $headers = { + "Content-Type" => $is_streaming ? "text/event-stream" : $status->{content_type}, + $is_streaming ? ()"Cache-Control" => "no-store") : (), + # TODO others? } + my $response = HTTP::Response->new($status->{status_code}, $status->{status_message}, $status->{headers}); + + $req->write($response->as_string("\r\n")); + $req->write("\r\n"); # extra to end headers + + $req->write(sub { + my $body_obj = await $queue->pull(); + + if (defined $body_obj) { + my $body = $body_obj->serialize(); + my $event_name = $body_obj->event_name(); + + if ($is_streaming) { + return sprintf "event: %s\ndata: %s\n\n", $event_name, $body; + } eles { + return $body; + } + } else { + # Finished + $req->done(); + return undef; + } + }); + + return; } catch($err) { $self->_resp_custom($req, 500, "Server error: ".$err); return;