Fixing destination file @streaming

vnext
Paul Schneider 4 years ago
parent 29edbec8a7
commit 85a88e1f10
11 changed files with 193 additions and 10 deletions

@ -7,6 +7,8 @@ CONFIG=Debug
git_status := $(shell git status -s --porcelain |wc -l) git_status := $(shell git status -s --porcelain |wc -l)
all: yavscd
clean: clean:
rm -f src/Yavsc.Abstract/bin/$(CONFIG)/dnx451/Yavsc.Abstract.dll src/OAuth.AspNet.Token/bin/$(CONFIG)/dnx451/OAuth.AspNet.Token.dll src/OAuth.AspNet.AuthServer/bin/$(CONFIG)/dnx451/OAuth.AspNet.AuthServer.dll src/Yavsc.Server/bin/$(CONFIG)/dnx451/Yavsc.Server.dll src/Yavsc/bin/$(CONFIG)/dnx451/Yavsc.dll rm -f src/Yavsc.Abstract/bin/$(CONFIG)/dnx451/Yavsc.Abstract.dll src/OAuth.AspNet.Token/bin/$(CONFIG)/dnx451/OAuth.AspNet.Token.dll src/OAuth.AspNet.AuthServer/bin/$(CONFIG)/dnx451/OAuth.AspNet.AuthServer.dll src/Yavsc.Server/bin/$(CONFIG)/dnx451/Yavsc.Server.dll src/Yavsc/bin/$(CONFIG)/dnx451/Yavsc.dll

@ -60,5 +60,7 @@ namespace Yavsc
public const int MaxUserNameLength = 26; public const int MaxUserNameLength = 26;
public const string LivePath = "/live/cast"; public const string LivePath = "/live/cast";
public const string StreamingPath = "/api/stream/Put";
} }
} }

@ -38,6 +38,17 @@ namespace Yavsc.Helpers
return !name.Any(c => !ValidFileNameChars.Contains(c)); return !name.Any(c => !ValidFileNameChars.Contains(c));
} }
public static bool IsValidShortFileName(this string name)
{
if (name.Any(c => !ValidFileNameChars.Contains(c)))
return false;
if (!name.Any(c => !AlfaNum.Contains(c)))
return false;
return true;
}
// Ensure this path is canonical, // Ensure this path is canonical,
// No "dirto/./this", neither "dirt/to/that/" // No "dirto/./this", neither "dirt/to/that/"
// no .. and each char must be listed as valid in constants // no .. and each char must be listed as valid in constants
@ -69,6 +80,7 @@ namespace Yavsc.Helpers
// Server side only supports POSIX file systems // Server side only supports POSIX file systems
public const char RemoteDirectorySeparator = '/'; public const char RemoteDirectorySeparator = '/';
public static char[] AlfaNum = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".ToCharArray();
// Only accept descent remote file names // Only accept descent remote file names
public static char[] ValidFileNameChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-=_~. %#".ToCharArray(); public static char[] ValidFileNameChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-=_~. %#".ToCharArray();

@ -1,3 +1,4 @@
using System;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema; using System.ComponentModel.DataAnnotations.Schema;
using Newtonsoft.Json; using Newtonsoft.Json;
@ -8,6 +9,7 @@ using Yavsc.Models.Relationship;
namespace Yavsc.Server.Models.Access namespace Yavsc.Server.Models.Access
{ {
[Obsolete]
public class CircleAuthorizationToFile : ICircleAuthorization public class CircleAuthorizationToFile : ICircleAuthorization
{ {

@ -180,6 +180,8 @@ namespace Yavsc.ApiControllers
} }
return Ok(new { deleted=id }); return Ok(new { deleted=id });
} }
} }
} }

