From cc54fb208351c97f0e5dc48600d489bae57929da Mon Sep 17 00:00:00 2001 From: Paul Schneider Date: Tue, 25 Jun 2019 17:11:16 +0100 Subject: [PATCH] a first file was sent by websocket --- src/Yavsc.Server/Constants.cs | 4 +- src/Yavsc/Startup/Startup.WebSockets.cs | 6 ++- src/Yavsc/Startup/Startup.cs | 69 ++++++++++++++++--------- src/cli/Commands/Streamer.cs | 34 +++++++++--- src/cli/Program.cs | 3 ++ src/cli/Settings/ConnectionSettings.cs | 5 +- 6 files changed, 86 insertions(+), 35 deletions(-) diff --git a/src/Yavsc.Server/Constants.cs b/src/Yavsc.Server/Constants.cs index e5159d32..408ead8c 100644 --- a/src/Yavsc.Server/Constants.cs +++ b/src/Yavsc.Server/Constants.cs @@ -1,5 +1,6 @@ namespace Yavsc { + using Microsoft.AspNet.Http; using Yavsc.Models.Auth; public static class Constants @@ -36,7 +37,7 @@ namespace Yavsc YavscConnectionStringEnvName = "YAVSC_DB_CONNECTION"; - public const int WebSocketsMaxBufLen = 6*1024; + public const int WebSocketsMaxBufLen = 4*1024; public static readonly long DefaultFSQ = 1024*1024*500; @@ -62,5 +63,6 @@ namespace Yavsc public const int MaxUserNameLength = 26; + public const string LivePath = "/live/cast"; } } diff --git a/src/Yavsc/Startup/Startup.WebSockets.cs b/src/Yavsc/Startup/Startup.WebSockets.cs index 7b06a305..2ef7d49d 100644 --- a/src/Yavsc/Startup/Startup.WebSockets.cs +++ b/src/Yavsc/Startup/Startup.WebSockets.cs @@ -16,10 +16,12 @@ namespace Yavsc { var webSocketOptions = new WebSocketOptions() { - KeepAliveInterval = TimeSpan.FromSeconds(120), + KeepAliveInterval = TimeSpan.FromSeconds(320), ReceiveBufferSize = Constants.WebSocketsMaxBufLen, - ReplaceFeature = true + ReplaceFeature = false + }; + app.UseWebSockets(webSocketOptions); app.UseSignalR(Constants.SignalRPath); } diff --git a/src/Yavsc/Startup/Startup.cs b/src/Yavsc/Startup/Startup.cs index bfc783c8..093d98e8 100755 --- a/src/Yavsc/Startup/Startup.cs +++ b/src/Yavsc/Startup/Startup.cs @@ -62,7 +62,7 @@ namespace Yavsc // leave the final slash - PathString liveCastingPath = "/live/cast"; + PathString liveCastingPath = Constants.LivePath; public Startup(IHostingEnvironment env, IApplicationEnvironment appEnv) { @@ -451,26 +451,28 @@ namespace Yavsc if (context.WebSockets.IsWebSocketRequest) { if (!context.User.Identity.IsAuthenticated) - context.Response.StatusCode = 403; + context.Abort(); else { // get the flow id from request path var castid = long.Parse(context.Request.Path.Value.Substring(liveCastingPath.Value.Length + 1)); - + logger.LogInformation("Cast id : "+castid); var uname = context.User.GetUserName(); // ensure uniqueness of casting stream from this user - var uid = context.User.GetUserId(); // get some setup from user var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == uid && f.Id == castid)); if (flow == null) { - context.Response.StatusCode = 400; + logger.LogWarning("Aborting. Flow info was not found."); + var socket = await context.WebSockets.AcceptWebSocketAsync(); + await socket.CloseAsync(WebSocketCloseStatus.PolicyViolation, "flow id invalid", CancellationToken.None); } else { + logger.LogInformation("flow : "+flow.Title+" for "+uname); LiveCastMeta meta = null; if (LiveApiController.Casters.ContainsKey(uname)) { @@ -478,6 +480,7 @@ namespace Yavsc if (meta.Socket.State != WebSocketState.Closed) { // FIXME loosed connexion should be detected & disposed else where + await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None); meta.Socket.Dispose(); meta.Socket = await context.WebSockets.AcceptWebSocketAsync(); } @@ -497,22 +500,20 @@ namespace Yavsc // Dispatch the flow try { - - - if (meta.Socket != null && meta.Socket.State == WebSocketState.Open) { LiveApiController.Casters[uname] = meta; // TODO: Handle the socket here. // Find receivers: others in the chat room // send them the flow - - var sBuffer = new ArraySegment(new byte[1024]); + var buffer = new byte[Constants.WebSocketsMaxBufLen]; + var sBuffer = new ArraySegment(buffer); logger.LogInformation("Receiving bytes..."); WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None); - logger.LogInformation("Received bytes!!!!"); - + logger.LogInformation($"Received bytes : {received.Count}"); + logger.LogInformation($"Is the end : {received.EndOfMessage}"); +/* TODO var hubContext = GlobalHost.ConnectionManager.GetHubContext(); hubContext.Clients.All.addPublicStream(new @@ -523,42 +524,59 @@ namespace Yavsc url = flow.GetFileUrl(), mediaType = flow.MediaType }, $"{flow.Owner.UserName} is starting a stream!"); - +*/ // FIXME do we really need to close those one in invalid state ? - Stack ToClose = new Stack(); + // Stack ToClose = new Stack(); try { - while (received.MessageType != WebSocketMessageType.Close) - { + /* logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); // Echo anything we receive // and send to all listner found foreach (var cliItem in meta.Listeners) { var listenningSocket = cliItem.Value; - if (listenningSocket.State == WebSocketState.Open) + if (listenningSocket.State == WebSocketState.Open) { await listenningSocket.SendAsync( sBuffer, received.MessageType, received.EndOfMessage, CancellationToken.None); + + } else if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent) { ToClose.Push(cliItem.Key); } } - received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None); + */ + // logger.LogInformation("replying..."); + while (!received.CloseStatus.HasValue) + { + + + // await meta.Socket.SendAsync(new ArraySegment(buffer), received.MessageType, received.EndOfMessage, CancellationToken.None); + + logger.LogInformation("Receiving new bytes..."); + + received = await meta.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); + + logger.LogInformation($"Received new bytes : {received.Count}"); + logger.LogInformation($"Is the end : {received.EndOfMessage}"); + } + logger.LogInformation("Closing connection"); + await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); + - string no; - do + + /* while (ToClose.Count >0) { - no = ToClose.Pop(); + string no = ToClose.Pop(); WebSocket listenningSocket; if (meta.Listeners.TryRemove(no, out listenningSocket)) await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); - } while (no != null); - } - await meta.Socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "eof", CancellationToken.None); + } */ + LiveApiController.Casters[uname] = null; } catch (Exception ex) @@ -597,6 +615,9 @@ namespace Yavsc } } } + else { + context.Response.StatusCode = 400; + } } else { diff --git a/src/cli/Commands/Streamer.cs b/src/cli/Commands/Streamer.cs index 5880b048..e25cfbb9 100644 --- a/src/cli/Commands/Streamer.cs +++ b/src/cli/Commands/Streamer.cs @@ -49,6 +49,7 @@ namespace cli { private async Task DoExecute() { + if (_fileOption.HasValue()){ var fi = new FileInfo(_fileOption.Value()); if (!fi.Exists) { @@ -57,6 +58,7 @@ namespace cli { } using (var stream = fi.OpenRead()) { + _logger.LogInformation("DoExecute from given file"); await DoStream(stream); } return 0; @@ -65,6 +67,7 @@ namespace cli { { using(var stream = Console.OpenStandardInput()) { + _logger.LogInformation("DoExecute from standard input"); await DoStream(stream); } return 0; @@ -72,20 +75,39 @@ namespace cli { } async Task DoStream (Stream stream) { + _tokenSource = new CancellationTokenSource(); - await _client.ConnectAsync( - new Uri(_cxSettings.StreamingUrl+"/"+_flowIdArg.Value), - _tokenSource.Token); + var url = _cxSettings.StreamingUrl+"/"+_flowIdArg.Value; + + _logger.LogInformation("Connecting to "+url); + await _client.ConnectAsync(new Uri(url), _tokenSource.Token); + _logger.LogInformation("Connected"); const int bufLen = Constants.WebSocketsMaxBufLen; byte [] buffer = new byte[bufLen]; const int offset=0; int read = 0; + /* + var reciving = Task.Run(async ()=> { + byte [] readbuffer = new byte[bufLen]; + var rb = new ArraySegment(readbuffer, 0, bufLen); + bool continueReading = false; + do { + var result = await _client.ReceiveAsync(rb, _tokenSource.Token); + _logger.LogInformation($"received {result.Count} bytes"); + continueReading = !result.CloseStatus.HasValue; + } while (continueReading); + } ); */ + do { read = await stream.ReadAsync(buffer, offset, bufLen); var segment = new ArraySegment(buffer, offset, read); - await _client.SendAsync(new ArraySegment(buffer), - WebSocketMessageType.Binary, false, _tokenSource.Token); - } while (read>0); + bool end = read < bufLen; + await _client.SendAsync(new ArraySegment(buffer), WebSocketMessageType.Binary, end, _tokenSource.Token); + _logger.LogInformation($"sent {read} bytes end:{end} "); + + } while (read>0 && stream.CanRead ); + // reciving.Wait(); + await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "EOF", _tokenSource.Token); } } } \ No newline at end of file diff --git a/src/cli/Program.cs b/src/cli/Program.cs index ce43b13d..2228812c 100644 --- a/src/cli/Program.cs +++ b/src/cli/Program.cs @@ -105,6 +105,8 @@ namespace cli // calling a Startup sequence var appBuilder = ConfigureApplication(); var loggerFactory = appBuilder.ApplicationServices.GetRequiredService(); + var cxSettings = appBuilder.ApplicationServices.GetRequiredService>(); + var usercxSettings = appBuilder.ApplicationServices.GetRequiredService>(); CommandOption rootCommandHelpOption = cliapp.HelpOption("-? | -h | --help"); @@ -113,6 +115,7 @@ namespace cli (new AuthCommander(loggerFactory)).Integrate(cliapp); (new CiBuildCommand()).Integrate(cliapp); (new GenerationCommander()).Integrate(cliapp); + (new Streamer(loggerFactory, cxSettings, usercxSettings )).Integrate(cliapp); if (args.Length == 0) { diff --git a/src/cli/Settings/ConnectionSettings.cs b/src/cli/Settings/ConnectionSettings.cs index d85569ed..1f98eae9 100644 --- a/src/cli/Settings/ConnectionSettings.cs +++ b/src/cli/Settings/ConnectionSettings.cs @@ -3,6 +3,7 @@ namespace cli using System.ComponentModel.DataAnnotations.Schema; using System.Runtime.Serialization; using Newtonsoft.Json; + using Yavsc; public class ConnectionSettings { @@ -38,8 +39,8 @@ namespace cli [NotMapped] [JsonIgnore] public string StreamingUrl { get { - return Port==0 ? $"{SiteAccessSheme}://{Authority}/ws": - $"{SiteAccessSheme}://{Authority}:{Port}/ws"; + return Port==0 ? $"ws://{Authority}"+Constants.LivePath: + $"ws://{Authority}:{Port}"+Constants.LivePath; } } } } \ No newline at end of file