From 85a88e1f10b10a3984df559398350950a1e5302b Mon Sep 17 00:00:00 2001 From: Paul Schneider Date: Sun, 18 Oct 2020 23:00:17 +0100 Subject: [PATCH] Fixing destination file @streaming --- Makefile | 2 + src/Yavsc.Abstract/Constants.cs | 2 + .../FileSystem/AbstractFileSystemHelpers.cs | 12 +++ .../Access/CircleAuthorizationToFile.cs | 2 + .../Blogspot/FileSystemApiController.cs | 2 + .../Blogspot/FileSystemStream.cs | 67 +++++++++++++ src/Yavsc/Services/LiveProcessor.cs | 9 +- src/Yavsc/Startup/Startup.WebSockets.cs | 4 +- .../ViewModels/Streaming/LiveCastHandler.cs | 2 +- src/cli/Commands/Streamer.cs | 99 ++++++++++++++++++- src/cli/Settings/ConnectionSettings.cs | 2 +- 11 files changed, 193 insertions(+), 10 deletions(-) create mode 100644 src/Yavsc/ApiControllers/Blogspot/FileSystemStream.cs diff --git a/Makefile b/Makefile index 8c977164..acb29c82 100644 --- a/Makefile +++ b/Makefile @@ -7,6 +7,8 @@ CONFIG=Debug git_status := $(shell git status -s --porcelain |wc -l) +all: yavscd + clean: rm -f src/Yavsc.Abstract/bin/$(CONFIG)/dnx451/Yavsc.Abstract.dll src/OAuth.AspNet.Token/bin/$(CONFIG)/dnx451/OAuth.AspNet.Token.dll src/OAuth.AspNet.AuthServer/bin/$(CONFIG)/dnx451/OAuth.AspNet.AuthServer.dll src/Yavsc.Server/bin/$(CONFIG)/dnx451/Yavsc.Server.dll src/Yavsc/bin/$(CONFIG)/dnx451/Yavsc.dll diff --git a/src/Yavsc.Abstract/Constants.cs b/src/Yavsc.Abstract/Constants.cs index cdf5bf61..c14b38b7 100644 --- a/src/Yavsc.Abstract/Constants.cs +++ b/src/Yavsc.Abstract/Constants.cs @@ -60,5 +60,7 @@ namespace Yavsc public const int MaxUserNameLength = 26; public const string LivePath = "/live/cast"; + + public const string StreamingPath = "/api/stream/Put"; } } diff --git a/src/Yavsc.Abstract/FileSystem/AbstractFileSystemHelpers.cs b/src/Yavsc.Abstract/FileSystem/AbstractFileSystemHelpers.cs index ccad8d88..7da35379 100644 --- a/src/Yavsc.Abstract/FileSystem/AbstractFileSystemHelpers.cs +++ b/src/Yavsc.Abstract/FileSystem/AbstractFileSystemHelpers.cs @@ -38,6 +38,17 @@ namespace Yavsc.Helpers return !name.Any(c => !ValidFileNameChars.Contains(c)); } + public static bool IsValidShortFileName(this string name) + { + if (name.Any(c => !ValidFileNameChars.Contains(c))) + return false; + + if (!name.Any(c => !AlfaNum.Contains(c))) + return false; + + return true; + } + // Ensure this path is canonical, // No "dirto/./this", neither "dirt/to/that/" // no .. and each char must be listed as valid in constants @@ -69,6 +80,7 @@ namespace Yavsc.Helpers // Server side only supports POSIX file systems public const char RemoteDirectorySeparator = '/'; + public static char[] AlfaNum = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".ToCharArray(); // Only accept descent remote file names public static char[] ValidFileNameChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-=_~. %#".ToCharArray(); diff --git a/src/Yavsc.Server/Models/Access/CircleAuthorizationToFile.cs b/src/Yavsc.Server/Models/Access/CircleAuthorizationToFile.cs index 9884838d..8eaba90d 100644 --- a/src/Yavsc.Server/Models/Access/CircleAuthorizationToFile.cs +++ b/src/Yavsc.Server/Models/Access/CircleAuthorizationToFile.cs @@ -1,3 +1,4 @@ +using System; using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations.Schema; using Newtonsoft.Json; @@ -8,6 +9,7 @@ using Yavsc.Models.Relationship; namespace Yavsc.Server.Models.Access { + [Obsolete] public class CircleAuthorizationToFile : ICircleAuthorization { diff --git a/src/Yavsc/ApiControllers/Blogspot/FileSystemApiController.cs b/src/Yavsc/ApiControllers/Blogspot/FileSystemApiController.cs index 1ad8dd17..13e9aec1 100644 --- a/src/Yavsc/ApiControllers/Blogspot/FileSystemApiController.cs +++ b/src/Yavsc/ApiControllers/Blogspot/FileSystemApiController.cs @@ -180,6 +180,8 @@ namespace Yavsc.ApiControllers } return Ok(new { deleted=id }); } + + } } diff --git a/src/Yavsc/ApiControllers/Blogspot/FileSystemStream.cs b/src/Yavsc/ApiControllers/Blogspot/FileSystemStream.cs new file mode 100644 index 00000000..e2987e3c --- /dev/null +++ b/src/Yavsc/ApiControllers/Blogspot/FileSystemStream.cs @@ -0,0 +1,67 @@ +using System.IO; +using System.Linq; +using System.Security.Claims; +using System.Threading.Tasks; +using Microsoft.AspNet.Authorization; +using Microsoft.AspNet.Mvc; +using Microsoft.Data.Entity; +using Microsoft.Extensions.Logging; +using Yavsc.Attributes.Validation; +using Yavsc.Helpers; +using Yavsc.Models; +using Yavsc.Models.Messaging; +using Yavsc.Services; + +namespace Yavsc.ApiControllers +{ + [Authorize, Route("api/stream")] + public partial class FileSystemStreamController : Controller + { + private readonly ILogger logger; + private readonly ILiveProcessor liveProcessor; + readonly ApplicationDbContext dbContext; + + public FileSystemStreamController(ApplicationDbContext context, ILiveProcessor liveProcessor, ILoggerFactory loggerFactory) + { + this.dbContext = context; + this.logger = loggerFactory.CreateLogger(); + this.liveProcessor = liveProcessor; + } + public async Task Put([ValidRemoteUserFilePath] string filename) + { + if (!HttpContext.WebSockets.IsWebSocketRequest) + return HttpBadRequest("not a web socket"); + if (!HttpContext.User.Identity.IsAuthenticated) + return new HttpUnauthorizedResult(); + var subdirs = filename.Split('/'); + var filePath = subdirs.Length > 1 ? string.Join("/", subdirs.Take(subdirs.Length-1)) : null; + var shortFileName = subdirs[subdirs.Length-1]; + if (shortFileName.IsValidShortFileName()) + return HttpBadRequest("invalid file name"); + + + var userName = User.GetUserName(); + var hubContext = Microsoft.AspNet.SignalR.GlobalHost.ConnectionManager.GetHubContext(); + string url = string.Format( + "{0}/{1}/{2}", + Startup.UserFilesOptions.RequestPath.ToUriComponent(), + userName, + filename + ); + + hubContext.Clients.All.addPublicStream(new PublicStreamInfo + { + sender = userName, + url = url, + }, $"{userName} is starting a stream!"); + + string destDir = HttpContext.User.InitPostToFileSystem(filePath); + logger.LogInformation($"Saving flow to {destDir}"); + var userId = User.GetUserId(); + var user = await dbContext.Users.FirstAsync(u => u.Id == userId); + + await liveProcessor.AcceptStream(HttpContext, user, destDir, shortFileName); + return Ok(); + } + } +} diff --git a/src/Yavsc/Services/LiveProcessor.cs b/src/Yavsc/Services/LiveProcessor.cs index 330a0d87..b13b4137 100644 --- a/src/Yavsc/Services/LiveProcessor.cs +++ b/src/Yavsc/Services/LiveProcessor.cs @@ -40,8 +40,6 @@ namespace Yavsc.Services public async Task AcceptStream(HttpContext context, ApplicationUser user, string destDir, string fileName) { // TODO defer request handling - - string uname = user.UserName; LiveCastHandler liveHandler = null; if (Casters.ContainsKey(uname)) @@ -94,6 +92,7 @@ namespace Yavsc.Services var fsInputQueue = new Queue>(); bool endOfInput = false; + sBuffer = new ArraySegment(buffer,0,received.Count); fsInputQueue.Enqueue(sBuffer); var taskWritingToFs = liveHandler.ReceiveUserFile(user, _logger, destDir, fsInputQueue, fileName, () => endOfInput); @@ -128,18 +127,20 @@ namespace Yavsc.Services _logger.LogInformation("try and receive new bytes"); buffer = new byte[Constants.WebSocketsMaxBufLen]; - sBuffer = new ArraySegment(buffer); received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token); _logger.LogInformation($"Received bytes : {received.Count}"); + + sBuffer = new ArraySegment(buffer,0,received.Count); _logger.LogInformation($"segment : offset: {sBuffer.Offset} count: {sBuffer.Count}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}"); - fsInputQueue.Enqueue(sBuffer); + if (received.CloseStatus.HasValue) { endOfInput=true; _logger.LogInformation($"received a close status: {received.CloseStatus.Value}: {received.CloseStatusDescription}"); } + else fsInputQueue.Enqueue(sBuffer); } else endOfInput=true; while (ToClose.Count > 0) diff --git a/src/Yavsc/Startup/Startup.WebSockets.cs b/src/Yavsc/Startup/Startup.WebSockets.cs index 60abe721..df3d8f70 100644 --- a/src/Yavsc/Startup/Startup.WebSockets.cs +++ b/src/Yavsc/Startup/Startup.WebSockets.cs @@ -16,8 +16,8 @@ namespace Yavsc var webSocketOptions = new WebSocketOptions() { KeepAliveInterval = TimeSpan.FromSeconds(30), - ReceiveBufferSize = Constants.WebSocketsMaxBufLen+4*sizeof(int), - ReplaceFeature = true + ReceiveBufferSize = Constants.WebSocketsMaxBufLen, + ReplaceFeature = false }; app.UseWebSockets(webSocketOptions); diff --git a/src/Yavsc/ViewModels/Streaming/LiveCastHandler.cs b/src/Yavsc/ViewModels/Streaming/LiveCastHandler.cs index 3ecd4514..fc71e760 100644 --- a/src/Yavsc/ViewModels/Streaming/LiveCastHandler.cs +++ b/src/Yavsc/ViewModels/Streaming/LiveCastHandler.cs @@ -64,7 +64,7 @@ namespace Yavsc.ViewModels.Streaming { var buffer = queue.Dequeue(); - logger.LogInformation($"writing {buffer.Array.Length} bytes..."); + logger.LogInformation($"writing {buffer.Count} bytes..."); await dest.WriteAsync(buffer.Array, buffer.Offset, buffer.Count); logger.LogInformation($"done."); diff --git a/src/cli/Commands/Streamer.cs b/src/cli/Commands/Streamer.cs index 406e62ed..484ca461 100644 --- a/src/cli/Commands/Streamer.cs +++ b/src/cli/Commands/Streamer.cs @@ -1,8 +1,10 @@ using System; using System.IO; using System.Net.WebSockets; +using System.Security.Policy; using System.Threading; using System.Threading.Tasks; +using System.Web; using cli.Model; using Microsoft.Extensions.CommandLineUtils; using Microsoft.Extensions.Logging; @@ -11,7 +13,7 @@ using Yavsc.Abstract; namespace cli { - public class Streamer: ICommander { + public class Streamer: ICommander { private readonly ClientWebSocket _client; private readonly ILogger _logger; private readonly ConnectionSettings _cxSettings; @@ -94,7 +96,100 @@ namespace cli { read = await stream.ReadAsync(buffer, offset, bufLen); lastFrame = read < Yavsc.Constants.WebSocketsMaxBufLen; ArraySegment segment = new ArraySegment(buffer, offset, read); - await _client.SendAsync(new ArraySegment(buffer), pckType, lastFrame, _tokenSource.Token); + await _client.SendAsync(segment, pckType, lastFrame, _tokenSource.Token); + _logger.LogInformation($"sent {segment.Count} "); + } while (!lastFrame); + _logger.LogInformation($"Closing socket"); + await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "EOF", _tokenSource.Token); + } + } + + public class NStreamer: ICommander { + private readonly ClientWebSocket _client; + private readonly ILogger _logger; + private readonly ConnectionSettings _cxSettings; + private readonly UserConnectionSettings _userCxSettings; + private CommandArgument _sourceArg; + private CommandArgument _destArg; + private CancellationTokenSource _tokenSource; + + public NStreamer(ILoggerFactory loggerFactory, + IOptions cxSettings, + IOptions userCxSettings + ) + { + _logger = loggerFactory.CreateLogger(); + _cxSettings = cxSettings.Value; + _userCxSettings = userCxSettings.Value; + _client = new ClientWebSocket(); + _client.Options.SetRequestHeader("Authorization", $"Bearer {_userCxSettings.AccessToken}"); + } + + public CommandLineApplication Integrate(CommandLineApplication rootApp) + { + CommandLineApplication streamCmd = rootApp.Command("stream", + (target) => + { + target.FullName = "Stream to server"; + target.Description = "Stream arbitrary binary data to your server channel"; + + _sourceArg = target.Argument("source", "Source file to send, use '-' for standard input", false); + _destArg = target.Argument("destination", "destination file name", false); + + target.HelpOption("-? | -h | --help"); + }); + streamCmd.OnExecute(async() => await DoExecute()); + return streamCmd; + } + + private async Task DoExecute() + { + + if (_sourceArg.Value != "-") + { + var fi = new FileInfo(_sourceArg.Value); + if (!fi.Exists) { + _logger.LogError("Input file doesn´t exist."); + return -2; + } + using (var stream = fi.OpenRead()) + { + _logger.LogInformation("DoExecute from given file"); + await DoStream(stream); + } + return 0; + } + else + { + using(var stream = Console.OpenStandardInput()) + { + _logger.LogInformation("DoExecute from standard input"); + await DoStream(stream); + } + return 0; + } + } + async Task DoStream (Stream stream) + { + _tokenSource = new CancellationTokenSource(); + var url = _cxSettings.StreamingUrl + "/" + HttpUtility.UrlEncode(_destArg.Value); + + _logger.LogInformation("Connecting to " + url); + await _client.ConnectAsync(new Uri(url), _tokenSource.Token); + _logger.LogInformation("Connected"); + const int bufLen = Yavsc.Constants.WebSocketsMaxBufLen; + byte [] buffer = new byte[bufLen]; + const int offset=0; + int read; + bool lastFrame; + + WebSocketMessageType pckType = WebSocketMessageType.Binary; + do + { + read = await stream.ReadAsync(buffer, offset, bufLen); + lastFrame = read < Yavsc.Constants.WebSocketsMaxBufLen; + ArraySegment segment = new ArraySegment(buffer, offset, read); + await _client.SendAsync(segment, pckType, lastFrame, _tokenSource.Token); _logger.LogInformation($"sent {segment.Count} "); } while (!lastFrame); _logger.LogInformation($"Closing socket"); diff --git a/src/cli/Settings/ConnectionSettings.cs b/src/cli/Settings/ConnectionSettings.cs index 1f98eae9..7148b019 100644 --- a/src/cli/Settings/ConnectionSettings.cs +++ b/src/cli/Settings/ConnectionSettings.cs @@ -43,4 +43,4 @@ namespace cli $"ws://{Authority}:{Port}"+Constants.LivePath; } } } -} \ No newline at end of file +}