Server side of this new api setup
This commit is contained in:
parent
5651230712
commit
8f33acb3b9
1 changed files with 52 additions and 21 deletions
|
@ -244,6 +244,20 @@ class OpenAIAsync::Server :repr(HASH) :strict(params) {
|
||||||
IO::Async::Notifier::configure($self, %io_async_params);
|
IO::Async::Notifier::configure($self, %io_async_params);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# These might not stay internal only
|
||||||
|
method _make_future() {
|
||||||
|
# TODO make this workable with Future::IO too, but longer term goal
|
||||||
|
return $self->loop->new_future();
|
||||||
|
}
|
||||||
|
|
||||||
|
method _make_queue() {
|
||||||
|
my $queue = Future::Queue->new(
|
||||||
|
prototype => sub {$self->_make_future()}
|
||||||
|
max_items => 1, # set a max item count, this is so that await ->push() will cause things to yield
|
||||||
|
);
|
||||||
|
return $queue;
|
||||||
|
}
|
||||||
|
|
||||||
method __make_http_server() {
|
method __make_http_server() {
|
||||||
# TODO args?
|
# TODO args?
|
||||||
# TODO make this work during a reload
|
# TODO make this work during a reload
|
||||||
|
@ -340,30 +354,47 @@ class OpenAIAsync::Server :repr(HASH) :strict(params) {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
my ($result, @extra) = (await $route->{handle}->($req, $ctx, $obj, $params));
|
# TODO make these use some other method for creation that is abstracted out, once I get rid of IO::Async
|
||||||
|
my $future_status = $self->_make_future();
|
||||||
|
my $queue = $self->_make_queue();
|
||||||
|
|
||||||
if ($route->{result_class}) {
|
# TODO can I redo this to eliminate the $future_status? I want it for internal chaining inside the handler
|
||||||
my $out_obj = $result;
|
# that is needed for it to persist some code that's running in the future that populates the queue
|
||||||
|
await $route->{handle}->($req, $future_status, $queue, $ctx, $obj, $params);
|
||||||
|
|
||||||
unless ($out_obj isa $route->{result_class}) {
|
my $status = await $future_status;
|
||||||
$out_obj = $route->{result_class}->new(%$result);
|
my $is_streaming_event = $status->{is_streaming};
|
||||||
}
|
|
||||||
|
|
||||||
if (@extra) {
|
my $headers = {
|
||||||
$self->_resp_custom($req, $extra[0], $out_obj); # TODO better design?
|
"Content-Type" => $is_streaming ? "text/event-stream" : $status->{content_type},
|
||||||
} else {
|
$is_streaming ? ()"Cache-Control" => "no-store") : (),
|
||||||
$self->_resp_custom($req, 200, $out_obj);
|
# TODO others?
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (@extra) {
|
|
||||||
$self->_resp_custom($req, @extra); # TODO better design?
|
|
||||||
} else {
|
|
||||||
# Nothing to output directly
|
|
||||||
$self->_resp_custom($req, 200, "");
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
my $response = HTTP::Response->new($status->{status_code}, $status->{status_message}, $status->{headers});
|
||||||
|
|
||||||
|
$req->write($response->as_string("\r\n"));
|
||||||
|
$req->write("\r\n"); # extra to end headers
|
||||||
|
|
||||||
|
$req->write(sub {
|
||||||
|
my $body_obj = await $queue->pull();
|
||||||
|
|
||||||
|
if (defined $body_obj) {
|
||||||
|
my $body = $body_obj->serialize();
|
||||||
|
my $event_name = $body_obj->event_name();
|
||||||
|
|
||||||
|
if ($is_streaming) {
|
||||||
|
return sprintf "event: %s\ndata: %s\n\n", $event_name, $body;
|
||||||
|
} eles {
|
||||||
|
return $body;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
# Finished
|
||||||
|
$req->done();
|
||||||
|
return undef;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
} catch($err) {
|
} catch($err) {
|
||||||
$self->_resp_custom($req, 500, "Server error: ".$err);
|
$self->_resp_custom($req, 500, "Server error: ".$err);
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Add table
Reference in a new issue