Structure is there, fixed a few design things i made along the way
Some checks failed
ci/woodpecker/push/author-tests Pipeline failed
Some checks failed
ci/woodpecker/push/author-tests Pipeline failed
This commit is contained in:
parent
303b3cd37d
commit
97f88ecdb0
1 changed files with 160 additions and 61 deletions
|
@ -133,7 +133,7 @@ terminator for TLS connections
|
|||
|
||||
=back
|
||||
|
||||
=head2 auth_check($key, $ctx, $http_req)
|
||||
=head2 async auth_check($key, $ctx, $http_req)
|
||||
|
||||
This method requres async keyword.
|
||||
|
||||
|
@ -163,25 +163,25 @@ This method requres async keyword.
|
|||
|
||||
Handle an embedding request
|
||||
|
||||
=head2 async image_generate
|
||||
=head2 async image_generate($ctx, $image_req)
|
||||
|
||||
This method requres async keyword.
|
||||
|
||||
Unimplemented, but once present will be used to generate images with Dall-E (or for self hosted, stable diffusion).
|
||||
|
||||
=head2 async text_to_speech
|
||||
=head2 async text_to_speech($ctx, $tts_req)
|
||||
|
||||
This method requres async keyword.
|
||||
|
||||
Unimplemented, but can be used to turn text to speech using whatever algorithms/models are supported.
|
||||
|
||||
=head2 async speech_to_text
|
||||
=head2 async speech_to_text($ctx, $stt_req)
|
||||
|
||||
This method requres async keyword.
|
||||
|
||||
Unimplemented. The opposite of the above.
|
||||
|
||||
=head2 async vision
|
||||
=head2 async vision($ctx, $vision_req)
|
||||
|
||||
This method requres async keyword.
|
||||
|
||||
|
@ -191,6 +191,12 @@ Unimplemented, I've not investigated this one much yet but I believe it's to get
|
|||
|
||||
At least some for getting the list of models and some other meta information, those will be added next after I get some more documentation written
|
||||
|
||||
=head1 SERVER SENT EVENTS
|
||||
|
||||
Design for this is pending, I'll end making this use new methods, i.e. C<stream_chat_completion>, etc. These will take in a new $stream object, that can have an event written to it which will be sent without closing the connection.
|
||||
This is mostly because using this as a proxy will require handling a different kind of client to call another OpenAI endpoint which will necessitate a loop inside
|
||||
the method that is handling the other end.
|
||||
|
||||
=head1 See Also
|
||||
|
||||
L<IO::Async>, L<Future::AsyncAwait>, L<Net::Async::HTTP>
|
||||
|
@ -207,27 +213,14 @@ Ryan Voots, ... etc.
|
|||
|
||||
class OpenAIAsync::Server :repr(HASH) :isa(IO::Async::Notifier) :strict(params) {
|
||||
use JSON::MaybeXS qw//;
|
||||
use Net::Async::HTTP;
|
||||
use Net::Async::HTTP::Server;
|
||||
use Feature::Compat::Try;
|
||||
use URI;
|
||||
|
||||
field $_json = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1);
|
||||
field $http;
|
||||
field $http_servers;
|
||||
|
||||
# TODO document these directly, other options gets mixed in BEFORE all of these
|
||||
field $_http_max_in_flight :param(http_max_in_flight) = 2;
|
||||
field $_http_max_redirects :param(http_max_redirects) = 3;
|
||||
field $_http_max_connections_per_host :param(http_max_connections_per_host) = 2;
|
||||
field $_http_timeout :param(http_timeout) = 120; # My personal server is kinda slow, use a generous default
|
||||
field $_http_stall_timeout :param(http_stall_timeout) = 600; # generous for my slow personal server
|
||||
field $_http_other :param(http_other_options) = {};
|
||||
field $_http_user_agent :param(http_user_agent) = __PACKAGE__." Perl/$VERSION (Net::Async::HTTP/".$Net::Async::HTTP::VERSION." IO::Async/".$IO::Async::VERSION." Perl/$])";
|
||||
|
||||
field $api_base :param(api_base) = $ENV{OPENAI_API_BASE} // "https://api.openai.com/v1";
|
||||
field $api_key :param(api_key) = $ENV{OPENAI_API_KEY};
|
||||
|
||||
field $api_org_name :param(api_org_name) = undef;
|
||||
|
||||
field $io_async_notifier_params :param = undef;
|
||||
|
||||
method configure(%params) {
|
||||
|
@ -237,62 +230,168 @@ class OpenAIAsync::Server :repr(HASH) :isa(IO::Async::Notifier) :strict(params)
|
|||
IO::Async::Notifier::configure($self, %io_async_params);
|
||||
}
|
||||
|
||||
method __make_http() {
|
||||
die "Missing API Key for OpenAI" unless $api_key;
|
||||
method __make_http_server($port, $listen, $ctx, %args) {
|
||||
# TODO args?
|
||||
my $server_id = sprintf("%d\0%d", $listen, $port);
|
||||
$ctx->{server_id} = $server_id;
|
||||
|
||||
return Net::Async::HTTP->new(
|
||||
$_http_other->%*,
|
||||
user_agent => "SNN OpenAI Client 1.0",
|
||||
+headers => {
|
||||
"Authorization" => "Bearer $api_key",
|
||||
"Content-Type" => "application/json",
|
||||
$api_org_name ? (
|
||||
'OpenAI-Organization' => $api_org_name,
|
||||
) : ()
|
||||
},
|
||||
max_redirects => $_http_max_redirects,
|
||||
max_connections_per_host => $_http_max_connections_per_host,
|
||||
max_in_flight => $_http_max_in_flight,
|
||||
timeout => $_http_timeout,
|
||||
stall_timeout => $_http_stall_timeout,
|
||||
)
|
||||
my $httpserver = Net::Async::HTTP::Server->new(
|
||||
on_request => sub($httpself, $req) {
|
||||
my $f = $self->loop->new_future();
|
||||
|
||||
my $async_f = $self->_route_request($httpself, $req, $ctx);
|
||||
$f->on_done($async_f);
|
||||
$self->adopt_future($async_f);
|
||||
|
||||
$f->done();
|
||||
}
|
||||
);
|
||||
|
||||
$http_servers->{$server_id} = {server => $httpserver, ctx => $ctx};
|
||||
|
||||
$self->loop->add($httpserver);
|
||||
}
|
||||
|
||||
ADJUST {
|
||||
$http = $self->__make_http;
|
||||
|
||||
$api_base =~ s|/$||; # trim an accidental final / since we will be putting it on the endpoints
|
||||
}
|
||||
|
||||
async method _make_request($endpoint, $data) {
|
||||
my $json = $_json->encode($data);
|
||||
method _resp_custom($req, $code, $str, $json = 0) {
|
||||
my $response = HTTP::Response->new( $code );
|
||||
$response->content_type('text/plain') unless $json;
|
||||
$response->content_type('application/json') if $json;
|
||||
$response->add_content($str);
|
||||
$response->content_length(length $str);
|
||||
$req->respond($response);
|
||||
}
|
||||
|
||||
my $url = URI->new($api_base . $endpoint );
|
||||
# Pulled out into another method to let subclasses override things if they REALLY want to
|
||||
method _get_routes($httpserver, $req, $ctx) {
|
||||
my $routers = {
|
||||
'/' => {
|
||||
GET => async sub {$self->_resp_custom($req, 200, "I'm an AI teapot")},
|
||||
},
|
||||
'/v1/'.OpenAIAsync::Types::Requests::ChatCompletion->_endpoint() => {
|
||||
POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "ChatCompletion")}
|
||||
},
|
||||
'/v1/'.OpenAIAsync::Types::Requests::Completion->_endpoint() => {
|
||||
POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "Completion")}
|
||||
},
|
||||
'/v1/'.OpenAIAsync::Types::Requests::Embedding->_endpoint() => {
|
||||
POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "Embedding")}
|
||||
},
|
||||
};
|
||||
|
||||
my $result = await $http->do_request(
|
||||
uri => $url,
|
||||
method => "POST",
|
||||
content => $json,
|
||||
content_type => 'application/json',
|
||||
);
|
||||
return $routers;
|
||||
}
|
||||
|
||||
if ($result->is_success) {
|
||||
my $json = $result->decoded_content;
|
||||
my $out_data = $_json->decode($json);
|
||||
async method _route_request($httpserver, $req, $ctx) {
|
||||
my $routers = $self->_get_routers($httpserver, $req, $ctx);
|
||||
|
||||
return $out_data;
|
||||
} else {
|
||||
die "Failure in talking to OpenAI service: ".$result->status_line.": ".$result->decoded_content;
|
||||
my $method = $req->method();
|
||||
my $uri = URI->new($req->uri);
|
||||
my $path = $uri->path;
|
||||
|
||||
try {
|
||||
if (my $route = $routers->{$path}) {
|
||||
if (my $method_route = $route->{$method}) {
|
||||
my $f = Future->wrap($method_route->());
|
||||
$self->adopt_future($f);
|
||||
return $f;
|
||||
} else {
|
||||
$self->_resp_custom($req, 405, "Not allowed");
|
||||
}
|
||||
} else {
|
||||
my $f = await $self->route_request($httpserver, $req, $ctx);
|
||||
$self->adopt_future($f);
|
||||
return $f;
|
||||
}
|
||||
} catch {
|
||||
my $err = $@;
|
||||
|
||||
my $f = Future->wrap($self->_resp_custom($req, 400, "Error: ".$err));
|
||||
$self->adopt_future($f);
|
||||
return $f;
|
||||
}
|
||||
}
|
||||
|
||||
method _add_to_loop($loop) {
|
||||
$loop->add($http);
|
||||
async method route_request($httpserver, $req, $ctx) {
|
||||
# Base implementation, override in your subclass to do more advanced things
|
||||
$self->_resp_custom($req, 404, "Not found");
|
||||
}
|
||||
|
||||
method _remove_from_loop($loop) {
|
||||
$loop->remove($http);
|
||||
$http = $self->__make_http; # overkill? want to make sure we have a clean one
|
||||
# TODO decide if I need this for this setup? I think I don't.
|
||||
# method _add_to_loop($loop) {
|
||||
# $loop->add($http);
|
||||
# }
|
||||
#
|
||||
# method _remove_from_loop($loop) {
|
||||
# $loop->remove($http);
|
||||
# $http = $self->__make_http; # overkill? want to make sure we have a clean one
|
||||
# }
|
||||
|
||||
method _decode_req($req, $kind) {
|
||||
my $content_type = $req->header("Content-Type");
|
||||
|
||||
die "Wrong Content Type '$content_type'" unless $content_type eq 'application/json';
|
||||
|
||||
my $raw_content = $req->decoded_content();
|
||||
my $json = $_json->decode($raw_content);
|
||||
|
||||
if ($kind eq 'ChatCompletion') {
|
||||
return OpenAIAsync::Types::Requests::ChatCompletion->new($json);
|
||||
} elsif ($kind eq 'Completion') {
|
||||
return OpenAIAsync::Types::Requests::Completion->new($json);
|
||||
} elsif ($kind eq 'Embedding') {
|
||||
return OpenAIAsync::Types::Requests::Embedding->new($json);
|
||||
} else {
|
||||
die "Failed to handle kind $kind";
|
||||
}
|
||||
}
|
||||
|
||||
method _check_response($req, $kind, $content) {
|
||||
|
||||
return $kind eq 'ChatCompletion' ? $content isa OpenAIAsync::Types::Results::ChatCompletion :
|
||||
$kind eq 'Completion' ? $content isa OpenAIAsync::Types::Results::Completion :
|
||||
$kind eq 'Embedding' ? $content isa OpenAIAsync::Types::Results::Embedding :
|
||||
false;
|
||||
}
|
||||
|
||||
async method _handle_req($httpserver, $req, $ctx, $kind) {
|
||||
my $authed_f = await $self->auth_check($api_key, $ctx, $req);
|
||||
|
||||
if (not $authed_f->get()) {
|
||||
# Not authorized, give a 403
|
||||
$self->_resp_custom($req, 403, "Forbidden");
|
||||
my $dummy_f = $self->loop->new_future();
|
||||
$dummy_f->done();
|
||||
return $dummy_f;
|
||||
}
|
||||
|
||||
my $obj = $self->_decode_req($req, $kind);
|
||||
|
||||
if ($obj->can('stream')) {
|
||||
die "Streaming is unsupported" if $obj->stream;
|
||||
}
|
||||
|
||||
my $f;
|
||||
|
||||
if ($kind eq 'ChatCompletion') {
|
||||
$f = await $self->chat($ctx, $obj);
|
||||
} elsif ($kind eq 'Completion') {
|
||||
$f = await $self->completion($ctx, $obj);
|
||||
} elsif ($kind eq 'ChatCompletion') {
|
||||
$f = await $self->embeddding($ctx, $obj);
|
||||
} else {
|
||||
die "Unhandled kind $kind";
|
||||
}
|
||||
|
||||
$self->adopt_future($f);
|
||||
my $resp = $f->get();
|
||||
die "Bad response $obj" unless $self->_check_response($req, $kind, $resp);
|
||||
|
||||
my $json_resp = $_json->encode($resp);
|
||||
$self->_custom_resp($req, 200, $json_resp, 1);
|
||||
return $f;
|
||||
}
|
||||
|
||||
# This is the legacy completion api
|
||||
|
|
Loading…
Add table
Reference in a new issue