From 64832d6fefecf69e1fe6a8a769215bf2f4750175 Mon Sep 17 00:00:00 2001 From: Paul Schneider Date: Thu, 27 Jun 2019 10:30:28 +0100 Subject: [PATCH] A first flow was saved to disk --- .../FileSystem/FileSystemHelpers.cs | 3 - .../Messaging/PublicStreamInfo.cs | 11 + src/Yavsc.Server/Constants.cs | 1 + .../Streaming/LiveApiController.cs | 14 +- src/Yavsc/Helpers/FileSystemHelpers.cs | 79 +++++-- .../IConnexionManager.cs | 0 src/Yavsc/Interfaces/ILiveProcessor.cs | 28 +++ src/Yavsc/Services/LiveProcessor.cs | 212 ++++++++++++++++++ src/Yavsc/Startup/Startup.FileServer.cs | 2 +- src/Yavsc/Startup/Startup.OAuthHelpers.cs | 36 +-- src/Yavsc/Startup/Startup.cs | 207 ++--------------- .../ViewModels/Streaming/LiveCastMeta.cs | 5 +- src/Yavsc/wwwroot/js/chat.js | 6 + 13 files changed, 363 insertions(+), 241 deletions(-) create mode 100644 src/Yavsc.Abstract/Messaging/PublicStreamInfo.cs rename src/Yavsc/{Services => Interfaces}/IConnexionManager.cs (100%) create mode 100644 src/Yavsc/Interfaces/ILiveProcessor.cs create mode 100644 src/Yavsc/Services/LiveProcessor.cs diff --git a/src/Yavsc.Abstract/FileSystem/FileSystemHelpers.cs b/src/Yavsc.Abstract/FileSystem/FileSystemHelpers.cs index 31b010eb..c7d41c64 100644 --- a/src/Yavsc.Abstract/FileSystem/FileSystemHelpers.cs +++ b/src/Yavsc.Abstract/FileSystem/FileSystemHelpers.cs @@ -48,11 +48,8 @@ namespace Yavsc.Abstract.FileSystem { UserDirectoryInfo di = new UserDirectoryInfo(UserFilesDirName, user.Identity.Name, subdir); - return di; } - - } public static class FileSystemConstants diff --git a/src/Yavsc.Abstract/Messaging/PublicStreamInfo.cs b/src/Yavsc.Abstract/Messaging/PublicStreamInfo.cs new file mode 100644 index 00000000..c215c823 --- /dev/null +++ b/src/Yavsc.Abstract/Messaging/PublicStreamInfo.cs @@ -0,0 +1,11 @@ +namespace Yavsc.Models.Messaging +{ + public class PublicStreamInfo + { + public long id { get; set; } + public string sender { get; set; } + public string title { get; set; } + public string url { get; set; } + public string mediaType { get; set; } + } +} \ No newline at end of file diff --git a/src/Yavsc.Server/Constants.cs b/src/Yavsc.Server/Constants.cs index 408ead8c..e77b943a 100644 --- a/src/Yavsc.Server/Constants.cs +++ b/src/Yavsc.Server/Constants.cs @@ -14,6 +14,7 @@ namespace Yavsc LogoutPath = "~/signout", UserInfoPath = "~/api/me", SignalRPath = "/api/signalr", + LiveUserPath = "live", ApplicationAuthenticationSheme = "ServerCookie", ExternalAuthenticationSheme= "ExternalCookie", diff --git a/src/Yavsc/ApiControllers/Streaming/LiveApiController.cs b/src/Yavsc/ApiControllers/Streaming/LiveApiController.cs index 47474edc..d8c3223d 100644 --- a/src/Yavsc/ApiControllers/Streaming/LiveApiController.cs +++ b/src/Yavsc/ApiControllers/Streaming/LiveApiController.cs @@ -10,6 +10,7 @@ using Microsoft.Data.Entity; using Microsoft.Extensions.Logging; using Yavsc.Models; using Yavsc.Models.Streaming; +using Yavsc.Services; using Yavsc.ViewModels.Streaming; namespace Yavsc.Controllers @@ -17,8 +18,8 @@ namespace Yavsc.Controllers [Route("api/live")] public class LiveApiController : Controller { - public static ConcurrentDictionary Casters = new ConcurrentDictionary(); - + + ILiveProcessor _liveProcessor; private ApplicationDbContext _dbContext; ILogger _logger; @@ -30,11 +31,14 @@ namespace Yavsc.Controllers public LiveApiController( ILoggerFactory loggerFactory, - ApplicationDbContext context) + ApplicationDbContext context, + ILiveProcessor liveProcessor) { + _liveProcessor = liveProcessor; _dbContext = context; _logger = loggerFactory.CreateLogger(); } + [HttpGet("filenamehint/{id}")] public async Task GetFileNameHint(string id) { @@ -46,7 +50,7 @@ namespace Yavsc.Controllers { if (!HttpContext.WebSockets.IsWebSocketRequest) return new BadRequestResult(); var uid = User.GetUserId(); - var existent = Casters[id]; + var existent = _liveProcessor.Casters[id]; var socket = await HttpContext.WebSockets.AcceptWebSocketAsync(); if (existent.Listeners.TryAdd(uid,socket)) { return Ok(); @@ -65,7 +69,7 @@ namespace Yavsc.Controllers public IActionResult Index(long? id) { if (id==0) - return View("Index", Casters.Select(c=> new { UserName = c.Key, Listenning = c.Value.Listeners.Count })); + return View("Index", _liveProcessor.Casters.Select(c=> new { UserName = c.Key, Listenning = c.Value.Listeners.Count })); var flow = _dbContext.LiveFlow.SingleOrDefault(f=>f.Id == id); if (flow == null) return HttpNotFound(); diff --git a/src/Yavsc/Helpers/FileSystemHelpers.cs b/src/Yavsc/Helpers/FileSystemHelpers.cs index c93c1e75..b1241785 100644 --- a/src/Yavsc/Helpers/FileSystemHelpers.cs +++ b/src/Yavsc/Helpers/FileSystemHelpers.cs @@ -1,11 +1,14 @@ using System; +using System.Collections.Generic; using System.Drawing; using System.Drawing.Imaging; using System.IO; using System.Net.Mime; using System.Security.Claims; +using System.Threading; +using System.Threading.Tasks; using System.Web; using Microsoft.AspNet.Http; using Yavsc.Abstract.FileSystem; @@ -107,18 +110,20 @@ public static FileRecievedInfo ReceiveProSignature(this ClaimsPrincipal user, st { user.DiskQuota += quota; } - public static FileRecievedInfo ReceiveUserFile(this ApplicationUser user, string root, IFormFile f, string destFileName = null) { - // TODO lock user's disk usage for this scope, + return ReceiveUserFile(user, root, f.OpenReadStream(), destFileName ?? f.ContentDisposition, f.ContentType, CancellationToken.None); + } + + public static FileRecievedInfo ReceiveUserFile(this ApplicationUser user, string root, Queue> queue, string destFileName, string contentType, CancellationToken token) + { + // TODO lock user's disk usage for this scope, // this process is not safe at concurrent access. long usage = user.DiskUsage; var item = new FileRecievedInfo(); - // form-data; name="file"; filename="capt0008.jpg" - ContentDisposition contentDisposition = new ContentDisposition(f.ContentDisposition); - item.FileName = Yavsc.Abstract.FileSystem.AbstractFileSystemHelpers.FilterFileName (destFileName ?? contentDisposition.FileName); - item.MimeType = contentDisposition.DispositionType; + item.FileName = Yavsc.Abstract.FileSystem.AbstractFileSystemHelpers.FilterFileName (destFileName); + item.MimeType = contentType; item.DestDir = root; var fi = new FileInfo(Path.Combine(root, item.FileName)); if (fi.Exists) @@ -128,36 +133,66 @@ public static FileRecievedInfo ReceiveProSignature(this ClaimsPrincipal user, st } using (var dest = fi.OpenWrite()) { - using (var org = f.OpenReadStream()) - { - byte[] buffer = new byte[1024]; - long len = org.Length; - if (len > (user.DiskQuota - usage)) { - item.QuotaOffensed = true; - return item; + while (!token.IsCancellationRequested) + { + if (queue.Count==0) Task.Delay(300); + else { + var buffer = queue.Dequeue(); + dest.Write(buffer.Array,buffer.Offset, buffer.Count); + usage += buffer.Count; + } + if (usage >= user.DiskQuota) break; } - + user.DiskUsage = usage; + dest.Close(); + } + if (usage >= user.DiskQuota) { + item.QuotaOffensed = true; + } + user.DiskUsage = usage; + return item; + } + public static FileRecievedInfo ReceiveUserFile(this ApplicationUser user, string root, Stream inputStream, string destFileName, string contentType, CancellationToken token) + { + // TODO lock user's disk usage for this scope, + // this process is not safe at concurrent access. + long usage = user.DiskUsage; - while (len > 0 && usage < user.DiskQuota) + var item = new FileRecievedInfo(); + item.FileName = Yavsc.Abstract.FileSystem.AbstractFileSystemHelpers.FilterFileName (destFileName); + item.MimeType = contentType; + item.DestDir = root; + var fi = new FileInfo(Path.Combine(root, item.FileName)); + if (fi.Exists) + { + item.Overriden = true; + usage -= fi.Length; + } + using (var dest = fi.OpenWrite()) + { + using (inputStream) + { + const int blen = 1024; + byte[] buffer = new byte[blen]; + int len = 0; + while (!token.IsCancellationRequested && (len=inputStream.Read(buffer, 0, blen))>0) { - int blen = len > 1024 ? 1024 : (int)len; - org.Read(buffer, 0, blen); - dest.Write(buffer, 0, blen); - len -= blen; - usage += blen; + dest.Write(buffer, 0, len); + usage += len; + if (usage >= user.DiskQuota) break; } user.DiskUsage = usage; dest.Close(); - org.Close(); + inputStream.Close(); } } if (usage >= user.DiskQuota) { item.QuotaOffensed = true; - } user.DiskUsage = usage; return item; } + public static HtmlString FileLink(this RemoteFileInfo info, string username, string subpath) { return new HtmlString( Startup.UserFilesOptions.RequestPath+"/"+ username + diff --git a/src/Yavsc/Services/IConnexionManager.cs b/src/Yavsc/Interfaces/IConnexionManager.cs similarity index 100% rename from src/Yavsc/Services/IConnexionManager.cs rename to src/Yavsc/Interfaces/IConnexionManager.cs diff --git a/src/Yavsc/Interfaces/ILiveProcessor.cs b/src/Yavsc/Interfaces/ILiveProcessor.cs new file mode 100644 index 00000000..c0793831 --- /dev/null +++ b/src/Yavsc/Interfaces/ILiveProcessor.cs @@ -0,0 +1,28 @@ +using System.Collections.Concurrent; +using System.Threading.Tasks; +using Microsoft.AspNet.Http; +using Yavsc.ViewModels.Streaming; + +namespace Yavsc.Services +{ + public interface ILiveProcessor { + /// + /// instance keeping reference on + /// all collections of casting and listenning websockets + /// + /// + ConcurrentDictionary Casters { get; } + /// + /// Try and accept websocket from aspnet http context + /// + /// + /// + Task AcceptStream (HttpContext context); + + /// + /// live cast entry point + /// + /// + PathString LiveCastingPath {get; set;} + } +} \ No newline at end of file diff --git a/src/Yavsc/Services/LiveProcessor.cs b/src/Yavsc/Services/LiveProcessor.cs new file mode 100644 index 00000000..c8dd1dc2 --- /dev/null +++ b/src/Yavsc/Services/LiveProcessor.cs @@ -0,0 +1,212 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net.WebSockets; +using System.Security.Claims; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNet.Http; +using Microsoft.AspNet.SignalR; +using Microsoft.Data.Entity; +using Microsoft.Extensions.Logging; +using Yavsc.Helpers; +using Yavsc.Models; +using Yavsc.ViewModels.Streaming; +using Yavsc.Models.Messaging; + +namespace Yavsc.Services +{ + + public class LiveProcessor : ILiveProcessor { + IHubContext _hubContext; + private ILogger _logger; + ApplicationDbContext _dbContext; + public PathString LiveCastingPath {get; set;} = Constants.LivePath; + + + public ConcurrentDictionary Casters {get;} = new ConcurrentDictionary(); + + public LiveProcessor(ApplicationDbContext dbContext, ILoggerFactory loggerFactory) + { + _dbContext = dbContext; + _hubContext = GlobalHost.ConnectionManager.GetHubContext(); + _logger = loggerFactory.CreateLogger(); + } + + public async Task AcceptStream (HttpContext context) + { + // TODO defer request handling + var liveId = long.Parse(context.Request.Path.Value.Substring(LiveCastingPath.Value.Length + 1)); + var userId = context.User.GetUserId(); + var user = await _dbContext.Users.FirstAsync(u => u.Id == userId); + var uname = user.UserName; + var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == userId && f.Id == liveId)); + if (flow == null) + { + _logger.LogWarning("Aborting. Flow info was not found."); + context.Response.StatusCode = 400; + return false; + } + _logger.LogInformation("flow : "+flow.Title+" for "+uname); + LiveCastHandler meta = null; + if (Casters.ContainsKey(uname)) + { + meta = Casters[uname]; + if (meta.Socket.State == WebSocketState.Open || meta.Socket.State == WebSocketState.Connecting ) + { + // FIXME loosed connexion should be detected & disposed else where + await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None); + + } + if (!meta.TokenSource.IsCancellationRequested) { + meta.TokenSource.Cancel(); + } + meta.Socket.Dispose(); + meta.Socket = await context.WebSockets.AcceptWebSocketAsync(); + meta.TokenSource = new CancellationTokenSource(); + } + else + { + // Accept the socket + meta = new LiveCastHandler { Socket = await context.WebSockets.AcceptWebSocketAsync() }; + } + _logger.LogInformation("Accepted web socket"); + // Dispatch the flow + + try + { + if (meta.Socket != null && meta.Socket.State == WebSocketState.Open) + { + Casters[uname] = meta; + // TODO: Handle the socket here. + // Find receivers: others in the chat room + // send them the flow + var buffer = new byte[Constants.WebSocketsMaxBufLen]; + var sBuffer = new ArraySegment(buffer); + _logger.LogInformation("Receiving bytes..."); + + WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token); + _logger.LogInformation($"Received bytes : {received.Count}"); + _logger.LogInformation($"Is the end : {received.EndOfMessage}"); + + string destDir = context.User.InitPostToFileSystem("live"); + _logger.LogInformation($"Saving flow to {destDir}"); + string fileName = Path.Combine(destDir, flow.DifferedFileName); + FileInfo iFile = new FileInfo(fileName); + var fsio = new Queue>(); + var toFs = Task.Run( ()=> user.ReceiveUserFile(destDir, fsio, flow.DifferedFileName, flow.MediaType, meta.TokenSource.Token)); + var hubContext = GlobalHost.ConnectionManager.GetHubContext(); + + hubContext.Clients.All.addPublicStream(new PublicStreamInfo + { + id = flow.Id, + sender = flow.Owner.UserName, + title = flow.Title, + url = flow.GetFileUrl(), + mediaType = flow.MediaType + }, $"{flow.Owner.UserName} is starting a stream!"); + + Stack ToClose = new Stack(); + + try + { + + _logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); + // Echo anything we receive + // and send to all listner found + foreach (var cliItem in meta.Listeners) + { + var listenningSocket = cliItem.Value; + if (listenningSocket.State == WebSocketState.Open) { + await listenningSocket.SendAsync( + sBuffer, received.MessageType, received.EndOfMessage, meta.TokenSource.Token); + + } + else + if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent) + { + ToClose.Push(cliItem.Key); + } + } + fsio.Enqueue(sBuffer); + // logger.LogInformation("replying..."); + while (!received.CloseStatus.HasValue) + { + // reply echo + // await meta.Socket.SendAsync(new ArraySegment(buffer), received.MessageType, received.EndOfMessage, meta.TokenSource.Token); + + _logger.LogInformation("Receiving new bytes..."); + buffer = new byte[Constants.WebSocketsMaxBufLen]; + sBuffer = new ArraySegment(buffer); + + received = await meta.Socket.ReceiveAsync(sBuffer, meta.TokenSource.Token); + foreach (var cliItem in meta.Listeners) + { + var listenningSocket = cliItem.Value; + if (listenningSocket.State == WebSocketState.Open) { + await listenningSocket.SendAsync( + sBuffer, received.MessageType, received.EndOfMessage, meta.TokenSource.Token); + } + else + if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent) + { + ToClose.Push(cliItem.Key); + } + } + fsio.Enqueue(sBuffer); + _logger.LogInformation($"Received new bytes : {received.Count}"); + _logger.LogInformation($"Is the end : {received.EndOfMessage}"); + while (ToClose.Count >0) + { + string no = ToClose.Pop(); + _logger.LogInformation("Closing follower connection"); + WebSocket listenningSocket; + if (meta.Listeners.TryRemove(no, out listenningSocket)) + await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); + + } + } + _logger.LogInformation("Closing connection"); + await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); + meta.TokenSource.Cancel(); + Casters[uname] = null; + } + catch (Exception ex) + { + _logger.LogError($"Exception occured : {ex.Message}"); + _logger.LogError(ex.StackTrace); + meta.Socket.Dispose(); + Casters[uname] = null; + } + } + else + { // not meta.Socket != null && meta.Socket.State == WebSocketState.Open + if (meta.Socket != null) + { + _logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} "); + meta.Socket.Dispose(); + } + else + _logger.LogError("socket object is null"); + } + } + catch (IOException ex) + { + if (ex.Message == "Unexpected end of stream") + { + _logger.LogError($"Unexpected end of stream"); + } + else + { + _logger.LogError($"Really unexpected end of stream"); + } + await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None); + meta.Socket?.Dispose(); + Casters[uname] = null; + } + return true; + } + } +} \ No newline at end of file diff --git a/src/Yavsc/Startup/Startup.FileServer.cs b/src/Yavsc/Startup/Startup.FileServer.cs index a0749464..78fd1ced 100644 --- a/src/Yavsc/Startup/Startup.FileServer.cs +++ b/src/Yavsc/Startup/Startup.FileServer.cs @@ -65,7 +65,7 @@ namespace Yavsc }; GitOptions.DefaultFilesOptions.DefaultFileNames.Add("index.md"); GitOptions.StaticFileOptions.ServeUnknownFileTypes = true; - logger.LogInformation( $"{GitDirName}"); + _logger.LogInformation( $"{GitDirName}"); GitOptions.StaticFileOptions.OnPrepareResponse+= OnPrepareGitRepoResponse; app.UseFileServer(UserFilesOptions); diff --git a/src/Yavsc/Startup/Startup.OAuthHelpers.cs b/src/Yavsc/Startup/Startup.OAuthHelpers.cs index da0bef2e..6beab063 100644 --- a/src/Yavsc/Startup/Startup.OAuthHelpers.cs +++ b/src/Yavsc/Startup/Startup.OAuthHelpers.cs @@ -18,10 +18,10 @@ namespace Yavsc private Client GetApplication(string clientId) { if (_dbContext==null) - logger.LogError("no db!"); + _logger.LogError("no db!"); Client app = _dbContext.Applications.FirstOrDefault(x => x.Id == clientId); if (app==null) - logger.LogError($"no app for <{clientId}>"); + _logger.LogError($"no app for <{clientId}>"); return app; } private readonly ConcurrentDictionary _authenticationCodes = new ConcurrentDictionary(StringComparer.Ordinal); @@ -31,7 +31,7 @@ namespace Yavsc if (context == null) throw new InvalidOperationException("context == null"); var app = GetApplication(context.ClientId); if (app == null) return Task.FromResult(0); - Startup.logger.LogInformation($"ValidateClientRedirectUri: Validated ({app.RedirectUri})"); + Startup._logger.LogInformation($"ValidateClientRedirectUri: Validated ({app.RedirectUri})"); context.Validated(app.RedirectUri); return Task.FromResult(0); } @@ -43,7 +43,7 @@ namespace Yavsc if (context.TryGetBasicCredentials(out clientId, out clientSecret) || context.TryGetFormCredentials(out clientId, out clientSecret)) { - logger.LogInformation($"ValidateClientAuthentication: Got id: ({clientId} secret: {clientSecret})"); + _logger.LogInformation($"ValidateClientAuthentication: Got id: ({clientId} secret: {clientSecret})"); var client = GetApplication(clientId); if (client==null) { context.SetError("invalid_clientId", "Client secret is invalid."); @@ -51,10 +51,10 @@ namespace Yavsc } else if (client.Type == ApplicationTypes.NativeConfidential) { - logger.LogInformation($"NativeConfidential key"); + _logger.LogInformation($"NativeConfidential key"); if (string.IsNullOrWhiteSpace(clientSecret)) { - logger.LogInformation($"invalid_clientId: Client secret should be sent."); + _logger.LogInformation($"invalid_clientId: Client secret should be sent."); context.SetError("invalid_clientId", "Client secret should be sent."); return Task.FromResult(null); } @@ -65,7 +65,7 @@ namespace Yavsc if (client.Secret != clientSecret) { context.SetError("invalid_clientId", "Client secret is invalid."); - logger.LogInformation($"invalid_clientId: Client secret is invalid."); + _logger.LogInformation($"invalid_clientId: Client secret is invalid."); return Task.FromResult(null); } } @@ -74,25 +74,25 @@ namespace Yavsc if (!client.Active) { context.SetError("invalid_clientId", "Client is inactive."); - logger.LogInformation($"invalid_clientId: Client is inactive."); + _logger.LogInformation($"invalid_clientId: Client is inactive."); return Task.FromResult(null); } if (client != null && client.Secret == clientSecret) { - logger.LogInformation($"\\o/ ValidateClientAuthentication: Validated ({clientId})"); + _logger.LogInformation($"\\o/ ValidateClientAuthentication: Validated ({clientId})"); context.Validated(); } - else logger.LogInformation($":'( ValidateClientAuthentication: KO ({clientId})"); + else _logger.LogInformation($":'( ValidateClientAuthentication: KO ({clientId})"); } - else logger.LogWarning($"ValidateClientAuthentication: neither Basic nor Form credential were found"); + else _logger.LogWarning($"ValidateClientAuthentication: neither Basic nor Form credential were found"); return Task.FromResult(0); } UserManager _usermanager; private async Task GrantResourceOwnerCredentials(OAuthGrantResourceOwnerCredentialsContext context) { - logger.LogWarning($"GrantResourceOwnerCredentials task ... {context.UserName}"); + _logger.LogWarning($"GrantResourceOwnerCredentials task ... {context.UserName}"); ApplicationUser user = null; user = await _usermanager.FindByNameAsync(context.UserName); @@ -132,7 +132,7 @@ namespace Yavsc private void CreateAuthenticationCode(AuthenticationTokenCreateContext context) { - logger.LogInformation("CreateAuthenticationCode"); + _logger.LogInformation("CreateAuthenticationCode"); context.SetToken(Guid.NewGuid().ToString("n") + Guid.NewGuid().ToString("n")); _authenticationCodes[context.Token] = context.SerializeTicket(); } @@ -143,16 +143,16 @@ namespace Yavsc if (_authenticationCodes.TryRemove(context.Token, out value)) { context.DeserializeTicket(value); - logger.LogInformation("ReceiveAuthenticationCode: Success"); + _logger.LogInformation("ReceiveAuthenticationCode: Success"); } } private void CreateRefreshToken(AuthenticationTokenCreateContext context) { var uid = context.Ticket.Principal.GetUserId(); - logger.LogInformation($"CreateRefreshToken for {uid}"); + _logger.LogInformation($"CreateRefreshToken for {uid}"); foreach (var c in context.Ticket.Principal.Claims) - logger.LogInformation($"| User claim: {c.Type} {c.Value}"); + _logger.LogInformation($"| User claim: {c.Type} {c.Value}"); context.SetToken(context.SerializeTicket()); } @@ -160,9 +160,9 @@ namespace Yavsc private void ReceiveRefreshToken(AuthenticationTokenReceiveContext context) { var uid = context.Ticket.Principal.GetUserId(); - logger.LogInformation($"ReceiveRefreshToken for {uid}"); + _logger.LogInformation($"ReceiveRefreshToken for {uid}"); foreach (var c in context.Ticket.Principal.Claims) - logger.LogInformation($"| User claim: {c.Type} {c.Value}"); + _logger.LogInformation($"| User claim: {c.Type} {c.Value}"); context.DeserializeTicket(context.Token); } } diff --git a/src/Yavsc/Startup/Startup.cs b/src/Yavsc/Startup/Startup.cs index 093d98e8..2e344665 100755 --- a/src/Yavsc/Startup/Startup.cs +++ b/src/Yavsc/Startup/Startup.cs @@ -24,26 +24,18 @@ using Newtonsoft.Json; namespace Yavsc { using System.Collections.Generic; - using System.Linq; using System.Net; - using System.Net.WebSockets; using System.Security.Claims; - using System.Threading; using Formatters; using Google.Apis.Util.Store; using Microsoft.AspNet.Http; using Microsoft.AspNet.Identity; - using Microsoft.AspNet.SignalR; using Microsoft.Extensions.Localization; using Microsoft.Extensions.Logging; using Models; - using PayPal.Manager; using Services; using Yavsc.Abstract.FileSystem; using Yavsc.AuthorizationHandlers; - using Yavsc.Controllers; - using Yavsc.Helpers; - using Yavsc.ViewModels.Streaming; using static System.Environment; public partial class Startup @@ -58,11 +50,13 @@ namespace Yavsc public static string HostingFullName { get; set; } public static PayPalSettings PayPalSettings { get; private set; } - private static ILogger logger; + private static ILogger _logger; + + static ILiveProcessor _liveProcessor; // leave the final slash - PathString liveCastingPath = Constants.LivePath; + public Startup(IHostingEnvironment env, IApplicationEnvironment appEnv) { @@ -110,8 +104,8 @@ namespace Yavsc // never hit ... private void OnUnHandledException(object sender, UnhandledExceptionEventArgs e) { - logger.LogError(sender.ToString()); - logger.LogError(JsonConvert.SerializeObject(e.ExceptionObject)); + _logger.LogError(sender.ToString()); + _logger.LogError(JsonConvert.SerializeObject(e.ExceptionObject)); } public static string ConnectionString { get; set; } @@ -169,20 +163,6 @@ namespace Yavsc // These are the cultures the app supports for UI strings, i.e. we have localized resources for. options.SupportedUICultures = supportedUICultures; - // You can change which providers are configured to determine the culture for requests, or even add a custom - // provider with your own logic. The providers will be asked in order to provide a culture for each request, - // and the first to provide a non-null result that is in the configured supported cultures list will be used. - // By default, the following built-in providers are configured: - // - QueryStringRequestCultureProvider, sets culture via "culture" and "ui-culture" query string values, useful for testing - // - CookieRequestCultureProvider, sets culture via "ASPNET_CULTURE" cookie - // - AcceptLanguageHeaderRequestCultureProvider, sets culture via the "Accept-Language" request header - - //options.RequestCultureProviders.Insert(0, new CustomRequestCultureProvider(async context => - //{ - // // My custom request culture logic - // return new ProviderCultureResult("fr"); - //})); - options.RequestCultureProviders = new List { new QueryStringRequestCultureProvider { Options = options }, @@ -251,6 +231,7 @@ namespace Yavsc services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddMvc(config => { @@ -312,12 +293,14 @@ namespace Yavsc IOptions googleSettings, IStringLocalizer localizer, UserManager usermanager, + ILiveProcessor liveProcessor, ILoggerFactory loggerFactory) { Services = app.ApplicationServices; _dbContext = dbContext; _usermanager = usermanager; + _liveProcessor = liveProcessor; GoogleSettings = googleSettings.Value; ResourcesHelpers.GlobalLocalizer = localizer; SiteSetup = siteSettings.Value; @@ -344,7 +327,7 @@ namespace Yavsc loggerFactory.AddConsole(Configuration.GetSection("Logging")); loggerFactory.AddDebug(); - logger = loggerFactory.CreateLogger(); + _logger = loggerFactory.CreateLogger(); app.UseStatusCodePagesWithReExecute("/Home/Status/{0}"); if (env.IsDevelopment()) @@ -420,11 +403,11 @@ namespace Yavsc }); app.UseSession(); - ConfigureOAuthApp(app, SiteSetup, logger); + ConfigureOAuthApp(app, SiteSetup, _logger); ConfigureFileServerApp(app, SiteSetup, env, authorizationService); app.UseRequestLocalization(localizationOptions.Value, (RequestCulture)new RequestCulture((string)"en-US")); - ConfigureWorkflow(app, SiteSetup, logger); + ConfigureWorkflow(app, SiteSetup, _logger); // Empty this odd chat user list from db foreach (var p in dbContext.ChatConnection) { @@ -440,10 +423,10 @@ namespace Yavsc name: "default", template: "{controller=Home}/{action=Index}/{id?}"); }); - logger.LogInformation("LocalApplicationData: " + Environment.GetFolderPath(SpecialFolder.LocalApplicationData, SpecialFolderOption.DoNotVerify)); + _logger.LogInformation("LocalApplicationData: " + Environment.GetFolderPath(SpecialFolder.LocalApplicationData, SpecialFolderOption.DoNotVerify)); app.Use(async (context, next) => { - var liveCasting = context.Request.Path.StartsWithSegments(liveCastingPath); + var liveCasting = context.Request.Path.StartsWithSegments(_liveProcessor.LiveCastingPath); if (liveCasting) { @@ -451,168 +434,10 @@ namespace Yavsc if (context.WebSockets.IsWebSocketRequest) { if (!context.User.Identity.IsAuthenticated) - context.Abort(); + context.Response.StatusCode = 403; else { - // get the flow id from request path - var castid = long.Parse(context.Request.Path.Value.Substring(liveCastingPath.Value.Length + 1)); - - logger.LogInformation("Cast id : "+castid); - var uname = context.User.GetUserName(); - // ensure uniqueness of casting stream from this user - - var uid = context.User.GetUserId(); - // get some setup from user - var flow = _dbContext.LiveFlow.Include(f => f.Owner).SingleOrDefault(f => (f.OwnerId == uid && f.Id == castid)); - if (flow == null) - { - logger.LogWarning("Aborting. Flow info was not found."); - var socket = await context.WebSockets.AcceptWebSocketAsync(); - await socket.CloseAsync(WebSocketCloseStatus.PolicyViolation, "flow id invalid", CancellationToken.None); - } - else - { - logger.LogInformation("flow : "+flow.Title+" for "+uname); - LiveCastMeta meta = null; - if (LiveApiController.Casters.ContainsKey(uname)) - { - meta = LiveApiController.Casters[uname]; - if (meta.Socket.State != WebSocketState.Closed) - { - // FIXME loosed connexion should be detected & disposed else where - await meta.Socket.CloseAsync( WebSocketCloseStatus.EndpointUnavailable, "one by user", CancellationToken.None); - meta.Socket.Dispose(); - meta.Socket = await context.WebSockets.AcceptWebSocketAsync(); - } - else - { - meta.Socket.Dispose(); - // Accept the socket - meta.Socket = await context.WebSockets.AcceptWebSocketAsync(); - } - } - else - { - // Accept the socket - meta = new LiveCastMeta { Socket = await context.WebSockets.AcceptWebSocketAsync() }; - } - logger.LogInformation("Accepted web socket"); - // Dispatch the flow - try - { - if (meta.Socket != null && meta.Socket.State == WebSocketState.Open) - { - LiveApiController.Casters[uname] = meta; - // TODO: Handle the socket here. - // Find receivers: others in the chat room - // send them the flow - var buffer = new byte[Constants.WebSocketsMaxBufLen]; - var sBuffer = new ArraySegment(buffer); - logger.LogInformation("Receiving bytes..."); - - WebSocketReceiveResult received = await meta.Socket.ReceiveAsync(sBuffer, CancellationToken.None); - logger.LogInformation($"Received bytes : {received.Count}"); - logger.LogInformation($"Is the end : {received.EndOfMessage}"); -/* TODO - var hubContext = GlobalHost.ConnectionManager.GetHubContext(); - - hubContext.Clients.All.addPublicStream(new - { - id = flow.Id, - sender = flow.Owner.UserName, - title = flow.Title, - url = flow.GetFileUrl(), - mediaType = flow.MediaType - }, $"{flow.Owner.UserName} is starting a stream!"); -*/ - // FIXME do we really need to close those one in invalid state ? - // Stack ToClose = new Stack(); - - try - { - /* - logger.LogInformation($"Echoing {received.Count} bytes received in a {received.MessageType} message; Fin={received.EndOfMessage}"); - // Echo anything we receive - // and send to all listner found - foreach (var cliItem in meta.Listeners) - { - var listenningSocket = cliItem.Value; - if (listenningSocket.State == WebSocketState.Open) { - await listenningSocket.SendAsync( - sBuffer, received.MessageType, received.EndOfMessage, CancellationToken.None); - - } - else - if (listenningSocket.State == WebSocketState.CloseReceived || listenningSocket.State == WebSocketState.CloseSent) - { - ToClose.Push(cliItem.Key); - } - } - */ - // logger.LogInformation("replying..."); - while (!received.CloseStatus.HasValue) - { - - - // await meta.Socket.SendAsync(new ArraySegment(buffer), received.MessageType, received.EndOfMessage, CancellationToken.None); - - logger.LogInformation("Receiving new bytes..."); - - received = await meta.Socket.ReceiveAsync(new ArraySegment(buffer), CancellationToken.None); - - logger.LogInformation($"Received new bytes : {received.Count}"); - logger.LogInformation($"Is the end : {received.EndOfMessage}"); - } - logger.LogInformation("Closing connection"); - await meta.Socket.CloseAsync(received.CloseStatus.Value, received.CloseStatusDescription, CancellationToken.None); - - - - /* while (ToClose.Count >0) - { - string no = ToClose.Pop(); - WebSocket listenningSocket; - if (meta.Listeners.TryRemove(no, out listenningSocket)) - await listenningSocket.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, "State != WebSocketState.Open", CancellationToken.None); - - } */ - - LiveApiController.Casters[uname] = null; - } - catch (Exception ex) - { - logger.LogError($"Exception occured : {ex.Message}"); - logger.LogError(ex.StackTrace); - meta.Socket.Dispose(); - LiveApiController.Casters[uname] = null; - } - } - else - { // not meta.Socket != null && meta.Socket.State == WebSocketState.Open - if (meta.Socket != null) - { - logger.LogError($"meta.Socket.State not Open: {meta.Socket.State.ToString()} "); - meta.Socket.Dispose(); - } - else - logger.LogError("socket object is null"); - } - } - catch (IOException ex) - { - if (ex.Message == "Unexpected end of stream") - { - logger.LogError($"Unexpected end of stream"); - } - else - { - logger.LogError($"Really unexpected end of stream"); - } - await meta.Socket?.CloseAsync(WebSocketCloseStatus.EndpointUnavailable, ex.Message, CancellationToken.None); - meta.Socket?.Dispose(); - LiveApiController.Casters[uname] = null; - } - } + await _liveProcessor.AcceptStream(context); } } else { diff --git a/src/Yavsc/ViewModels/Streaming/LiveCastMeta.cs b/src/Yavsc/ViewModels/Streaming/LiveCastMeta.cs index 254af1e3..e0c40bd8 100644 --- a/src/Yavsc/ViewModels/Streaming/LiveCastMeta.cs +++ b/src/Yavsc/ViewModels/Streaming/LiveCastMeta.cs @@ -1,5 +1,6 @@ using System.Collections.Concurrent; using System.Net.WebSockets; +using System.Threading; namespace Yavsc.ViewModels.Streaming { @@ -13,10 +14,12 @@ namespace Yavsc.ViewModels.Streaming public string FlowId { get; set; } } - public class LiveCastMeta + public class LiveCastHandler { public WebSocket Socket { get; set; } public ConcurrentDictionary Listeners { get; set; } = new ConcurrentDictionary(); + + public CancellationTokenSource TokenSource { get; set; } = new CancellationTokenSource(); } } \ No newline at end of file diff --git a/src/Yavsc/wwwroot/js/chat.js b/src/Yavsc/wwwroot/js/chat.js index 67e251ba..34499da5 100644 --- a/src/Yavsc/wwwroot/js/chat.js +++ b/src/Yavsc/wwwroot/js/chat.js @@ -109,6 +109,12 @@ window.ChatHubHandler = (function ($) { $('
  • ').append(tag + ': ' ).append(message).addClass(tag).appendTo($('#room_' + room)); }; + chat.client.addPublicStream = function (pubStrInfo) + { + $('
  • ').append(pubStrInfo.sender + ': ') + .append(''+pubStrInfo.title+'').append('['+pubStrInfo.mediaType+']').addClass('streaminfo').appendTo(notifications); + }; + var setChanInfo = function (chanInfo) { if (chanInfo) { var chanId = 'r' + chanInfo.Name;