@ -0,0 +1,67 @@
using System.IO;
using System.Linq;
using System.Security.Claims;
using System.Threading.Tasks;
using Microsoft.AspNet.Authorization;
using Microsoft.AspNet.Mvc;
using Microsoft.Data.Entity;
using Microsoft.Extensions.Logging;
using Yavsc.Attributes.Validation;
using Yavsc.Helpers;
using Yavsc.Models;
using Yavsc.Models.Messaging;
using Yavsc.Services;
namespace Yavsc.ApiControllers
{
[Authorize, Route("api/stream")]
public partial class FileSystemStreamController : Controller
{
private readonly ILogger logger;
private readonly ILiveProcessor liveProcessor;
readonly ApplicationDbContext dbContext;
public FileSystemStreamController(ApplicationDbContext context, ILiveProcessor liveProcessor, ILoggerFactory loggerFactory)
{
this.dbContext = context;
this.logger = loggerFactory.CreateLogger<FileSystemStreamController>();
this.liveProcessor = liveProcessor;
}
public async Task<IActionResult> Put([ValidRemoteUserFilePath] string filename)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
return HttpBadRequest("not a web socket");
if (!HttpContext.User.Identity.IsAuthenticated)
return new HttpUnauthorizedResult();
var subdirs = filename.Split('/');
var filePath = subdirs.Length > 1 ? string.Join("/", subdirs.Take(subdirs.Length-1)) : null;
var shortFileName = subdirs[subdirs.Length-1];
if (shortFileName.IsValidShortFileName())
return HttpBadRequest("invalid file name");
var userName = User.GetUserName();
var hubContext = Microsoft.AspNet.SignalR.GlobalHost.ConnectionManager.GetHubContext<ChatHub>();
string url = string.Format(
"{0}/{1}/{2}",
Startup.UserFilesOptions.RequestPath.ToUriComponent(),
userName,
filename
);
hubContext.Clients.All.addPublicStream(new PublicStreamInfo
{
sender = userName,
url = url,
}, $"{userName} is starting a stream!");
string destDir = HttpContext.User.InitPostToFileSystem(filePath);
logger.LogInformation($"Saving flow to {destDir}");
var userId = User.GetUserId();
var user = await dbContext.Users.FirstAsync(u => u.Id == userId);
await liveProcessor.AcceptStream(HttpContext, user, destDir, shortFileName);
return Ok();
}
}
}

@ -40,8 +40,6 @@ namespace Yavsc.Services
public async Task<bool> AcceptStream(HttpContext context, ApplicationUser user, string destDir, string fileName) public async Task<bool> AcceptStream(HttpContext context, ApplicationUser user, string destDir, string fileName)
{ {
// TODO defer request handling // TODO defer request handling
string uname = user.UserName; string uname = user.UserName;
LiveCastHandler liveHandler = null; LiveCastHandler liveHandler = null;
if (Casters.ContainsKey(uname)) if (Casters.ContainsKey(uname))
@ -94,6 +92,7 @@ namespace Yavsc.Services
var fsInputQueue = new Queue<ArraySegment<byte>>(); var fsInputQueue = new Queue<ArraySegment<byte>>();
bool endOfInput = false; bool endOfInput = false;
sBuffer = new ArraySegment<byte>(buffer,0,received.Count);
fsInputQueue.Enqueue(sBuffer); fsInputQueue.Enqueue(sBuffer);
var taskWritingToFs = liveHandler.ReceiveUserFile(user, _logger, destDir, fsInputQueue, fileName, () => endOfInput); var taskWritingToFs = liveHandler.ReceiveUserFile(user, _logger, destDir, fsInputQueue, fileName, () => endOfInput);
@ -128,18 +127,20 @@ namespace Yavsc.Services
_logger.LogInformation("try and receive new bytes"); _logger.LogInformation("try and receive new bytes");
buffer = new byte[Constants.WebSocketsMaxBufLen]; buffer = new byte[Constants.WebSocketsMaxBufLen];
sBuffer = new ArraySegment<byte>(buffer);
received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token); received = await liveHandler.Socket.ReceiveAsync(sBuffer, liveHandler.TokenSource.Token);
_logger.LogInformation($"Received bytes : {received.Count}"); _logger.LogInformation($"Received bytes : {received.Count}");
sBuffer = new ArraySegment<byte>(buffer,0,received.Count);
_logger.LogInformation($"segment : offset: {sBuffer.Offset} count: {sBuffer.Count}"); _logger.LogInformation($"segment : offset: {sBuffer.Offset} count: {sBuffer.Count}");
_logger.LogInformation($"Is the end : {received.EndOfMessage}"); _logger.LogInformation($"Is the end : {received.EndOfMessage}");
fsInputQueue.Enqueue(sBuffer);
if (received.CloseStatus.HasValue) if (received.CloseStatus.HasValue)
{ {
endOfInput=true; endOfInput=true;
_logger.LogInformation($"received a close status: {received.CloseStatus.Value}: {received.CloseStatusDescription}"); _logger.LogInformation($"received a close status: {received.CloseStatus.Value}: {received.CloseStatusDescription}");
} }
else fsInputQueue.Enqueue(sBuffer);
} }
else endOfInput=true; else endOfInput=true;
while (ToClose.Count > 0) while (ToClose.Count > 0)

@ -16,8 +16,8 @@ namespace Yavsc
var webSocketOptions = new WebSocketOptions() var webSocketOptions = new WebSocketOptions()
{ {
KeepAliveInterval = TimeSpan.FromSeconds(30), KeepAliveInterval = TimeSpan.FromSeconds(30),
ReceiveBufferSize = Constants.WebSocketsMaxBufLen+4*sizeof(int), ReceiveBufferSize = Constants.WebSocketsMaxBufLen,
ReplaceFeature = true ReplaceFeature = false
}; };
app.UseWebSockets(webSocketOptions); app.UseWebSockets(webSocketOptions);

