From 97f88ecdb0c67d0c6d763952a1c90b4c59eea03b Mon Sep 17 00:00:00 2001 From: Ryan Voots Date: Thu, 28 Dec 2023 13:31:45 -0500 Subject: [PATCH] Structure is there, fixed a few design things i made along the way --- lib/OpenAIAsync/Server.pm | 221 +++++++++++++++++++++++++++----------- 1 file changed, 160 insertions(+), 61 deletions(-) diff --git a/lib/OpenAIAsync/Server.pm b/lib/OpenAIAsync/Server.pm index 86b2029..a98ddcb 100644 --- a/lib/OpenAIAsync/Server.pm +++ b/lib/OpenAIAsync/Server.pm @@ -133,7 +133,7 @@ terminator for TLS connections =back -=head2 auth_check($key, $ctx, $http_req) +=head2 async auth_check($key, $ctx, $http_req) This method requres async keyword. @@ -163,25 +163,25 @@ This method requres async keyword. Handle an embedding request -=head2 async image_generate +=head2 async image_generate($ctx, $image_req) This method requres async keyword. Unimplemented, but once present will be used to generate images with Dall-E (or for self hosted, stable diffusion). -=head2 async text_to_speech +=head2 async text_to_speech($ctx, $tts_req) This method requres async keyword. Unimplemented, but can be used to turn text to speech using whatever algorithms/models are supported. -=head2 async speech_to_text +=head2 async speech_to_text($ctx, $stt_req) This method requres async keyword. Unimplemented. The opposite of the above. -=head2 async vision +=head2 async vision($ctx, $vision_req) This method requres async keyword. @@ -191,6 +191,12 @@ Unimplemented, I've not investigated this one much yet but I believe it's to get At least some for getting the list of models and some other meta information, those will be added next after I get some more documentation written +=head1 SERVER SENT EVENTS + +Design for this is pending, I'll end making this use new methods, i.e. C, etc. These will take in a new $stream object, that can have an event written to it which will be sent without closing the connection. +This is mostly because using this as a proxy will require handling a different kind of client to call another OpenAI endpoint which will necessitate a loop inside +the method that is handling the other end. + =head1 See Also L, L, L @@ -207,27 +213,14 @@ Ryan Voots, ... etc. class OpenAIAsync::Server :repr(HASH) :isa(IO::Async::Notifier) :strict(params) { use JSON::MaybeXS qw//; - use Net::Async::HTTP; + use Net::Async::HTTP::Server; use Feature::Compat::Try; use URI; field $_json = JSON::MaybeXS->new(utf8 => 1, convert_blessed => 1); - field $http; + field $http_servers; # TODO document these directly, other options gets mixed in BEFORE all of these - field $_http_max_in_flight :param(http_max_in_flight) = 2; - field $_http_max_redirects :param(http_max_redirects) = 3; - field $_http_max_connections_per_host :param(http_max_connections_per_host) = 2; - field $_http_timeout :param(http_timeout) = 120; # My personal server is kinda slow, use a generous default - field $_http_stall_timeout :param(http_stall_timeout) = 600; # generous for my slow personal server - field $_http_other :param(http_other_options) = {}; - field $_http_user_agent :param(http_user_agent) = __PACKAGE__." Perl/$VERSION (Net::Async::HTTP/".$Net::Async::HTTP::VERSION." IO::Async/".$IO::Async::VERSION." Perl/$])"; - - field $api_base :param(api_base) = $ENV{OPENAI_API_BASE} // "https://api.openai.com/v1"; - field $api_key :param(api_key) = $ENV{OPENAI_API_KEY}; - - field $api_org_name :param(api_org_name) = undef; - field $io_async_notifier_params :param = undef; method configure(%params) { @@ -237,62 +230,168 @@ class OpenAIAsync::Server :repr(HASH) :isa(IO::Async::Notifier) :strict(params) IO::Async::Notifier::configure($self, %io_async_params); } - method __make_http() { - die "Missing API Key for OpenAI" unless $api_key; + method __make_http_server($port, $listen, $ctx, %args) { + # TODO args? + my $server_id = sprintf("%d\0%d", $listen, $port); + $ctx->{server_id} = $server_id; - return Net::Async::HTTP->new( - $_http_other->%*, - user_agent => "SNN OpenAI Client 1.0", - +headers => { - "Authorization" => "Bearer $api_key", - "Content-Type" => "application/json", - $api_org_name ? ( - 'OpenAI-Organization' => $api_org_name, - ) : () - }, - max_redirects => $_http_max_redirects, - max_connections_per_host => $_http_max_connections_per_host, - max_in_flight => $_http_max_in_flight, - timeout => $_http_timeout, - stall_timeout => $_http_stall_timeout, - ) + my $httpserver = Net::Async::HTTP::Server->new( + on_request => sub($httpself, $req) { + my $f = $self->loop->new_future(); + + my $async_f = $self->_route_request($httpself, $req, $ctx); + $f->on_done($async_f); + $self->adopt_future($async_f); + + $f->done(); + } + ); + + $http_servers->{$server_id} = {server => $httpserver, ctx => $ctx}; + + $self->loop->add($httpserver); } ADJUST { - $http = $self->__make_http; - - $api_base =~ s|/$||; # trim an accidental final / since we will be putting it on the endpoints } - async method _make_request($endpoint, $data) { - my $json = $_json->encode($data); + method _resp_custom($req, $code, $str, $json = 0) { + my $response = HTTP::Response->new( $code ); + $response->content_type('text/plain') unless $json; + $response->content_type('application/json') if $json; + $response->add_content($str); + $response->content_length(length $str); + $req->respond($response); + } - my $url = URI->new($api_base . $endpoint ); + # Pulled out into another method to let subclasses override things if they REALLY want to + method _get_routes($httpserver, $req, $ctx) { + my $routers = { + '/' => { + GET => async sub {$self->_resp_custom($req, 200, "I'm an AI teapot")}, + }, + '/v1/'.OpenAIAsync::Types::Requests::ChatCompletion->_endpoint() => { + POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "ChatCompletion")} + }, + '/v1/'.OpenAIAsync::Types::Requests::Completion->_endpoint() => { + POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "Completion")} + }, + '/v1/'.OpenAIAsync::Types::Requests::Embedding->_endpoint() => { + POST => async sub {$self->_handle_req($httpserver, $req, $ctx, "Embedding")} + }, + }; - my $result = await $http->do_request( - uri => $url, - method => "POST", - content => $json, - content_type => 'application/json', - ); + return $routers; + } - if ($result->is_success) { - my $json = $result->decoded_content; - my $out_data = $_json->decode($json); + async method _route_request($httpserver, $req, $ctx) { + my $routers = $self->_get_routers($httpserver, $req, $ctx); - return $out_data; - } else { - die "Failure in talking to OpenAI service: ".$result->status_line.": ".$result->decoded_content; + my $method = $req->method(); + my $uri = URI->new($req->uri); + my $path = $uri->path; + + try { + if (my $route = $routers->{$path}) { + if (my $method_route = $route->{$method}) { + my $f = Future->wrap($method_route->()); + $self->adopt_future($f); + return $f; + } else { + $self->_resp_custom($req, 405, "Not allowed"); + } + } else { + my $f = await $self->route_request($httpserver, $req, $ctx); + $self->adopt_future($f); + return $f; + } + } catch { + my $err = $@; + + my $f = Future->wrap($self->_resp_custom($req, 400, "Error: ".$err)); + $self->adopt_future($f); + return $f; } } - method _add_to_loop($loop) { - $loop->add($http); + async method route_request($httpserver, $req, $ctx) { + # Base implementation, override in your subclass to do more advanced things + $self->_resp_custom($req, 404, "Not found"); } - method _remove_from_loop($loop) { - $loop->remove($http); - $http = $self->__make_http; # overkill? want to make sure we have a clean one + # TODO decide if I need this for this setup? I think I don't. +# method _add_to_loop($loop) { +# $loop->add($http); +# } +# +# method _remove_from_loop($loop) { +# $loop->remove($http); +# $http = $self->__make_http; # overkill? want to make sure we have a clean one +# } + + method _decode_req($req, $kind) { + my $content_type = $req->header("Content-Type"); + + die "Wrong Content Type '$content_type'" unless $content_type eq 'application/json'; + + my $raw_content = $req->decoded_content(); + my $json = $_json->decode($raw_content); + + if ($kind eq 'ChatCompletion') { + return OpenAIAsync::Types::Requests::ChatCompletion->new($json); + } elsif ($kind eq 'Completion') { + return OpenAIAsync::Types::Requests::Completion->new($json); + } elsif ($kind eq 'Embedding') { + return OpenAIAsync::Types::Requests::Embedding->new($json); + } else { + die "Failed to handle kind $kind"; + } + } + + method _check_response($req, $kind, $content) { + + return $kind eq 'ChatCompletion' ? $content isa OpenAIAsync::Types::Results::ChatCompletion : + $kind eq 'Completion' ? $content isa OpenAIAsync::Types::Results::Completion : + $kind eq 'Embedding' ? $content isa OpenAIAsync::Types::Results::Embedding : + false; + } + + async method _handle_req($httpserver, $req, $ctx, $kind) { + my $authed_f = await $self->auth_check($api_key, $ctx, $req); + + if (not $authed_f->get()) { + # Not authorized, give a 403 + $self->_resp_custom($req, 403, "Forbidden"); + my $dummy_f = $self->loop->new_future(); + $dummy_f->done(); + return $dummy_f; + } + + my $obj = $self->_decode_req($req, $kind); + + if ($obj->can('stream')) { + die "Streaming is unsupported" if $obj->stream; + } + + my $f; + + if ($kind eq 'ChatCompletion') { + $f = await $self->chat($ctx, $obj); + } elsif ($kind eq 'Completion') { + $f = await $self->completion($ctx, $obj); + } elsif ($kind eq 'ChatCompletion') { + $f = await $self->embeddding($ctx, $obj); + } else { + die "Unhandled kind $kind"; + } + + $self->adopt_future($f); + my $resp = $f->get(); + die "Bad response $obj" unless $self->_check_response($req, $kind, $resp); + + my $json_resp = $_json->encode($resp); + $self->_custom_resp($req, 200, $json_resp, 1); + return $f; } # This is the legacy completion api