@ -64,7 +64,7 @@ namespace Yavsc.ViewModels.Streaming
{ {
var buffer = queue.Dequeue(); var buffer = queue.Dequeue();
logger.LogInformation($"writing {buffer.Array.Length} bytes..."); logger.LogInformation($"writing {buffer.Count} bytes...");
await dest.WriteAsync(buffer.Array, buffer.Offset, buffer.Count); await dest.WriteAsync(buffer.Array, buffer.Offset, buffer.Count);
logger.LogInformation($"done."); logger.LogInformation($"done.");

@ -1,8 +1,10 @@
using System; using System;
using System.IO; using System.IO;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Security.Policy;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Web;
using cli.Model; using cli.Model;
using Microsoft.Extensions.CommandLineUtils; using Microsoft.Extensions.CommandLineUtils;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -94,7 +96,100 @@ namespace cli {
read = await stream.ReadAsync(buffer, offset, bufLen); read = await stream.ReadAsync(buffer, offset, bufLen);
lastFrame = read < Yavsc.Constants.WebSocketsMaxBufLen; lastFrame = read < Yavsc.Constants.WebSocketsMaxBufLen;
ArraySegment<byte> segment = new ArraySegment<byte>(buffer, offset, read); ArraySegment<byte> segment = new ArraySegment<byte>(buffer, offset, read);
await _client.SendAsync(new ArraySegment<byte>(buffer), pckType, lastFrame, _tokenSource.Token); await _client.SendAsync(segment, pckType, lastFrame, _tokenSource.Token);
_logger.LogInformation($"sent {segment.Count} ");
} while (!lastFrame);
_logger.LogInformation($"Closing socket");
await _client.CloseAsync(WebSocketCloseStatus.NormalClosure, "EOF", _tokenSource.Token);
}
}
public class NStreamer: ICommander {
private readonly ClientWebSocket _client;
private readonly ILogger _logger;
private readonly ConnectionSettings _cxSettings;
private readonly UserConnectionSettings _userCxSettings;
private CommandArgument _sourceArg;
private CommandArgument _destArg;
private CancellationTokenSource _tokenSource;
public NStreamer(ILoggerFactory loggerFactory,
IOptions<ConnectionSettings> cxSettings,
IOptions<UserConnectionSettings> userCxSettings
)
{
_logger = loggerFactory.CreateLogger<Streamer>();
_cxSettings = cxSettings.Value;
_userCxSettings = userCxSettings.Value;
_client = new ClientWebSocket();
_client.Options.SetRequestHeader("Authorization", $"Bearer {_userCxSettings.AccessToken}");
}
public CommandLineApplication Integrate(CommandLineApplication rootApp)
{
CommandLineApplication streamCmd = rootApp.Command("stream",
(target) =>
{
target.FullName = "Stream to server";
target.Description = "Stream arbitrary binary data to your server channel";
_sourceArg = target.Argument("source", "Source file to send, use '-' for standard input", false);
_destArg = target.Argument("destination", "destination file name", false);
target.HelpOption("-? | -h | --help");
});
streamCmd.OnExecute(async() => await DoExecute());
return streamCmd;
}
private async Task <int> DoExecute()
{
if (_sourceArg.Value != "-")
{
var fi = new FileInfo(_sourceArg.Value);
if (!fi.Exists) {
_logger.LogError("Input file doesn´t exist.");
return -2;
}
using (var stream = fi.OpenRead())
{
_logger.LogInformation("DoExecute from given file");
await DoStream(stream);
}
return 0;
}
else
{
using(var stream = Console.OpenStandardInput())
{
_logger.LogInformation("DoExecute from standard input");
await DoStream(stream);
}
return 0;
}
}
async Task DoStream (Stream stream)
{
_tokenSource = new CancellationTokenSource();
var url = _cxSettings.StreamingUrl + "/" + HttpUtility.UrlEncode(_destArg.Value);
_logger.LogInformation("Connecting to " + url);
await _client.ConnectAsync(new Uri(url), _tokenSource.Token);
_logger.LogInformation("Connected");
const int bufLen = Yavsc.Constants.WebSocketsMaxBufLen;
byte [] buffer = new byte[bufLen];
const int offset=0;
int read;
bool lastFrame;
WebSocketMessageType pckType = WebSocketMessageType.Binary;
do
{
read = await stream.ReadAsync(buffer, offset, bufLen);
lastFrame = read < Yavsc.Constants.WebSocketsMaxBufLen;
ArraySegment<byte> segment = new ArraySegment<byte>(buffer, offset, read);
await _client.SendAsync(segment, pckType, lastFrame, _tokenSource.Token);
_logger.LogInformation($"sent {segment.Count} "); _logger.LogInformation($"sent {segment.Count} ");
} while (!lastFrame); } while (!lastFrame);
_logger.LogInformation($"Closing socket"); _logger.LogInformation($"Closing socket");

Loading…